diff --git a/server/internal/api/exports.go b/server/internal/api/exports.go index e3b3027..4eae75b 100644 --- a/server/internal/api/exports.go +++ b/server/internal/api/exports.go @@ -432,12 +432,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } rrepo.StartJob(a.meta, id) if fmt == "csv" { - w, err := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) - if err != nil { - a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) - return + newBaseWriter := func() (exporter.RowWriter, error) { + return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) } - w.WriteHeader(cols) const maxRowsPerFile = 300000 files := []string{} { @@ -502,13 +499,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)}) log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs)) - newWriter := func() (exporter.RowWriter, error) { - w2, e := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) - if e == nil { - _ = w2.WriteHeader(cols) - } - return w2, e - } + newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } chunkBase := total onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil } @@ -554,13 +545,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, batch := chooseBatch(est, fmt) files2 := []string{} cur := rrepo.NewCursor(jobDS, jobMain) - newWriter := func() (exporter.RowWriter, error) { - w, e := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) - if e == nil { - _ = w.WriteHeader(cols) - } - return w, e - } + newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) } onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil } onRoll := func(path string, size int64, partRows int64) error { @@ -579,6 +564,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, rrepo.MarkCompleted(a.meta, id, count) return } + w, err := newBaseWriter() + if err != nil { + a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) + return + } + _ = w.WriteHeader(cols) rows, err := db.Query(q, args...) if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)