feat(exports): 支持服务启动时恢复未完成的导出任务

- 在服务启动时自动查询并恢复所有运行状态的导出任务
- 为导出任务增加重启次数计数及进度重置功能,确保任务重试正确性
- 将原有导出任务执行流程抽象出根据任务ID恢复任务的方法
- 修复导出任务相关数据库访问,统一使用结构体内db连接字段
- 优化导出任务文件处理与状态更新流程,提升任务恢复的稳定性
- 新增导出任务运行状态查询接口以支持任务恢复功能
- 改进错误日志记录,便于排查任务恢复中的异常情况
This commit is contained in:
zhouyonggao 2025-12-19 17:44:29 +08:00
parent cfaad63e9a
commit 48979b41d1
4 changed files with 2129 additions and 77 deletions

View File

@ -9,6 +9,7 @@ import (
"server/internal/config" "server/internal/config"
"server/internal/db" "server/internal/db"
"server/internal/logging" "server/internal/logging"
"server/internal/repo"
"time" "time"
) )
@ -79,9 +80,48 @@ func main() {
} }
return s return s
}() }()
// 启动时恢复未完成的任务
log.Println("[服务启动] 开始恢复未完成的导出任务...")
recoverRunningJobs(meta, marketing, marketingAuth, resellerDB, ymt)
log.Println("[服务启动] 任务恢复完成")
srv := &http.Server{Addr: addr, Handler: r, ReadTimeout: 15 * time.Second, WriteTimeout: 60 * time.Second} srv := &http.Server{Addr: addr, Handler: r, ReadTimeout: 15 * time.Second, WriteTimeout: 60 * time.Second}
log.Println("server listening on ", addr) log.Println("server listening on ", addr)
log.Fatal(srv.ListenAndServe()) log.Fatal(srv.ListenAndServe())
} }
// recoverRunningJobs 恢复服务中断前未完成的导出任务
func recoverRunningJobs(meta, marketing, marketingAuth, resellerDB, ymt *sql.DB) {
rrepo := repo.NewExportRepo()
// 查询所有 running 状态的任务
jobs, err := rrepo.GetRunningJobs(meta)
if err != nil {
log.Printf("[任务恢复] 查询运行中任务失败: %v", err)
return
}
if len(jobs) == 0 {
log.Println("[任务恢复] 没有需要恢复的任务")
return
}
log.Printf("[任务恢复] 发现 %d 个需要恢复的任务\n", len(jobs))
// 创建一个 ExportsAPI 实例用于恢复任务
exportsAPI := &api.ExportsAPI{Meta: meta, Marketing: marketing, YMT: ymt}
// 启动每个任务的恢复
for _, job := range jobs {
jobID := job.ID
log.Printf("[任务恢复] 启动任务恢复 - ID: %d\n", jobID)
// 通过 goroutine 并行启动任务恢复
go exportsAPI.RunJobByID(jobID)
}
// 给予一点时间让 goroutine 启动
time.Sleep(1 * time.Second)
}
// buildDSN deprecated; use cfg.YMTDB.DSN()/cfg.MarketingDB.DSN() // buildDSN deprecated; use cfg.YMTDB.DSN()/cfg.MarketingDB.DSN()

View File

@ -22,13 +22,13 @@ import (
) )
type ExportsAPI struct { type ExportsAPI struct {
meta *sql.DB Meta *sql.DB
marketing *sql.DB Marketing *sql.DB
ymt *sql.DB YMT *sql.DB
} }
func ExportsHandler(meta, marketing, ymt *sql.DB) http.Handler { func ExportsHandler(meta, marketing, ymt *sql.DB) http.Handler {
api := &ExportsAPI{meta: meta, marketing: marketing, ymt: ymt} api := &ExportsAPI{Meta: meta, Marketing: marketing, YMT: ymt}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/api/exports") p := strings.TrimPrefix(r.URL.Path, "/api/exports")
if r.Method == http.MethodPost && p == "" { if r.Method == http.MethodPost && p == "" {
@ -88,7 +88,7 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
var main string var main string
var ds string var ds string
rrepo := repo.NewExportRepo() rrepo := repo.NewExportRepo()
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, p.TemplateID) ds, main, fs, err := rrepo.GetTemplateMeta(a.Meta, p.TemplateID)
if err != nil { if err != nil {
fail(w, r, http.StatusBadRequest, "invalid template") fail(w, r, http.StatusBadRequest, "invalid template")
return return
@ -320,15 +320,111 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
owner = n owner = n
} }
} }
id, err := rrepo.InsertJob(a.meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat) id, err := rrepo.InsertJob(a.Meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat)
if err != nil { if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error()) fail(w, r, http.StatusInternalServerError, err.Error())
return return
} }
go a.runJob(uint64(id), dataDB, q, args, filtered, hdrs, p.FileFormat) go a.RunJobByID(uint64(id))
ok(w, r, map[string]interface{}{"id": id}) ok(w, r, map[string]interface{}{"id": id})
} }
// runJobByID 通过任务ID从数据库读取信息并执行任务
func (a *ExportsAPI) RunJobByID(jobID uint64) {
rrepo := repo.NewExportRepo()
// 增加重启计数
rrepo.IncrementRestartCount(a.Meta, jobID)
// 重置进度为0因为是重新开始导出
rrepo.ResetJobProgress(a.Meta, jobID)
// 获取任务详情
jobDetail, err := rrepo.GetJob(a.Meta, strconv.FormatUint(jobID, 10))
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "get_job_failed",
"job_id": jobID,
"error": err.Error(),
})
return
}
// 获取模板信息和任务过滤条件
tplID, filtersJSON, err := rrepo.GetJobFilters(a.Meta, jobID)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "get_job_filters_failed",
"job_id": jobID,
"error": err.Error(),
})
return
}
var filters map[string]interface{}
json.Unmarshal(filtersJSON, &filters)
// 获取模板字段信息
ds, mainTable, fields, err := rrepo.GetTemplateMeta(a.Meta, tplID)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "get_template_meta_failed",
"job_id": jobID,
"error": err.Error(),
})
return
}
// 获取数据库连接
dataDB := a.selectDataDB(ds)
// 构建 SQL
wl := Whitelist()
req := exporter.BuildRequest{MainTable: mainTable, Datasource: ds, Fields: fields, Filters: filters}
q, args, usedFields, err := rrepo.BuildWithFields(req, wl)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "build_sql_failed",
"job_id": jobID,
"error": err.Error(),
})
return
}
if len(usedFields) > 0 {
fields = usedFields
}
// 构建列标题
labels := FieldLabels()
hdrs := make([]string, len(fields))
for i, tf := range fields {
if v, ok := labels[tf]; ok {
hdrs[i] = v
} else {
hdrs[i] = tf
}
}
// 列头去重
{
cnt := map[string]int{}
for _, h := range hdrs {
cnt[h]++
}
for i := range hdrs {
if cnt[hdrs[i]] > 1 {
parts := strings.Split(fields[i], ".")
if len(parts) == 2 && parts[0] != mainTable {
hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i]
}
}
}
}
a.runJob(jobID, dataDB, q, args, fields, hdrs, jobDetail.FileFormat)
}
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fileFormat string) { func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fileFormat string) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -340,7 +436,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"format": fileFormat, "format": fileFormat,
}) })
log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields) log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields)
repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{ repo.NewExportRepo().MarkFailed(a.Meta, id, "export_panic", map[string]interface{}{
"error": utils.ToString(r), "error": utils.ToString(r),
"fields": fields, "fields": fields,
"format": fileFormat, "format": fileFormat,
@ -352,9 +448,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var jobMain string var jobMain string
rrepo := repo.NewExportRepo() rrepo := repo.NewExportRepo()
{ {
tplID, _, _ := rrepo.GetJobFilters(a.meta, id) tplID, _, _ := rrepo.GetJobFilters(a.Meta, id)
if tplID > 0 { if tplID > 0 {
ds, mt, _, _ := rrepo.GetTemplateMeta(a.meta, tplID) ds, mt, _, _ := rrepo.GetTemplateMeta(a.Meta, tplID)
jobDS = ds jobDS = ds
if mt != "" { if mt != "" {
jobMain = mt jobMain = mt
@ -367,7 +463,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
// 检查预估行数如果超过阈值且格式是xlsx强制改为csv // 检查预估行数如果超过阈值且格式是xlsx强制改为csv
if fileFormat == "xlsx" { if fileFormat == "xlsx" {
var rowEstimate int64 var rowEstimate int64
estRow := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) estRow := a.Meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = estRow.Scan(&rowEstimate) _ = estRow.Scan(&rowEstimate)
if rowEstimate > constants.ExportThresholds.XlsxMaxRows { if rowEstimate > constants.ExportThresholds.XlsxMaxRows {
logging.JSON("INFO", map[string]interface{}{ logging.JSON("INFO", map[string]interface{}{
@ -381,7 +477,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
} }
} }
rrepo.StartJob(a.meta, id) rrepo.StartJob(a.Meta, id)
if fileFormat == "csv" { if fileFormat == "csv" {
newBaseWriter := func() (exporter.RowWriter, error) { newBaseWriter := func() (exporter.RowWriter, error) {
return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
@ -390,12 +486,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
{ {
var tplID uint64 var tplID uint64
var filtersJSON []byte var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) row := a.Meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON) _ = row.Scan(&tplID, &filtersJSON)
var tplDS string var tplDS string
var main string var main string
var fieldsJSON []byte var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) tr := a.Meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON) _ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string var fs []string
var fl map[string]interface{} var fl map[string]interface{}
@ -415,12 +511,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var total int64 var total int64
// 如果 row_estimate 为 0在分块导出开始时重新估算 // 如果 row_estimate 为 0在分块导出开始时重新估算
var currentEst int64 var currentEst int64
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) row := a.Meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = row.Scan(&currentEst) _ = row.Scan(&currentEst)
if currentEst == 0 { if currentEst == 0 {
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl) estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
if estChunk > 0 { if estChunk > 0 {
rrepo.UpdateRowEstimate(a.meta, id, estChunk) rrepo.UpdateRowEstimate(a.Meta, id, estChunk)
} }
} }
skipChunk := false skipChunk := false
@ -452,10 +548,10 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
chunkBase := total chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil } onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, chunkBase+totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error { onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path) files = append(files, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size)
return nil return nil
} }
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
@ -470,7 +566,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"args": cargs, "args": cargs,
}) })
log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq) log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq)
rrepo.MarkFailed(a.meta, id, "csv_chunk_stream_error", map[string]interface{}{ rrepo.MarkFailed(a.Meta, id, "csv_chunk_stream_error", map[string]interface{}{
"error": e.Error(), "error": e.Error(),
"datasource": jobDS, "datasource": jobDS,
"sql": cq, "sql": cq,
@ -479,15 +575,15 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
return return
} }
total += cnt total += cnt
rrepo.UpdateProgress(a.meta, id, total) rrepo.UpdateProgress(a.Meta, id, total)
} }
if total == 0 { if total == 0 {
total = rrepo.Count(db, q, args) total = rrepo.Count(db, q, args)
} }
if len(files) >= 1 { if len(files) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files, total) rrepo.ZipAndRecord(a.Meta, id, files, total)
} }
rrepo.MarkCompleted(a.meta, id, total) rrepo.MarkCompleted(a.Meta, id, total)
return return
} }
} }
@ -499,22 +595,22 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var est int64 var est int64
{ {
var filtersJSON []byte var filtersJSON []byte
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id) row := a.Meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&filtersJSON) _ = row.Scan(&filtersJSON)
var fl map[string]interface{} var fl map[string]interface{}
json.Unmarshal(filtersJSON, &fl) json.Unmarshal(filtersJSON, &fl)
est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est) rrepo.UpdateRowEstimate(a.Meta, id, est)
} }
batch := constants.ChooseBatchSize(est, constants.FileFormat(fileFormat)) batch := constants.ChooseBatchSize(est, constants.FileFormat(fileFormat))
files2 := []string{} files2 := []string{}
cur := rrepo.NewCursor(jobDS, jobMain) cur := rrepo.NewCursor(jobDS, jobMain)
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) } transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) }
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil } onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error { onRoll := func(path string, size int64, partRows int64) error {
files2 = append(files2, path) files2 = append(files2, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size)
return nil return nil
} }
count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
@ -529,7 +625,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"sql": q, "sql": q,
}) })
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields)) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields))
rrepo.MarkFailed(a.meta, id, "csv_direct_stream_error", map[string]interface{}{ rrepo.MarkFailed(a.Meta, id, "csv_direct_stream_error", map[string]interface{}{
"error": err.Error(), "error": err.Error(),
"datasource": jobDS, "datasource": jobDS,
"fields": fields, "fields": fields,
@ -538,9 +634,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
return return
} }
if len(files2) >= 1 { if len(files2) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files2, count) rrepo.ZipAndRecord(a.Meta, id, files2, count)
} }
rrepo.MarkCompleted(a.meta, id, count) rrepo.MarkCompleted(a.Meta, id, count)
return return
} }
} }
@ -549,12 +645,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
{ {
var tplID uint64 var tplID uint64
var filtersJSON []byte var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) row := a.Meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON) _ = row.Scan(&tplID, &filtersJSON)
var tplDS string var tplDS string
var main string var main string
var fieldsJSON []byte var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) tr := a.Meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON) _ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string var fs []string
var fl map[string]interface{} var fl map[string]interface{}
@ -574,12 +670,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var total int64 var total int64
// 如果 row_estimate 为 0在分块导出开始时重新估算 // 如果 row_estimate 为 0在分块导出开始时重新估算
var currentEst int64 var currentEst int64
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) row := a.Meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = row.Scan(&currentEst) _ = row.Scan(&currentEst)
if currentEst == 0 { if currentEst == 0 {
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl) estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
if estChunk > 0 { if estChunk > 0 {
rrepo.UpdateRowEstimate(a.meta, id, estChunk) rrepo.UpdateRowEstimate(a.Meta, id, estChunk)
} }
} }
skipChunk := false skipChunk := false
@ -618,17 +714,17 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
// 进度回调按全局累计行数更新,避免跨分片出现数值回退 // 进度回调按全局累计行数更新,避免跨分片出现数值回退
chunkBase := total chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil } onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, chunkBase+totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error { onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path) files = append(files, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size)
return nil return nil
} }
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if e != nil { if e != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs}) logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs})
log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e) log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e)
rrepo.MarkFailed(a.meta, id, "xlsx_chunk_stream_error", map[string]interface{}{ rrepo.MarkFailed(a.Meta, id, "xlsx_chunk_stream_error", map[string]interface{}{
"error": e.Error(), "error": e.Error(),
"datasource": jobDS, "datasource": jobDS,
"sql": cq, "sql": cq,
@ -637,15 +733,15 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
return return
} }
total += cnt total += cnt
rrepo.UpdateProgress(a.meta, id, total) rrepo.UpdateProgress(a.Meta, id, total)
} }
if total == 0 { if total == 0 {
total = rrepo.Count(db, q, args) total = rrepo.Count(db, q, args)
} }
if len(files) >= 1 { if len(files) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files, total) rrepo.ZipAndRecord(a.Meta, id, files, total)
} }
rrepo.MarkCompleted(a.meta, id, total) rrepo.MarkCompleted(a.Meta, id, total)
return return
} }
} }
@ -656,25 +752,25 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var est2 int64 var est2 int64
{ {
var filtersJSON []byte var filtersJSON []byte
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id) row := a.Meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&filtersJSON) _ = row.Scan(&filtersJSON)
var fl map[string]interface{} var fl map[string]interface{}
json.Unmarshal(filtersJSON, &fl) json.Unmarshal(filtersJSON, &fl)
est2 = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) est2 = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est2) rrepo.UpdateRowEstimate(a.Meta, id, est2)
} }
x, err := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1") x, err := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
if err != nil { if err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()}) logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()})
log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err) log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err)
rrepo.MarkFailed(a.meta, id, "xlsx_writer_creation_failed", map[string]interface{}{ rrepo.MarkFailed(a.Meta, id, "xlsx_writer_creation_failed", map[string]interface{}{
"error": err.Error(), "error": err.Error(),
"stage": "xlsx_direct", "stage": "xlsx_direct",
}) })
return return
} }
_ = x.WriteHeader(cols) _ = x.WriteHeader(cols)
rrepo.UpdateProgress(a.meta, id, 0) rrepo.UpdateProgress(a.Meta, id, 0)
// 记录查询执行前的参数类型信息 // 记录查询执行前的参数类型信息
argTypes := make([]string, len(args)) argTypes := make([]string, len(args))
for i, arg := range args { for i, arg := range args {
@ -703,7 +799,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"args": args, "args": args,
}) })
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_query error=%v", id, err) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_query error=%v", id, err)
rrepo.MarkFailed(a.meta, id, "xlsx_query_failed", map[string]interface{}{ rrepo.MarkFailed(a.Meta, id, "xlsx_query_failed", map[string]interface{}{
"error": err.Error(), "error": err.Error(),
"datasource": jobDS, "datasource": jobDS,
"sql": q, "sql": q,
@ -722,7 +818,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"error": err.Error(), "error": err.Error(),
}) })
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_columns error=%v", id, err) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_columns error=%v", id, err)
rrepo.MarkFailed(a.meta, id, "xlsx_columns_failed", map[string]interface{}{ rrepo.MarkFailed(a.Meta, id, "xlsx_columns_failed", map[string]interface{}{
"error": err.Error(), "error": err.Error(),
}) })
return return
@ -756,7 +852,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"count": count, "count": count,
}) })
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_scan error=%v count=%d", id, err, count) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_scan error=%v count=%d", id, err, count)
rrepo.MarkFailed(a.meta, id, "xlsx_scan_failed", map[string]interface{}{ rrepo.MarkFailed(a.Meta, id, "xlsx_scan_failed", map[string]interface{}{
"error": err.Error(), "error": err.Error(),
"count": count, "count": count,
}) })
@ -783,7 +879,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
count++ count++
tick++ tick++
if tick%200 == 0 { if tick%200 == 0 {
rrepo.UpdateProgress(a.meta, id, count) rrepo.UpdateProgress(a.Meta, id, count)
} }
} }
// 如果查询到了数据,记录一条包含首行数据的日志,便于确认导出前 SQL 是否返回结果 // 如果查询到了数据,记录一条包含首行数据的日志,便于确认导出前 SQL 是否返回结果
@ -817,9 +913,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
} }
p, size, _ := x.Close() p, size, _ := x.Close()
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()}) log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now()) a.Meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
rrepo.ZipAndRecord(a.meta, id, []string{p}, count) rrepo.ZipAndRecord(a.Meta, id, []string{p}, count)
rrepo.MarkCompleted(a.meta, id, count) rrepo.MarkCompleted(a.Meta, id, count)
return return
} }
logging.JSON("ERROR", map[string]interface{}{ logging.JSON("ERROR", map[string]interface{}{
@ -828,7 +924,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"format": fileFormat, "format": fileFormat,
}) })
log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fileFormat) log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fileFormat)
rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{ rrepo.MarkFailed(a.Meta, id, "unsupported_format", map[string]interface{}{
"format": fileFormat, "format": fileFormat,
}) })
} }
@ -838,7 +934,7 @@ func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr str
id, _ := strconv.ParseUint(idStr, 10, 64) id, _ := strconv.ParseUint(idStr, 10, 64)
var tplID uint64 var tplID uint64
var filtersJSON []byte var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) row := a.Meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
if err := row.Scan(&tplID, &filtersJSON); err != nil { if err := row.Scan(&tplID, &filtersJSON); err != nil {
fail(w, r, http.StatusNotFound, "not found") fail(w, r, http.StatusNotFound, "not found")
return return
@ -846,7 +942,7 @@ func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr str
var ds string var ds string
var main string var main string
var fieldsJSON []byte var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) tr := a.Meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&ds, &main, &fieldsJSON) _ = tr.Scan(&ds, &main, &fieldsJSON)
var fs []string var fs []string
var fl map[string]interface{} var fl map[string]interface{}
@ -861,27 +957,27 @@ func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr str
} }
dataDB := a.selectDataDB(ds) dataDB := a.selectDataDB(ds)
final := repo.NewExportRepo().Count(dataDB, q, args) final := repo.NewExportRepo().Count(dataDB, q, args)
repo.NewExportRepo().MarkCompleted(a.meta, id, final) repo.NewExportRepo().MarkCompleted(a.Meta, id, final)
ok(w, r, map[string]interface{}{"id": id, "final_rows": final}) ok(w, r, map[string]interface{}{"id": id, "final_rows": final})
} }
func (a *ExportsAPI) selectDataDB(ds string) *sql.DB { func (a *ExportsAPI) selectDataDB(ds string) *sql.DB {
if ds == "ymt" { if ds == "ymt" {
return a.ymt return a.YMT
} }
return a.marketing return a.Marketing
} }
// moved to repo layer: repo.ZipAndRecord // moved to repo layer: repo.ZipAndRecord
func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) { func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo() rrepo := repo.NewExportRepo()
d, err := rrepo.GetJob(a.meta, id) d, err := rrepo.GetJob(a.Meta, id)
if err != nil { if err != nil {
fail(w, r, http.StatusNotFound, "not found") fail(w, r, http.StatusNotFound, "not found")
return return
} }
flist, _ := rrepo.ListJobFiles(a.meta, id) flist, _ := rrepo.ListJobFiles(a.Meta, id)
files := []map[string]interface{}{} files := []map[string]interface{}{}
for _, f := range flist { for _, f := range flist {
files = append(files, map[string]interface{}{"storage_uri": f.URI.String, "sheet_name": f.Sheet.String, "row_count": f.RowCount.Int64, "size_bytes": f.SizeBytes.Int64}) files = append(files, map[string]interface{}{"storage_uri": f.URI.String, "sheet_name": f.Sheet.String, "row_count": f.RowCount.Int64, "size_bytes": f.SizeBytes.Int64})
@ -965,12 +1061,12 @@ func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo() rrepo := repo.NewExportRepo()
var jid uint64 var jid uint64
_, _ = fmt.Sscan(id, &jid) _, _ = fmt.Sscan(id, &jid)
tplID, filters, err := rrepo.GetJobFilters(a.meta, jid) tplID, filters, err := rrepo.GetJobFilters(a.Meta, jid)
if err != nil { if err != nil {
fail(w, r, http.StatusNotFound, "not found") fail(w, r, http.StatusNotFound, "not found")
return return
} }
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, tplID) ds, main, fs, err := rrepo.GetTemplateMeta(a.Meta, tplID)
if err != nil { if err != nil {
fail(w, r, http.StatusBadRequest, "template not found") fail(w, r, http.StatusBadRequest, "template not found")
return return
@ -1023,7 +1119,7 @@ func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) { func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo() rrepo := repo.NewExportRepo()
uri, err := rrepo.GetLatestFileURI(a.meta, id) uri, err := rrepo.GetLatestFileURI(a.Meta, id)
if err != nil { if err != nil {
// fallback: try to serve local storage file by job id // fallback: try to serve local storage file by job id
// search for files named export_job_<id>_*.zip/xlsx/csv // search for files named export_job_<id>_*.zip/xlsx/csv
@ -1438,7 +1534,7 @@ func decodeOrderKey(s string) string {
} }
func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) { func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) {
a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", string(constants.JobStatusCanceled), time.Now(), id) a.Meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", string(constants.JobStatusCanceled), time.Now(), id)
w.Write([]byte("ok")) w.Write([]byte("ok"))
} }
@ -1503,8 +1599,8 @@ func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
offset := (page - 1) * size offset := (page - 1) * size
rrepo := repo.NewExportRepo() rrepo := repo.NewExportRepo()
var totalCount int64 var totalCount int64
totalCount = rrepo.CountJobs(a.meta, tplID, "") totalCount = rrepo.CountJobs(a.Meta, tplID, "")
itemsRaw, err := rrepo.ListJobs(a.meta, tplID, "", size, offset) itemsRaw, err := rrepo.ListJobs(a.Meta, tplID, "", size, offset)
if err != nil { if err != nil {
failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error") failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error")
return return

File diff suppressed because it is too large Load Diff

View File

@ -388,6 +388,58 @@ func (r *ExportQueryRepo) GetJob(metaDB *sql.DB, jobID string) (JobDetail, error
return detail, err return detail, err
} }
// GetRunningJobs 获取所有运行中的任务(用于服务重启恢复)
func (r *ExportQueryRepo) GetRunningJobs(metaDB *sql.DB) ([]JobDetail, error) {
querySQL := `SELECT id, template_id, status, requested_by, total_rows,
file_format, started_at, finished_at, created_at, updated_at,
explain_score, explain_json
FROM export_jobs WHERE status=?`
rows, err := metaDB.Query(querySQL, string(constants.JobStatusRunning))
if err != nil {
return nil, err
}
defer rows.Close()
var jobs []JobDetail
for rows.Next() {
var detail JobDetail
if err := rows.Scan(
&detail.ID, &detail.TemplateID, &detail.Status, &detail.RequestedBy,
&detail.TotalRows, &detail.FileFormat, &detail.StartedAt, &detail.FinishedAt,
&detail.CreatedAt, &detail.UpdatedAt, &detail.ExplainScore, &detail.ExplainJSON,
); err != nil {
continue
}
jobs = append(jobs, detail)
}
return jobs, nil
}
// IncrementRestartCount 增加任务重启计数
func (r *ExportQueryRepo) IncrementRestartCount(metaDB *sql.DB, jobID uint64) {
now := time.Now()
_, err := metaDB.Exec(
"UPDATE export_jobs SET restart_count = COALESCE(restart_count,0) + 1, updated_at=? WHERE id=?",
now, jobID,
)
if err != nil {
logging.DBError("increment_restart_count", jobID, err)
}
}
// ResetJobProgress 重置任务进度为0用于任务恢复
func (r *ExportQueryRepo) ResetJobProgress(metaDB *sql.DB, jobID uint64) {
now := time.Now()
_, err := metaDB.Exec(
"UPDATE export_jobs SET total_rows=0, updated_at=? WHERE id=?",
now, jobID,
)
if err != nil {
logging.DBError("reset_job_progress", jobID, err)
}
}
// ListJobFiles 获取任务文件列表 // ListJobFiles 获取任务文件列表
func (r *ExportQueryRepo) ListJobFiles(metaDB *sql.DB, jobID string) ([]JobFile, error) { func (r *ExportQueryRepo) ListJobFiles(metaDB *sql.DB, jobID string) ([]JobFile, error) {
rows, err := metaDB.Query( rows, err := metaDB.Query(