refactor(api): 优化CSV导出逻辑以减少重复代码

在exports.go中重构CSV导出逻辑,通过引入newBaseWriter函数减少重复代码,提高代码可读性和维护性。同时,确保在导出过程中正确处理文件头信息,提升导出功能的稳定性和准确性。
This commit is contained in:
zhouyonggao 2025-12-15 16:18:25 +08:00
parent 0e32026f46
commit c4f674ec5b
1 changed files with 10 additions and 19 deletions

View File

@ -432,12 +432,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
}
rrepo.StartJob(a.meta, id)
if fmt == "csv" {
w, err := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
newBaseWriter := func() (exporter.RowWriter, error) {
return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
}
w.WriteHeader(cols)
const maxRowsPerFile = 300000
files := []string{}
{
@ -502,13 +499,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
}
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
newWriter := func() (exporter.RowWriter, error) {
w2, e := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
if e == nil {
_ = w2.WriteHeader(cols)
}
return w2, e
}
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
@ -554,13 +545,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
batch := chooseBatch(est, fmt)
files2 := []string{}
cur := rrepo.NewCursor(jobDS, jobMain)
newWriter := func() (exporter.RowWriter, error) {
w, e := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
if e == nil {
_ = w.WriteHeader(cols)
}
return w, e
}
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) }
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
@ -579,6 +564,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
rrepo.MarkCompleted(a.meta, id, count)
return
}
w, err := newBaseWriter()
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
_ = w.WriteHeader(cols)
rows, err := db.Query(q, args...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)