fix(exports): 增强导出任务错误日志记录
- 在导出任务恢复时记录详细的错误信息,包括任务ID、错误内容、字段和格式 - 导出流处理失败时增加错误日志,包含任务阶段、数据源、SQL语句和参数 - 在直接生成xlsx失败时,记录错误详情和导出字段数量 - 统一使用日志系统记录导出失败事件,方便排查问题 - 确保所有异常情况下调用MarkFailed标记任务失败
This commit is contained in:
parent
2f2cef905f
commit
d89f6fffad
|
|
@ -343,8 +343,15 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
|
||||||
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fmt string) {
|
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fmt string) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
|
logging.JSON("ERROR", map[string]interface{}{
|
||||||
|
"event": "export_panic",
|
||||||
|
"job_id": id,
|
||||||
|
"error": utils.ToString(r),
|
||||||
|
"fields": fields,
|
||||||
|
"format": fmt,
|
||||||
|
})
|
||||||
|
log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields)
|
||||||
repo.NewExportRepo().MarkFailed(a.meta, id)
|
repo.NewExportRepo().MarkFailed(a.meta, id)
|
||||||
logging.JSON("ERROR", map[string]interface{}{"event": "export_panic", "job_id": id, "error": utils.ToString(r)})
|
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// load datasource once for transform decisions
|
// load datasource once for transform decisions
|
||||||
|
|
@ -442,6 +449,16 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
|
||||||
}
|
}
|
||||||
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
|
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
|
||||||
if e != nil {
|
if e != nil {
|
||||||
|
logging.JSON("ERROR", map[string]interface{}{
|
||||||
|
"event": "export_stream_error",
|
||||||
|
"job_id": id,
|
||||||
|
"stage": "csv_chunk",
|
||||||
|
"error": e.Error(),
|
||||||
|
"datasource": jobDS,
|
||||||
|
"sql": cq,
|
||||||
|
"args": cargs,
|
||||||
|
})
|
||||||
|
log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq)
|
||||||
rrepo.MarkFailed(a.meta, id)
|
rrepo.MarkFailed(a.meta, id)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -486,6 +503,16 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
|
||||||
}
|
}
|
||||||
count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
|
count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logging.JSON("ERROR", map[string]interface{}{
|
||||||
|
"event": "export_stream_error",
|
||||||
|
"job_id": id,
|
||||||
|
"stage": "xlsx_direct",
|
||||||
|
"error": err.Error(),
|
||||||
|
"datasource": jobDS,
|
||||||
|
"fields": fields,
|
||||||
|
"sql": q,
|
||||||
|
})
|
||||||
|
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields))
|
||||||
rrepo.MarkFailed(a.meta, id)
|
rrepo.MarkFailed(a.meta, id)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue