refactor(exports): 使用并行导出替代串行分块导出

- 移除旧的串行分块导出代码,改为基于分片的并行导出机制
- 引入并行导出配置,支持设置分片数量、最大并发数和每文件最大行数
- 预估总行数作为并行导出总行数,用于更合理的任务拆分
- 新增并行导出过程中的进度更新和文件创建回调处理
- 并行导出失败时记录错误日志并标记导出任务失败
- 保留原有Transform行数据转换功能,保证数据一致性
- 优化日志输出,详细记录并行导出过程中的错误信息
This commit is contained in:
zhouyonggao 2025-12-19 18:43:43 +08:00
parent 5426ca76d8
commit e8c264a384
2 changed files with 238 additions and 36 deletions

View File

@ -499,7 +499,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var fl map[string]interface{} var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs) json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl) json.Unmarshal(filtersJSON, &fl)
wl := Whitelist() _ = Whitelist()
var chunks [][2]string var chunks [][2]string
if v, ok := fl["create_time_between"]; ok { if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
@ -541,6 +541,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
for _, rg := range chunks { for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]} fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl} req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
wl := Whitelist()
cq, cargs, err := exporter.BuildSQL(req, wl) cq, cargs, err := exporter.BuildSQL(req, wl)
if err != nil { if err != nil {
continue continue
@ -662,7 +663,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var fl map[string]interface{} var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs) json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl) json.Unmarshal(filtersJSON, &fl)
wl := Whitelist() _ = Whitelist()
var chunks [][2]string var chunks [][2]string
if v, ok := fl["create_time_between"]; ok { if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
@ -699,48 +700,69 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
} }
} }
if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold { if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold {
cur := rrepo.NewCursor(tplDS, main) // 使用并行导出替代串行分块导出
batch := constants.ChooseBatchSize(0, constants.FileFormatXLSX) // 计算基础查询删除create_time_between过滤
for _, rg := range chunks { baseQuery := q
fl["create_time_between"] = []string{rg[0], rg[1]} baseArgs := make([]interface{}, len(args))
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl} copy(baseArgs, args)
cq, cargs, err := rrepo.Build(req, wl)
if err != nil { // 预估行数作为总行数
continue totalRowsForParallel := currentEst
} if totalRowsForParallel == 0 {
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)}) totalRowsForParallel = rrepo.Count(db, baseQuery, baseArgs)
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs)) }
newWriter := func() (exporter.RowWriter, error) {
// 创建并行导出配置
parallelConfig := exporter.ParallelExportConfig{
DB: db,
Query: baseQuery,
Args: baseArgs,
Columns: cols,
NumPartitions: 10, // 10个分片
MaxConcurrency: 5, // 最多5个并发
MaxRowsPerFile: constants.ExportThresholds.MaxRowsPerFile,
TotalRows: totalRowsForParallel,
MainTable: main,
Datasource: tplDS,
NewWriter: func() (exporter.RowWriter, error) {
xw, e := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1") xw, e := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
if e == nil { if e == nil {
_ = xw.WriteHeader(cols) _ = xw.WriteHeader(cols)
} }
return xw, e return xw, e
} },
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } Transform: func(vals []string) []string {
// 进度回调按全局累计行数更新,避免跨分片出现数值回退 return transformRow(jobDS, fs, vals)
chunkBase := total },
onProgress := func(totalRows int64) error { return progressTracker.Update(chunkBase + totalRows) } OnProgress: func(jobID uint64, totalRows int64) error {
onRoll := func(path string, size int64, partRows int64) error { return progressTracker.Update(totalRows)
},
OnFileCreated: func(jobID uint64, path string, size, partRows int64) error {
files = append(files, path) files = append(files, path)
rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size) rrepo.InsertJobFile(a.Meta, jobID, path, "", partRows, size)
return nil return nil
} },
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) OnFailed: func(jobID uint64, err error) {
if e != nil { logging.JSON("ERROR", map[string]interface{}{"event": "parallel_export_error", "job_id": jobID, "error": err.Error()})
logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs}) },
log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e) JobID: id,
rrepo.MarkFailed(a.Meta, id, "xlsx_chunk_stream_error", map[string]interface{}{
"error": e.Error(),
"datasource": jobDS,
"sql": cq,
"args": cargs,
})
return
}
total += cnt
progressTracker.Update(total)
} }
// 执行并行导出
pe := exporter.NewParallelExporter(parallelConfig)
parallelFiles, parallelTotal, err := pe.Export()
if err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "parallel_export_failed", "job_id": id, "error": err.Error(), "datasource": jobDS})
log.Printf("[EXPORT_FAILED] job_id=%d stage=parallel_export error=%v", id, err)
rrepo.MarkFailed(a.Meta, id, "parallel_export_error", map[string]interface{}{
"error": err.Error(),
"datasource": jobDS,
})
return
}
total = parallelTotal
files = parallelFiles
if total == 0 { if total == 0 {
total = rrepo.Count(db, q, args) total = rrepo.Count(db, q, args)
} }

View File

@ -0,0 +1,180 @@
package exporter
import (
"database/sql"
"server/internal/logging"
"sync"
"time"
)
// ParallelExportConfig 并行导出配置
type ParallelExportConfig struct {
DB *sql.DB
Query string
Args []interface{}
Columns []string
NumPartitions int // 分片数默认10
MaxConcurrency int // 最大并发数默认5
MaxRowsPerFile int64 // 单文件最大行数
TotalRows int64 // 总数据行数
MainTable string // 主表名
Datasource string // 数据源
NewWriter func() (RowWriter, error)
Transform RowTransform
OnProgress func(jobID uint64, totalRows int64) error
OnFileCreated func(jobID uint64, path string, size, partRows int64) error
OnFailed func(jobID uint64, err error)
JobID uint64
}
// PartitionResult 分片导出结果
type PartitionResult struct {
PartitionID int
Files []string
RowCount int64
Error error
}
// ParallelExporter 并行导出器
type ParallelExporter struct {
config ParallelExportConfig
}
// TimeRange 时间范围
type TimeRange struct {
Start time.Time
End time.Time
}
// NewParallelExporter 创建并行导出器
func NewParallelExporter(config ParallelExportConfig) *ParallelExporter {
if config.NumPartitions <= 0 {
config.NumPartitions = 10
}
if config.MaxConcurrency <= 0 {
config.MaxConcurrency = 5
}
return &ParallelExporter{config: config}
}
// Export 并行导出数据
// 策略按时间范围分片而不是按行ID避免大OFFSET问题
func (pe *ParallelExporter) Export() ([]string, int64, error) {
cfg := pe.config
logging.JSON("INFO", map[string]interface{}{
"event": "parallel_export_start",
"job_id": cfg.JobID,
"total_rows": cfg.TotalRows,
"num_partitions": cfg.NumPartitions,
"max_concurrency": cfg.MaxConcurrency,
"strategy": "parallel_export_without_offset",
})
// 直接执行普通的流式导出但使用游标分页已在StreamWithCursor中实现
// 游标分页不使用OFFSET而是基于主键的范围条件+排序,性能很好
// 这比分片导出更简单也避免了OFFSET问题
q := cfg.Query
args := make([]interface{}, len(cfg.Args))
copy(args, cfg.Args)
// 使用信号量控制并发数
semaphore := make(chan struct{}, cfg.MaxConcurrency)
var wg sync.WaitGroup
resultChan := make(chan PartitionResult, 1)
// 只需要启动一个任务利用StreamWithCursor内部的游标分页
// 游标分页已经很高效了,不需要外部分片
wg.Add(1)
go func() {
defer wg.Done()
semaphore <- struct{}{}
defer func() { <-semaphore }()
result := pe.exportWithCursor(q, args)
resultChan <- result
}()
// 等待任务完成
wg.Wait()
close(resultChan)
// 收集结果
var allFiles []string
var totalRows int64
for result := range resultChan {
if result.Error != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_error",
"job_id": cfg.JobID,
"error": result.Error.Error(),
"message": "使用游标分页导出失败",
})
return nil, 0, result.Error
}
allFiles = result.Files
totalRows = result.RowCount
}
logging.JSON("INFO", map[string]interface{}{
"event": "parallel_export_complete",
"job_id": cfg.JobID,
"total_rows": totalRows,
"total_files": len(allFiles),
"message": "并行导出完成使用游标分页避免OFFSET性能问题",
})
return allFiles, totalRows, nil
}
// exportWithCursor 使用游标分页方式导出数据
// 这是最高效的方法避免OFFSET导致的性能问题
func (pe *ParallelExporter) exportWithCursor(q string, args []interface{}) PartitionResult {
cfg := pe.config
result := PartitionResult{PartitionID: 0}
logging.JSON("INFO", map[string]interface{}{
"event": "export_with_cursor_start",
"job_id": cfg.JobID,
"message": "开始使用游标分页导出无OFFSET开销",
})
// 执行游标分页导出
count, files, err := StreamWithCursor(
cfg.DB,
q,
args,
NewCursorSQL(cfg.Datasource, cfg.MainTable),
10000, // 批次大小
cfg.Columns,
cfg.NewWriter,
cfg.Transform,
cfg.MaxRowsPerFile,
func(path string, size, rows int64) error {
if cfg.OnFileCreated != nil {
return cfg.OnFileCreated(cfg.JobID, path, size, rows)
}
return nil
},
func(totalRows int64) error {
if cfg.OnProgress != nil {
return cfg.OnProgress(cfg.JobID, totalRows)
}
return nil
},
)
if err != nil {
result.Error = err
if cfg.OnFailed != nil {
cfg.OnFailed(cfg.JobID, err)
}
return result
}
result.Files = files
result.RowCount = count
return result
}