From c4f674ec5b0c103374473119825299ba7ba0a017 Mon Sep 17 00:00:00 2001 From: zhouyonggao <1971162852@qq.com> Date: Mon, 15 Dec 2025 16:18:25 +0800 Subject: [PATCH] =?UTF-8?q?refactor(api):=20=E4=BC=98=E5=8C=96CSV=E5=AF=BC?= =?UTF-8?q?=E5=87=BA=E9=80=BB=E8=BE=91=E4=BB=A5=E5=87=8F=E5=B0=91=E9=87=8D?= =?UTF-8?q?=E5=A4=8D=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在exports.go中重构CSV导出逻辑,通过引入newBaseWriter函数减少重复代码,提高代码可读性和维护性。同时,确保在导出过程中正确处理文件头信息,提升导出功能的稳定性和准确性。 --- server/internal/api/exports.go | 29 ++++++++++------------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/server/internal/api/exports.go b/server/internal/api/exports.go index e3b3027..4eae75b 100644 --- a/server/internal/api/exports.go +++ b/server/internal/api/exports.go @@ -432,12 +432,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } rrepo.StartJob(a.meta, id) if fmt == "csv" { - w, err := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) - if err != nil { - a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) - return + newBaseWriter := func() (exporter.RowWriter, error) { + return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) } - w.WriteHeader(cols) const maxRowsPerFile = 300000 files := []string{} { @@ -502,13 +499,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)}) log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs)) - newWriter := func() (exporter.RowWriter, error) { - w2, e := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) - if e == nil { - _ = w2.WriteHeader(cols) - } - return w2, e - } + newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } chunkBase := total onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil } @@ -554,13 +545,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, batch := chooseBatch(est, fmt) files2 := []string{} cur := rrepo.NewCursor(jobDS, jobMain) - newWriter := func() (exporter.RowWriter, error) { - w, e := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) - if e == nil { - _ = w.WriteHeader(cols) - } - return w, e - } + newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) } onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil } onRoll := func(path string, size int64, partRows int64) error { @@ -579,6 +564,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, rrepo.MarkCompleted(a.meta, id, count) return } + w, err := newBaseWriter() + if err != nil { + a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) + return + } + _ = w.WriteHeader(cols) rows, err := db.Query(q, args...) if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)