diff --git a/server/cmd/server/main.go b/server/cmd/server/main.go index f8c6bb9..f941997 100644 --- a/server/cmd/server/main.go +++ b/server/cmd/server/main.go @@ -9,6 +9,7 @@ import ( "server/internal/config" "server/internal/db" "server/internal/logging" + "server/internal/repo" "time" ) @@ -79,9 +80,48 @@ func main() { } return s }() + + // 启动时恢复未完成的任务 + log.Println("[服务启动] 开始恢复未完成的导出任务...") + recoverRunningJobs(meta, marketing, marketingAuth, resellerDB, ymt) + log.Println("[服务启动] 任务恢复完成") + srv := &http.Server{Addr: addr, Handler: r, ReadTimeout: 15 * time.Second, WriteTimeout: 60 * time.Second} log.Println("server listening on ", addr) log.Fatal(srv.ListenAndServe()) } +// recoverRunningJobs 恢复服务中断前未完成的导出任务 +func recoverRunningJobs(meta, marketing, marketingAuth, resellerDB, ymt *sql.DB) { + rrepo := repo.NewExportRepo() + + // 查询所有 running 状态的任务 + jobs, err := rrepo.GetRunningJobs(meta) + if err != nil { + log.Printf("[任务恢复] 查询运行中任务失败: %v", err) + return + } + + if len(jobs) == 0 { + log.Println("[任务恢复] 没有需要恢复的任务") + return + } + + log.Printf("[任务恢复] 发现 %d 个需要恢复的任务\n", len(jobs)) + + // 创建一个 ExportsAPI 实例用于恢复任务 + exportsAPI := &api.ExportsAPI{Meta: meta, Marketing: marketing, YMT: ymt} + + // 启动每个任务的恢复 + for _, job := range jobs { + jobID := job.ID + log.Printf("[任务恢复] 启动任务恢复 - ID: %d\n", jobID) + // 通过 goroutine 并行启动任务恢复 + go exportsAPI.RunJobByID(jobID) + } + + // 给予一点时间让 goroutine 启动 + time.Sleep(1 * time.Second) +} + // buildDSN deprecated; use cfg.YMTDB.DSN()/cfg.MarketingDB.DSN() diff --git a/server/internal/api/exports.go b/server/internal/api/exports.go index d36c227..e96ba0a 100644 --- a/server/internal/api/exports.go +++ b/server/internal/api/exports.go @@ -22,13 +22,13 @@ import ( ) type ExportsAPI struct { - meta *sql.DB - marketing *sql.DB - ymt *sql.DB + Meta *sql.DB + Marketing *sql.DB + YMT *sql.DB } func ExportsHandler(meta, marketing, ymt *sql.DB) http.Handler { - api := &ExportsAPI{meta: meta, marketing: marketing, ymt: ymt} + api := &ExportsAPI{Meta: meta, Marketing: marketing, YMT: ymt} return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { p := strings.TrimPrefix(r.URL.Path, "/api/exports") if r.Method == http.MethodPost && p == "" { @@ -88,7 +88,7 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { var main string var ds string rrepo := repo.NewExportRepo() - ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, p.TemplateID) + ds, main, fs, err := rrepo.GetTemplateMeta(a.Meta, p.TemplateID) if err != nil { fail(w, r, http.StatusBadRequest, "invalid template") return @@ -320,15 +320,111 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { owner = n } } - id, err := rrepo.InsertJob(a.meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat) + id, err := rrepo.InsertJob(a.Meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat) if err != nil { fail(w, r, http.StatusInternalServerError, err.Error()) return } - go a.runJob(uint64(id), dataDB, q, args, filtered, hdrs, p.FileFormat) + go a.RunJobByID(uint64(id)) ok(w, r, map[string]interface{}{"id": id}) } +// runJobByID 通过任务ID从数据库读取信息并执行任务 +func (a *ExportsAPI) RunJobByID(jobID uint64) { + rrepo := repo.NewExportRepo() + + // 增加重启计数 + rrepo.IncrementRestartCount(a.Meta, jobID) + + // 重置进度为0,因为是重新开始导出 + rrepo.ResetJobProgress(a.Meta, jobID) + + // 获取任务详情 + jobDetail, err := rrepo.GetJob(a.Meta, strconv.FormatUint(jobID, 10)) + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "get_job_failed", + "job_id": jobID, + "error": err.Error(), + }) + return + } + + // 获取模板信息和任务过滤条件 + tplID, filtersJSON, err := rrepo.GetJobFilters(a.Meta, jobID) + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "get_job_filters_failed", + "job_id": jobID, + "error": err.Error(), + }) + return + } + + var filters map[string]interface{} + json.Unmarshal(filtersJSON, &filters) + + // 获取模板字段信息 + ds, mainTable, fields, err := rrepo.GetTemplateMeta(a.Meta, tplID) + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "get_template_meta_failed", + "job_id": jobID, + "error": err.Error(), + }) + return + } + + // 获取数据库连接 + dataDB := a.selectDataDB(ds) + + // 构建 SQL + wl := Whitelist() + req := exporter.BuildRequest{MainTable: mainTable, Datasource: ds, Fields: fields, Filters: filters} + q, args, usedFields, err := rrepo.BuildWithFields(req, wl) + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "build_sql_failed", + "job_id": jobID, + "error": err.Error(), + }) + return + } + + if len(usedFields) > 0 { + fields = usedFields + } + + // 构建列标题 + labels := FieldLabels() + hdrs := make([]string, len(fields)) + for i, tf := range fields { + if v, ok := labels[tf]; ok { + hdrs[i] = v + } else { + hdrs[i] = tf + } + } + + // 列头去重 + { + cnt := map[string]int{} + for _, h := range hdrs { + cnt[h]++ + } + for i := range hdrs { + if cnt[hdrs[i]] > 1 { + parts := strings.Split(fields[i], ".") + if len(parts) == 2 && parts[0] != mainTable { + hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i] + } + } + } + } + + a.runJob(jobID, dataDB, q, args, fields, hdrs, jobDetail.FileFormat) +} + func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fileFormat string) { defer func() { if r := recover(); r != nil { @@ -340,7 +436,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "format": fileFormat, }) log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields) - repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{ + repo.NewExportRepo().MarkFailed(a.Meta, id, "export_panic", map[string]interface{}{ "error": utils.ToString(r), "fields": fields, "format": fileFormat, @@ -352,9 +448,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, var jobMain string rrepo := repo.NewExportRepo() { - tplID, _, _ := rrepo.GetJobFilters(a.meta, id) + tplID, _, _ := rrepo.GetJobFilters(a.Meta, id) if tplID > 0 { - ds, mt, _, _ := rrepo.GetTemplateMeta(a.meta, tplID) + ds, mt, _, _ := rrepo.GetTemplateMeta(a.Meta, tplID) jobDS = ds if mt != "" { jobMain = mt @@ -367,7 +463,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, // 检查预估行数,如果超过阈值且格式是xlsx,强制改为csv if fileFormat == "xlsx" { var rowEstimate int64 - estRow := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) + estRow := a.Meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) _ = estRow.Scan(&rowEstimate) if rowEstimate > constants.ExportThresholds.XlsxMaxRows { logging.JSON("INFO", map[string]interface{}{ @@ -381,7 +477,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } } - rrepo.StartJob(a.meta, id) + rrepo.StartJob(a.Meta, id) if fileFormat == "csv" { newBaseWriter := func() (exporter.RowWriter, error) { return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) @@ -390,12 +486,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, { var tplID uint64 var filtersJSON []byte - row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) + row := a.Meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) _ = row.Scan(&tplID, &filtersJSON) var tplDS string var main string var fieldsJSON []byte - tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) + tr := a.Meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) _ = tr.Scan(&tplDS, &main, &fieldsJSON) var fs []string var fl map[string]interface{} @@ -415,12 +511,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, var total int64 // 如果 row_estimate 为 0,在分块导出开始时重新估算 var currentEst int64 - row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) + row := a.Meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) _ = row.Scan(¤tEst) if currentEst == 0 { estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl) if estChunk > 0 { - rrepo.UpdateRowEstimate(a.meta, id, estChunk) + rrepo.UpdateRowEstimate(a.Meta, id, estChunk) } } skipChunk := false @@ -452,10 +548,10 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } chunkBase := total - onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil } + onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, chunkBase+totalRows); return nil } onRoll := func(path string, size int64, partRows int64) error { files = append(files, path) - rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) + rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size) return nil } cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) @@ -470,7 +566,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "args": cargs, }) log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq) - rrepo.MarkFailed(a.meta, id, "csv_chunk_stream_error", map[string]interface{}{ + rrepo.MarkFailed(a.Meta, id, "csv_chunk_stream_error", map[string]interface{}{ "error": e.Error(), "datasource": jobDS, "sql": cq, @@ -479,15 +575,15 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, return } total += cnt - rrepo.UpdateProgress(a.meta, id, total) + rrepo.UpdateProgress(a.Meta, id, total) } if total == 0 { total = rrepo.Count(db, q, args) } if len(files) >= 1 { - rrepo.ZipAndRecord(a.meta, id, files, total) + rrepo.ZipAndRecord(a.Meta, id, files, total) } - rrepo.MarkCompleted(a.meta, id, total) + rrepo.MarkCompleted(a.Meta, id, total) return } } @@ -499,22 +595,22 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, var est int64 { var filtersJSON []byte - row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id) + row := a.Meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id) _ = row.Scan(&filtersJSON) var fl map[string]interface{} json.Unmarshal(filtersJSON, &fl) est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) - rrepo.UpdateRowEstimate(a.meta, id, est) + rrepo.UpdateRowEstimate(a.Meta, id, est) } batch := constants.ChooseBatchSize(est, constants.FileFormat(fileFormat)) files2 := []string{} cur := rrepo.NewCursor(jobDS, jobMain) newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) } - onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil } + onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, totalRows); return nil } onRoll := func(path string, size int64, partRows int64) error { files2 = append(files2, path) - rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) + rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size) return nil } count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) @@ -529,7 +625,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "sql": q, }) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields)) - rrepo.MarkFailed(a.meta, id, "csv_direct_stream_error", map[string]interface{}{ + rrepo.MarkFailed(a.Meta, id, "csv_direct_stream_error", map[string]interface{}{ "error": err.Error(), "datasource": jobDS, "fields": fields, @@ -538,9 +634,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, return } if len(files2) >= 1 { - rrepo.ZipAndRecord(a.meta, id, files2, count) + rrepo.ZipAndRecord(a.Meta, id, files2, count) } - rrepo.MarkCompleted(a.meta, id, count) + rrepo.MarkCompleted(a.Meta, id, count) return } } @@ -549,12 +645,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, { var tplID uint64 var filtersJSON []byte - row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) + row := a.Meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) _ = row.Scan(&tplID, &filtersJSON) var tplDS string var main string var fieldsJSON []byte - tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) + tr := a.Meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) _ = tr.Scan(&tplDS, &main, &fieldsJSON) var fs []string var fl map[string]interface{} @@ -574,12 +670,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, var total int64 // 如果 row_estimate 为 0,在分块导出开始时重新估算 var currentEst int64 - row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) + row := a.Meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) _ = row.Scan(¤tEst) if currentEst == 0 { estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl) if estChunk > 0 { - rrepo.UpdateRowEstimate(a.meta, id, estChunk) + rrepo.UpdateRowEstimate(a.Meta, id, estChunk) } } skipChunk := false @@ -618,17 +714,17 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } // 进度回调按全局累计行数更新,避免跨分片出现数值回退 chunkBase := total - onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil } + onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, chunkBase+totalRows); return nil } onRoll := func(path string, size int64, partRows int64) error { files = append(files, path) - rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) + rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size) return nil } cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) if e != nil { logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs}) log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e) - rrepo.MarkFailed(a.meta, id, "xlsx_chunk_stream_error", map[string]interface{}{ + rrepo.MarkFailed(a.Meta, id, "xlsx_chunk_stream_error", map[string]interface{}{ "error": e.Error(), "datasource": jobDS, "sql": cq, @@ -637,15 +733,15 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, return } total += cnt - rrepo.UpdateProgress(a.meta, id, total) + rrepo.UpdateProgress(a.Meta, id, total) } if total == 0 { total = rrepo.Count(db, q, args) } if len(files) >= 1 { - rrepo.ZipAndRecord(a.meta, id, files, total) + rrepo.ZipAndRecord(a.Meta, id, files, total) } - rrepo.MarkCompleted(a.meta, id, total) + rrepo.MarkCompleted(a.Meta, id, total) return } } @@ -656,25 +752,25 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, var est2 int64 { var filtersJSON []byte - row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id) + row := a.Meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id) _ = row.Scan(&filtersJSON) var fl map[string]interface{} json.Unmarshal(filtersJSON, &fl) est2 = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) - rrepo.UpdateRowEstimate(a.meta, id, est2) + rrepo.UpdateRowEstimate(a.Meta, id, est2) } x, err := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1") if err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()}) log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err) - rrepo.MarkFailed(a.meta, id, "xlsx_writer_creation_failed", map[string]interface{}{ + rrepo.MarkFailed(a.Meta, id, "xlsx_writer_creation_failed", map[string]interface{}{ "error": err.Error(), "stage": "xlsx_direct", }) return } _ = x.WriteHeader(cols) - rrepo.UpdateProgress(a.meta, id, 0) + rrepo.UpdateProgress(a.Meta, id, 0) // 记录查询执行前的参数类型信息 argTypes := make([]string, len(args)) for i, arg := range args { @@ -703,7 +799,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "args": args, }) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_query error=%v", id, err) - rrepo.MarkFailed(a.meta, id, "xlsx_query_failed", map[string]interface{}{ + rrepo.MarkFailed(a.Meta, id, "xlsx_query_failed", map[string]interface{}{ "error": err.Error(), "datasource": jobDS, "sql": q, @@ -722,18 +818,18 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "error": err.Error(), }) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_columns error=%v", id, err) - rrepo.MarkFailed(a.meta, id, "xlsx_columns_failed", map[string]interface{}{ + rrepo.MarkFailed(a.Meta, id, "xlsx_columns_failed", map[string]interface{}{ "error": err.Error(), }) return } if len(actualCols) != len(cols) { logging.JSON("WARN", map[string]interface{}{ - "event": "export_column_count_mismatch", - "job_id": id, - "stage": "xlsx_direct", + "event": "export_column_count_mismatch", + "job_id": id, + "stage": "xlsx_direct", "expected_cols": len(cols), - "actual_cols": len(actualCols), + "actual_cols": len(actualCols), }) log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct column_mismatch expected=%d actual=%d", id, len(cols), len(actualCols)) } @@ -756,7 +852,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "count": count, }) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_scan error=%v count=%d", id, err, count) - rrepo.MarkFailed(a.meta, id, "xlsx_scan_failed", map[string]interface{}{ + rrepo.MarkFailed(a.Meta, id, "xlsx_scan_failed", map[string]interface{}{ "error": err.Error(), "count": count, }) @@ -783,7 +879,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, count++ tick++ if tick%200 == 0 { - rrepo.UpdateProgress(a.meta, id, count) + rrepo.UpdateProgress(a.Meta, id, count) } } // 如果查询到了数据,记录一条包含首行数据的日志,便于确认导出前 SQL 是否返回结果 @@ -802,24 +898,24 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } else if count == 0 { // 如果查询返回0行,记录详细信息以便排查 logging.JSON("WARN", map[string]interface{}{ - "event": "export_zero_rows", - "job_id": id, - "datasource": jobDS, - "stage": "xlsx_direct", - "sql": q, - "args": args, - "arg_types": argTypes, - "final_sql": renderSQL(q, args), + "event": "export_zero_rows", + "job_id": id, + "datasource": jobDS, + "stage": "xlsx_direct", + "sql": q, + "args": args, + "arg_types": argTypes, + "final_sql": renderSQL(q, args), "expected_cols": len(cols), - "actual_cols": len(actualCols), + "actual_cols": len(actualCols), }) log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct zero_rows sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args)) } p, size, _ := x.Close() log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()}) - a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now()) - rrepo.ZipAndRecord(a.meta, id, []string{p}, count) - rrepo.MarkCompleted(a.meta, id, count) + a.Meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now()) + rrepo.ZipAndRecord(a.Meta, id, []string{p}, count) + rrepo.MarkCompleted(a.Meta, id, count) return } logging.JSON("ERROR", map[string]interface{}{ @@ -828,7 +924,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "format": fileFormat, }) log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fileFormat) - rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{ + rrepo.MarkFailed(a.Meta, id, "unsupported_format", map[string]interface{}{ "format": fileFormat, }) } @@ -838,7 +934,7 @@ func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr str id, _ := strconv.ParseUint(idStr, 10, 64) var tplID uint64 var filtersJSON []byte - row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) + row := a.Meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) if err := row.Scan(&tplID, &filtersJSON); err != nil { fail(w, r, http.StatusNotFound, "not found") return @@ -846,7 +942,7 @@ func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr str var ds string var main string var fieldsJSON []byte - tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) + tr := a.Meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) _ = tr.Scan(&ds, &main, &fieldsJSON) var fs []string var fl map[string]interface{} @@ -861,27 +957,27 @@ func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr str } dataDB := a.selectDataDB(ds) final := repo.NewExportRepo().Count(dataDB, q, args) - repo.NewExportRepo().MarkCompleted(a.meta, id, final) + repo.NewExportRepo().MarkCompleted(a.Meta, id, final) ok(w, r, map[string]interface{}{"id": id, "final_rows": final}) } func (a *ExportsAPI) selectDataDB(ds string) *sql.DB { if ds == "ymt" { - return a.ymt + return a.YMT } - return a.marketing + return a.Marketing } // moved to repo layer: repo.ZipAndRecord func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) { rrepo := repo.NewExportRepo() - d, err := rrepo.GetJob(a.meta, id) + d, err := rrepo.GetJob(a.Meta, id) if err != nil { fail(w, r, http.StatusNotFound, "not found") return } - flist, _ := rrepo.ListJobFiles(a.meta, id) + flist, _ := rrepo.ListJobFiles(a.Meta, id) files := []map[string]interface{}{} for _, f := range flist { files = append(files, map[string]interface{}{"storage_uri": f.URI.String, "sheet_name": f.Sheet.String, "row_count": f.RowCount.Int64, "size_bytes": f.SizeBytes.Int64}) @@ -965,12 +1061,12 @@ func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) { rrepo := repo.NewExportRepo() var jid uint64 _, _ = fmt.Sscan(id, &jid) - tplID, filters, err := rrepo.GetJobFilters(a.meta, jid) + tplID, filters, err := rrepo.GetJobFilters(a.Meta, jid) if err != nil { fail(w, r, http.StatusNotFound, "not found") return } - ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, tplID) + ds, main, fs, err := rrepo.GetTemplateMeta(a.Meta, tplID) if err != nil { fail(w, r, http.StatusBadRequest, "template not found") return @@ -1023,7 +1119,7 @@ func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) { func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) { rrepo := repo.NewExportRepo() - uri, err := rrepo.GetLatestFileURI(a.meta, id) + uri, err := rrepo.GetLatestFileURI(a.Meta, id) if err != nil { // fallback: try to serve local storage file by job id // search for files named export_job__*.zip/xlsx/csv @@ -1438,7 +1534,7 @@ func decodeOrderKey(s string) string { } func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) { - a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", string(constants.JobStatusCanceled), time.Now(), id) + a.Meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", string(constants.JobStatusCanceled), time.Now(), id) w.Write([]byte("ok")) } @@ -1503,8 +1599,8 @@ func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) { offset := (page - 1) * size rrepo := repo.NewExportRepo() var totalCount int64 - totalCount = rrepo.CountJobs(a.meta, tplID, "") - itemsRaw, err := rrepo.ListJobs(a.meta, tplID, "", size, offset) + totalCount = rrepo.CountJobs(a.Meta, tplID, "") + itemsRaw, err := rrepo.ListJobs(a.Meta, tplID, "", size, offset) if err != nil { failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error") return diff --git a/server/internal/api/exports.go.bak b/server/internal/api/exports.go.bak new file mode 100644 index 0000000..f708a1a --- /dev/null +++ b/server/internal/api/exports.go.bak @@ -0,0 +1,1864 @@ +package api + +import ( + "database/sql" + "encoding/json" + "fmt" + "io" + "log" + "math/big" + "net/http" + "os" + "path/filepath" + "server/internal/constants" + "server/internal/exporter" + "server/internal/logging" + "server/internal/repo" + "server/internal/utils" + "server/internal/ymtcrypto" + "strconv" + "strings" + "time" +) + +type ExportsAPI struct { + Meta *sql.DB + Marketing *sql.DB + YMT *sql.DB +} + +func ExportsHandler(meta, marketing, ymt *sql.DB) http.Handler { + api := &ExportsAPI{Meta: meta, Marketing: marketing, YMT: ymt} + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + p := strings.TrimPrefix(r.URL.Path, "/api/exports") + if r.Method == http.MethodPost && p == "" { + api.create(w, r) + return + } + if r.Method == http.MethodGet && p == "" { + api.list(w, r) + return + } + if strings.HasPrefix(p, "/") { + id := strings.TrimPrefix(p, "/") + if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") { + if strings.HasSuffix(p, "/sql") { + id = strings.TrimSuffix(id, "/sql") + api.getSQL(w, r, id) + return + } + api.get(w, r, id) + return + } + if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") { + id = strings.TrimSuffix(id, "/download") + api.download(w, r, id) + return + } + if r.Method == http.MethodPost && strings.HasSuffix(p, "/recompute") { + id = strings.TrimSuffix(id, "/recompute") + api.recompute(w, r, id) + return + } + if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") { + id = strings.TrimSuffix(id, "/cancel") + api.cancel(w, r, id) + return + } + } + w.WriteHeader(http.StatusNotFound) + }) +} + +type ExportPayload struct { + TemplateID uint64 `json:"template_id"` + RequestedBy uint64 `json:"requested_by"` + Permission map[string]interface{} `json:"permission"` + Options map[string]interface{} `json:"options"` + FileFormat string `json:"file_format"` + Filters map[string]interface{} `json:"filters"` + Datasource string `json:"datasource"` +} + +func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { + b, _ := io.ReadAll(r.Body) + var p ExportPayload + json.Unmarshal(b, &p) + r = WithPayload(r, p) + var main string + var ds string + rrepo := repo.NewExportRepo() + ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, p.TemplateID) + if err != nil { + fail(w, r, http.StatusBadRequest, "invalid template") + return + } + if p.Datasource != "" { + ds = p.Datasource + } + // ensure filters map initialized + if p.Filters == nil { + p.Filters = map[string]interface{}{} + } + // merge permission scope into filters to enforce boundary + p.Filters = mergePermissionIntoFilters(p.Datasource, main, p.Permission, p.Filters) + // 注意:不再从 URL 参数 userId 或 current_user_id 自动转换为 creator_in 过滤 + // current_user_id 仅用于记录导出任务的 owner,不用于数据过滤 + // support multiple merchantId in query: e.g., merchantId=1,2,3 → filters.merchant_id_in + { + midStr := r.URL.Query().Get("merchantId") + if midStr != "" { + parts := strings.Split(midStr, ",") + ids := make([]interface{}, 0, len(parts)) + for _, s := range parts { + s = strings.TrimSpace(s) + if s == "" { + continue + } + if n, err := strconv.ParseUint(s, 10, 64); err == nil { + ids = append(ids, n) + } + } + if len(ids) > 0 { + if _, exists := p.Filters["merchant_id_in"]; !exists { + p.Filters["merchant_id_in"] = ids + } + } + } + } + + // DEBUG LOGGING + logging.JSON("INFO", map[string]interface{}{ + "event": "export_filters_debug", + "filters": p.Filters, + "has_creator_in": hasNonEmptyIDs(p.Filters["creator_in"]), + "has_merchant_id_in": hasNonEmptyIDs(p.Filters["merchant_id_in"]), + }) + if ds == "marketing" && (main == "order" || main == "order_info") { + if v, ok := p.Filters["create_time_between"]; ok { + switch t := v.(type) { + case []interface{}: + if len(t) != 2 { + fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值") + return + } + case []string: + if len(t) != 2 { + fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值") + return + } + default: + fail(w, r, http.StatusBadRequest, "create_time_between 格式错误") + return + } + } else { + fail(w, r, http.StatusBadRequest, "缺少时间过滤:必须提供 create_time_between") + return + } + } + filtered := make([]string, 0, len(fs)) + tv := 0 + if v, ok := p.Filters["type_eq"]; ok { + switch t := v.(type) { + case float64: + tv = int(t) + case int: + tv = t + case string: + s := strings.TrimSpace(t) + for i := 0; i < len(s); i++ { + c := s[i] + if c >= '0' && c <= '9' { + tv = tv*10 + int(c-'0') + } + } + } + } + // Normalize template fields preserving order + normalized := make([]string, 0, len(fs)) + for _, tf := range fs { + if ds == "ymt" && strings.HasPrefix(tf, "order_info.") { + tf = strings.Replace(tf, "order_info.", "order.", 1) + } + if ds == "marketing" && tf == "order_voucher.channel_batch_no" { + tf = "order_voucher.channel_activity_id" + } + normalized = append(normalized, tf) + } + // 移除 YMT 无效字段(key批次) + if ds == "ymt" { + tmp := make([]string, 0, len(normalized)) + for _, tf := range normalized { + if tf == "order.key_batch_id" || tf == "order.key_batch_name" { + continue + } + tmp = append(tmp, tf) + } + normalized = tmp + } + // 不再使用白名单过滤,直接使用所有字段 + filtered = normalized + // 易码通立减金:保留 order_voucher.grant_time,移除红包领取时间列,避免“领取时间”为空 + if ds == "ymt" && tv == 3 { + deduped := make([]string, 0, len(filtered)) + removed := []string{} + for _, tf := range filtered { + if tf == "order_cash.receive_time" { + removed = append(removed, tf) + continue + } + deduped = append(deduped, tf) + } + if len(removed) > 0 { + logging.JSON("INFO", map[string]interface{}{"event": "fields_deduplicated_receive_time", "removed": removed, "reason": "立减金保留 order_voucher.grant_time"}) + } + filtered = deduped + } + // 营销系统:非直充类型(type!=1)时移除recharge_time、card_code、account字段 + if ds == "marketing" && tv != 1 { + deduped := make([]string, 0, len(filtered)) + removed := []string{} + for _, tf := range filtered { + if tf == "order.recharge_time" || tf == "order.card_code" || tf == "order.account" { + removed = append(removed, tf) + continue + } + deduped = append(deduped, tf) + } + if len(removed) > 0 { + logging.JSON("INFO", map[string]interface{}{"event": "fields_filtered_non_direct_charge", "removed": removed, "reason": "非直充类型不导出充值时间、卡密和账号"}) + } + filtered = deduped + } + labels := FieldLabels() + // 字段匹配校验(数量与顺序) + if len(filtered) != len(fs) { + logging.JSON("ERROR", map[string]interface{}{"event": "field_count_mismatch", "template_count": len(fs), "final_count": len(filtered)}) + } + // relax: creator_in 非必填,若权限中提供其他边界将被合并为等值过滤 + req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: filtered, Filters: p.Filters} + q, args, usedFields, err := rrepo.BuildWithFields(req, nil) // 取消白名单过滤,前端选择多少字段就导出多少 + if err != nil { + r = WithSQL(r, q) + fail(w, r, http.StatusBadRequest, err.Error()) + return + } + // 使用实际使用的字段列表(解决白名单过滤后列数不匹配问题) + if len(usedFields) > 0 { + filtered = usedFields + } + r = WithSQL(r, q) + logging.JSON("INFO", map[string]interface{}{"event": "export_sql", "datasource": ds, "main_table": main, "file_format": p.FileFormat, "sql": q, "args": args, "final_sql": renderSQL(q, args)}) + log.Printf("export_sql ds=%s main=%s fmt=%s sql=%s args=%v final_sql=%s", ds, main, p.FileFormat, q, args, renderSQL(q, args)) + dataDB := a.selectDataDB(ds) + score, sugg, err := rrepo.Explain(dataDB, q, args) + if err != nil { + fail(w, r, http.StatusBadRequest, err.Error()) + return + } + sugg = append(sugg, exporter.IndexSuggestions(req)...) + if score < constants.ExportThresholds.PassScoreThreshold { + fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d,请优化索引或缩小查询范围", score)) + return + } + // 估算行数(优先使用分块统计,失败或结果为 0 时回退到精确 COUNT) + var estimate int64 + estimate = rrepo.EstimateFastChunked(dataDB, ds, main, p.Filters) + if estimate <= 0 { + logging.JSON("WARN", map[string]interface{}{ + "event": "estimate_zero_fallback", + "datasource": ds, + "main_table": main, + "filters": p.Filters, + "stage": "fast_chunked", + "estimate": estimate, + }) + // 使用完整导出 SQL 做一次精确统计,避免分表/索引等原因导致估算为 0 + estimate = exporter.CountRows(dataDB, q, args) + logging.JSON("INFO", map[string]interface{}{ + "event": "estimate_exact_count", + "datasource": ds, + "main_table": main, + "filters": p.Filters, + "sql": q, + "args": args, + "estimate": estimate, + }) + } + hdrs := make([]string, len(filtered)) + for i, tf := range filtered { + if v, ok := labels[tf]; ok { + hdrs[i] = v + } else { + hdrs[i] = tf + } + } + // 列头去重:如果仍有重复的列头(中文标签),对非主表字段添加前缀 + { + cnt := map[string]int{} + for _, h := range hdrs { + cnt[h]++ + } + for i := range hdrs { + if cnt[hdrs[i]] > 1 { + parts := strings.Split(filtered[i], ".") + if len(parts) == 2 && parts[0] != main { + hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i] + } + } + } + } + // owner from query current_user_id or userId if provided + owner := uint64(0) + ownStr := r.URL.Query().Get("current_user_id") + if ownStr == "" { + ownStr = r.URL.Query().Get("userId") + } + if ownStr != "" { + first := strings.TrimSpace(strings.Split(ownStr, ",")[0]) + if n, err := strconv.ParseUint(first, 10, 64); err == nil { + owner = n + } + } + id, err := rrepo.InsertJob(a.meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat) + if err != nil { + fail(w, r, http.StatusInternalServerError, err.Error()) + return + } + go a.runJobByID(uint64(id)) + ok(w, r, map[string]interface{}{"id": id}) +} + +// runJobByID 通过任务ID从数据库读取信息并执行任务 +func (a *ExportsAPI) RunJobByID(jobID uint64) { + rrepo := repo.NewExportRepo() + + // 增加重启计数 + rrepo.IncrementRestartCount(a.meta, jobID) + + // 获取任务详情 + jobDetail, err := rrepo.GetJob(a.meta, strconv.FormatUint(jobID, 10)) + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "get_job_failed", + "job_id": jobID, + "error": err.Error(), + }) + return + } + + // 获取模板信息和任务过滤条件 + tplID, filtersJSON, err := rrepo.GetJobFilters(a.meta, jobID) + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "get_job_filters_failed", + "job_id": jobID, + "error": err.Error(), + }) + return + } + + var filters map[string]interface{} + json.Unmarshal(filtersJSON, &filters) + + // 获取模板字段信息 + ds, mainTable, fields, err := rrepo.GetTemplateMeta(a.meta, tplID) + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "get_template_meta_failed", + "job_id": jobID, + "error": err.Error(), + }) + return + } + + // 获取数据库连接 + dataDB := a.selectDataDB(ds) + + // 构建 SQL + wl := Whitelist() + req := exporter.BuildRequest{MainTable: mainTable, Datasource: ds, Fields: fields, Filters: filters} + q, args, usedFields, err := rrepo.BuildWithFields(req, wl) + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "build_sql_failed", + "job_id": jobID, + "error": err.Error(), + }) + return + } + + if len(usedFields) > 0 { + fields = usedFields + } + + // 构建列标题 + labels := FieldLabels() + hdrs := make([]string, len(fields)) + for i, tf := range fields { + if v, ok := labels[tf]; ok { + hdrs[i] = v + } else { + hdrs[i] = tf + } + } + + // 列头去重 + { + cnt := map[string]int{} + for _, h := range hdrs { + cnt[h]++ + } + for i := range hdrs { + if cnt[hdrs[i]] > 1 { + parts := strings.Split(fields[i], ".") + if len(parts) == 2 && parts[0] != mainTable { + hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i] + } + } + } + } + + a.runJob(jobID, dataDB, q, args, fields, hdrs, jobDetail.FileFormat) +} + +func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fileFormat string) { + defer func() { + if r := recover(); r != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_panic", + "job_id": id, + "error": utils.ToString(r), + "fields": fields, + "format": fileFormat, + }) + log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields) + repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{ + "error": utils.ToString(r), + "fields": fields, + "format": fileFormat, + }) + } + }() + // load datasource once for transform decisions + var jobDS string + var jobMain string + rrepo := repo.NewExportRepo() + { + tplID, _, _ := rrepo.GetJobFilters(a.meta, id) + if tplID > 0 { + ds, mt, _, _ := rrepo.GetTemplateMeta(a.meta, tplID) + jobDS = ds + if mt != "" { + jobMain = mt + } else { + jobMain = "order" + } + } + } + + // 检查预估行数,如果超过阈值且格式是xlsx,强制改为csv + if fileFormat == "xlsx" { + var rowEstimate int64 + estRow := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) + _ = estRow.Scan(&rowEstimate) + if rowEstimate > constants.ExportThresholds.XlsxMaxRows { + logging.JSON("INFO", map[string]interface{}{ + "event": "force_csv_format", + "job_id": id, + "row_estimate": rowEstimate, + "threshold": constants.ExportThresholds.XlsxMaxRows, + "reason": "row_estimate exceeds xlsx max rows, forcing csv format", + }) + fileFormat = "csv" + } + } + + rrepo.StartJob(a.meta, id) + if fileFormat == "csv" { + newBaseWriter := func() (exporter.RowWriter, error) { + return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) + } + files := []string{} + { + var tplID uint64 + var filtersJSON []byte + row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) + _ = row.Scan(&tplID, &filtersJSON) + var tplDS string + var main string + var fieldsJSON []byte + tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) + _ = tr.Scan(&tplDS, &main, &fieldsJSON) + var fs []string + var fl map[string]interface{} + json.Unmarshal(fieldsJSON, &fs) + json.Unmarshal(filtersJSON, &fl) + wl := Whitelist() + var chunks [][2]string + if v, ok := fl["create_time_between"]; ok { + if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { + chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays) + } + if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 { + chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays) + } + } + if len(chunks) > 0 { + var total int64 + // 如果 row_estimate 为 0,在分块导出开始时重新估算 + var currentEst int64 + row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) + _ = row.Scan(¤tEst) + if currentEst == 0 { + estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl) + if estChunk > 0 { + rrepo.UpdateRowEstimate(a.meta, id, estChunk) + } + } + skipChunk := false + if tplDS == "marketing" && main == "order" { + for _, f := range fs { + if strings.HasPrefix(f, "order_voucher.") { + skipChunk = true + break + } + } + if !skipChunk { + if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok { + skipChunk = true + } + } + } + if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold { + cur := rrepo.NewCursor(tplDS, main) + batch := constants.ChooseBatchSize(0, constants.FileFormatCSV) + for _, rg := range chunks { + fl["create_time_between"] = []string{rg[0], rg[1]} + req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl} + cq, cargs, err := exporter.BuildSQL(req, wl) + if err != nil { + continue + } + logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)}) + log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs)) + newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } + transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } + chunkBase := total + onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil } + onRoll := func(path string, size int64, partRows int64) error { + files = append(files, path) + rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) + return nil + } + cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) + if e != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_stream_error", + "job_id": id, + "stage": "csv_chunk", + "error": e.Error(), + "datasource": jobDS, + "sql": cq, + "args": cargs, + }) + log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq) + rrepo.MarkFailed(a.meta, id, "csv_chunk_stream_error", map[string]interface{}{ + "error": e.Error(), + "datasource": jobDS, + "sql": cq, + "args": cargs, + }) + return + } + total += cnt + rrepo.UpdateProgress(a.meta, id, total) + } + if total == 0 { + total = rrepo.Count(db, q, args) + } + if len(files) >= 1 { + rrepo.ZipAndRecord(a.meta, id, files, total) + } + rrepo.MarkCompleted(a.meta, id, total) + return + } + } + } + log.Printf("job_id=%d sql=%s args=%v", id, q, args) + logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)}) + log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args)) + { + var est int64 + { + var filtersJSON []byte + row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id) + _ = row.Scan(&filtersJSON) + var fl map[string]interface{} + json.Unmarshal(filtersJSON, &fl) + est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) + rrepo.UpdateRowEstimate(a.meta, id, est) + } + batch := constants.ChooseBatchSize(est, constants.FileFormat(fileFormat)) + files2 := []string{} + cur := rrepo.NewCursor(jobDS, jobMain) + newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } + transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) } + onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil } + onRoll := func(path string, size int64, partRows int64) error { + files2 = append(files2, path) + rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) + return nil + } + count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_stream_error", + "job_id": id, + "stage": "xlsx_direct", + "error": err.Error(), + "datasource": jobDS, + "fields": fields, + "sql": q, + }) + log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields)) + rrepo.MarkFailed(a.meta, id, "csv_direct_stream_error", map[string]interface{}{ + "error": err.Error(), + "datasource": jobDS, + "fields": fields, + "sql": q, + }) + return + } + if len(files2) >= 1 { + rrepo.ZipAndRecord(a.meta, id, files2, count) + } + rrepo.MarkCompleted(a.meta, id, count) + return + } + } + if fileFormat == "xlsx" { + files := []string{} + { + var tplID uint64 + var filtersJSON []byte + row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) + _ = row.Scan(&tplID, &filtersJSON) + var tplDS string + var main string + var fieldsJSON []byte + tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) + _ = tr.Scan(&tplDS, &main, &fieldsJSON) + var fs []string + var fl map[string]interface{} + json.Unmarshal(fieldsJSON, &fs) + json.Unmarshal(filtersJSON, &fl) + wl := Whitelist() + var chunks [][2]string + if v, ok := fl["create_time_between"]; ok { + if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { + chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays) + } + if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 { + chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays) + } + } + if len(chunks) > 0 { + var total int64 + // 如果 row_estimate 为 0,在分块导出开始时重新估算 + var currentEst int64 + row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) + _ = row.Scan(¤tEst) + if currentEst == 0 { + estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl) + if estChunk > 0 { + rrepo.UpdateRowEstimate(a.meta, id, estChunk) + } + } + skipChunk := false + if tplDS == "marketing" && main == "order" { + for _, f := range fs { + if strings.HasPrefix(f, "order_voucher.") { + skipChunk = true + break + } + } + if !skipChunk { + if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok { + skipChunk = true + } + } + } + if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold { + cur := rrepo.NewCursor(tplDS, main) + batch := constants.ChooseBatchSize(0, constants.FileFormatXLSX) + for _, rg := range chunks { + fl["create_time_between"] = []string{rg[0], rg[1]} + req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl} + cq, cargs, err := rrepo.Build(req, wl) + if err != nil { + continue + } + logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)}) + log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs)) + newWriter := func() (exporter.RowWriter, error) { + xw, e := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1") + if e == nil { + _ = xw.WriteHeader(cols) + } + return xw, e + } + transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } + // 进度回调按全局累计行数更新,避免跨分片出现数值回退 + chunkBase := total + onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil } + onRoll := func(path string, size int64, partRows int64) error { + files = append(files, path) + rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) + return nil + } + cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) + if e != nil { + logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs}) + log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e) + rrepo.MarkFailed(a.meta, id, "xlsx_chunk_stream_error", map[string]interface{}{ + "error": e.Error(), + "datasource": jobDS, + "sql": cq, + "args": cargs, + }) + return + } + total += cnt + rrepo.UpdateProgress(a.meta, id, total) + } + if total == 0 { + total = rrepo.Count(db, q, args) + } + if len(files) >= 1 { + rrepo.ZipAndRecord(a.meta, id, files, total) + } + rrepo.MarkCompleted(a.meta, id, total) + return + } + } + } + log.Printf("job_id=%d sql=%s args=%v", id, q, args) + logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)}) + log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args)) + var est2 int64 + { + var filtersJSON []byte + row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id) + _ = row.Scan(&filtersJSON) + var fl map[string]interface{} + json.Unmarshal(filtersJSON, &fl) + est2 = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) + rrepo.UpdateRowEstimate(a.meta, id, est2) + } + x, err := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1") + if err != nil { + logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()}) + log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err) + rrepo.MarkFailed(a.meta, id, "xlsx_writer_creation_failed", map[string]interface{}{ + "error": err.Error(), + "stage": "xlsx_direct", + }) + return + } + _ = x.WriteHeader(cols) + rrepo.UpdateProgress(a.meta, id, 0) + // 记录查询执行前的参数类型信息 + argTypes := make([]string, len(args)) + for i, arg := range args { + argTypes[i] = fmt.Sprintf("%T", arg) + } + logging.JSON("INFO", map[string]interface{}{ + "event": "export_query_before_execute", + "job_id": id, + "stage": "xlsx_direct", + "datasource": jobDS, + "sql": q, + "args": args, + "arg_types": argTypes, + "final_sql": renderSQL(q, args), + }) + log.Printf("[EXPORT_DEBUG] job_id=%d stage=xlsx_direct before_query sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args)) + rows, err := db.Query(q, args...) + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_query_error", + "job_id": id, + "stage": "xlsx_direct", + "error": err.Error(), + "datasource": jobDS, + "sql": q, + "args": args, + }) + log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_query error=%v", id, err) + rrepo.MarkFailed(a.meta, id, "xlsx_query_failed", map[string]interface{}{ + "error": err.Error(), + "datasource": jobDS, + "sql": q, + "args": args, + }) + return + } + defer rows.Close() + // 动态获取实际列数 + actualCols, err := rows.Columns() + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_columns_error", + "job_id": id, + "stage": "xlsx_direct", + "error": err.Error(), + }) + log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_columns error=%v", id, err) + rrepo.MarkFailed(a.meta, id, "xlsx_columns_failed", map[string]interface{}{ + "error": err.Error(), + }) + return + } + if len(actualCols) != len(cols) { + logging.JSON("WARN", map[string]interface{}{ + "event": "export_column_count_mismatch", + "job_id": id, + "stage": "xlsx_direct", + "expected_cols": len(cols), + "actual_cols": len(actualCols), + }) + log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct column_mismatch expected=%d actual=%d", id, len(cols), len(actualCols)) + } + out := make([]interface{}, len(actualCols)) + dest := make([]interface{}, len(actualCols)) + for i := range out { + dest[i] = &out[i] + } + var count int64 + var tick int64 + var firstRow []string + firstRowCaptured := false + for rows.Next() { + if err := rows.Scan(dest...); err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_scan_error", + "job_id": id, + "stage": "xlsx_direct", + "error": err.Error(), + "count": count, + }) + log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_scan error=%v count=%d", id, err, count) + rrepo.MarkFailed(a.meta, id, "xlsx_scan_failed", map[string]interface{}{ + "error": err.Error(), + "count": count, + }) + return + } + vals := make([]string, len(cols)) + for i := range out { + if b, ok := out[i].([]byte); ok { + vals[i] = string(b) + } else if out[i] == nil { + vals[i] = "" + } else { + vals[i] = utils.ToString(out[i]) + } + } + // 仅记录第一行原始数据到日志中,方便排查是否有查询结果 + if !firstRowCaptured { + firstRow = make([]string, len(vals)) + copy(firstRow, vals) + firstRowCaptured = true + } + vals = transformRow(jobDS, fields, vals) + x.WriteRow(vals) + count++ + tick++ + if tick%200 == 0 { + rrepo.UpdateProgress(a.meta, id, count) + } + } + // 如果查询到了数据,记录一条包含首行数据的日志,便于确认导出前 SQL 是否返回结果 + if count > 0 && firstRowCaptured { + logging.JSON("INFO", map[string]interface{}{ + "event": "export_first_row_sample", + "job_id": id, + "datasource": jobDS, + "total_rows": count, + "first_row": firstRow, + "sql": q, + "args": args, + "final_sql": renderSQL(q, args), + "fields_order": fields, + }) + } else if count == 0 { + // 如果查询返回0行,记录详细信息以便排查 + logging.JSON("WARN", map[string]interface{}{ + "event": "export_zero_rows", + "job_id": id, + "datasource": jobDS, + "stage": "xlsx_direct", + "sql": q, + "args": args, + "arg_types": argTypes, + "final_sql": renderSQL(q, args), + "expected_cols": len(cols), + "actual_cols": len(actualCols), + }) + log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct zero_rows sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args)) + } + p, size, _ := x.Close() + log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()}) + a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now()) + rrepo.ZipAndRecord(a.meta, id, []string{p}, count) + rrepo.MarkCompleted(a.meta, id, count) + return + } + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_format_unsupported", + "job_id": id, + "format": fileFormat, + }) + log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fileFormat) + rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{ + "format": fileFormat, + }) +} + +// recompute final rows for a job and correct export_jobs.total_rows +func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr string) { + id, _ := strconv.ParseUint(idStr, 10, 64) + var tplID uint64 + var filtersJSON []byte + row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) + if err := row.Scan(&tplID, &filtersJSON); err != nil { + fail(w, r, http.StatusNotFound, "not found") + return + } + var ds string + var main string + var fieldsJSON []byte + tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) + _ = tr.Scan(&ds, &main, &fieldsJSON) + var fs []string + var fl map[string]interface{} + json.Unmarshal(fieldsJSON, &fs) + json.Unmarshal(filtersJSON, &fl) + wl := Whitelist() + req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl} + q, args, err := exporter.BuildSQL(req, wl) + if err != nil { + fail(w, r, http.StatusBadRequest, err.Error()) + return + } + dataDB := a.selectDataDB(ds) + final := repo.NewExportRepo().Count(dataDB, q, args) + repo.NewExportRepo().MarkCompleted(a.meta, id, final) + ok(w, r, map[string]interface{}{"id": id, "final_rows": final}) +} + +func (a *ExportsAPI) selectDataDB(ds string) *sql.DB { + if ds == "ymt" { + return a.ymt + } + return a.marketing +} + +// moved to repo layer: repo.ZipAndRecord + +func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) { + rrepo := repo.NewExportRepo() + d, err := rrepo.GetJob(a.meta, id) + if err != nil { + fail(w, r, http.StatusNotFound, "not found") + return + } + flist, _ := rrepo.ListJobFiles(a.meta, id) + files := []map[string]interface{}{} + for _, f := range flist { + files = append(files, map[string]interface{}{"storage_uri": f.URI.String, "sheet_name": f.Sheet.String, "row_count": f.RowCount.Int64, "size_bytes": f.SizeBytes.Int64}) + } + evalStatus := "通过" + if d.ExplainScore.Int64 < 60 { + evalStatus = "禁止" + } + desc := fmt.Sprintf("评分:%d,估算行数:%d;%s", d.ExplainScore.Int64, d.TotalRows.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[d.ExplainScore.Int64 >= 60]) + if d.ExplainJSON.Valid && d.ExplainJSON.String != "" { + var arr []map[string]interface{} + if err := json.Unmarshal([]byte(d.ExplainJSON.String), &arr); err == nil { + segs := []string{} + for _, r := range arr { + getStr := func(field string) string { + if v, ok := r[field]; ok { + if mm, ok := v.(map[string]interface{}); ok { + if b, ok := mm["Valid"].(bool); ok && !b { + return "" + } + if s, ok := mm["String"].(string); ok { + return s + } + } + } + return "" + } + getInt := func(field string) int64 { + if v, ok := r[field]; ok { + if mm, ok := v.(map[string]interface{}); ok { + if b, ok := mm["Valid"].(bool); ok && !b { + return 0 + } + if f, ok := mm["Int64"].(float64); ok { + return int64(f) + } + } + } + return 0 + } + getFloat := func(field string) float64 { + if v, ok := r[field]; ok { + if mm, ok := v.(map[string]interface{}); ok { + if b, ok := mm["Valid"].(bool); ok && !b { + return 0 + } + if f, ok := mm["Float64"].(float64); ok { + return f + } + } + } + return 0 + } + tbl := getStr("Table") + typ := getStr("Type") + if typ == "" { + typ = getStr("SelectType") + } + key := getStr("Key") + rowsN := getInt("Rows") + filt := getFloat("Filtered") + extra := getStr("Extra") + if tbl == "" && typ == "" && rowsN == 0 && extra == "" { + continue + } + s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt) + if extra != "" { + s += ", 额外:" + extra + } + segs = append(segs, s) + } + if len(segs) > 0 { + desc = strings.Join(segs, ";") + } + } + } + ok(w, r, map[string]interface{}{"id": d.ID, "template_id": d.TemplateID, "status": d.Status, "requested_by": d.RequestedBy, "file_format": d.FileFormat, "total_rows": d.TotalRows.Int64, "started_at": d.StartedAt.Time, "finished_at": d.FinishedAt.Time, "created_at": d.CreatedAt, "updated_at": d.UpdatedAt, "files": files, "eval_status": evalStatus, "eval_desc": desc}) +} + +func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) { + rrepo := repo.NewExportRepo() + var jid uint64 + _, _ = fmt.Sscan(id, &jid) + tplID, filters, err := rrepo.GetJobFilters(a.meta, jid) + if err != nil { + fail(w, r, http.StatusNotFound, "not found") + return + } + ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, tplID) + if err != nil { + fail(w, r, http.StatusBadRequest, "template not found") + return + } + var fl map[string]interface{} + json.Unmarshal(filters, &fl) + wl := Whitelist() + req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl} + q, args, err := rrepo.Build(req, wl) + if err != nil { + failCat(w, r, http.StatusBadRequest, err.Error(), "sql_build_error") + return + } + formatArg := func(a interface{}) string { + switch t := a.(type) { + case nil: + return "NULL" + case []byte: + s := string(t) + s = strings.ReplaceAll(s, "'", "''") + return "'" + s + "'" + case string: + s := strings.ReplaceAll(t, "'", "''") + return "'" + s + "'" + case int: + return strconv.Itoa(t) + case int64: + return strconv.FormatInt(t, 10) + case float64: + return strconv.FormatFloat(t, 'f', -1, 64) + case time.Time: + return "'" + t.Format("2006-01-02 15:04:05") + "'" + default: + return fmt.Sprintf("%v", t) + } + } + var sb strings.Builder + var ai int + for i := 0; i < len(q); i++ { + c := q[i] + if c == '?' && ai < len(args) { + sb.WriteString(formatArg(args[ai])) + ai++ + } else { + sb.WriteByte(c) + } + } + ok(w, r, map[string]interface{}{"sql": q, "final_sql": sb.String()}) +} + +func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) { + rrepo := repo.NewExportRepo() + uri, err := rrepo.GetLatestFileURI(a.meta, id) + if err != nil { + // fallback: try to serve local storage file by job id + // search for files named export_job__*.zip/xlsx/csv + dir := "storage" + entries, e := os.ReadDir(dir) + if e == nil { + best := "" + var bestInfo os.FileInfo + for _, ent := range entries { + name := ent.Name() + if strings.HasPrefix(name, "export_job_"+id+"_") && (strings.HasSuffix(name, ".zip") || strings.HasSuffix(name, ".xlsx") || strings.HasSuffix(name, ".csv")) { + info, _ := os.Stat(filepath.Join(dir, name)) + if info != nil { + if best == "" || info.ModTime().After(bestInfo.ModTime()) { + best = name + bestInfo = info + } + } + } + } + if best != "" { + http.ServeFile(w, r, filepath.Join(dir, best)) + return + } + } + fail(w, r, http.StatusNotFound, "not found") + return + } + http.ServeFile(w, r, uri) +} + +func transformRow(ds string, fields []string, vals []string) []string { + payStatusIdx := -1 + for i := range fields { + if fields[i] == "order.pay_status" { + payStatusIdx = i + break + } + } + isPaid := func() bool { + if payStatusIdx < 0 || payStatusIdx >= len(vals) { + return true + } + return constants.IsPaidStatus(ds, vals[payStatusIdx]) + }() + for i := range fields { + if i >= len(vals) { + break + } + f := fields[i] + v := vals[i] + + // ==================== 枚举转换 ==================== + // order.type - 订单类型 + if f == "order.type" { + if n := parseIntVal(v); n >= 0 { + if ds == "ymt" { + if label, ok := constants.YMTOrderType[n]; ok { + vals[i] = label + } + } else { + if label, ok := constants.MarketingOrderType[n]; ok { + vals[i] = label + } + } + } + continue + } + // order.status - 订单状态 + if f == "order.status" { + if n := parseIntVal(v); n >= 0 { + if ds == "ymt" { + if label, ok := constants.YMTOrderStatus[n]; ok { + vals[i] = label + } + } else { + if label, ok := constants.MarketingOrderStatus[n]; ok { + vals[i] = label + } + } + } + continue + } + // order.pay_type - 支付方式 + if f == "order.pay_type" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.MarketingPayType[n]; ok { + vals[i] = label + } else if n == 0 { + vals[i] = "" + } + } + continue + } + // order.pay_status - 支付状态 + if f == "order.pay_status" { + if n := parseIntVal(v); n >= 0 { + if ds == "ymt" { + if label, ok := constants.YMTPayStatus[n]; ok { + vals[i] = label + } + } else { + if label, ok := constants.MarketingPayStatus[n]; ok { + vals[i] = label + } + } + } + continue + } + // order.use_coupon - 是否使用优惠券 + if f == "order.use_coupon" { + switch v { + case "1": + vals[i] = "是" + case "2", "0": + vals[i] = "否" + } + continue + } + // order.deliver_status - 投递状态 + if f == "order.deliver_status" { + switch v { + case "1": + vals[i] = "待投递" + case "2": + vals[i] = "已投递" + case "3": + vals[i] = "投递失败" + } + continue + } + // order.is_inner - 供应商类型 + if f == "order.is_inner" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.YMTIsInner[n]; ok { + vals[i] = label + } + } + continue + } + // order_voucher.channel / voucher.channel - 立减金渠道 + if f == "order_voucher.channel" || f == "voucher.channel" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.OrderVoucherChannel[n]; ok { + vals[i] = label + } + } + continue + } + // order_voucher.status - 立减金状态 + if f == "order_voucher.status" { + if n := parseIntVal(v); n >= 0 { + if ds == "ymt" { + if label, ok := constants.YMTOrderVoucherStatus[n]; ok { + vals[i] = label + } + } else { + if label, ok := constants.MarketingOrderVoucherStatus[n]; ok { + vals[i] = label + } + } + } + continue + } + // order_voucher.receive_mode / voucher.receive_mode - 领取方式 + if f == "order_voucher.receive_mode" || f == "voucher.receive_mode" { + if n := parseIntVal(v); n >= 0 { + if ds == "ymt" { + if label, ok := constants.YMTVoucherReceiveMode[n]; ok { + vals[i] = label + } + } else { + if label, ok := constants.OrderVoucherReceiveMode[n]; ok { + vals[i] = label + } + } + } + continue + } + // voucher.is_webview - 打开方式 + if f == "voucher.is_webview" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.VoucherOpenMode[n]; ok { + vals[i] = label + } + } + continue + } + // goods_voucher_subject_config.type - 主体类型 + if f == "goods_voucher_subject_config.type" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.VoucherSubjectType[n]; ok { + vals[i] = label + } + } + continue + } + // order_cash.channel - 红包渠道 + if f == "order_cash.channel" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.OrderCashChannel[n]; ok { + vals[i] = label + } + } + continue + } + // order_cash.receive_status - 红包领取状态 + if f == "order_cash.receive_status" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.OrderCashReceiveStatus[n]; ok { + vals[i] = label + } + } + continue + } + // order_cash.status - 红包状态(营销系统) + if f == "order_cash.status" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.MarketingOrderCashStatus[n]; ok { + vals[i] = label + } + } + continue + } + // order_digit.order_type - 数字订单类型 + if f == "order_digit.order_type" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.OrderDigitOrderType[n]; ok { + vals[i] = label + } + } + continue + } + // activity.settlement_type / plan.settlement_type - 结算方式 + if f == "activity.settlement_type" || f == "plan.settlement_type" { + if n := parseIntVal(v); n >= 0 { + if ds == "ymt" { + if label, ok := constants.YMTSettlementType[n]; ok { + vals[i] = label + } + } else { + if label, ok := constants.MarketingSettlementType[n]; ok { + vals[i] = label + } + } + } + continue + } + // plan.send_method - 发放方式 + if f == "plan.send_method" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.MarketingSendMethod[n]; ok { + vals[i] = label + } + } + continue + } + // code_batch.period_type - 周期类型 + if f == "code_batch.period_type" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.MarketingPeriodType[n]; ok { + vals[i] = label + } + } + continue + } + // code_batch.recharge_type - 充值类型 + if f == "code_batch.recharge_type" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.MarketingRechargeType[n]; ok { + vals[i] = label + } + } + continue + } + // key_batch.style - key码样式 + if f == "key_batch.style" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.KeyBatchStyle[n]; ok { + vals[i] = label + } + } + continue + } + // merchant_key_send.status - key码API发放状态 + if f == "merchant_key_send.status" { + if n := parseIntVal(v); n >= 0 { + if label, ok := constants.MerchantKeySendStatus[n]; ok { + vals[i] = label + } + } + continue + } + + // ==================== 特殊字段转换 ==================== + // 解密/转换订单 key + if f == "order.key" { + if ds == "ymt" { + key := os.Getenv("YMT_KEY_DECRYPT_KEY_B64") + if key == "" { + key = "z9DoIVLuDYEN/qsgweRA4A==" + } + if dec, err := ymtcrypto.SM4Decrypt(vals[i], key); err == nil && dec != "" { + vals[i] = dec + } + } else { + vals[i] = decodeOrderKey(vals[i]) + } + } + // voucher_batch.provider: 将渠道编码转换为中文名称 + if f == "voucher_batch.provider" { + switch strings.TrimSpace(vals[i]) { + // 老编码 + case "lsxd": + vals[i] = "蓝色兄弟" + case "fjxw": + vals[i] = "福建兴旺" + case "fzxy": + vals[i] = "福州兴雅" + case "fzyt": + vals[i] = "福州悦途" + // 新编码:微信立减金渠道 + case "voucher_wechat_lsxd": + vals[i] = "蓝色兄弟" + case "voucher_wechat_fjxw": + vals[i] = "福建兴旺" + case "voucher_wechat_fzxy": + vals[i] = "福州兴雅" + case "voucher_wechat_fzyt": + vals[i] = "福州悦途" + case "voucher_wechat_zjky": + vals[i] = "浙江卡赢" + case "voucher_wechat_zjky2": + vals[i] = "浙江卡赢2" + case "voucher_wechat_zjwsxx": + vals[i] = "浙江喔刷" + case "voucher_wechat_gzynd": + vals[i] = "广州亿纳德" + case "voucher_wechat_fjhrxxfw": + vals[i] = "福建省宏仁信息服务" + case "voucher_wechat_fzqmkj": + vals[i] = "福州启蒙科技有限公司" + case "voucher_wechat_fzydxx": + vals[i] = "福州元朵信息科技有限公司" + case "voucher_wechat_xyhxx": + vals[i] = "新沂薪伙原信息科技有限公司" + } + } + // activity.channels: 解析 JSON 并转成可读渠道名 + if f == "activity.channels" { + if vals[i] == "" || vals[i] == "0" { + vals[i] = "无" + continue + } + if !isPaid { + vals[i] = "无" + continue + } + var arr []map[string]interface{} + if err := json.Unmarshal([]byte(vals[i]), &arr); err != nil { + vals[i] = "无" + continue + } + names := make([]string, 0, len(arr)) + for _, item := range arr { + if v, ok := item["pay_name"].(string); ok && strings.TrimSpace(v) != "" { + names = append(names, v) + continue + } + if v, ok := item["name"].(string); ok && strings.TrimSpace(v) != "" { + names = append(names, v) + } + } + if len(names) == 0 { + vals[i] = "无" + } else { + vals[i] = strings.Join(names, ",") + } + } + } + return vals +} + +func decodeOrderKey(s string) string { + if s == "" { + return s + } + if len(s) > 2 && s[len(s)-2:] == "_1" { + s = s[:len(s)-2] + } + var n big.Int + if _, ok := n.SetString(s, 10); !ok { + return s + } + base := []rune{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j', 'k', 'l', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'} + baseCount := big.NewInt(int64(len(base))) + zero := big.NewInt(0) + var out []rune + for n.Cmp(zero) > 0 { + var mod big.Int + mod.Mod(&n, baseCount) + out = append(out, base[mod.Int64()]) + n.Div(&n, baseCount) + } + for len(out) < 16 { + out = append(out, base[0]) + } + for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 { + out[i], out[j] = out[j], out[i] + } + return string(out) +} + +func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) { + a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", string(constants.JobStatusCanceled), time.Now(), id) + w.Write([]byte("ok")) +} + +func renderSQL(q string, args []interface{}) string { + formatArg := func(a interface{}) string { + switch t := a.(type) { + case nil: + return "NULL" + case []byte: + s := string(t) + s = strings.ReplaceAll(s, "'", "''") + return "'" + s + "'" + case string: + s := strings.ReplaceAll(t, "'", "''") + return "'" + s + "'" + case int: + return strconv.Itoa(t) + case int64: + return strconv.FormatInt(t, 10) + case float64: + return strconv.FormatFloat(t, 'f', -1, 64) + case time.Time: + return "'" + t.Format("2006-01-02 15:04:05") + "'" + default: + return fmt.Sprintf("%v", t) + } + } + var sb strings.Builder + var ai int + for i := 0; i < len(q); i++ { + c := q[i] + if c == '?' && ai < len(args) { + sb.WriteString(formatArg(args[ai])) + ai++ + } else { + sb.WriteByte(c) + } + } + return sb.String() +} +func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) { + q := r.URL.Query() + page := 1 + size := 15 + if p := q.Get("page"); p != "" { + if n, err := strconv.Atoi(p); err == nil && n > 0 { + page = n + } + } + if s := q.Get("page_size"); s != "" { + if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 { + size = n + } + } + tplIDStr := q.Get("template_id") + var tplID uint64 + if tplIDStr != "" { + if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil { + tplID = n + } + } + offset := (page - 1) * size + rrepo := repo.NewExportRepo() + var totalCount int64 + totalCount = rrepo.CountJobs(a.meta, tplID, "") + itemsRaw, err := rrepo.ListJobs(a.meta, tplID, "", size, offset) + if err != nil { + failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error") + return + } + items := []map[string]interface{}{} + for _, it := range itemsRaw { + id, tid, req := it.ID, it.TemplateID, it.RequestedBy + status, fmtstr := it.Status, it.FileFormat + estimate, total := it.RowEstimate, it.TotalRows + createdAt, updatedAt := it.CreatedAt, it.UpdatedAt + score, explainRaw := it.ExplainScore, it.ExplainJSON + evalStatus := "通过" + if score.Int64 < 60 { + evalStatus = "禁止" + } + desc := fmt.Sprintf("评分:%d,估算行数:%d;%s", score.Int64, estimate.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[score.Int64 >= 60]) + if explainRaw.Valid && explainRaw.String != "" { + var arr []map[string]interface{} + if err := json.Unmarshal([]byte(explainRaw.String), &arr); err == nil { + segs := []string{} + for _, r := range arr { + getStr := func(field string) string { + if v, ok := r[field]; ok { + if mm, ok := v.(map[string]interface{}); ok { + if b, ok := mm["Valid"].(bool); ok && !b { + return "" + } + if s, ok := mm["String"].(string); ok { + return s + } + } + } + return "" + } + getInt := func(field string) int64 { + if v, ok := r[field]; ok { + if mm, ok := v.(map[string]interface{}); ok { + if b, ok := mm["Valid"].(bool); ok && !b { + return 0 + } + if f, ok := mm["Int64"].(float64); ok { + return int64(f) + } + } + } + return 0 + } + getFloat := func(field string) float64 { + if v, ok := r[field]; ok { + if mm, ok := v.(map[string]interface{}); ok { + if b, ok := mm["Valid"].(bool); ok && !b { + return 0 + } + if f, ok := mm["Float64"].(float64); ok { + return f + } + } + } + return 0 + } + tbl := getStr("Table") + typ := getStr("Type") + if typ == "" { + typ = getStr("SelectType") + } + key := getStr("Key") + rowsN := getInt("Rows") + filt := getFloat("Filtered") + extra := getStr("Extra") + if tbl == "" && typ == "" && rowsN == 0 && extra == "" { + continue + } + s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt) + if extra != "" { + s += ", 额外:" + extra + } + segs = append(segs, s) + } + if len(segs) > 0 { + desc = strings.Join(segs, ";") + } + } + } + m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc} + items = append(items, m) + } + ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size}) +} + +// mergePermissionIntoFilters injects permission scope into filters in a canonical way +func mergePermissionIntoFilters(ds, main string, perm map[string]interface{}, filters map[string]interface{}) map[string]interface{} { + if filters == nil { + filters = map[string]interface{}{} + } + // 先处理 plan_id_eq 和 reseller_id_eq 的设置 + if v, ok := pickFirst(perm, filters, []string{"reseller_id", "merchant_id"}); ok { + filters["reseller_id_eq"] = v + } + if v, ok := pickFirst(perm, filters, []string{"plan_id", "activity_id"}); ok { + filters["plan_id_eq"] = v + } + // 如果传递了 plan_id_eq 或 reseller_id_eq 且不为空,则删除已有的 creator_in 并跳过设置(适用于所有数据源) + if main == "order" || main == "order_info" { + hasPlanOrReseller := false + if v, ok := filters["plan_id_eq"]; ok && v != nil && v != "" && v != 0 { + hasPlanOrReseller = true + } + if v, ok := filters["reseller_id_eq"]; ok && v != nil && v != "" && v != 0 { + hasPlanOrReseller = true + } + if hasPlanOrReseller { + // 删除已有的 creator_in + delete(filters, "creator_in") + delete(filters, "creator_ids") + goto skipCreator + } + } + // if creator_in already present, keep it + if hasNonEmptyIDs(filters["creator_in"]) { + return filters + } + // try known keys (明确排除 current_user_id,它仅用于记录 owner,不用于数据过滤) + { + candidates := []string{"creator_in", "creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"} + ids := []interface{}{} + for _, k := range candidates { + if perm == nil { + break + } + // 明确排除 current_user_id 字段(即使不在 candidates 列表中,也显式检查以确保安全) + if k == "current_user_id" { + continue + } + if v, ok := perm[k]; ok { + ids = normalizeIDs(v) + if len(ids) > 0 { + break + } + } + } + // also check filters incoming alternative keys and normalize into creator_in + // 明确排除 current_user_id 字段 + if len(ids) == 0 { + alt := []string{"creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"} + for _, k := range alt { + // 明确排除 current_user_id 字段 + if k == "current_user_id" { + continue + } + if v, ok := filters[k]; ok { + ids = normalizeIDs(v) + if len(ids) > 0 { + break + } + } + } + } + // 额外检查:如果 permission 或 filters 中直接有 current_user_id,明确排除它 + if perm != nil { + delete(perm, "current_user_id") + } + delete(filters, "current_user_id") + if len(ids) > 0 { + filters["creator_in"] = ids + } + } +skipCreator: + // account + if v, ok := pickFirst(perm, filters, []string{"account", "account_no"}); ok { + filters["account_eq"] = v + } + // out_trade_no + if v, ok := pickFirst(perm, filters, []string{"out_trade_no", "out_order_no"}); ok { + filters["out_trade_no_eq"] = v + } + return filters +} + +func normalizeIDs(v interface{}) []interface{} { + out := []interface{}{} + switch t := v.(type) { + case []interface{}: + for _, x := range t { + if s := utils.ToString(x); s != "" { + out = append(out, s) + } + } + case []string: + for _, s := range t { + s2 := strings.TrimSpace(s) + if s2 != "" { + out = append(out, s2) + } + } + case []int: + for _, n := range t { + out = append(out, n) + } + case []int64: + for _, n := range t { + out = append(out, n) + } + case string: + // support comma-separated string + parts := strings.Split(t, ",") + for _, s := range parts { + s2 := strings.TrimSpace(s) + if s2 != "" { + out = append(out, s2) + } + } + default: + if s := utils.ToString(t); s != "" { + out = append(out, s) + } + } + return out +} + +func hasNonEmptyIDs(v interface{}) bool { + arr := normalizeIDs(v) + return len(arr) > 0 +} + +func pickFirst(perm map[string]interface{}, filters map[string]interface{}, keys []string) (interface{}, bool) { + for _, k := range keys { + if perm != nil { + if v, ok := perm[k]; ok { + arr := normalizeIDs(v) + if len(arr) > 0 { + return arr[0], true + } + if s := utils.ToString(v); s != "" { + return s, true + } + } + } + if v, ok := filters[k]; ok { + arr := normalizeIDs(v) + if len(arr) > 0 { + return arr[0], true + } + if s := utils.ToString(v); s != "" { + return s, true + } + } + } + return nil, false +} + +// parseIntVal 尝试将字符串解析为整数,失败返回-1 +func parseIntVal(s string) int { + if s == "" { + return -1 + } + n := 0 + for _, c := range s { + if c < '0' || c > '9' { + return -1 + } + n = n*10 + int(c-'0') + } + return n +} diff --git a/server/internal/repo/export_repo.go b/server/internal/repo/export_repo.go index f9665bf..f9e860e 100644 --- a/server/internal/repo/export_repo.go +++ b/server/internal/repo/export_repo.go @@ -388,6 +388,58 @@ func (r *ExportQueryRepo) GetJob(metaDB *sql.DB, jobID string) (JobDetail, error return detail, err } +// GetRunningJobs 获取所有运行中的任务(用于服务重启恢复) +func (r *ExportQueryRepo) GetRunningJobs(metaDB *sql.DB) ([]JobDetail, error) { + querySQL := `SELECT id, template_id, status, requested_by, total_rows, + file_format, started_at, finished_at, created_at, updated_at, + explain_score, explain_json + FROM export_jobs WHERE status=?` + + rows, err := metaDB.Query(querySQL, string(constants.JobStatusRunning)) + if err != nil { + return nil, err + } + defer rows.Close() + + var jobs []JobDetail + for rows.Next() { + var detail JobDetail + if err := rows.Scan( + &detail.ID, &detail.TemplateID, &detail.Status, &detail.RequestedBy, + &detail.TotalRows, &detail.FileFormat, &detail.StartedAt, &detail.FinishedAt, + &detail.CreatedAt, &detail.UpdatedAt, &detail.ExplainScore, &detail.ExplainJSON, + ); err != nil { + continue + } + jobs = append(jobs, detail) + } + return jobs, nil +} + +// IncrementRestartCount 增加任务重启计数 +func (r *ExportQueryRepo) IncrementRestartCount(metaDB *sql.DB, jobID uint64) { + now := time.Now() + _, err := metaDB.Exec( + "UPDATE export_jobs SET restart_count = COALESCE(restart_count,0) + 1, updated_at=? WHERE id=?", + now, jobID, + ) + if err != nil { + logging.DBError("increment_restart_count", jobID, err) + } +} + +// ResetJobProgress 重置任务进度为0(用于任务恢复) +func (r *ExportQueryRepo) ResetJobProgress(metaDB *sql.DB, jobID uint64) { + now := time.Now() + _, err := metaDB.Exec( + "UPDATE export_jobs SET total_rows=0, updated_at=? WHERE id=?", + now, jobID, + ) + if err != nil { + logging.DBError("reset_job_progress", jobID, err) + } +} + // ListJobFiles 获取任务文件列表 func (r *ExportQueryRepo) ListJobFiles(metaDB *sql.DB, jobID string) ([]JobFile, error) { rows, err := metaDB.Query(