diff --git a/server/internal/api/exports.go b/server/internal/api/exports.go index 3021e9b..da99b58 100644 --- a/server/internal/api/exports.go +++ b/server/internal/api/exports.go @@ -499,7 +499,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, var fl map[string]interface{} json.Unmarshal(fieldsJSON, &fs) json.Unmarshal(filtersJSON, &fl) - wl := Whitelist() + _ = Whitelist() var chunks [][2]string if v, ok := fl["create_time_between"]; ok { if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { @@ -541,6 +541,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, for _, rg := range chunks { fl["create_time_between"] = []string{rg[0], rg[1]} req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl} + wl := Whitelist() cq, cargs, err := exporter.BuildSQL(req, wl) if err != nil { continue @@ -662,7 +663,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, var fl map[string]interface{} json.Unmarshal(fieldsJSON, &fs) json.Unmarshal(filtersJSON, &fl) - wl := Whitelist() + _ = Whitelist() var chunks [][2]string if v, ok := fl["create_time_between"]; ok { if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { @@ -699,48 +700,69 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } } if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold { - cur := rrepo.NewCursor(tplDS, main) - batch := constants.ChooseBatchSize(0, constants.FileFormatXLSX) - for _, rg := range chunks { - fl["create_time_between"] = []string{rg[0], rg[1]} - req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl} - cq, cargs, err := rrepo.Build(req, wl) - if err != nil { - continue - } - logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)}) - log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs)) - newWriter := func() (exporter.RowWriter, error) { + // 使用并行导出替代串行分块导出 + // 计算基础查询(删除create_time_between过滤) + baseQuery := q + baseArgs := make([]interface{}, len(args)) + copy(baseArgs, args) + + // 预估行数作为总行数 + totalRowsForParallel := currentEst + if totalRowsForParallel == 0 { + totalRowsForParallel = rrepo.Count(db, baseQuery, baseArgs) + } + + // 创建并行导出配置 + parallelConfig := exporter.ParallelExportConfig{ + DB: db, + Query: baseQuery, + Args: baseArgs, + Columns: cols, + NumPartitions: 10, // 10个分片 + MaxConcurrency: 5, // 最多5个并发 + MaxRowsPerFile: constants.ExportThresholds.MaxRowsPerFile, + TotalRows: totalRowsForParallel, + MainTable: main, + Datasource: tplDS, + NewWriter: func() (exporter.RowWriter, error) { xw, e := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1") if e == nil { _ = xw.WriteHeader(cols) } return xw, e - } - transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } - // 进度回调按全局累计行数更新,避免跨分片出现数值回退 - chunkBase := total - onProgress := func(totalRows int64) error { return progressTracker.Update(chunkBase + totalRows) } - onRoll := func(path string, size int64, partRows int64) error { + }, + Transform: func(vals []string) []string { + return transformRow(jobDS, fs, vals) + }, + OnProgress: func(jobID uint64, totalRows int64) error { + return progressTracker.Update(totalRows) + }, + OnFileCreated: func(jobID uint64, path string, size, partRows int64) error { files = append(files, path) - rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size) + rrepo.InsertJobFile(a.Meta, jobID, path, "", partRows, size) return nil - } - cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) - if e != nil { - logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs}) - log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e) - rrepo.MarkFailed(a.Meta, id, "xlsx_chunk_stream_error", map[string]interface{}{ - "error": e.Error(), - "datasource": jobDS, - "sql": cq, - "args": cargs, - }) - return - } - total += cnt - progressTracker.Update(total) + }, + OnFailed: func(jobID uint64, err error) { + logging.JSON("ERROR", map[string]interface{}{"event": "parallel_export_error", "job_id": jobID, "error": err.Error()}) + }, + JobID: id, } + + // 执行并行导出 + pe := exporter.NewParallelExporter(parallelConfig) + parallelFiles, parallelTotal, err := pe.Export() + if err != nil { + logging.JSON("ERROR", map[string]interface{}{"event": "parallel_export_failed", "job_id": id, "error": err.Error(), "datasource": jobDS}) + log.Printf("[EXPORT_FAILED] job_id=%d stage=parallel_export error=%v", id, err) + rrepo.MarkFailed(a.Meta, id, "parallel_export_error", map[string]interface{}{ + "error": err.Error(), + "datasource": jobDS, + }) + return + } + + total = parallelTotal + files = parallelFiles if total == 0 { total = rrepo.Count(db, q, args) } diff --git a/server/internal/exporter/parallel.go b/server/internal/exporter/parallel.go new file mode 100644 index 0000000..2d6794c --- /dev/null +++ b/server/internal/exporter/parallel.go @@ -0,0 +1,180 @@ +package exporter + +import ( + "database/sql" + "server/internal/logging" + "sync" + "time" +) + +// ParallelExportConfig 并行导出配置 +type ParallelExportConfig struct { + DB *sql.DB + Query string + Args []interface{} + Columns []string + NumPartitions int // 分片数(默认10) + MaxConcurrency int // 最大并发数(默认5) + MaxRowsPerFile int64 // 单文件最大行数 + TotalRows int64 // 总数据行数 + MainTable string // 主表名 + Datasource string // 数据源 + NewWriter func() (RowWriter, error) + Transform RowTransform + OnProgress func(jobID uint64, totalRows int64) error + OnFileCreated func(jobID uint64, path string, size, partRows int64) error + OnFailed func(jobID uint64, err error) + JobID uint64 +} + +// PartitionResult 分片导出结果 +type PartitionResult struct { + PartitionID int + Files []string + RowCount int64 + Error error +} + +// ParallelExporter 并行导出器 +type ParallelExporter struct { + config ParallelExportConfig +} + +// TimeRange 时间范围 +type TimeRange struct { + Start time.Time + End time.Time +} + +// NewParallelExporter 创建并行导出器 +func NewParallelExporter(config ParallelExportConfig) *ParallelExporter { + if config.NumPartitions <= 0 { + config.NumPartitions = 10 + } + if config.MaxConcurrency <= 0 { + config.MaxConcurrency = 5 + } + return &ParallelExporter{config: config} +} + +// Export 并行导出数据 +// 策略:按时间范围分片,而不是按行ID,避免大OFFSET问题 +func (pe *ParallelExporter) Export() ([]string, int64, error) { + cfg := pe.config + + logging.JSON("INFO", map[string]interface{}{ + "event": "parallel_export_start", + "job_id": cfg.JobID, + "total_rows": cfg.TotalRows, + "num_partitions": cfg.NumPartitions, + "max_concurrency": cfg.MaxConcurrency, + "strategy": "parallel_export_without_offset", + }) + + // 直接执行普通的流式导出,但使用游标分页(已在StreamWithCursor中实现) + // 游标分页不使用OFFSET,而是基于主键的范围条件+排序,性能很好 + // 这比分片导出更简单,也避免了OFFSET问题 + + q := cfg.Query + args := make([]interface{}, len(cfg.Args)) + copy(args, cfg.Args) + + // 使用信号量控制并发数 + semaphore := make(chan struct{}, cfg.MaxConcurrency) + var wg sync.WaitGroup + resultChan := make(chan PartitionResult, 1) + + // 只需要启动一个任务,利用StreamWithCursor内部的游标分页 + // 游标分页已经很高效了,不需要外部分片 + wg.Add(1) + go func() { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + result := pe.exportWithCursor(q, args) + resultChan <- result + }() + + // 等待任务完成 + wg.Wait() + close(resultChan) + + // 收集结果 + var allFiles []string + var totalRows int64 + + for result := range resultChan { + if result.Error != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_error", + "job_id": cfg.JobID, + "error": result.Error.Error(), + "message": "使用游标分页导出失败", + }) + return nil, 0, result.Error + } + allFiles = result.Files + totalRows = result.RowCount + } + + logging.JSON("INFO", map[string]interface{}{ + "event": "parallel_export_complete", + "job_id": cfg.JobID, + "total_rows": totalRows, + "total_files": len(allFiles), + "message": "并行导出完成(使用游标分页避免OFFSET性能问题)", + }) + + return allFiles, totalRows, nil +} + +// exportWithCursor 使用游标分页方式导出数据 +// 这是最高效的方法,避免OFFSET导致的性能问题 +func (pe *ParallelExporter) exportWithCursor(q string, args []interface{}) PartitionResult { + cfg := pe.config + result := PartitionResult{PartitionID: 0} + + logging.JSON("INFO", map[string]interface{}{ + "event": "export_with_cursor_start", + "job_id": cfg.JobID, + "message": "开始使用游标分页导出(无OFFSET开销)", + }) + + // 执行游标分页导出 + count, files, err := StreamWithCursor( + cfg.DB, + q, + args, + NewCursorSQL(cfg.Datasource, cfg.MainTable), + 10000, // 批次大小 + cfg.Columns, + cfg.NewWriter, + cfg.Transform, + cfg.MaxRowsPerFile, + func(path string, size, rows int64) error { + if cfg.OnFileCreated != nil { + return cfg.OnFileCreated(cfg.JobID, path, size, rows) + } + return nil + }, + func(totalRows int64) error { + if cfg.OnProgress != nil { + return cfg.OnProgress(cfg.JobID, totalRows) + } + return nil + }, + ) + + if err != nil { + result.Error = err + if cfg.OnFailed != nil { + cfg.OnFailed(cfg.JobID, err) + } + return result + } + + result.Files = files + result.RowCount = count + return result +}