package api import ( "database/sql" "encoding/json" "io" "net/http" "strconv" "strings" "time" "marketing-system-data-tool/server/internal/exporter" ) type ExportsAPI struct{ meta *sql.DB marketing *sql.DB } func ExportsHandler(meta, marketing *sql.DB) http.Handler { api := &ExportsAPI{meta: meta, marketing: marketing} return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { p := strings.TrimPrefix(r.URL.Path, "/api/exports") if r.Method == http.MethodPost && p == "" { api.create(w, r) return } if strings.HasPrefix(p, "/") { id := strings.TrimPrefix(p, "/") if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") { api.get(w, r, id) return } if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") { id = strings.TrimSuffix(id, "/download") api.download(w, r, id) return } if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") { id = strings.TrimSuffix(id, "/cancel") api.cancel(w, r, id) return } } w.WriteHeader(http.StatusNotFound) }) } type ExportPayload struct{ TemplateID uint64 `json:"template_id"` RequestedBy uint64 `json:"requested_by"` Permission map[string]interface{} `json:"permission"` Options map[string]interface{} `json:"options"` FileFormat string `json:"file_format"` Filters map[string]interface{} `json:"filters"` } func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { b, _ := io.ReadAll(r.Body) var p ExportPayload json.Unmarshal(b, &p) var main string var fields []byte row := a.meta.QueryRow("SELECT main_table, fields_json FROM export_templates WHERE id=?", p.TemplateID) err := row.Scan(&main, &fields) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte("invalid template")) return } var fs []string json.Unmarshal(fields, &fs) wl := whitelist() req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: p.Filters} q, args, err := exporter.BuildSQL(req, wl) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) return } expRows, score, err := exporter.RunExplain(a.marketing, q, args) if err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(err.Error())) return } res, err := a.meta.Exec("INSERT INTO export_jobs (template_id, status, requested_by, permission_scope_json, filters_json, options_json, explain_json, explain_score, file_format, created_at) VALUES (?,?,?,?,?,?,?,?,?,?)", p.TemplateID, "queued", p.RequestedBy, toJSON(p.Permission), toJSON(p.Filters), toJSON(p.Options), toJSON(expRows), score, p.FileFormat, time.Now()) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } id, _ := res.LastInsertId() go a.runJob(uint64(id), q, args, fs, p.FileFormat) w.Header().Set("Content-Type", "application/json") w.Write([]byte("{\"id\":"+strconv.FormatInt(id,10)+"}")) } func (a *ExportsAPI) runJob(id uint64, q string, args []interface{}, cols []string, fmt string) { a.meta.Exec("UPDATE export_jobs SET status=?, started_at=? WHERE id=?", "running", time.Now(), id) if fmt == "csv" { w, err := exporter.NewCSVWriter("storage", "export") if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id) return } w.WriteHeader(cols) rows, err := a.marketing.Query(q, args...) if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id) return } defer rows.Close() out := make([]interface{}, len(cols)) dest := make([]interface{}, len(cols)) for i := range out { dest[i] = &out[i] } var count int64 for rows.Next() { if err := rows.Scan(dest...); err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id) return } vals := make([]string, len(cols)) for i := range out { if b, ok := out[i].([]byte); ok { vals[i] = string(b) } else if out[i] == nil { vals[i] = "" } else { vals[i] = toString(out[i]) } } w.WriteRow(vals) count++ } path, size, _ := w.Close() a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at) VALUES (?,?,?,?,?)", id, path, count, size, time.Now()) a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=? WHERE id=?", "completed", time.Now(), count, id) return } if fmt == "xlsx" { x, path, err := exporter.NewXLSXWriter("storage", "export", "Sheet1") if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id) return } x.WriteHeader(cols) rows, err := a.marketing.Query(q, args...) if err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id) return } defer rows.Close() out := make([]interface{}, len(cols)) dest := make([]interface{}, len(cols)) for i := range out { dest[i] = &out[i] } var count int64 for rows.Next() { if err := rows.Scan(dest...); err != nil { a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id) return } vals := make([]string, len(cols)) for i := range out { if b, ok := out[i].([]byte); ok { vals[i] = string(b) } else if out[i] == nil { vals[i] = "" } else { vals[i] = toString(out[i]) } } x.WriteRow(vals) count++ } p, size, _ := x.Close(path) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at) VALUES (?,?,?,?,?)", id, p, count, size, time.Now()) a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=? WHERE id=?", "completed", time.Now(), count, id) return } a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id) } func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) { row := a.meta.QueryRow("SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at FROM export_jobs WHERE id=?", id) var m = map[string]interface{}{} var jid uint64 var templateID uint64 var status string var requestedBy uint64 var totalRows sql.NullInt64 var fileFormat string var startedAt, finishedAt sql.NullTime var createdAt, updatedAt time.Time err := row.Scan(&jid, &templateID, &status, &requestedBy, &totalRows, &fileFormat, &startedAt, &finishedAt, &createdAt, &updatedAt) if err != nil { w.WriteHeader(http.StatusNotFound) w.Write([]byte("not found")) return } m["id"] = jid m["template_id"] = templateID m["status"] = status m["requested_by"] = requestedBy m["file_format"] = fileFormat m["total_rows"] = totalRows.Int64 m["started_at"] = startedAt.Time m["finished_at"] = finishedAt.Time m["created_at"] = createdAt m["updated_at"] = updatedAt rows, _ := a.meta.Query("SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id=?", id) files := []map[string]interface{}{} for rows.Next() { var uri, sheet sql.NullString var rc, sz sql.NullInt64 rows.Scan(&uri, &sheet, &rc, &sz) files = append(files, map[string]interface{}{"storage_uri": uri.String, "sheet_name": sheet.String, "row_count": rc.Int64, "size_bytes": sz.Int64}) } rows.Close() m["files"] = files b, _ := json.Marshal(m) w.Header().Set("Content-Type", "application/json") w.Write(b) } func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) { row := a.meta.QueryRow("SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1", id) var uri string err := row.Scan(&uri) if err != nil { w.WriteHeader(http.StatusNotFound) w.Write([]byte("not found")) return } http.ServeFile(w, r, uri) } func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) { a.meta.Exec("UPDATE export_jobs SET status=? WHERE id=? AND status IN ('queued','running')", "canceled", id) w.Write([]byte("ok")) } func toString(v interface{}) string { switch t := v.(type) { case []byte: return string(t) case string: return t case int64: return strconv.FormatInt(t, 10) case int: return strconv.Itoa(t) case float64: return strconv.FormatFloat(t, 'f', -1, 64) default: return "" } }