refactor(export): 优化导出任务进度更新,减少数据库写入

- 引入内存进度跟踪器ProgressTracker管理进度状态
- 将多处调用rrepo.UpdateProgress替换为ProgressTracker.Update
- 增加周期性和时间间隔同步限制,降低数据库写入频率
- 实现导出完成时的最终同步以保证进度准确更新
- 统一管理各类型导出任务的进度更新逻辑
- 保持导出流程原有功能不变,提高性能和稳定性
This commit is contained in:
zhouyonggao 2025-12-19 17:49:41 +08:00
parent 48979b41d1
commit d96271edf9
2 changed files with 94 additions and 7 deletions

View File

@ -426,6 +426,8 @@ func (a *ExportsAPI) RunJobByID(jobID uint64) {
}
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fileFormat string) {
// 创建进度跟踪器自动管理数据库写入
progressTracker := repo.NewProgressTracker(id, a.Meta)
defer func() {
if r := recover(); r != nil {
logging.JSON("ERROR", map[string]interface{}{
@ -548,7 +550,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, chunkBase+totalRows); return nil }
// 使用进度跟踪器减少数据库写入
progressTracker := repo.NewProgressTracker(id, a.Meta)
onProgress := func(totalRows int64) error { return progressTracker.Update(chunkBase + totalRows) }
onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path)
rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size)
@ -575,7 +579,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
return
}
total += cnt
rrepo.UpdateProgress(a.Meta, id, total)
progressTracker.Update(total)
}
if total == 0 {
total = rrepo.Count(db, q, args)
@ -583,6 +587,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
if len(files) >= 1 {
rrepo.ZipAndRecord(a.Meta, id, files, total)
}
progressTracker.FinalSync() // 分块处理完成时最终同步
rrepo.MarkCompleted(a.Meta, id, total)
return
}
@ -607,7 +612,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
cur := rrepo.NewCursor(jobDS, jobMain)
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) }
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, totalRows); return nil }
onProgress := func(totalRows int64) error { return progressTracker.Update(totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files2 = append(files2, path)
rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size)
@ -636,6 +641,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
if len(files2) >= 1 {
rrepo.ZipAndRecord(a.Meta, id, files2, count)
}
progressTracker.FinalSync() // CSV直接导出完成时最终同步
rrepo.MarkCompleted(a.Meta, id, count)
return
}
@ -714,7 +720,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
// 进度回调按全局累计行数更新,避免跨分片出现数值回退
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, chunkBase+totalRows); return nil }
onProgress := func(totalRows int64) error { return progressTracker.Update(chunkBase + totalRows) }
onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path)
rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size)
@ -733,7 +739,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
return
}
total += cnt
rrepo.UpdateProgress(a.Meta, id, total)
progressTracker.Update(total)
}
if total == 0 {
total = rrepo.Count(db, q, args)
@ -741,6 +747,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
if len(files) >= 1 {
rrepo.ZipAndRecord(a.Meta, id, files, total)
}
progressTracker.FinalSync() // 分块处理完成时最终同步
rrepo.MarkCompleted(a.Meta, id, total)
return
}
@ -770,7 +777,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
return
}
_ = x.WriteHeader(cols)
rrepo.UpdateProgress(a.Meta, id, 0)
progressTracker.Update(0)
// 记录查询执行前的参数类型信息
argTypes := make([]string, len(args))
for i, arg := range args {
@ -879,7 +886,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
count++
tick++
if tick%200 == 0 {
rrepo.UpdateProgress(a.Meta, id, count)
progressTracker.Update(count)
}
}
// 如果查询到了数据,记录一条包含首行数据的日志,便于确认导出前 SQL 是否返回结果
@ -915,6 +922,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
a.Meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
rrepo.ZipAndRecord(a.Meta, id, []string{p}, count)
progressTracker.FinalSync() // XLSX直接导出完成时最终同步
rrepo.MarkCompleted(a.Meta, id, count)
return
}

View File

@ -440,6 +440,85 @@ func (r *ExportQueryRepo) ResetJobProgress(metaDB *sql.DB, jobID uint64) {
}
}
// ProgressTracker 内存中的进度跟踪器,减少数据库写入
type ProgressTracker struct {
jobID uint64
totalRows int64
lastSyncRows int64
lastSyncTime time.Time
metaDB *sql.DB
syncInterval int64 // 每多少行同步一次默认10000
timeLimitMS int64 // 最长多久同步一次毫秒默认5000
}
// NewProgressTracker 创建进度跟踪器
func NewProgressTracker(jobID uint64, metaDB *sql.DB) *ProgressTracker {
return &ProgressTracker{
jobID: jobID,
totalRows: 0,
lastSyncRows: 0,
lastSyncTime: time.Now(),
metaDB: metaDB,
syncInterval: 10000, // 每10000行同步一次
timeLimitMS: 5000, // 最长5秒同步一次
}
}
// Update 更新进度,并在必要时同步到数据库
func (pt *ProgressTracker) Update(totalRows int64) error {
pt.totalRows = totalRows
// 检查是否需要同步到数据库
rowDiff := totalRows - pt.lastSyncRows
timeDiff := time.Since(pt.lastSyncTime).Milliseconds()
// 满足任一条件就同步:行数差异超过阈值 或 时间超过限制
if rowDiff >= pt.syncInterval || timeDiff > pt.timeLimitMS {
return pt.Sync()
}
return nil
}
// Sync 强制同步当前进度到数据库
func (pt *ProgressTracker) Sync() error {
if pt.metaDB == nil {
return nil
}
// 使用 GREATEST 防止进度倒退
now := time.Now()
_, err := pt.metaDB.Exec(
`UPDATE export_jobs SET total_rows=GREATEST(COALESCE(total_rows,0), ?), updated_at=? WHERE id=?`,
pt.totalRows, now, pt.jobID,
)
if err != nil {
logging.DBError("progress_tracker_sync", pt.jobID, err)
return err
}
pt.lastSyncRows = pt.totalRows
pt.lastSyncTime = time.Now()
return nil
}
// FinalSync 导出完成时的最终同步
func (pt *ProgressTracker) FinalSync() error {
if pt.metaDB == nil {
return nil
}
now := time.Now()
_, err := pt.metaDB.Exec(
`UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id=?`,
pt.totalRows, now, pt.jobID,
)
if err != nil {
logging.DBError("progress_tracker_final_sync", pt.jobID, err)
return err
}
return nil
}
// ListJobFiles 获取任务文件列表
func (r *ExportQueryRepo) ListJobFiles(metaDB *sql.DB, jobID string) ([]JobFile, error) {
rows, err := metaDB.Query(