MarketingSystemDataExportTool/server/internal/api/exports.go

1306 lines
40 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package api
import (
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"math/big"
"net/http"
"os"
"path/filepath"
"server/internal/exporter"
"server/internal/logging"
"server/internal/repo"
"server/internal/ymtcrypto"
"strconv"
"strings"
"time"
)
type ExportsAPI struct {
meta *sql.DB
marketing *sql.DB
ymt *sql.DB
}
func ExportsHandler(meta, marketing, ymt *sql.DB) http.Handler {
api := &ExportsAPI{meta: meta, marketing: marketing, ymt: ymt}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
if r.Method == http.MethodPost && p == "" {
api.create(w, r)
return
}
if r.Method == http.MethodGet && p == "" {
api.list(w, r)
return
}
if strings.HasPrefix(p, "/") {
id := strings.TrimPrefix(p, "/")
if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") {
if strings.HasSuffix(p, "/sql") {
id = strings.TrimSuffix(id, "/sql")
api.getSQL(w, r, id)
return
}
api.get(w, r, id)
return
}
if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") {
id = strings.TrimSuffix(id, "/download")
api.download(w, r, id)
return
}
if r.Method == http.MethodPost && strings.HasSuffix(p, "/recompute") {
id = strings.TrimSuffix(id, "/recompute")
api.recompute(w, r, id)
return
}
if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") {
id = strings.TrimSuffix(id, "/cancel")
api.cancel(w, r, id)
return
}
}
w.WriteHeader(http.StatusNotFound)
})
}
type ExportPayload struct {
TemplateID uint64 `json:"template_id"`
RequestedBy uint64 `json:"requested_by"`
Permission map[string]interface{} `json:"permission"`
Options map[string]interface{} `json:"options"`
FileFormat string `json:"file_format"`
Filters map[string]interface{} `json:"filters"`
Datasource string `json:"datasource"`
}
func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
var p ExportPayload
json.Unmarshal(b, &p)
r = WithPayload(r, p)
var main string
var ds string
rrepo := repo.NewExportRepo()
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, p.TemplateID)
if err != nil {
fail(w, r, http.StatusBadRequest, "invalid template")
return
}
if p.Datasource != "" {
ds = p.Datasource
}
wl := Whitelist()
// ensure filters map initialized
if p.Filters == nil {
p.Filters = map[string]interface{}{}
}
// merge permission scope into filters to enforce boundary
p.Filters = mergePermissionIntoFilters(p.Datasource, main, p.Permission, p.Filters)
// support multiple userId in query: e.g., userId=15,25 → filters.creator_in
{
uidStr := r.URL.Query().Get("userId")
if uidStr != "" {
parts := strings.Split(uidStr, ",")
ids := make([]interface{}, 0, len(parts))
for _, s := range parts {
s = strings.TrimSpace(s)
if s == "" {
continue
}
if n, err := strconv.ParseUint(s, 10, 64); err == nil {
ids = append(ids, n)
}
}
if len(ids) > 0 {
// FORCE set creator_in if URL params are present, even if p.Filters had something else (which is unlikely if mergePermission worked, but let's be safe)
// Actually, we should probably append or merge? For now, let's assume URL overrides or merges if key missing.
// Logic before was: if _, exists := p.Filters["creator_in"]; !exists { ... }
// But if user passed userId in URL, they probably want it to be used.
// If p.Filters["creator_in"] came from `Permission`, it might be the logged-in user.
// If the user is an admin acting as another user (passed in URL), we should probably use the URL one?
// But `mergePermissionIntoFilters` logic is strict.
// Let's keep existing logic: if permission set it, don't override.
// Wait, if permission is empty (e.g. admin), then `creator_in` is missing.
if _, exists := p.Filters["creator_in"]; !exists {
p.Filters["creator_in"] = ids
} else {
// If it exists, should we merge?
// If the existing one is from permission, it's a boundary.
// If we are admin, permission might be empty.
// Let's trust `mergePermissionIntoFilters`.
}
}
}
}
// support multiple merchantId in query: e.g., merchantId=1,2,3 → filters.merchant_id_in
{
midStr := r.URL.Query().Get("merchantId")
if midStr != "" {
parts := strings.Split(midStr, ",")
ids := make([]interface{}, 0, len(parts))
for _, s := range parts {
s = strings.TrimSpace(s)
if s == "" {
continue
}
if n, err := strconv.ParseUint(s, 10, 64); err == nil {
ids = append(ids, n)
}
}
if len(ids) > 0 {
if _, exists := p.Filters["merchant_id_in"]; !exists {
p.Filters["merchant_id_in"] = ids
}
}
}
}
// DEBUG LOGGING
logging.JSON("INFO", map[string]interface{}{
"event": "export_filters_debug",
"filters": p.Filters,
"has_creator_in": hasNonEmptyIDs(p.Filters["creator_in"]),
"has_merchant_id_in": hasNonEmptyIDs(p.Filters["merchant_id_in"]),
})
if ds == "marketing" && (main == "order" || main == "order_info") {
if v, ok := p.Filters["create_time_between"]; ok {
switch t := v.(type) {
case []interface{}:
if len(t) != 2 {
fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值")
return
}
case []string:
if len(t) != 2 {
fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值")
return
}
default:
fail(w, r, http.StatusBadRequest, "create_time_between 格式错误")
return
}
} else {
fail(w, r, http.StatusBadRequest, "缺少时间过滤:必须提供 create_time_between")
return
}
}
filtered := make([]string, 0, len(fs))
tv := 0
if v, ok := p.Filters["type_eq"]; ok {
switch t := v.(type) {
case float64:
tv = int(t)
case int:
tv = t
case string:
s := strings.TrimSpace(t)
for i := 0; i < len(s); i++ {
c := s[i]
if c >= '0' && c <= '9' {
tv = tv*10 + int(c-'0')
}
}
}
}
for _, tf := range fs {
if ds == "ymt" && strings.HasPrefix(tf, "order_info.") {
tf = strings.Replace(tf, "order_info.", "order.", 1)
}
if ds == "marketing" && tf == "order_voucher.channel_batch_no" {
tf = "order_voucher.channel_activity_id"
}
if ds == "ymt" && tv == 2 {
if strings.HasPrefix(tf, "order_voucher.") || strings.HasPrefix(tf, "goods_voucher_batch.") || strings.HasPrefix(tf, "goods_voucher_subject_config.") {
continue
}
}
if wl[tf] {
filtered = append(filtered, tf)
}
}
if ds == "ymt" {
present := map[string]bool{}
for _, f := range filtered {
present[f] = true
}
if present["merchant.name"] && present["order.merchant_name"] {
out := make([]string, 0, len(filtered))
for _, f := range filtered {
if f == "order.merchant_name" {
continue
}
out = append(out, f)
}
filtered = out
}
}
// relax: creator_in 非必填,若权限中提供其他边界将被合并为等值过滤
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: filtered, Filters: p.Filters}
q, args, err := rrepo.Build(req, wl)
if err != nil {
r = WithSQL(r, q)
fail(w, r, http.StatusBadRequest, err.Error())
return
}
r = WithSQL(r, q)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql", "datasource": ds, "main_table": main, "file_format": p.FileFormat, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql ds=%s main=%s fmt=%s sql=%s args=%v final_sql=%s", ds, main, p.FileFormat, q, args, renderSQL(q, args))
dataDB := a.selectDataDB(ds)
score, sugg, err := rrepo.Explain(dataDB, q, args)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
sugg = append(sugg, exporter.IndexSuggestions(req)...)
const passThreshold = 60
if score < passThreshold {
fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d请优化索引或缩小查询范围", score))
return
}
var estimate int64
estimate = rrepo.EstimateFastChunked(dataDB, ds, main, p.Filters)
labels := FieldLabels()
hdrs := make([]string, len(filtered))
for i, tf := range filtered {
if v, ok := labels[tf]; ok {
hdrs[i] = v
} else {
hdrs[i] = tf
}
}
// owner from query userId if provided
owner := uint64(0)
if uidStr := r.URL.Query().Get("userId"); uidStr != "" {
first := strings.TrimSpace(strings.Split(uidStr, ",")[0])
if n, err := strconv.ParseUint(first, 10, 64); err == nil {
owner = n
}
}
id, err := rrepo.InsertJob(a.meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat)
if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
go a.runJob(uint64(id), dataDB, q, args, filtered, hdrs, p.FileFormat)
ok(w, r, map[string]interface{}{"id": id})
}
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fmt string) {
defer func() {
if r := recover(); r != nil {
repo.NewExportRepo().MarkFailed(a.meta, id)
logging.JSON("ERROR", map[string]interface{}{"event": "export_panic", "job_id": id, "error": toString(r)})
}
}()
// load datasource once for transform decisions
var jobDS string
var jobMain string
rrepo := repo.NewExportRepo()
{
tplID, _, _ := rrepo.GetJobFilters(a.meta, id)
if tplID > 0 {
ds, mt, _, _ := rrepo.GetTemplateMeta(a.meta, tplID)
jobDS = ds
if mt != "" {
jobMain = mt
} else {
jobMain = "order"
}
}
}
rrepo.StartJob(a.meta, id)
if fmt == "csv" {
w, err := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
w.WriteHeader(cols)
const maxRowsPerFile = 300000
files := []string{}
{
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
chunks = splitByDays(toString(arr[0]), toString(arr[1]), 10)
}
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
chunks = splitByDays(arrs[0], arrs[1], 10)
}
}
if len(chunks) > 0 {
var total int64
// 如果 row_estimate 为 0在分块导出开始时重新估算
var currentEst int64
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = row.Scan(&currentEst)
if currentEst == 0 {
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
if estChunk > 0 {
rrepo.UpdateRowEstimate(a.meta, id, estChunk)
}
}
skipChunk := false
if tplDS == "marketing" && main == "order" {
for _, f := range fs {
if strings.HasPrefix(f, "order_voucher.") {
skipChunk = true
break
}
}
if !skipChunk {
if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok {
skipChunk = true
}
}
}
if !skipChunk && currentEst > 50000 {
cur := rrepo.NewCursor(tplDS, main)
batch := chooseBatch(0, "csv")
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
cq, cargs, err := exporter.BuildSQL(req, wl)
if err != nil {
continue
}
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
newWriter := func() (exporter.RowWriter, error) {
w2, e := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
if e == nil {
_ = w2.WriteHeader(cols)
}
return w2, e
}
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, maxRowsPerFile, onRoll, onProgress)
if e != nil {
rrepo.MarkFailed(a.meta, id)
return
}
total += cnt
rrepo.UpdateProgress(a.meta, id, total)
}
if total == 0 {
total = rrepo.Count(db, q, args)
}
if len(files) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files, total)
}
rrepo.MarkCompleted(a.meta, id, total)
return
}
}
}
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args))
{
const maxRowsPerFile = 300000
var est int64
{
var filtersJSON []byte
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&filtersJSON)
var fl map[string]interface{}
json.Unmarshal(filtersJSON, &fl)
est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est)
}
batch := chooseBatch(est, fmt)
files2 := []string{}
cur := rrepo.NewCursor(jobDS, jobMain)
newWriter := func() (exporter.RowWriter, error) {
w, e := exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
if e == nil {
_ = w.WriteHeader(cols)
}
return w, e
}
transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) }
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files2 = append(files2, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, maxRowsPerFile, onRoll, onProgress)
if err != nil {
rrepo.MarkFailed(a.meta, id)
return
}
if len(files2) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files2, count)
}
rrepo.MarkCompleted(a.meta, id, count)
return
}
rows, err := db.Query(q, args...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
defer rows.Close()
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var tick int64
for rows.Next() {
if err := rows.Scan(dest...); err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = toString(out[i])
}
}
w.WriteRow(vals)
count++
tick++
if tick%1000 == 0 {
rrepo.UpdateProgress(a.meta, id, count)
}
}
path, size, _ := w.Close()
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, path, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, count, size, time.Now(), time.Now())
log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", []interface{}{"completed", time.Now(), count, time.Now(), id})
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
return
}
if fmt == "xlsx" {
const maxRowsPerFile = 300000
files := []string{}
{
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
chunks = splitByDays(toString(arr[0]), toString(arr[1]), 10)
}
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
chunks = splitByDays(arrs[0], arrs[1], 10)
}
}
if len(chunks) > 0 {
var total int64
// 如果 row_estimate 为 0在分块导出开始时重新估算
var currentEst int64
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = row.Scan(&currentEst)
if currentEst == 0 {
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
if estChunk > 0 {
rrepo.UpdateRowEstimate(a.meta, id, estChunk)
}
}
skipChunk := false
if tplDS == "marketing" && main == "order" {
for _, f := range fs {
if strings.HasPrefix(f, "order_voucher.") {
skipChunk = true
break
}
}
if !skipChunk {
if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok {
skipChunk = true
}
}
}
if !skipChunk && currentEst > 50000 {
cur := rrepo.NewCursor(tplDS, main)
batch := chooseBatch(0, "xlsx")
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
cq, cargs, err := rrepo.Build(req, wl)
if err != nil {
continue
}
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
newWriter := func() (exporter.RowWriter, error) {
xw, e := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
if e == nil {
_ = xw.WriteHeader(cols)
}
return xw, e
}
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
// 进度回调按全局累计行数更新,避免跨分片出现数值回退
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, maxRowsPerFile, onRoll, onProgress)
if e != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error()})
log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e)
rrepo.MarkFailed(a.meta, id)
return
}
total += cnt
rrepo.UpdateProgress(a.meta, id, total)
}
if total == 0 {
total = rrepo.Count(db, q, args)
}
if len(files) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files, total)
}
rrepo.MarkCompleted(a.meta, id, total)
return
}
}
}
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args))
var est2 int64
{
var filtersJSON []byte
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&filtersJSON)
var fl map[string]interface{}
json.Unmarshal(filtersJSON, &fl)
est2 = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est2)
}
x, err := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
if err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()})
log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err)
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
_ = x.WriteHeader(cols)
rrepo.UpdateProgress(a.meta, id, 0)
rows, err := db.Query(q, args...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
return
}
defer rows.Close()
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var tick int64
for rows.Next() {
if err := rows.Scan(dest...); err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = toString(out[i])
}
}
vals = transformRow(jobDS, fields, vals)
x.WriteRow(vals)
count++
tick++
if tick%200 == 0 {
rrepo.UpdateProgress(a.meta, id, count)
}
}
p, size, _ := x.Close()
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
rrepo.ZipAndRecord(a.meta, id, []string{p}, count)
rrepo.MarkCompleted(a.meta, id, count)
return
}
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, updated_at=? WHERE id= ?", "failed", time.Now(), time.Now(), id)
}
// recompute final rows for a job and correct export_jobs.total_rows
func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr string) {
id, _ := strconv.ParseUint(idStr, 10, 64)
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
if err := row.Scan(&tplID, &filtersJSON); err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
var ds string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&ds, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
q, args, err := exporter.BuildSQL(req, wl)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
dataDB := a.selectDataDB(ds)
final := repo.NewExportRepo().Count(dataDB, q, args)
repo.NewExportRepo().MarkCompleted(a.meta, id, final)
ok(w, r, map[string]interface{}{"id": id, "final_rows": final})
}
func (a *ExportsAPI) selectDataDB(ds string) *sql.DB {
if ds == "ymt" {
return a.ymt
}
return a.marketing
}
func splitByDays(startStr, endStr string, stepDays int) [][2]string {
layout := "2006-01-02 15:04:05"
s, es := strings.TrimSpace(startStr), strings.TrimSpace(endStr)
st, err1 := time.Parse(layout, s)
en, err2 := time.Parse(layout, es)
if err1 != nil || err2 != nil || !en.After(st) || stepDays <= 0 {
return [][2]string{{s, es}}
}
var out [][2]string
cur := st
step := time.Duration(stepDays) * 24 * time.Hour
for cur.Before(en) {
nxt := cur.Add(step)
if nxt.After(en) {
nxt = en
}
out = append(out, [2]string{cur.Format(layout), nxt.Format(layout)})
cur = nxt
}
return out
}
func chooseBatch(estimate int64, fmt string) int {
if fmt == "xlsx" {
return 5000
}
if estimate <= 0 {
return 10000
}
if estimate < 50000 {
return 10000
}
if estimate < 200000 {
return 20000
}
if estimate < 500000 {
return 50000
}
if estimate >= 2000000 {
return 100000
}
return 50000
}
// moved to repo layer: repo.ZipAndRecord
func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
d, err := rrepo.GetJob(a.meta, id)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
flist, _ := rrepo.ListJobFiles(a.meta, id)
files := []map[string]interface{}{}
for _, f := range flist {
files = append(files, map[string]interface{}{"storage_uri": f.URI.String, "sheet_name": f.Sheet.String, "row_count": f.RowCount.Int64, "size_bytes": f.SizeBytes.Int64})
}
evalStatus := "通过"
if d.ExplainScore.Int64 < 60 {
evalStatus = "禁止"
}
desc := fmt.Sprintf("评分:%d估算行数:%d%s", d.ExplainScore.Int64, d.TotalRows.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[d.ExplainScore.Int64 >= 60])
if d.ExplainJSON.Valid && d.ExplainJSON.String != "" {
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(d.ExplainJSON.String), &arr); err == nil {
segs := []string{}
for _, r := range arr {
getStr := func(field string) string {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return ""
}
if s, ok := mm["String"].(string); ok {
return s
}
}
}
return ""
}
getInt := func(field string) int64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Int64"].(float64); ok {
return int64(f)
}
}
}
return 0
}
getFloat := func(field string) float64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Float64"].(float64); ok {
return f
}
}
}
return 0
}
tbl := getStr("Table")
typ := getStr("Type")
if typ == "" {
typ = getStr("SelectType")
}
key := getStr("Key")
rowsN := getInt("Rows")
filt := getFloat("Filtered")
extra := getStr("Extra")
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
continue
}
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
if extra != "" {
s += ", 额外:" + extra
}
segs = append(segs, s)
}
if len(segs) > 0 {
desc = strings.Join(segs, "")
}
}
}
ok(w, r, map[string]interface{}{"id": d.ID, "template_id": d.TemplateID, "status": d.Status, "requested_by": d.RequestedBy, "file_format": d.FileFormat, "total_rows": d.TotalRows.Int64, "started_at": d.StartedAt.Time, "finished_at": d.FinishedAt.Time, "created_at": d.CreatedAt, "updated_at": d.UpdatedAt, "files": files, "eval_status": evalStatus, "eval_desc": desc})
}
func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
var jid uint64
_, _ = fmt.Sscan(id, &jid)
tplID, filters, err := rrepo.GetJobFilters(a.meta, jid)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, tplID)
if err != nil {
fail(w, r, http.StatusBadRequest, "template not found")
return
}
var fl map[string]interface{}
json.Unmarshal(filters, &fl)
wl := Whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
q, args, err := rrepo.Build(req, wl)
if err != nil {
failCat(w, r, http.StatusBadRequest, err.Error(), "sql_build_error")
return
}
formatArg := func(a interface{}) string {
switch t := a.(type) {
case nil:
return "NULL"
case []byte:
s := string(t)
s = strings.ReplaceAll(s, "'", "''")
return "'" + s + "'"
case string:
s := strings.ReplaceAll(t, "'", "''")
return "'" + s + "'"
case int:
return strconv.Itoa(t)
case int64:
return strconv.FormatInt(t, 10)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case time.Time:
return "'" + t.Format("2006-01-02 15:04:05") + "'"
default:
return fmt.Sprintf("%v", t)
}
}
var sb strings.Builder
var ai int
for i := 0; i < len(q); i++ {
c := q[i]
if c == '?' && ai < len(args) {
sb.WriteString(formatArg(args[ai]))
ai++
} else {
sb.WriteByte(c)
}
}
ok(w, r, map[string]interface{}{"sql": q, "final_sql": sb.String()})
}
func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
uri, err := rrepo.GetLatestFileURI(a.meta, id)
if err != nil {
// fallback: try to serve local storage file by job id
// search for files named export_job_<id>_*.zip/xlsx/csv
dir := "storage"
entries, e := os.ReadDir(dir)
if e == nil {
best := ""
var bestInfo os.FileInfo
for _, ent := range entries {
name := ent.Name()
if strings.HasPrefix(name, "export_job_"+id+"_") && (strings.HasSuffix(name, ".zip") || strings.HasSuffix(name, ".xlsx") || strings.HasSuffix(name, ".csv")) {
info, _ := os.Stat(filepath.Join(dir, name))
if info != nil {
if best == "" || info.ModTime().After(bestInfo.ModTime()) {
best = name
bestInfo = info
}
}
}
}
if best != "" {
http.ServeFile(w, r, filepath.Join(dir, best))
return
}
}
fail(w, r, http.StatusNotFound, "not found")
return
}
http.ServeFile(w, r, uri)
}
func transformRow(ds string, fields []string, vals []string) []string {
for i := range fields {
if i >= len(vals) {
break
}
f := fields[i]
if f == "order.key" {
if ds == "ymt" {
key := os.Getenv("YMT_KEY_DECRYPT_KEY_B64")
if key == "" {
key = "z9DoIVLuDYEN/qsgweRA4A=="
}
if dec, err := ymtcrypto.SM4Decrypt(vals[i], key); err == nil && dec != "" {
vals[i] = dec
}
} else {
vals[i] = decodeOrderKey(vals[i])
}
}
}
return vals
}
func decodeOrderKey(s string) string {
if s == "" {
return s
}
if len(s) > 2 && s[len(s)-2:] == "_1" {
s = s[:len(s)-2]
}
var n big.Int
if _, ok := n.SetString(s, 10); !ok {
return s
}
base := []rune{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j', 'k', 'l', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
baseCount := big.NewInt(int64(len(base)))
zero := big.NewInt(0)
var out []rune
for n.Cmp(zero) > 0 {
var mod big.Int
mod.Mod(&n, baseCount)
out = append(out, base[mod.Int64()])
n.Div(&n, baseCount)
}
for len(out) < 16 {
out = append(out, base[0])
}
for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 {
out[i], out[j] = out[j], out[i]
}
return string(out)
}
func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) {
a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", "canceled", time.Now(), id)
w.Write([]byte("ok"))
}
func toString(v interface{}) string {
switch t := v.(type) {
case []byte:
return string(t)
case string:
return t
case int64:
return strconv.FormatInt(t, 10)
case int:
return strconv.Itoa(t)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case bool:
if t {
return "1"
}
return "0"
case time.Time:
return t.Format("2006-01-02 15:04:05")
default:
return fmt.Sprintf("%v", t)
}
}
func renderSQL(q string, args []interface{}) string {
formatArg := func(a interface{}) string {
switch t := a.(type) {
case nil:
return "NULL"
case []byte:
s := string(t)
s = strings.ReplaceAll(s, "'", "''")
return "'" + s + "'"
case string:
s := strings.ReplaceAll(t, "'", "''")
return "'" + s + "'"
case int:
return strconv.Itoa(t)
case int64:
return strconv.FormatInt(t, 10)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case time.Time:
return "'" + t.Format("2006-01-02 15:04:05") + "'"
default:
return fmt.Sprintf("%v", t)
}
}
var sb strings.Builder
var ai int
for i := 0; i < len(q); i++ {
c := q[i]
if c == '?' && ai < len(args) {
sb.WriteString(formatArg(args[ai]))
ai++
} else {
sb.WriteByte(c)
}
}
return sb.String()
}
func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
page := 1
size := 15
if p := q.Get("page"); p != "" {
if n, err := strconv.Atoi(p); err == nil && n > 0 {
page = n
}
}
if s := q.Get("page_size"); s != "" {
if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 {
size = n
}
}
tplIDStr := q.Get("template_id")
var tplID uint64
if tplIDStr != "" {
if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil {
tplID = n
}
}
offset := (page - 1) * size
rrepo := repo.NewExportRepo()
var totalCount int64
uidStr := q.Get("userId")
totalCount = rrepo.CountJobs(a.meta, tplID, uidStr)
itemsRaw, err := rrepo.ListJobs(a.meta, tplID, uidStr, size, offset)
if err != nil {
failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error")
return
}
items := []map[string]interface{}{}
for _, it := range itemsRaw {
id, tid, req := it.ID, it.TemplateID, it.RequestedBy
status, fmtstr := it.Status, it.FileFormat
estimate, total := it.RowEstimate, it.TotalRows
createdAt, updatedAt := it.CreatedAt, it.UpdatedAt
score, explainRaw := it.ExplainScore, it.ExplainJSON
evalStatus := "通过"
if score.Int64 < 60 {
evalStatus = "禁止"
}
desc := fmt.Sprintf("评分:%d估算行数:%d%s", score.Int64, estimate.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[score.Int64 >= 60])
if explainRaw.Valid && explainRaw.String != "" {
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(explainRaw.String), &arr); err == nil {
segs := []string{}
for _, r := range arr {
getStr := func(field string) string {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return ""
}
if s, ok := mm["String"].(string); ok {
return s
}
}
}
return ""
}
getInt := func(field string) int64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Int64"].(float64); ok {
return int64(f)
}
}
}
return 0
}
getFloat := func(field string) float64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Float64"].(float64); ok {
return f
}
}
}
return 0
}
tbl := getStr("Table")
typ := getStr("Type")
if typ == "" {
typ = getStr("SelectType")
}
key := getStr("Key")
rowsN := getInt("Rows")
filt := getFloat("Filtered")
extra := getStr("Extra")
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
continue
}
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
if extra != "" {
s += ", 额外:" + extra
}
segs = append(segs, s)
}
if len(segs) > 0 {
desc = strings.Join(segs, "")
}
}
}
m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc}
items = append(items, m)
}
ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size})
}
// mergePermissionIntoFilters injects permission scope into filters in a canonical way
func mergePermissionIntoFilters(ds, main string, perm map[string]interface{}, filters map[string]interface{}) map[string]interface{} {
if filters == nil {
filters = map[string]interface{}{}
}
// if creator_in already present, keep it
if hasNonEmptyIDs(filters["creator_in"]) {
return filters
}
// try known keys
candidates := []string{"creator_in", "creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"}
ids := []interface{}{}
for _, k := range candidates {
if perm == nil {
break
}
if v, ok := perm[k]; ok {
ids = normalizeIDs(v)
if len(ids) > 0 {
break
}
}
}
// also check filters incoming alternative keys and normalize into creator_in
if len(ids) == 0 {
alt := []string{"creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"}
for _, k := range alt {
if v, ok := filters[k]; ok {
ids = normalizeIDs(v)
if len(ids) > 0 {
break
}
}
}
}
if len(ids) > 0 {
filters["creator_in"] = ids
}
// map alternative permission boundaries to supported filters
// reseller/merchant -> reseller_id_eq
if v, ok := pickFirst(perm, filters, []string{"reseller_id", "merchant_id"}); ok {
filters["reseller_id_eq"] = v
}
// plan/activity -> plan_id_eq
if v, ok := pickFirst(perm, filters, []string{"plan_id", "activity_id"}); ok {
filters["plan_id_eq"] = v
}
// account
if v, ok := pickFirst(perm, filters, []string{"account", "account_no"}); ok {
filters["account_eq"] = v
}
// out_trade_no
if v, ok := pickFirst(perm, filters, []string{"out_trade_no", "out_order_no"}); ok {
filters["out_trade_no_eq"] = v
}
return filters
}
func normalizeIDs(v interface{}) []interface{} {
out := []interface{}{}
switch t := v.(type) {
case []interface{}:
for _, x := range t {
if s := toString(x); s != "" {
out = append(out, s)
}
}
case []string:
for _, s := range t {
s2 := strings.TrimSpace(s)
if s2 != "" {
out = append(out, s2)
}
}
case []int:
for _, n := range t {
out = append(out, n)
}
case []int64:
for _, n := range t {
out = append(out, n)
}
case string:
// support comma-separated string
parts := strings.Split(t, ",")
for _, s := range parts {
s2 := strings.TrimSpace(s)
if s2 != "" {
out = append(out, s2)
}
}
default:
if s := toString(t); s != "" {
out = append(out, s)
}
}
return out
}
func hasNonEmptyIDs(v interface{}) bool {
arr := normalizeIDs(v)
return len(arr) > 0
}
func pickFirst(perm map[string]interface{}, filters map[string]interface{}, keys []string) (interface{}, bool) {
for _, k := range keys {
if perm != nil {
if v, ok := perm[k]; ok {
arr := normalizeIDs(v)
if len(arr) > 0 {
return arr[0], true
}
if s := toString(v); s != "" {
return s, true
}
}
}
if v, ok := filters[k]; ok {
arr := normalizeIDs(v)
if len(arr) > 0 {
return arr[0], true
}
if s := toString(v); s != "" {
return s, true
}
}
}
return nil, false
}