MarketingSystemDataTool/server/internal/api/exports.go

988 lines
32 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package api
import (
"archive/zip"
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"marketing-system-data-tool/server/internal/exporter"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
type ExportsAPI struct {
meta *sql.DB
marketing *sql.DB
}
func ExportsHandler(meta, marketing *sql.DB) http.Handler {
api := &ExportsAPI{meta: meta, marketing: marketing}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
if r.Method == http.MethodPost && p == "" {
api.create(w, r)
return
}
if r.Method == http.MethodGet && p == "" {
api.list(w, r)
return
}
if strings.HasPrefix(p, "/") {
id := strings.TrimPrefix(p, "/")
if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") {
if strings.HasSuffix(p, "/sql") {
id = strings.TrimSuffix(id, "/sql")
api.getSQL(w, r, id)
return
}
api.get(w, r, id)
return
}
if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") {
id = strings.TrimSuffix(id, "/download")
api.download(w, r, id)
return
}
if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") {
id = strings.TrimSuffix(id, "/cancel")
api.cancel(w, r, id)
return
}
}
w.WriteHeader(http.StatusNotFound)
})
}
type ExportPayload struct {
TemplateID uint64 `json:"template_id"`
RequestedBy uint64 `json:"requested_by"`
Permission map[string]interface{} `json:"permission"`
Options map[string]interface{} `json:"options"`
FileFormat string `json:"file_format"`
Filters map[string]interface{} `json:"filters"`
Datasource string `json:"datasource"`
}
func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
var p ExportPayload
json.Unmarshal(b, &p)
r = WithPayload(r, p)
var main string
var ds string
var fields []byte
log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), "SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", []interface{}{p.TemplateID})
row := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", p.TemplateID)
err := row.Scan(&ds, &main, &fields)
if err != nil {
fail(w, r, http.StatusBadRequest, "invalid template")
return
}
if p.Datasource != "" {
ds = p.Datasource
}
var fs []string
json.Unmarshal(fields, &fs)
wl := whitelist()
req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: p.Filters}
q, args, err := exporter.BuildSQL(req, wl)
if err != nil {
r = WithSQL(r, q)
fail(w, r, http.StatusBadRequest, err.Error())
return
}
r = WithSQL(r, q)
dataDB := a.selectDataDB(ds)
expRows, score, err := exporter.RunExplain(dataDB, q, args)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
const passThreshold = 60
if score < passThreshold {
fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d请优化索引或缩小查询范围", score))
return
}
var estimate int64
func() {
idx := strings.Index(q, " FROM ")
if idx > 0 {
cq := "SELECT COUNT(1)" + q[idx:]
row := dataDB.QueryRow(cq, args...)
var cnt int64
if err := row.Scan(&cnt); err == nil {
estimate = cnt
return
}
}
for _, r := range expRows {
if r.Table.Valid && r.Table.String == "order" && r.Rows.Valid {
estimate = r.Rows.Int64
break
}
if r.Rows.Valid {
estimate += r.Rows.Int64
}
}
}()
labels := fieldLabels()
hdrs := make([]string, len(fs))
for i, tf := range fs {
if v, ok := labels[tf]; ok {
hdrs[i] = v
} else {
hdrs[i] = tf
}
}
ejSQL := "INSERT INTO export_jobs (template_id, status, requested_by, permission_scope_json, filters_json, options_json, explain_json, explain_score, row_estimate, file_format, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)"
ejArgs := []interface{}{p.TemplateID, "queued", p.RequestedBy, toJSON(p.Permission), toJSON(p.Filters), toJSON(p.Options), toJSON(expRows), score, estimate, p.FileFormat, time.Now(), time.Now()}
log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), ejSQL, ejArgs)
res, err := a.meta.Exec(ejSQL, ejArgs...)
if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
id, _ := res.LastInsertId()
go a.runJob(uint64(id), dataDB, q, args, hdrs, p.FileFormat)
ok(w, r, map[string]interface{}{"id": id})
}
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, cols []string, fmt string) {
log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, started_at=? WHERE id= ?", []interface{}{"running", time.Now(), id})
a.meta.Exec("UPDATE export_jobs SET status=?, started_at=?, updated_at=? WHERE id= ?", "running", time.Now(), time.Now(), id)
if fmt == "csv" {
w, err := exporter.NewCSVWriter("storage", "export")
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
w.WriteHeader(cols)
const maxRowsPerFile = 300000
files := []string{}
{
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
chunks = splitByDays(toString(arr[0]), toString(arr[1]), 10)
}
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
chunks = splitByDays(arrs[0], arrs[1], 10)
}
}
if len(chunks) > 0 {
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var partCount int64
var tick int64
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: fl}
cq, cargs, err := exporter.BuildSQL(req, wl)
if err != nil {
continue
}
batch := 1000
for off := 0; ; off += batch {
sub := "SELECT * FROM (" + cq + ") AS sub LIMIT ? OFFSET ?"
args2 := append(append([]interface{}{}, cargs...), batch, off)
rows2, err := db.Query(sub, args2...)
if err != nil {
break
}
fetched := false
for rows2.Next() {
fetched = true
if err := rows2.Scan(dest...); err != nil {
rows2.Close()
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id?", "failed", time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = toString(out[i])
}
}
w.WriteRow(vals)
count++
partCount++
tick++
if tick%50 == 0 {
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
}
if partCount >= maxRowsPerFile {
path, size, _ := w.Close()
files = append(files, path)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now())
w, err = exporter.NewCSVWriter("storage", "export")
if err != nil {
rows2.Close()
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
w.WriteHeader(cols)
partCount = 0
}
}
rows2.Close()
if !fetched {
break
}
}
}
path, size, _ := w.Close()
if partCount > 0 || len(files) == 0 {
files = append(files, path)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now())
}
if count == 0 {
row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...)
var c int64
_ = row.Scan(&c)
count = c
}
if len(files) >= 1 {
zipPath, zipSize := createZip(id, files)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now())
}
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
return
}
}
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
// batched cursor queries, split workbook per 300k rows
{
const maxRowsPerFile = 300000
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var partCount int64
var tick int64
batch := 1000
files2 := []string{}
for off := 0; ; off += batch {
sub := "SELECT * FROM (" + q + ") AS sub LIMIT ? OFFSET ?"
args2 := append(append([]interface{}{}, args...), batch, off)
rows3, err := db.Query(sub, args2...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
fetched := false
for rows3.Next() {
fetched = true
if err := rows3.Scan(dest...); err != nil {
rows3.Close()
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id?", "failed", time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = toString(out[i])
}
}
w.WriteRow(vals)
count++
partCount++
tick++
if tick%50 == 0 {
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
}
if partCount >= maxRowsPerFile {
path2, size2, _ := w.Close()
files2 = append(files2, path2)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path2, partCount, size2, time.Now(), time.Now())
w, err = exporter.NewCSVWriter("storage", "export")
if err != nil {
rows3.Close()
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
w.WriteHeader(cols)
partCount = 0
}
}
rows3.Close()
if !fetched {
break
}
}
path, size, _ := w.Close()
if partCount > 0 || len(files2) == 0 {
files2 = append(files2, path)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now())
}
if count == 0 {
row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...)
var c int64
_ = row.Scan(&c)
count = c
}
if len(files2) >= 1 {
zipPath, zipSize := createZip(id, files2)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now())
}
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
return
}
// batched cursor queries, 1000 rows per page, file split at 300k
{
const maxRowsPerFile = 300000
files2 := []string{}
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var partCount int64
var tick int64
batch := 1000
for off := 0; ; off += batch {
sub := "SELECT * FROM (" + q + ") AS sub LIMIT ? OFFSET ?"
args2 := append(append([]interface{}{}, args...), batch, off)
rows3, err := db.Query(sub, args2...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
fetched := false
for rows3.Next() {
fetched = true
if err := rows3.Scan(dest...); err != nil {
rows3.Close()
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id?", "failed", time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = toString(out[i])
}
}
w.WriteRow(vals)
count++
partCount++
tick++
if tick%50 == 0 {
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
}
if partCount >= maxRowsPerFile {
path, size, _ := w.Close()
files2 = append(files2, path)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now())
w, err = exporter.NewCSVWriter("storage", "export")
if err != nil {
rows3.Close()
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
w.WriteHeader(cols)
partCount = 0
}
}
rows3.Close()
if !fetched {
break
}
}
path, size, _ := w.Close()
if partCount > 0 || len(files2) == 0 {
files2 = append(files2, path)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now())
}
if count == 0 {
row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...)
var c int64
_ = row.Scan(&c)
count = c
}
if len(files2) >= 1 {
zipPath, zipSize := createZip(id, files2)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now())
}
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
return
}
rows, err := db.Query(q, args...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
defer rows.Close()
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var tick int64
for rows.Next() {
if err := rows.Scan(dest...); err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = toString(out[i])
}
}
w.WriteRow(vals)
count++
tick++
if tick%50 == 0 {
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
}
}
path, size, _ := w.Close()
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, path, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, count, size, time.Now(), time.Now())
log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", []interface{}{"completed", time.Now(), count, time.Now(), id})
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
return
}
if fmt == "xlsx" {
const maxRowsPerFile = 300000
files := []string{}
x, path, err := exporter.NewXLSXWriter("storage", "export", "Sheet1")
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
x.WriteHeader(cols)
{
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
chunks = splitByDays(toString(arr[0]), toString(arr[1]), 10)
}
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
chunks = splitByDays(arrs[0], arrs[1], 10)
}
}
if len(chunks) > 0 {
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var partCount int64
var tick int64
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: fl}
cq, cargs, err := exporter.BuildSQL(req, wl)
if err != nil {
continue
}
batch := 1000
for off := 0; ; off += batch {
sub := "SELECT * FROM (" + cq + ") AS sub LIMIT ? OFFSET ?"
args2 := append(append([]interface{}{}, cargs...), batch, off)
rows2, err := db.Query(sub, args2...)
if err != nil {
break
}
fetched := false
for rows2.Next() {
fetched = true
if err := rows2.Scan(dest...); err != nil {
rows2.Close()
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id?", "failed", time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = toString(out[i])
}
}
x.WriteRow(vals)
count++
partCount++
tick++
if tick%50 == 0 {
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
}
if partCount >= maxRowsPerFile {
p, size, _ := x.Close(path)
files = append(files, p)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, partCount, size, time.Now(), time.Now())
x, path, err = exporter.NewXLSXWriter("storage", "export", "Sheet1")
if err != nil {
rows2.Close()
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
x.WriteHeader(cols)
partCount = 0
}
}
rows2.Close()
if !fetched {
break
}
}
}
p, size, _ := x.Close(path)
if partCount > 0 || len(files) == 0 {
files = append(files, p)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, partCount, size, time.Now(), time.Now())
}
if count == 0 {
row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...)
var c int64
_ = row.Scan(&c)
count = c
}
if len(files) > 1 {
zipPath, zipSize := createZip(id, files)
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now())
}
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
return
}
}
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
rows, err := db.Query(q, args...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
defer rows.Close()
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var tick int64
for rows.Next() {
if err := rows.Scan(dest...); err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = toString(out[i])
}
}
x.WriteRow(vals)
count++
tick++
if tick%50 == 0 {
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
}
}
p, size, _ := x.Close(path)
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", []interface{}{"completed", time.Now(), count, time.Now(), id})
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
return
}
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, updated_at=? WHERE id= ?", "failed", time.Now(), time.Now(), id)
}
func (a *ExportsAPI) selectDataDB(ds string) *sql.DB {
if ds == "ymt" {
return a.meta
}
return a.marketing
}
func splitByDays(startStr, endStr string, stepDays int) [][2]string {
layout := "2006-01-02 15:04:05"
s, es := strings.TrimSpace(startStr), strings.TrimSpace(endStr)
st, err1 := time.Parse(layout, s)
en, err2 := time.Parse(layout, es)
if err1 != nil || err2 != nil || !en.After(st) || stepDays <= 0 {
return [][2]string{{s, es}}
}
var out [][2]string
cur := st
step := time.Duration(stepDays) * 24 * time.Hour
for cur.Before(en) {
nxt := cur.Add(step)
if nxt.After(en) {
nxt = en
}
out = append(out, [2]string{cur.Format(layout), nxt.Format(layout)})
cur = nxt
}
return out
}
func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) {
row := a.meta.QueryRow("SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at FROM export_jobs WHERE id=?", id)
var m = map[string]interface{}{}
var jid uint64
var templateID uint64
var status string
var requestedBy uint64
var totalRows sql.NullInt64
var fileFormat string
var startedAt, finishedAt sql.NullTime
var createdAt, updatedAt time.Time
err := row.Scan(&jid, &templateID, &status, &requestedBy, &totalRows, &fileFormat, &startedAt, &finishedAt, &createdAt, &updatedAt)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
m["id"] = jid
m["template_id"] = templateID
m["status"] = status
m["requested_by"] = requestedBy
m["file_format"] = fileFormat
m["total_rows"] = totalRows.Int64
m["started_at"] = startedAt.Time
m["finished_at"] = finishedAt.Time
m["created_at"] = createdAt
m["updated_at"] = updatedAt
rows, _ := a.meta.Query("SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id=?", id)
files := []map[string]interface{}{}
for rows.Next() {
var uri, sheet sql.NullString
var rc, sz sql.NullInt64
rows.Scan(&uri, &sheet, &rc, &sz)
files = append(files, map[string]interface{}{"storage_uri": uri.String, "sheet_name": sheet.String, "row_count": rc.Int64, "size_bytes": sz.Int64})
}
rows.Close()
m["files"] = files
ok(w, r, m)
}
func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
// load job filters and template fields
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
var tplID uint64
var filters []byte
if err := row.Scan(&tplID, &filters); err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
tr := a.meta.QueryRow("SELECT main_table, fields_json FROM export_templates WHERE id=?", tplID)
var main string
var fields []byte
if err := tr.Scan(&main, &fields); err != nil {
fail(w, r, http.StatusBadRequest, "template not found")
return
}
var fs []string
var fl map[string]interface{}
json.Unmarshal(fields, &fs)
json.Unmarshal(filters, &fl)
wl := whitelist()
req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: fl}
q, args, err := exporter.BuildSQL(req, wl)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
formatArg := func(a interface{}) string {
switch t := a.(type) {
case nil:
return "NULL"
case []byte:
s := string(t)
s = strings.ReplaceAll(s, "'", "''")
return "'" + s + "'"
case string:
s := strings.ReplaceAll(t, "'", "''")
return "'" + s + "'"
case int:
return strconv.Itoa(t)
case int64:
return strconv.FormatInt(t, 10)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case time.Time:
return "'" + t.Format("2006-01-02 15:04:05") + "'"
default:
return fmt.Sprintf("%v", t)
}
}
var sb strings.Builder
var ai int
for i := 0; i < len(q); i++ {
c := q[i]
if c == '?' && ai < len(args) {
sb.WriteString(formatArg(args[ai]))
ai++
} else {
sb.WriteByte(c)
}
}
ok(w, r, map[string]interface{}{"sql": q, "final_sql": sb.String()})
}
func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) {
row := a.meta.QueryRow("SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1", id)
var uri string
err := row.Scan(&uri)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
http.ServeFile(w, r, uri)
}
func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) {
a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", "canceled", time.Now(), id)
w.Write([]byte("ok"))
}
func toString(v interface{}) string {
switch t := v.(type) {
case []byte:
return string(t)
case string:
return t
case int64:
return strconv.FormatInt(t, 10)
case int:
return strconv.Itoa(t)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case bool:
if t {
return "1"
}
return "0"
case time.Time:
return t.Format("2006-01-02 15:04:05")
default:
return fmt.Sprintf("%v", t)
}
}
func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
page := 1
size := 15
if p := q.Get("page"); p != "" {
if n, err := strconv.Atoi(p); err == nil && n > 0 {
page = n
}
}
if s := q.Get("page_size"); s != "" {
if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 {
size = n
}
}
tplIDStr := q.Get("template_id")
var tplID uint64
if tplIDStr != "" {
if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil {
tplID = n
}
}
offset := (page - 1) * size
var totalCount int64
if tplID > 0 {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ?", tplID)
_ = row.Scan(&totalCount)
} else {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs")
_ = row.Scan(&totalCount)
}
var rows *sql.Rows
var err error
if tplID > 0 {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, size, offset)
} else {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs ORDER BY id DESC LIMIT ? OFFSET ?", size, offset)
}
if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
defer rows.Close()
items := []map[string]interface{}{}
for rows.Next() {
var id, tid, req uint64
var status, fmtstr string
var estimate, total sql.NullInt64
var createdAt, updatedAt sql.NullTime
var score sql.NullInt64
var explainRaw sql.NullString
if err := rows.Scan(&id, &tid, &status, &req, &estimate, &total, &fmtstr, &createdAt, &updatedAt, &score, &explainRaw); err != nil {
continue
}
evalStatus := "通过"
if score.Int64 < 60 {
evalStatus = "禁止"
}
desc := fmt.Sprintf("评分:%d估算行数:%d%s", score.Int64, estimate.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[score.Int64 >= 60])
if explainRaw.Valid && explainRaw.String != "" {
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(explainRaw.String), &arr); err == nil {
segs := []string{}
for _, r := range arr {
getStr := func(field string) string {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return ""
}
if s, ok := mm["String"].(string); ok {
return s
}
}
}
return ""
}
getInt := func(field string) int64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Int64"].(float64); ok {
return int64(f)
}
}
}
return 0
}
getFloat := func(field string) float64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Float64"].(float64); ok {
return f
}
}
}
return 0
}
tbl := getStr("Table")
typ := getStr("Type")
if typ == "" {
typ = getStr("SelectType")
}
key := getStr("Key")
rowsN := getInt("Rows")
filt := getFloat("Filtered")
extra := getStr("Extra")
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
continue
}
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
if extra != "" {
s += ", 额外:" + extra
}
segs = append(segs, s)
}
if len(segs) > 0 {
desc = strings.Join(segs, "")
}
}
}
m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc}
items = append(items, m)
}
ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size})
}
func createZip(jobID uint64, files []string) (string, int64) {
baseDir := "storage/export"
_ = os.MkdirAll(baseDir, 0755)
zipPath := filepath.Join(baseDir, fmt.Sprintf("job_%d_%d.zip", jobID, time.Now().Unix()))
zf, err := os.Create(zipPath)
if err != nil {
return zipPath, 0
}
defer zf.Close()
zw := zip.NewWriter(zf)
for _, p := range files {
f, err := os.Open(p)
if err != nil {
continue
}
fi, _ := f.Stat()
w, err := zw.Create(filepath.Base(p))
if err != nil {
f.Close()
continue
}
_, _ = io.Copy(w, f)
_ = fi
f.Close()
}
_ = zw.Close()
st, err := os.Stat(zipPath)
if err != nil {
return zipPath, 0
}
return zipPath, st.Size()
}