181 lines
4.6 KiB
Go
181 lines
4.6 KiB
Go
package exporter
|
||
|
||
import (
|
||
"database/sql"
|
||
"server/internal/logging"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
// ParallelExportConfig 并行导出配置
|
||
type ParallelExportConfig struct {
|
||
DB *sql.DB
|
||
Query string
|
||
Args []interface{}
|
||
Columns []string
|
||
NumPartitions int // 分片数(默认10)
|
||
MaxConcurrency int // 最大并发数(默认5)
|
||
MaxRowsPerFile int64 // 单文件最大行数
|
||
TotalRows int64 // 总数据行数
|
||
MainTable string // 主表名
|
||
Datasource string // 数据源
|
||
NewWriter func() (RowWriter, error)
|
||
Transform RowTransform
|
||
OnProgress func(jobID uint64, totalRows int64) error
|
||
OnFileCreated func(jobID uint64, path string, size, partRows int64) error
|
||
OnFailed func(jobID uint64, err error)
|
||
JobID uint64
|
||
}
|
||
|
||
// PartitionResult 分片导出结果
|
||
type PartitionResult struct {
|
||
PartitionID int
|
||
Files []string
|
||
RowCount int64
|
||
Error error
|
||
}
|
||
|
||
// ParallelExporter 并行导出器
|
||
type ParallelExporter struct {
|
||
config ParallelExportConfig
|
||
}
|
||
|
||
// TimeRange 时间范围
|
||
type TimeRange struct {
|
||
Start time.Time
|
||
End time.Time
|
||
}
|
||
|
||
// NewParallelExporter 创建并行导出器
|
||
func NewParallelExporter(config ParallelExportConfig) *ParallelExporter {
|
||
if config.NumPartitions <= 0 {
|
||
config.NumPartitions = 10
|
||
}
|
||
if config.MaxConcurrency <= 0 {
|
||
config.MaxConcurrency = 5
|
||
}
|
||
return &ParallelExporter{config: config}
|
||
}
|
||
|
||
// Export 并行导出数据
|
||
// 策略:按时间范围分片,而不是按行ID,避免大OFFSET问题
|
||
func (pe *ParallelExporter) Export() ([]string, int64, error) {
|
||
cfg := pe.config
|
||
|
||
logging.JSON("INFO", map[string]interface{}{
|
||
"event": "parallel_export_start",
|
||
"job_id": cfg.JobID,
|
||
"total_rows": cfg.TotalRows,
|
||
"num_partitions": cfg.NumPartitions,
|
||
"max_concurrency": cfg.MaxConcurrency,
|
||
"strategy": "parallel_export_without_offset",
|
||
})
|
||
|
||
// 直接执行普通的流式导出,但使用游标分页(已在StreamWithCursor中实现)
|
||
// 游标分页不使用OFFSET,而是基于主键的范围条件+排序,性能很好
|
||
// 这比分片导出更简单,也避免了OFFSET问题
|
||
|
||
q := cfg.Query
|
||
args := make([]interface{}, len(cfg.Args))
|
||
copy(args, cfg.Args)
|
||
|
||
// 使用信号量控制并发数
|
||
semaphore := make(chan struct{}, cfg.MaxConcurrency)
|
||
var wg sync.WaitGroup
|
||
resultChan := make(chan PartitionResult, 1)
|
||
|
||
// 只需要启动一个任务,利用StreamWithCursor内部的游标分页
|
||
// 游标分页已经很高效了,不需要外部分片
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
semaphore <- struct{}{}
|
||
defer func() { <-semaphore }()
|
||
|
||
result := pe.exportWithCursor(q, args)
|
||
resultChan <- result
|
||
}()
|
||
|
||
// 等待任务完成
|
||
wg.Wait()
|
||
close(resultChan)
|
||
|
||
// 收集结果
|
||
var allFiles []string
|
||
var totalRows int64
|
||
|
||
for result := range resultChan {
|
||
if result.Error != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "export_error",
|
||
"job_id": cfg.JobID,
|
||
"error": result.Error.Error(),
|
||
"message": "使用游标分页导出失败",
|
||
})
|
||
return nil, 0, result.Error
|
||
}
|
||
allFiles = result.Files
|
||
totalRows = result.RowCount
|
||
}
|
||
|
||
logging.JSON("INFO", map[string]interface{}{
|
||
"event": "parallel_export_complete",
|
||
"job_id": cfg.JobID,
|
||
"total_rows": totalRows,
|
||
"total_files": len(allFiles),
|
||
"message": "并行导出完成(使用游标分页避免OFFSET性能问题)",
|
||
})
|
||
|
||
return allFiles, totalRows, nil
|
||
}
|
||
|
||
// exportWithCursor 使用游标分页方式导出数据
|
||
// 这是最高效的方法,避免OFFSET导致的性能问题
|
||
func (pe *ParallelExporter) exportWithCursor(q string, args []interface{}) PartitionResult {
|
||
cfg := pe.config
|
||
result := PartitionResult{PartitionID: 0}
|
||
|
||
logging.JSON("INFO", map[string]interface{}{
|
||
"event": "export_with_cursor_start",
|
||
"job_id": cfg.JobID,
|
||
"message": "开始使用游标分页导出(无OFFSET开销)",
|
||
})
|
||
|
||
// 执行游标分页导出
|
||
count, files, err := StreamWithCursor(
|
||
cfg.DB,
|
||
q,
|
||
args,
|
||
NewCursorSQL(cfg.Datasource, cfg.MainTable),
|
||
10000, // 批次大小
|
||
cfg.Columns,
|
||
cfg.NewWriter,
|
||
cfg.Transform,
|
||
cfg.MaxRowsPerFile,
|
||
func(path string, size, rows int64) error {
|
||
if cfg.OnFileCreated != nil {
|
||
return cfg.OnFileCreated(cfg.JobID, path, size, rows)
|
||
}
|
||
return nil
|
||
},
|
||
func(totalRows int64) error {
|
||
if cfg.OnProgress != nil {
|
||
return cfg.OnProgress(cfg.JobID, totalRows)
|
||
}
|
||
return nil
|
||
},
|
||
)
|
||
|
||
if err != nil {
|
||
result.Error = err
|
||
if cfg.OnFailed != nil {
|
||
cfg.OnFailed(cfg.JobID, err)
|
||
}
|
||
return result
|
||
}
|
||
|
||
result.Files = files
|
||
result.RowCount = count
|
||
return result
|
||
}
|