From d96271edf9180eeb1c0c16dcdce5660dcbcc4f8a Mon Sep 17 00:00:00 2001 From: zhouyonggao <1971162852@qq.com> Date: Fri, 19 Dec 2025 17:49:41 +0800 Subject: [PATCH] =?UTF-8?q?refactor(export):=20=E4=BC=98=E5=8C=96=E5=AF=BC?= =?UTF-8?q?=E5=87=BA=E4=BB=BB=E5=8A=A1=E8=BF=9B=E5=BA=A6=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=EF=BC=8C=E5=87=8F=E5=B0=91=E6=95=B0=E6=8D=AE=E5=BA=93=E5=86=99?= =?UTF-8?q?=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 引入内存进度跟踪器ProgressTracker管理进度状态 - 将多处调用rrepo.UpdateProgress替换为ProgressTracker.Update - 增加周期性和时间间隔同步限制,降低数据库写入频率 - 实现导出完成时的最终同步以保证进度准确更新 - 统一管理各类型导出任务的进度更新逻辑 - 保持导出流程原有功能不变,提高性能和稳定性 --- server/internal/api/exports.go | 22 +++++--- server/internal/repo/export_repo.go | 79 +++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 7 deletions(-) diff --git a/server/internal/api/exports.go b/server/internal/api/exports.go index e96ba0a..18322b2 100644 --- a/server/internal/api/exports.go +++ b/server/internal/api/exports.go @@ -426,6 +426,8 @@ func (a *ExportsAPI) RunJobByID(jobID uint64) { } func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fileFormat string) { + // 创建进度跟踪器自动管理数据库写入 + progressTracker := repo.NewProgressTracker(id, a.Meta) defer func() { if r := recover(); r != nil { logging.JSON("ERROR", map[string]interface{}{ @@ -548,7 +550,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } chunkBase := total - onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, chunkBase+totalRows); return nil } + // 使用进度跟踪器减少数据库写入 + progressTracker := repo.NewProgressTracker(id, a.Meta) + onProgress := func(totalRows int64) error { return progressTracker.Update(chunkBase + totalRows) } onRoll := func(path string, size int64, partRows int64) error { files = append(files, path) rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size) @@ -575,7 +579,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, return } total += cnt - rrepo.UpdateProgress(a.Meta, id, total) + progressTracker.Update(total) } if total == 0 { total = rrepo.Count(db, q, args) @@ -583,6 +587,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, if len(files) >= 1 { rrepo.ZipAndRecord(a.Meta, id, files, total) } + progressTracker.FinalSync() // 分块处理完成时最终同步 rrepo.MarkCompleted(a.Meta, id, total) return } @@ -607,7 +612,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, cur := rrepo.NewCursor(jobDS, jobMain) newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) } - onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, totalRows); return nil } + onProgress := func(totalRows int64) error { return progressTracker.Update(totalRows); return nil } onRoll := func(path string, size int64, partRows int64) error { files2 = append(files2, path) rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size) @@ -636,6 +641,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, if len(files2) >= 1 { rrepo.ZipAndRecord(a.Meta, id, files2, count) } + progressTracker.FinalSync() // CSV直接导出完成时最终同步 rrepo.MarkCompleted(a.Meta, id, count) return } @@ -714,7 +720,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } // 进度回调按全局累计行数更新,避免跨分片出现数值回退 chunkBase := total - onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.Meta, id, chunkBase+totalRows); return nil } + onProgress := func(totalRows int64) error { return progressTracker.Update(chunkBase + totalRows) } onRoll := func(path string, size int64, partRows int64) error { files = append(files, path) rrepo.InsertJobFile(a.Meta, id, path, "", partRows, size) @@ -733,7 +739,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, return } total += cnt - rrepo.UpdateProgress(a.Meta, id, total) + progressTracker.Update(total) } if total == 0 { total = rrepo.Count(db, q, args) @@ -741,6 +747,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, if len(files) >= 1 { rrepo.ZipAndRecord(a.Meta, id, files, total) } + progressTracker.FinalSync() // 分块处理完成时最终同步 rrepo.MarkCompleted(a.Meta, id, total) return } @@ -770,7 +777,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, return } _ = x.WriteHeader(cols) - rrepo.UpdateProgress(a.Meta, id, 0) + progressTracker.Update(0) // 记录查询执行前的参数类型信息 argTypes := make([]string, len(args)) for i, arg := range args { @@ -879,7 +886,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, count++ tick++ if tick%200 == 0 { - rrepo.UpdateProgress(a.Meta, id, count) + progressTracker.Update(count) } } // 如果查询到了数据,记录一条包含首行数据的日志,便于确认导出前 SQL 是否返回结果 @@ -915,6 +922,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()}) a.Meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now()) rrepo.ZipAndRecord(a.Meta, id, []string{p}, count) + progressTracker.FinalSync() // XLSX直接导出完成时最终同步 rrepo.MarkCompleted(a.Meta, id, count) return } diff --git a/server/internal/repo/export_repo.go b/server/internal/repo/export_repo.go index f9e860e..467b63c 100644 --- a/server/internal/repo/export_repo.go +++ b/server/internal/repo/export_repo.go @@ -440,6 +440,85 @@ func (r *ExportQueryRepo) ResetJobProgress(metaDB *sql.DB, jobID uint64) { } } +// ProgressTracker 内存中的进度跟踪器,减少数据库写入 +type ProgressTracker struct { + jobID uint64 + totalRows int64 + lastSyncRows int64 + lastSyncTime time.Time + metaDB *sql.DB + syncInterval int64 // 每多少行同步一次(默认10000) + timeLimitMS int64 // 最长多久同步一次(毫秒,默认5000) +} + +// NewProgressTracker 创建进度跟踪器 +func NewProgressTracker(jobID uint64, metaDB *sql.DB) *ProgressTracker { + return &ProgressTracker{ + jobID: jobID, + totalRows: 0, + lastSyncRows: 0, + lastSyncTime: time.Now(), + metaDB: metaDB, + syncInterval: 10000, // 每10000行同步一次 + timeLimitMS: 5000, // 最长5秒同步一次 + } +} + +// Update 更新进度,并在必要时同步到数据库 +func (pt *ProgressTracker) Update(totalRows int64) error { + pt.totalRows = totalRows + + // 检查是否需要同步到数据库 + rowDiff := totalRows - pt.lastSyncRows + timeDiff := time.Since(pt.lastSyncTime).Milliseconds() + + // 满足任一条件就同步:行数差异超过阈值 或 时间超过限制 + if rowDiff >= pt.syncInterval || timeDiff > pt.timeLimitMS { + return pt.Sync() + } + return nil +} + +// Sync 强制同步当前进度到数据库 +func (pt *ProgressTracker) Sync() error { + if pt.metaDB == nil { + return nil + } + + // 使用 GREATEST 防止进度倒退 + now := time.Now() + _, err := pt.metaDB.Exec( + `UPDATE export_jobs SET total_rows=GREATEST(COALESCE(total_rows,0), ?), updated_at=? WHERE id=?`, + pt.totalRows, now, pt.jobID, + ) + if err != nil { + logging.DBError("progress_tracker_sync", pt.jobID, err) + return err + } + + pt.lastSyncRows = pt.totalRows + pt.lastSyncTime = time.Now() + return nil +} + +// FinalSync 导出完成时的最终同步 +func (pt *ProgressTracker) FinalSync() error { + if pt.metaDB == nil { + return nil + } + + now := time.Now() + _, err := pt.metaDB.Exec( + `UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id=?`, + pt.totalRows, now, pt.jobID, + ) + if err != nil { + logging.DBError("progress_tracker_final_sync", pt.jobID, err) + return err + } + return nil +} + // ListJobFiles 获取任务文件列表 func (r *ExportQueryRepo) ListJobFiles(metaDB *sql.DB, jobID string) ([]JobFile, error) { rows, err := metaDB.Query(