package api import ( "archive/zip" "database/sql" "encoding/json" "fmt" "io" "log" "marketing-system-data-tool/server/internal/exporter" "math/big" "net/http" "os" "path/filepath" "strconv" "strings" "time" ) type ExportsAPI struct { meta *sql.DB marketing *sql.DB } func ExportsHandler(meta, marketing *sql.DB) http.Handler { api := &ExportsAPI{meta: meta, marketing: marketing} return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { p := strings.TrimPrefix(r.URL.Path, "/api/exports") if r.Method == http.MethodPost && p == "" { api.create(w, r) return } if r.Method == http.MethodGet && p == "" { api.list(w, r) return } if strings.HasPrefix(p, "/") { id := strings.TrimPrefix(p, "/") if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") { if strings.HasSuffix(p, "/sql") { id = strings.TrimSuffix(id, "/sql") api.getSQL(w, r, id) return } api.get(w, r, id) return } if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") { id = strings.TrimSuffix(id, "/download") api.download(w, r, id) return } if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") { id = strings.TrimSuffix(id, "/cancel") api.cancel(w, r, id) return } } w.WriteHeader(http.StatusNotFound) }) } func (a *ExportsAPI) ensureOwnerColumn() { // Try to add owner_id column if not exists; ignore errors _, _ = a.meta.Exec("ALTER TABLE export_jobs ADD COLUMN owner_id BIGINT UNSIGNED NOT NULL DEFAULT 0") } type ExportPayload struct { TemplateID uint64 `json:"template_id"` RequestedBy uint64 `json:"requested_by"` Permission map[string]interface{} `json:"permission"` Options map[string]interface{} `json:"options"` FileFormat string `json:"file_format"` Filters map[string]interface{} `json:"filters"` Datasource string `json:"datasource"` } func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { a.ensureOwnerColumn() b, _ := io.ReadAll(r.Body) var p ExportPayload json.Unmarshal(b, &p) r = WithPayload(r, p) var main string var ds string var fields []byte log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), "SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", []interface{}{p.TemplateID}) row := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", p.TemplateID) err := row.Scan(&ds, &main, &fields) if err != nil { fail(w, r, http.StatusBadRequest, "invalid template") return } if p.Datasource != "" { ds = p.Datasource } var fs []string json.Unmarshal(fields, &fs) wl := whitelist() req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: p.Filters} q, args, err := exporter.BuildSQL(req, wl) if err != nil { r = WithSQL(r, q) fail(w, r, http.StatusBadRequest, err.Error()) return } r = WithSQL(r, q) dataDB := a.selectDataDB(ds) expRows, score, err := exporter.RunExplain(dataDB, q, args) if err != nil { fail(w, r, http.StatusBadRequest, err.Error()) return } const passThreshold = 60 if score < passThreshold { fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d,请优化索引或缩小查询范围", score)) return } var estimate int64 func() { idx := strings.Index(q, " FROM ") if idx > 0 { cq := "SELECT COUNT(1)" + q[idx:] row := dataDB.QueryRow(cq, args...) var cnt int64 if err := row.Scan(&cnt); err == nil { estimate = cnt return } } for _, r := range expRows { if r.Table.Valid && r.Table.String == "order" && r.Rows.Valid { estimate = r.Rows.Int64 break } if r.Rows.Valid { estimate += r.Rows.Int64 } } }() labels := fieldLabels() hdrs := make([]string, len(fs)) for i, tf := range fs { if v, ok := labels[tf]; ok { hdrs[i] = v } else { hdrs[i] = tf } } // owner from query userId if provided owner := uint64(0) if uidStr := r.URL.Query().Get("userId"); uidStr != "" { if n, err := strconv.ParseUint(uidStr, 10, 64); err == nil { owner = n } } ejSQL := "INSERT INTO export_jobs (template_id, status, requested_by, owner_id, permission_scope_json, filters_json, options_json, explain_json, explain_score, row_estimate, file_format, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)" ejArgs := []interface{}{p.TemplateID, "queued", p.RequestedBy, owner, toJSON(p.Permission), toJSON(p.Filters), toJSON(p.Options), toJSON(expRows), score, estimate, p.FileFormat, time.Now(), time.Now()} log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), ejSQL, ejArgs) res, err := a.meta.Exec(ejSQL, ejArgs...) if err != nil { fail(w, r, http.StatusInternalServerError, err.Error()) return } id, _ := res.LastInsertId() go a.runJob(uint64(id), dataDB, q, args, fs, hdrs, p.FileFormat) ok(w, r, map[string]interface{}{"id": id}) } func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fmt string) { log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, started_at=? WHERE id= ?", []interface{}{"running", time.Now(), id}) a.meta.Exec("UPDATE export_jobs SET status=?, started_at=?, updated_at=? WHERE id= ?", "running", time.Now(), time.Now(), id) if fmt == "csv" { w, err := exporter.NewCSVWriter("storage", "export") if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) return } w.WriteHeader(cols) const maxRowsPerFile = 300000 files := []string{} { var tplID uint64 var filtersJSON []byte row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) _ = row.Scan(&tplID, &filtersJSON) var main string var fieldsJSON []byte tr := a.meta.QueryRow("SELECT main_table, fields_json FROM export_templates WHERE id=?", tplID) _ = tr.Scan(&main, &fieldsJSON) var fs []string var fl map[string]interface{} json.Unmarshal(fieldsJSON, &fs) json.Unmarshal(filtersJSON, &fl) wl := whitelist() var chunks [][2]string if v, ok := fl["create_time_between"]; ok { if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { chunks = splitByDays(toString(arr[0]), toString(arr[1]), 10) } if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 { chunks = splitByDays(arrs[0], arrs[1], 10) } } if len(chunks) > 0 { out := make([]interface{}, len(cols)) dest := make([]interface{}, len(cols)) for i := range out { dest[i] = &out[i] } var count int64 var partCount int64 var tick int64 for _, rg := range chunks { fl["create_time_between"] = []string{rg[0], rg[1]} req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: fl} cq, cargs, err := exporter.BuildSQL(req, wl) if err != nil { continue } batch := 1000 for off := 0; ; off += batch { sub := "SELECT * FROM (" + cq + ") AS sub LIMIT ? OFFSET ?" args2 := append(append([]interface{}{}, cargs...), batch, off) rows2, err := db.Query(sub, args2...) if err != nil { break } fetched := false for rows2.Next() { fetched = true if err := rows2.Scan(dest...); err != nil { rows2.Close() a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id?", "failed", time.Now(), id) return } vals := make([]string, len(cols)) for i := range out { if b, ok := out[i].([]byte); ok { vals[i] = string(b) } else if out[i] == nil { vals[i] = "" } else { vals[i] = toString(out[i]) } } vals = transformRow(fs, vals) vals = transformRow(fields, vals) vals = transformRow(fields, vals) vals = transformRow(fields, vals) w.WriteRow(vals) count++ partCount++ tick++ if tick%50 == 0 { a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id) } if partCount >= maxRowsPerFile { path, size, _ := w.Close() files = append(files, path) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now()) w, err = exporter.NewCSVWriter("storage", "export") if err != nil { rows2.Close() a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) return } w.WriteHeader(cols) partCount = 0 } } rows2.Close() if !fetched { break } } } path, size, _ := w.Close() if partCount > 0 || len(files) == 0 { files = append(files, path) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now()) } if count == 0 { row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...) var c int64 _ = row.Scan(&c) count = c } if len(files) >= 1 { zipPath, zipSize := createZip(id, files) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now()) } a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id) return } } log.Printf("job_id=%d sql=%s args=%v", id, q, args) // batched cursor queries, split workbook per 300k rows { const maxRowsPerFile = 300000 out := make([]interface{}, len(cols)) dest := make([]interface{}, len(cols)) for i := range out { dest[i] = &out[i] } var count int64 var partCount int64 var tick int64 batch := 1000 files2 := []string{} for off := 0; ; off += batch { sub := "SELECT * FROM (" + q + ") AS sub LIMIT ? OFFSET ?" args2 := append(append([]interface{}{}, args...), batch, off) rows3, err := db.Query(sub, args2...) if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) return } fetched := false for rows3.Next() { fetched = true if err := rows3.Scan(dest...); err != nil { rows3.Close() a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id?", "failed", time.Now(), id) return } vals := make([]string, len(cols)) for i := range out { if b, ok := out[i].([]byte); ok { vals[i] = string(b) } else if out[i] == nil { vals[i] = "" } else { vals[i] = toString(out[i]) } } w.WriteRow(vals) count++ partCount++ tick++ if tick%50 == 0 { a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id) } if partCount >= maxRowsPerFile { path2, size2, _ := w.Close() files2 = append(files2, path2) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path2, partCount, size2, time.Now(), time.Now()) w, err = exporter.NewCSVWriter("storage", "export") if err != nil { rows3.Close() a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) return } w.WriteHeader(cols) partCount = 0 } } rows3.Close() if !fetched { break } } path, size, _ := w.Close() if partCount > 0 || len(files2) == 0 { files2 = append(files2, path) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now()) } if count == 0 { row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...) var c int64 _ = row.Scan(&c) count = c } if len(files2) >= 1 { zipPath, zipSize := createZip(id, files2) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now()) } a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id) return } // batched cursor queries, 1000 rows per page, file split at 300k { const maxRowsPerFile = 300000 files2 := []string{} out := make([]interface{}, len(cols)) dest := make([]interface{}, len(cols)) for i := range out { dest[i] = &out[i] } var count int64 var partCount int64 var tick int64 batch := 1000 for off := 0; ; off += batch { sub := "SELECT * FROM (" + q + ") AS sub LIMIT ? OFFSET ?" args2 := append(append([]interface{}{}, args...), batch, off) rows3, err := db.Query(sub, args2...) if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) return } fetched := false for rows3.Next() { fetched = true if err := rows3.Scan(dest...); err != nil { rows3.Close() a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id?", "failed", time.Now(), id) return } vals := make([]string, len(cols)) for i := range out { if b, ok := out[i].([]byte); ok { vals[i] = string(b) } else if out[i] == nil { vals[i] = "" } else { vals[i] = toString(out[i]) } } w.WriteRow(vals) count++ partCount++ tick++ if tick%50 == 0 { a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id) } if partCount >= maxRowsPerFile { path, size, _ := w.Close() files2 = append(files2, path) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now()) w, err = exporter.NewCSVWriter("storage", "export") if err != nil { rows3.Close() a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) return } w.WriteHeader(cols) partCount = 0 } } rows3.Close() if !fetched { break } } path, size, _ := w.Close() if partCount > 0 || len(files2) == 0 { files2 = append(files2, path) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now()) } if count == 0 { row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...) var c int64 _ = row.Scan(&c) count = c } if len(files2) >= 1 { zipPath, zipSize := createZip(id, files2) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now()) } a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id) return } rows, err := db.Query(q, args...) if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) return } defer rows.Close() out := make([]interface{}, len(cols)) dest := make([]interface{}, len(cols)) for i := range out { dest[i] = &out[i] } var count int64 var tick int64 for rows.Next() { if err := rows.Scan(dest...); err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id) return } vals := make([]string, len(cols)) for i := range out { if b, ok := out[i].([]byte); ok { vals[i] = string(b) } else if out[i] == nil { vals[i] = "" } else { vals[i] = toString(out[i]) } } w.WriteRow(vals) count++ tick++ if tick%50 == 0 { a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id) } } path, size, _ := w.Close() log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, path, count, size, time.Now(), time.Now()}) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, count, size, time.Now(), time.Now()) log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", []interface{}{"completed", time.Now(), count, time.Now(), id}) a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id) return } if fmt == "xlsx" { const maxRowsPerFile = 300000 files := []string{} x, path, err := exporter.NewXLSXWriter("storage", "export", "Sheet1") if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) return } x.WriteHeader(cols) { var tplID uint64 var filtersJSON []byte row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) _ = row.Scan(&tplID, &filtersJSON) var main string var fieldsJSON []byte tr := a.meta.QueryRow("SELECT main_table, fields_json FROM export_templates WHERE id=?", tplID) _ = tr.Scan(&main, &fieldsJSON) var fs []string var fl map[string]interface{} json.Unmarshal(fieldsJSON, &fs) json.Unmarshal(filtersJSON, &fl) wl := whitelist() var chunks [][2]string if v, ok := fl["create_time_between"]; ok { if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { chunks = splitByDays(toString(arr[0]), toString(arr[1]), 10) } if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 { chunks = splitByDays(arrs[0], arrs[1], 10) } } if len(chunks) > 0 { out := make([]interface{}, len(cols)) dest := make([]interface{}, len(cols)) for i := range out { dest[i] = &out[i] } var count int64 var partCount int64 var tick int64 for _, rg := range chunks { fl["create_time_between"] = []string{rg[0], rg[1]} req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: fl} cq, cargs, err := exporter.BuildSQL(req, wl) if err != nil { continue } batch := 1000 for off := 0; ; off += batch { sub := "SELECT * FROM (" + cq + ") AS sub LIMIT ? OFFSET ?" args2 := append(append([]interface{}{}, cargs...), batch, off) rows2, err := db.Query(sub, args2...) if err != nil { break } fetched := false for rows2.Next() { fetched = true if err := rows2.Scan(dest...); err != nil { rows2.Close() a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id?", "failed", time.Now(), id) return } vals := make([]string, len(cols)) for i := range out { if b, ok := out[i].([]byte); ok { vals[i] = string(b) } else if out[i] == nil { vals[i] = "" } else { vals[i] = toString(out[i]) } } vals = transformRow(fs, vals) vals = transformRow(fs, vals) vals = transformRow(fs, vals) vals = transformRow(fs, vals) vals = transformRow(fields, vals) x.WriteRow(vals) count++ partCount++ tick++ if tick%50 == 0 { a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id) } if partCount >= maxRowsPerFile { p, size, _ := x.Close(path) files = append(files, p) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, partCount, size, time.Now(), time.Now()) x, path, err = exporter.NewXLSXWriter("storage", "export", "Sheet1") if err != nil { rows2.Close() a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) return } x.WriteHeader(cols) partCount = 0 } } rows2.Close() if !fetched { break } } } if count == 0 { rows, err := db.Query(q, args...) if err == nil { defer rows.Close() out2 := make([]interface{}, len(cols)) dest2 := make([]interface{}, len(cols)) for i := range out2 { dest2[i] = &out2[i] } var tick2 int64 for rows.Next() { if err := rows.Scan(dest2...); err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id?", "failed", time.Now(), id) return } vals := make([]string, len(cols)) for i := range out2 { if b, ok := out2[i].([]byte); ok { vals[i] = string(b) } else if out2[i] == nil { vals[i] = "" } else { vals[i] = toString(out2[i]) } } x.WriteRow(vals) count++ tick2++ if tick2%50 == 0 { a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id) } } } } p, size, _ := x.Close(path) if partCount > 0 || len(files) == 0 { files = append(files, p) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, func() int64 { if count > 0 { return count } return partCount }(), size, time.Now(), time.Now()) } if count == 0 { row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...) var c int64 _ = row.Scan(&c) count = c } if len(files) >= 1 { zipPath, zipSize := createZip(id, files) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now()) } a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id) return } } log.Printf("job_id=%d sql=%s args=%v", id, q, args) rows, err := db.Query(q, args...) if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id) return } defer rows.Close() out := make([]interface{}, len(cols)) dest := make([]interface{}, len(cols)) for i := range out { dest[i] = &out[i] } var count int64 var tick int64 for rows.Next() { if err := rows.Scan(dest...); err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id) return } vals := make([]string, len(cols)) for i := range out { if b, ok := out[i].([]byte); ok { vals[i] = string(b) } else if out[i] == nil { vals[i] = "" } else { vals[i] = toString(out[i]) } } x.WriteRow(vals) count++ tick++ if tick%50 == 0 { a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id) } } p, size, _ := x.Close(path) log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()}) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now()) zipPath, zipSize := createZip(id, []string{p}) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now()) log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", []interface{}{"completed", time.Now(), count, time.Now(), id}) a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id) return } a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, updated_at=? WHERE id= ?", "failed", time.Now(), time.Now(), id) } func (a *ExportsAPI) selectDataDB(ds string) *sql.DB { if ds == "ymt" { return a.meta } return a.marketing } func splitByDays(startStr, endStr string, stepDays int) [][2]string { layout := "2006-01-02 15:04:05" s, es := strings.TrimSpace(startStr), strings.TrimSpace(endStr) st, err1 := time.Parse(layout, s) en, err2 := time.Parse(layout, es) if err1 != nil || err2 != nil || !en.After(st) || stepDays <= 0 { return [][2]string{{s, es}} } var out [][2]string cur := st step := time.Duration(stepDays) * 24 * time.Hour for cur.Before(en) { nxt := cur.Add(step) if nxt.After(en) { nxt = en } out = append(out, [2]string{cur.Format(layout), nxt.Format(layout)}) cur = nxt } return out } func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) { row := a.meta.QueryRow("SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at FROM export_jobs WHERE id=?", id) var m = map[string]interface{}{} var jid uint64 var templateID uint64 var status string var requestedBy uint64 var totalRows sql.NullInt64 var fileFormat string var startedAt, finishedAt sql.NullTime var createdAt, updatedAt time.Time err := row.Scan(&jid, &templateID, &status, &requestedBy, &totalRows, &fileFormat, &startedAt, &finishedAt, &createdAt, &updatedAt) if err != nil { fail(w, r, http.StatusNotFound, "not found") return } m["id"] = jid m["template_id"] = templateID m["status"] = status m["requested_by"] = requestedBy m["file_format"] = fileFormat m["total_rows"] = totalRows.Int64 m["started_at"] = startedAt.Time m["finished_at"] = finishedAt.Time m["created_at"] = createdAt m["updated_at"] = updatedAt rows, _ := a.meta.Query("SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id=?", id) files := []map[string]interface{}{} for rows.Next() { var uri, sheet sql.NullString var rc, sz sql.NullInt64 rows.Scan(&uri, &sheet, &rc, &sz) files = append(files, map[string]interface{}{"storage_uri": uri.String, "sheet_name": sheet.String, "row_count": rc.Int64, "size_bytes": sz.Int64}) } rows.Close() m["files"] = files ok(w, r, m) } func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) { // load job filters and template fields row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) var tplID uint64 var filters []byte if err := row.Scan(&tplID, &filters); err != nil { fail(w, r, http.StatusNotFound, "not found") return } tr := a.meta.QueryRow("SELECT main_table, fields_json FROM export_templates WHERE id=?", tplID) var main string var fields []byte if err := tr.Scan(&main, &fields); err != nil { fail(w, r, http.StatusBadRequest, "template not found") return } var fs []string var fl map[string]interface{} json.Unmarshal(fields, &fs) json.Unmarshal(filters, &fl) wl := whitelist() req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: fl} q, args, err := exporter.BuildSQL(req, wl) if err != nil { fail(w, r, http.StatusBadRequest, err.Error()) return } formatArg := func(a interface{}) string { switch t := a.(type) { case nil: return "NULL" case []byte: s := string(t) s = strings.ReplaceAll(s, "'", "''") return "'" + s + "'" case string: s := strings.ReplaceAll(t, "'", "''") return "'" + s + "'" case int: return strconv.Itoa(t) case int64: return strconv.FormatInt(t, 10) case float64: return strconv.FormatFloat(t, 'f', -1, 64) case time.Time: return "'" + t.Format("2006-01-02 15:04:05") + "'" default: return fmt.Sprintf("%v", t) } } var sb strings.Builder var ai int for i := 0; i < len(q); i++ { c := q[i] if c == '?' && ai < len(args) { sb.WriteString(formatArg(args[ai])) ai++ } else { sb.WriteByte(c) } } ok(w, r, map[string]interface{}{"sql": q, "final_sql": sb.String()}) } func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) { row := a.meta.QueryRow("SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1", id) var uri string err := row.Scan(&uri) if err != nil { fail(w, r, http.StatusNotFound, "not found") return } http.ServeFile(w, r, uri) } func transformRow(fields []string, vals []string) []string { for i := range fields { if i >= len(vals) { break } f := fields[i] if f == "order.key" { vals[i] = decodeOrderKey(vals[i]) } } return vals } func decodeOrderKey(s string) string { if s == "" { return s } if len(s) > 2 && s[len(s)-2:] == "_1" { s = s[:len(s)-2] } var n big.Int if _, ok := n.SetString(s, 10); !ok { return s } base := []rune{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j', 'k', 'l', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'} baseCount := big.NewInt(int64(len(base))) zero := big.NewInt(0) var out []rune for n.Cmp(zero) > 0 { var mod big.Int mod.Mod(&n, baseCount) out = append(out, base[mod.Int64()]) n.Div(&n, baseCount) } for len(out) < 16 { out = append(out, base[0]) } for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 { out[i], out[j] = out[j], out[i] } return string(out) } func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) { a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", "canceled", time.Now(), id) w.Write([]byte("ok")) } func toString(v interface{}) string { switch t := v.(type) { case []byte: return string(t) case string: return t case int64: return strconv.FormatInt(t, 10) case int: return strconv.Itoa(t) case float64: return strconv.FormatFloat(t, 'f', -1, 64) case bool: if t { return "1" } return "0" case time.Time: return t.Format("2006-01-02 15:04:05") default: return fmt.Sprintf("%v", t) } } func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) { a.ensureOwnerColumn() q := r.URL.Query() page := 1 size := 15 if p := q.Get("page"); p != "" { if n, err := strconv.Atoi(p); err == nil && n > 0 { page = n } } if s := q.Get("page_size"); s != "" { if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 { size = n } } tplIDStr := q.Get("template_id") var tplID uint64 if tplIDStr != "" { if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil { tplID = n } } offset := (page - 1) * size var totalCount int64 uidStr := q.Get("userId") if tplID > 0 { if uidStr != "" { row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ? AND owner_id = ?", tplID, uidStr) _ = row.Scan(&totalCount) } else { row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ?", tplID) _ = row.Scan(&totalCount) } } else { if uidStr != "" { row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE owner_id = ?", uidStr) _ = row.Scan(&totalCount) } else { row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs") _ = row.Scan(&totalCount) } } var rows *sql.Rows var err error if tplID > 0 { if uidStr != "" { rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? AND owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, uidStr, size, offset) } else { rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, size, offset) } } else { if uidStr != "" { rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", uidStr, size, offset) } else { rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs ORDER BY id DESC LIMIT ? OFFSET ?", size, offset) } } if err != nil { fail(w, r, http.StatusInternalServerError, err.Error()) return } defer rows.Close() items := []map[string]interface{}{} for rows.Next() { var id, tid, req uint64 var status, fmtstr string var estimate, total sql.NullInt64 var createdAt, updatedAt sql.NullTime var score sql.NullInt64 var explainRaw sql.NullString if err := rows.Scan(&id, &tid, &status, &req, &estimate, &total, &fmtstr, &createdAt, &updatedAt, &score, &explainRaw); err != nil { continue } evalStatus := "通过" if score.Int64 < 60 { evalStatus = "禁止" } desc := fmt.Sprintf("评分:%d,估算行数:%d;%s", score.Int64, estimate.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[score.Int64 >= 60]) if explainRaw.Valid && explainRaw.String != "" { var arr []map[string]interface{} if err := json.Unmarshal([]byte(explainRaw.String), &arr); err == nil { segs := []string{} for _, r := range arr { getStr := func(field string) string { if v, ok := r[field]; ok { if mm, ok := v.(map[string]interface{}); ok { if b, ok := mm["Valid"].(bool); ok && !b { return "" } if s, ok := mm["String"].(string); ok { return s } } } return "" } getInt := func(field string) int64 { if v, ok := r[field]; ok { if mm, ok := v.(map[string]interface{}); ok { if b, ok := mm["Valid"].(bool); ok && !b { return 0 } if f, ok := mm["Int64"].(float64); ok { return int64(f) } } } return 0 } getFloat := func(field string) float64 { if v, ok := r[field]; ok { if mm, ok := v.(map[string]interface{}); ok { if b, ok := mm["Valid"].(bool); ok && !b { return 0 } if f, ok := mm["Float64"].(float64); ok { return f } } } return 0 } tbl := getStr("Table") typ := getStr("Type") if typ == "" { typ = getStr("SelectType") } key := getStr("Key") rowsN := getInt("Rows") filt := getFloat("Filtered") extra := getStr("Extra") if tbl == "" && typ == "" && rowsN == 0 && extra == "" { continue } s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt) if extra != "" { s += ", 额外:" + extra } segs = append(segs, s) } if len(segs) > 0 { desc = strings.Join(segs, ";") } } } m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc} items = append(items, m) } ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size}) } func createZip(jobID uint64, files []string) (string, int64) { baseDir := "storage/export" _ = os.MkdirAll(baseDir, 0755) zipPath := filepath.Join(baseDir, fmt.Sprintf("job_%d_%d.zip", jobID, time.Now().Unix())) zf, err := os.Create(zipPath) if err != nil { return zipPath, 0 } defer zf.Close() zw := zip.NewWriter(zf) for _, p := range files { f, err := os.Open(p) if err != nil { continue } fi, _ := f.Stat() w, err := zw.Create(filepath.Base(p)) if err != nil { f.Close() continue } _, _ = io.Copy(w, f) _ = fi f.Close() } _ = zw.Close() st, err := os.Stat(zipPath) if err != nil { return zipPath, 0 } return zipPath, st.Size() }