diff --git a/server/internal/api/exports.go b/server/internal/api/exports.go index 18322b2..3021e9b 100644 --- a/server/internal/api/exports.go +++ b/server/internal/api/exports.go @@ -1062,7 +1062,7 @@ func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) { } } } - ok(w, r, map[string]interface{}{"id": d.ID, "template_id": d.TemplateID, "status": d.Status, "requested_by": d.RequestedBy, "file_format": d.FileFormat, "total_rows": d.TotalRows.Int64, "started_at": d.StartedAt.Time, "finished_at": d.FinishedAt.Time, "created_at": d.CreatedAt, "updated_at": d.UpdatedAt, "files": files, "eval_status": evalStatus, "eval_desc": desc}) + ok(w, r, map[string]interface{}{"id": d.ID, "template_id": d.TemplateID, "status": d.Status, "requested_by": d.RequestedBy, "file_format": d.FileFormat, "total_rows": repo.GetProgress(d.ID), "started_at": d.StartedAt.Time, "finished_at": d.FinishedAt.Time, "created_at": d.CreatedAt, "updated_at": d.UpdatedAt, "files": files, "eval_status": evalStatus, "eval_desc": desc}) } func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) { @@ -1617,7 +1617,7 @@ func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) { for _, it := range itemsRaw { id, tid, req := it.ID, it.TemplateID, it.RequestedBy status, fmtstr := it.Status, it.FileFormat - estimate, total := it.RowEstimate, it.TotalRows + estimate := it.RowEstimate createdAt, updatedAt := it.CreatedAt, it.UpdatedAt score, explainRaw := it.ExplainScore, it.ExplainJSON evalStatus := "通过" @@ -1692,7 +1692,7 @@ func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) { } } } - m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc} + m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": repo.GetProgress(id), "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc} items = append(items, m) } ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size}) diff --git a/server/internal/exporter/util.go b/server/internal/exporter/util.go index d9e05dd..c9289cf 100644 --- a/server/internal/exporter/util.go +++ b/server/internal/exporter/util.go @@ -2,6 +2,7 @@ package exporter import ( "archive/zip" + "compress/flate" "fmt" "io" "os" @@ -10,7 +11,7 @@ import ( "time" ) -// ZipFiles 将分片文件打包为zip并返回路径与大小,同时清理源xlsx分片文件 +// ZipFiles 将分片文件打包为压缩zip并返回路径与大小,同时清理源xlsx分片文件 func ZipFiles(jobID uint64, files []string) (string, int64) { baseDir := "storage/export" _ = os.MkdirAll(baseDir, 0755) @@ -21,12 +22,24 @@ func ZipFiles(jobID uint64, files []string) (string, int64) { } defer zf.Close() zw := zip.NewWriter(zf) + // 注册 DEFLATE 压缩器 + zw.RegisterCompressor(zip.Deflate, func(out io.Writer) (io.WriteCloser, error) { + w, err := flate.NewWriter(out, flate.DefaultCompression) + if err != nil { + return nil, err + } + return w, nil + }) for _, p := range files { f, err := os.Open(p) if err != nil { continue } - w, err := zw.Create(filepath.Base(p)) + // 创建压缩文件头,指定使用 DEFLATE 压缩 + w, err := zw.CreateHeader(&zip.FileHeader{ + Name: filepath.Base(p), + Method: zip.Deflate, // 使用 DEFLATE 压缩 + }) if err != nil { f.Close() continue diff --git a/server/internal/repo/export_repo.go b/server/internal/repo/export_repo.go index 86ddc84..8416b28 100644 --- a/server/internal/repo/export_repo.go +++ b/server/internal/repo/export_repo.go @@ -7,6 +7,7 @@ import ( "server/internal/constants" "server/internal/exporter" "server/internal/logging" + "sync" "time" ) @@ -20,6 +21,44 @@ func NewExportRepo() *ExportQueryRepo { return &ExportQueryRepo{} } +// ==================== 实时进度缓存 ==================== + +// ProgressCache 实时进度缓存 +type ProgressCache struct { + mu sync.RWMutex + progress map[uint64]int64 // jobID -> totalRows +} + +var ( + progressCache = &ProgressCache{ + progress: make(map[uint64]int64), + } +) + +// SetProgress 设置实时进度 +func SetProgress(jobID uint64, totalRows int64) { + progressCache.mu.Lock() + defer progressCache.mu.Unlock() + progressCache.progress[jobID] = totalRows +} + +// GetProgress 获取实时进度 +func GetProgress(jobID uint64) int64 { + progressCache.mu.RLock() + defer progressCache.mu.RUnlock() + if rows, ok := progressCache.progress[jobID]; ok { + return rows + } + return 0 +} + +// ClearProgress 清除所有进度缓存(可选,死了的休惑自动清理) +func ClearProgress(jobID uint64) { + progressCache.mu.Lock() + defer progressCache.mu.Unlock() + delete(progressCache.progress, jobID) +} + // ==================== SQL构建 ==================== // Build 构建SQL查询 @@ -289,6 +328,8 @@ func (r *ExportQueryRepo) MarkCompleted(metaDB *sql.DB, jobID uint64, totalRows if err != nil { logging.DBError("mark_completed", jobID, err) } + // 导出完成时清除缓存,释放内存 + ClearProgress(jobID) } // InsertJobFile 插入任务文件记录 @@ -464,40 +505,17 @@ func NewProgressTracker(jobID uint64, metaDB *sql.DB) *ProgressTracker { } } -// Update 更新进度,并在必要时同步到数据库 +// Update 更新进度到缓存(不同步数据库) func (pt *ProgressTracker) Update(totalRows int64) error { pt.totalRows = totalRows - - // 检查是否需要同步到数据库 - rowDiff := totalRows - pt.lastSyncRows - timeDiff := time.Since(pt.lastSyncTime).Milliseconds() - - // 满足任一条件就同步:行数差异超过阈值 或 时间超过限制 - if rowDiff >= pt.syncInterval || timeDiff > pt.timeLimitMS { - return pt.Sync() - } + // 立即写入缓存,前端可以实时查询 + SetProgress(pt.jobID, totalRows) return nil } -// Sync 强制同步当前进度到数据库 +// Sync 仅执行最终同步(应用于导出完成时) func (pt *ProgressTracker) Sync() error { - if pt.metaDB == nil { - return nil - } - - // 使用 GREATEST 防止进度倒退 - now := time.Now() - _, err := pt.metaDB.Exec( - `UPDATE export_jobs SET total_rows=GREATEST(COALESCE(total_rows,0), ?), updated_at=? WHERE id=?`, - pt.totalRows, now, pt.jobID, - ) - if err != nil { - logging.DBError("progress_tracker_sync", pt.jobID, err) - return err - } - - pt.lastSyncRows = pt.totalRows - pt.lastSyncTime = time.Now() + // 取消同步数据库操作,缓存已在 Update 中实时更新 return nil }