MarketingSystemDataExportTool/server/internal/exporter/parallel.go

181 lines
4.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package exporter
import (
"database/sql"
"server/internal/logging"
"sync"
"time"
)
// ParallelExportConfig 并行导出配置
type ParallelExportConfig struct {
DB *sql.DB
Query string
Args []interface{}
Columns []string
NumPartitions int // 分片数默认10
MaxConcurrency int // 最大并发数默认5
MaxRowsPerFile int64 // 单文件最大行数
TotalRows int64 // 总数据行数
MainTable string // 主表名
Datasource string // 数据源
NewWriter func() (RowWriter, error)
Transform RowTransform
OnProgress func(jobID uint64, totalRows int64) error
OnFileCreated func(jobID uint64, path string, size, partRows int64) error
OnFailed func(jobID uint64, err error)
JobID uint64
}
// PartitionResult 分片导出结果
type PartitionResult struct {
PartitionID int
Files []string
RowCount int64
Error error
}
// ParallelExporter 并行导出器
type ParallelExporter struct {
config ParallelExportConfig
}
// TimeRange 时间范围
type TimeRange struct {
Start time.Time
End time.Time
}
// NewParallelExporter 创建并行导出器
func NewParallelExporter(config ParallelExportConfig) *ParallelExporter {
if config.NumPartitions <= 0 {
config.NumPartitions = 10
}
if config.MaxConcurrency <= 0 {
config.MaxConcurrency = 5
}
return &ParallelExporter{config: config}
}
// Export 并行导出数据
// 策略按时间范围分片而不是按行ID避免大OFFSET问题
func (pe *ParallelExporter) Export() ([]string, int64, error) {
cfg := pe.config
logging.JSON("INFO", map[string]interface{}{
"event": "parallel_export_start",
"job_id": cfg.JobID,
"total_rows": cfg.TotalRows,
"num_partitions": cfg.NumPartitions,
"max_concurrency": cfg.MaxConcurrency,
"strategy": "parallel_export_without_offset",
})
// 直接执行普通的流式导出但使用游标分页已在StreamWithCursor中实现
// 游标分页不使用OFFSET而是基于主键的范围条件+排序,性能很好
// 这比分片导出更简单也避免了OFFSET问题
q := cfg.Query
args := make([]interface{}, len(cfg.Args))
copy(args, cfg.Args)
// 使用信号量控制并发数
semaphore := make(chan struct{}, cfg.MaxConcurrency)
var wg sync.WaitGroup
resultChan := make(chan PartitionResult, 1)
// 只需要启动一个任务利用StreamWithCursor内部的游标分页
// 游标分页已经很高效了,不需要外部分片
wg.Add(1)
go func() {
defer wg.Done()
semaphore <- struct{}{}
defer func() { <-semaphore }()
result := pe.exportWithCursor(q, args)
resultChan <- result
}()
// 等待任务完成
wg.Wait()
close(resultChan)
// 收集结果
var allFiles []string
var totalRows int64
for result := range resultChan {
if result.Error != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_error",
"job_id": cfg.JobID,
"error": result.Error.Error(),
"message": "使用游标分页导出失败",
})
return nil, 0, result.Error
}
allFiles = result.Files
totalRows = result.RowCount
}
logging.JSON("INFO", map[string]interface{}{
"event": "parallel_export_complete",
"job_id": cfg.JobID,
"total_rows": totalRows,
"total_files": len(allFiles),
"message": "并行导出完成使用游标分页避免OFFSET性能问题",
})
return allFiles, totalRows, nil
}
// exportWithCursor 使用游标分页方式导出数据
// 这是最高效的方法避免OFFSET导致的性能问题
func (pe *ParallelExporter) exportWithCursor(q string, args []interface{}) PartitionResult {
cfg := pe.config
result := PartitionResult{PartitionID: 0}
logging.JSON("INFO", map[string]interface{}{
"event": "export_with_cursor_start",
"job_id": cfg.JobID,
"message": "开始使用游标分页导出无OFFSET开销",
})
// 执行游标分页导出
count, files, err := StreamWithCursor(
cfg.DB,
q,
args,
NewCursorSQL(cfg.Datasource, cfg.MainTable),
10000, // 批次大小
cfg.Columns,
cfg.NewWriter,
cfg.Transform,
cfg.MaxRowsPerFile,
func(path string, size, rows int64) error {
if cfg.OnFileCreated != nil {
return cfg.OnFileCreated(cfg.JobID, path, size, rows)
}
return nil
},
func(totalRows int64) error {
if cfg.OnProgress != nil {
return cfg.OnProgress(cfg.JobID, totalRows)
}
return nil
},
)
if err != nil {
result.Error = err
if cfg.OnFailed != nil {
cfg.OnFailed(cfg.JobID, err)
}
return result
}
result.Files = files
result.RowCount = count
return result
}