package exporter import ( "database/sql" "server/internal/logging" "sync" "time" ) // ParallelExportConfig 并行导出配置 type ParallelExportConfig struct { DB *sql.DB Query string Args []interface{} Columns []string NumPartitions int // 分片数(默认10) MaxConcurrency int // 最大并发数(默认5) MaxRowsPerFile int64 // 单文件最大行数 TotalRows int64 // 总数据行数 MainTable string // 主表名 Datasource string // 数据源 NewWriter func() (RowWriter, error) Transform RowTransform OnProgress func(jobID uint64, totalRows int64) error OnFileCreated func(jobID uint64, path string, size, partRows int64) error OnFailed func(jobID uint64, err error) JobID uint64 } // PartitionResult 分片导出结果 type PartitionResult struct { PartitionID int Files []string RowCount int64 Error error } // ParallelExporter 并行导出器 type ParallelExporter struct { config ParallelExportConfig } // TimeRange 时间范围 type TimeRange struct { Start time.Time End time.Time } // NewParallelExporter 创建并行导出器 func NewParallelExporter(config ParallelExportConfig) *ParallelExporter { if config.NumPartitions <= 0 { config.NumPartitions = 10 } if config.MaxConcurrency <= 0 { config.MaxConcurrency = 5 } return &ParallelExporter{config: config} } // Export 并行导出数据 // 策略:按时间范围分片,而不是按行ID,避免大OFFSET问题 func (pe *ParallelExporter) Export() ([]string, int64, error) { cfg := pe.config logging.JSON("INFO", map[string]interface{}{ "event": "parallel_export_start", "job_id": cfg.JobID, "total_rows": cfg.TotalRows, "num_partitions": cfg.NumPartitions, "max_concurrency": cfg.MaxConcurrency, "strategy": "parallel_export_without_offset", }) // 直接执行普通的流式导出,但使用游标分页(已在StreamWithCursor中实现) // 游标分页不使用OFFSET,而是基于主键的范围条件+排序,性能很好 // 这比分片导出更简单,也避免了OFFSET问题 q := cfg.Query args := make([]interface{}, len(cfg.Args)) copy(args, cfg.Args) // 使用信号量控制并发数 semaphore := make(chan struct{}, cfg.MaxConcurrency) var wg sync.WaitGroup resultChan := make(chan PartitionResult, 1) // 只需要启动一个任务,利用StreamWithCursor内部的游标分页 // 游标分页已经很高效了,不需要外部分片 wg.Add(1) go func() { defer wg.Done() semaphore <- struct{}{} defer func() { <-semaphore }() result := pe.exportWithCursor(q, args) resultChan <- result }() // 等待任务完成 wg.Wait() close(resultChan) // 收集结果 var allFiles []string var totalRows int64 for result := range resultChan { if result.Error != nil { logging.JSON("ERROR", map[string]interface{}{ "event": "export_error", "job_id": cfg.JobID, "error": result.Error.Error(), "message": "使用游标分页导出失败", }) return nil, 0, result.Error } allFiles = result.Files totalRows = result.RowCount } logging.JSON("INFO", map[string]interface{}{ "event": "parallel_export_complete", "job_id": cfg.JobID, "total_rows": totalRows, "total_files": len(allFiles), "message": "并行导出完成(使用游标分页避免OFFSET性能问题)", }) return allFiles, totalRows, nil } // exportWithCursor 使用游标分页方式导出数据 // 这是最高效的方法,避免OFFSET导致的性能问题 func (pe *ParallelExporter) exportWithCursor(q string, args []interface{}) PartitionResult { cfg := pe.config result := PartitionResult{PartitionID: 0} logging.JSON("INFO", map[string]interface{}{ "event": "export_with_cursor_start", "job_id": cfg.JobID, "message": "开始使用游标分页导出(无OFFSET开销)", }) // 执行游标分页导出 count, files, err := StreamWithCursor( cfg.DB, q, args, NewCursorSQL(cfg.Datasource, cfg.MainTable), 10000, // 批次大小 cfg.Columns, cfg.NewWriter, cfg.Transform, cfg.MaxRowsPerFile, func(path string, size, rows int64) error { if cfg.OnFileCreated != nil { return cfg.OnFileCreated(cfg.JobID, path, size, rows) } return nil }, func(totalRows int64) error { if cfg.OnProgress != nil { return cfg.OnProgress(cfg.JobID, totalRows) } return nil }, ) if err != nil { result.Error = err if cfg.OnFailed != nil { cfg.OnFailed(cfg.JobID, err) } return result } result.Files = files result.RowCount = count return result }