MarketingSystemDataTool/server/internal/api/exports.go

351 lines
13 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package api
import (
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"marketing-system-data-tool/server/internal/exporter"
"net/http"
"strconv"
"strings"
"time"
)
type ExportsAPI struct {
meta *sql.DB
marketing *sql.DB
}
func ExportsHandler(meta, marketing *sql.DB) http.Handler {
api := &ExportsAPI{meta: meta, marketing: marketing}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
if r.Method == http.MethodPost && p == "" {
api.create(w, r)
return
}
if r.Method == http.MethodGet && p == "" {
api.list(w, r)
return
}
if strings.HasPrefix(p, "/") {
id := strings.TrimPrefix(p, "/")
if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") {
api.get(w, r, id)
return
}
if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") {
id = strings.TrimSuffix(id, "/download")
api.download(w, r, id)
return
}
if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") {
id = strings.TrimSuffix(id, "/cancel")
api.cancel(w, r, id)
return
}
}
w.WriteHeader(http.StatusNotFound)
})
}
type ExportPayload struct {
TemplateID uint64 `json:"template_id"`
RequestedBy uint64 `json:"requested_by"`
Permission map[string]interface{} `json:"permission"`
Options map[string]interface{} `json:"options"`
FileFormat string `json:"file_format"`
Filters map[string]interface{} `json:"filters"`
Datasource string `json:"datasource"`
}
func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
var p ExportPayload
json.Unmarshal(b, &p)
r = WithPayload(r, p)
var main string
var ds string
var fields []byte
log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), "SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", []interface{}{p.TemplateID})
row := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", p.TemplateID)
err := row.Scan(&ds, &main, &fields)
if err != nil {
fail(w, r, http.StatusBadRequest, "invalid template")
return
}
if p.Datasource != "" {
ds = p.Datasource
}
var fs []string
json.Unmarshal(fields, &fs)
wl := whitelist()
req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: p.Filters}
q, args, err := exporter.BuildSQL(req, wl)
if err != nil {
r = WithSQL(r, q)
fail(w, r, http.StatusBadRequest, err.Error())
return
}
r = WithSQL(r, q)
dataDB := a.selectDataDB(ds)
expRows, score, err := exporter.RunExplain(dataDB, q, args)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
const passThreshold = 60
if score < passThreshold {
fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d请优化索引或缩小查询范围", score))
return
}
var estimate int64
for _, r := range expRows {
if r.Table.Valid && r.Table.String == "order" && r.Rows.Valid { estimate = r.Rows.Int64; break }
if r.Rows.Valid { estimate += r.Rows.Int64 }
}
ejSQL := "INSERT INTO export_jobs (template_id, status, requested_by, permission_scope_json, filters_json, options_json, explain_json, explain_score, row_estimate, file_format, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)"
ejArgs := []interface{}{p.TemplateID, "queued", p.RequestedBy, toJSON(p.Permission), toJSON(p.Filters), toJSON(p.Options), toJSON(expRows), score, estimate, p.FileFormat, time.Now(), time.Now()}
log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), ejSQL, ejArgs)
res, err := a.meta.Exec(ejSQL, ejArgs...)
if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
id, _ := res.LastInsertId()
go a.runJob(uint64(id), dataDB, q, args, fs, p.FileFormat)
ok(w, r, map[string]interface{}{"id": id})
}
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, cols []string, fmt string) {
log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, started_at=? WHERE id= ?", []interface{}{"running", time.Now(), id})
a.meta.Exec("UPDATE export_jobs SET status=?, started_at=?, updated_at=? WHERE id= ?", "running", time.Now(), time.Now(), id)
if fmt == "csv" {
w, err := exporter.NewCSVWriter("storage", "export")
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
w.WriteHeader(cols)
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
rows, err := db.Query(q, args...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
defer rows.Close()
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var tick int64
for rows.Next() {
if err := rows.Scan(dest...); err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = toString(out[i])
}
}
w.WriteRow(vals)
count++
tick++
if tick%500 == 0 { a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id) }
}
path, size, _ := w.Close()
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, path, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, count, size, time.Now(), time.Now())
log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", []interface{}{"completed", time.Now(), count, time.Now(), id})
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
return
}
if fmt == "xlsx" {
x, path, err := exporter.NewXLSXWriter("storage", "export", "Sheet1")
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
x.WriteHeader(cols)
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
rows, err := db.Query(q, args...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
defer rows.Close()
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
for rows.Next() {
if err := rows.Scan(dest...); err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = toString(out[i])
}
}
x.WriteRow(vals)
count++
}
p, size, _ := x.Close(path)
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", []interface{}{"completed", time.Now(), count, time.Now(), id})
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
return
}
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, updated_at=? WHERE id= ?", "failed", time.Now(), time.Now(), id)
}
func (a *ExportsAPI) selectDataDB(ds string) *sql.DB {
if ds == "ymt" {
return a.meta
}
return a.marketing
}
func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) {
row := a.meta.QueryRow("SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at FROM export_jobs WHERE id=?", id)
var m = map[string]interface{}{}
var jid uint64
var templateID uint64
var status string
var requestedBy uint64
var totalRows sql.NullInt64
var fileFormat string
var startedAt, finishedAt sql.NullTime
var createdAt, updatedAt time.Time
err := row.Scan(&jid, &templateID, &status, &requestedBy, &totalRows, &fileFormat, &startedAt, &finishedAt, &createdAt, &updatedAt)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
m["id"] = jid
m["template_id"] = templateID
m["status"] = status
m["requested_by"] = requestedBy
m["file_format"] = fileFormat
m["total_rows"] = totalRows.Int64
m["started_at"] = startedAt.Time
m["finished_at"] = finishedAt.Time
m["created_at"] = createdAt
m["updated_at"] = updatedAt
rows, _ := a.meta.Query("SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id=?", id)
files := []map[string]interface{}{}
for rows.Next() {
var uri, sheet sql.NullString
var rc, sz sql.NullInt64
rows.Scan(&uri, &sheet, &rc, &sz)
files = append(files, map[string]interface{}{"storage_uri": uri.String, "sheet_name": sheet.String, "row_count": rc.Int64, "size_bytes": sz.Int64})
}
rows.Close()
m["files"] = files
ok(w, r, m)
}
func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) {
row := a.meta.QueryRow("SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1", id)
var uri string
err := row.Scan(&uri)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
http.ServeFile(w, r, uri)
}
func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) {
a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", "canceled", time.Now(), id)
w.Write([]byte("ok"))
}
func toString(v interface{}) string {
switch t := v.(type) {
case []byte:
return string(t)
case string:
return t
case int64:
return strconv.FormatInt(t, 10)
case int:
return strconv.Itoa(t)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
default:
return ""
}
}
func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
page := 1
size := 15
if p := q.Get("page"); p != "" {
if n, err := strconv.Atoi(p); err == nil && n > 0 { page = n }
}
if s := q.Get("page_size"); s != "" {
if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 { size = n }
}
tplIDStr := q.Get("template_id")
var tplID uint64
if tplIDStr != "" {
if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil { tplID = n }
}
offset := (page - 1) * size
var totalCount int64
if tplID > 0 {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ?", tplID)
_ = row.Scan(&totalCount)
} else {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs")
_ = row.Scan(&totalCount)
}
var rows *sql.Rows
var err error
if tplID > 0 {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score FROM export_jobs WHERE template_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, size, offset)
} else {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score FROM export_jobs ORDER BY id DESC LIMIT ? OFFSET ?", size, offset)
}
if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
defer rows.Close()
items := []map[string]interface{}{}
for rows.Next() {
var id, tid, req uint64
var status, fmtstr string
var estimate, total sql.NullInt64
var createdAt, updatedAt sql.NullTime
var score sql.NullInt64
if err := rows.Scan(&id, &tid, &status, &req, &estimate, &total, &fmtstr, &createdAt, &updatedAt, &score); err != nil { continue }
evalStatus := "通过"
if score.Int64 < 60 { evalStatus = "禁止" }
desc := fmt.Sprintf("评分:%d估算行数:%d%s", score.Int64, estimate.Int64, map[bool]string{true:"允许执行", false:"禁止执行"}[score.Int64>=60])
m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc}
items = append(items, m)
}
ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size})
}