MarketingSystemDataExportTool/server/internal/api/exports.go

1385 lines
43 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package api
import (
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"math/big"
"net/http"
"os"
"path/filepath"
"server/internal/constants"
"server/internal/exporter"
"server/internal/logging"
"server/internal/repo"
"server/internal/utils"
"server/internal/ymtcrypto"
"strconv"
"strings"
"time"
)
type ExportsAPI struct {
meta *sql.DB
marketing *sql.DB
ymt *sql.DB
}
func ExportsHandler(meta, marketing, ymt *sql.DB) http.Handler {
api := &ExportsAPI{meta: meta, marketing: marketing, ymt: ymt}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
if r.Method == http.MethodPost && p == "" {
api.create(w, r)
return
}
if r.Method == http.MethodGet && p == "" {
api.list(w, r)
return
}
if strings.HasPrefix(p, "/") {
id := strings.TrimPrefix(p, "/")
if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") {
if strings.HasSuffix(p, "/sql") {
id = strings.TrimSuffix(id, "/sql")
api.getSQL(w, r, id)
return
}
api.get(w, r, id)
return
}
if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") {
id = strings.TrimSuffix(id, "/download")
api.download(w, r, id)
return
}
if r.Method == http.MethodPost && strings.HasSuffix(p, "/recompute") {
id = strings.TrimSuffix(id, "/recompute")
api.recompute(w, r, id)
return
}
if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") {
id = strings.TrimSuffix(id, "/cancel")
api.cancel(w, r, id)
return
}
}
w.WriteHeader(http.StatusNotFound)
})
}
type ExportPayload struct {
TemplateID uint64 `json:"template_id"`
RequestedBy uint64 `json:"requested_by"`
Permission map[string]interface{} `json:"permission"`
Options map[string]interface{} `json:"options"`
FileFormat string `json:"file_format"`
Filters map[string]interface{} `json:"filters"`
Datasource string `json:"datasource"`
}
func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
var p ExportPayload
json.Unmarshal(b, &p)
r = WithPayload(r, p)
var main string
var ds string
rrepo := repo.NewExportRepo()
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, p.TemplateID)
if err != nil {
fail(w, r, http.StatusBadRequest, "invalid template")
return
}
if p.Datasource != "" {
ds = p.Datasource
}
wl := Whitelist()
// ensure filters map initialized
if p.Filters == nil {
p.Filters = map[string]interface{}{}
}
// merge permission scope into filters to enforce boundary
p.Filters = mergePermissionIntoFilters(p.Datasource, main, p.Permission, p.Filters)
// support multiple userId in query: e.g., userId=15,25 → filters.creator_in
{
uidStr := r.URL.Query().Get("userId")
if uidStr != "" {
parts := strings.Split(uidStr, ",")
ids := make([]interface{}, 0, len(parts))
for _, s := range parts {
s = strings.TrimSpace(s)
if s == "" {
continue
}
if n, err := strconv.ParseUint(s, 10, 64); err == nil {
ids = append(ids, n)
}
}
if len(ids) > 0 {
// FORCE set creator_in if URL params are present, even if p.Filters had something else (which is unlikely if mergePermission worked, but let's be safe)
// Actually, we should probably append or merge? For now, let's assume URL overrides or merges if key missing.
// Logic before was: if _, exists := p.Filters["creator_in"]; !exists { ... }
// But if user passed userId in URL, they probably want it to be used.
// If p.Filters["creator_in"] came from `Permission`, it might be the logged-in user.
// If the user is an admin acting as another user (passed in URL), we should probably use the URL one?
// But `mergePermissionIntoFilters` logic is strict.
// Let's keep existing logic: if permission set it, don't override.
// Wait, if permission is empty (e.g. admin), then `creator_in` is missing.
if _, exists := p.Filters["creator_in"]; !exists {
p.Filters["creator_in"] = ids
} else {
// If it exists, should we merge?
// If the existing one is from permission, it's a boundary.
// If we are admin, permission might be empty.
// Let's trust `mergePermissionIntoFilters`.
}
}
}
}
// support multiple merchantId in query: e.g., merchantId=1,2,3 → filters.merchant_id_in
{
midStr := r.URL.Query().Get("merchantId")
if midStr != "" {
parts := strings.Split(midStr, ",")
ids := make([]interface{}, 0, len(parts))
for _, s := range parts {
s = strings.TrimSpace(s)
if s == "" {
continue
}
if n, err := strconv.ParseUint(s, 10, 64); err == nil {
ids = append(ids, n)
}
}
if len(ids) > 0 {
if _, exists := p.Filters["merchant_id_in"]; !exists {
p.Filters["merchant_id_in"] = ids
}
}
}
}
// DEBUG LOGGING
logging.JSON("INFO", map[string]interface{}{
"event": "export_filters_debug",
"filters": p.Filters,
"has_creator_in": hasNonEmptyIDs(p.Filters["creator_in"]),
"has_merchant_id_in": hasNonEmptyIDs(p.Filters["merchant_id_in"]),
})
if ds == "marketing" && (main == "order" || main == "order_info") {
if v, ok := p.Filters["create_time_between"]; ok {
switch t := v.(type) {
case []interface{}:
if len(t) != 2 {
fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值")
return
}
case []string:
if len(t) != 2 {
fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值")
return
}
default:
fail(w, r, http.StatusBadRequest, "create_time_between 格式错误")
return
}
} else {
fail(w, r, http.StatusBadRequest, "缺少时间过滤:必须提供 create_time_between")
return
}
}
filtered := make([]string, 0, len(fs))
tv := 0
if v, ok := p.Filters["type_eq"]; ok {
switch t := v.(type) {
case float64:
tv = int(t)
case int:
tv = t
case string:
s := strings.TrimSpace(t)
for i := 0; i < len(s); i++ {
c := s[i]
if c >= '0' && c <= '9' {
tv = tv*10 + int(c-'0')
}
}
}
}
// Normalize template fields preserving order
normalized := make([]string, 0, len(fs))
for _, tf := range fs {
if ds == "ymt" && strings.HasPrefix(tf, "order_info.") {
tf = strings.Replace(tf, "order_info.", "order.", 1)
}
if ds == "marketing" && tf == "order_voucher.channel_batch_no" {
tf = "order_voucher.channel_activity_id"
}
normalized = append(normalized, tf)
}
// 移除 YMT 无效字段key批次
if ds == "ymt" {
tmp := make([]string, 0, len(normalized))
for _, tf := range normalized {
if tf == "order.key_batch_id" || tf == "order.key_batch_name" {
continue
}
tmp = append(tmp, tf)
}
normalized = tmp
}
// whitelist validation & soft removal of disallowed fields
bad := []string{}
filtered = make([]string, 0, len(normalized))
for _, tf := range normalized {
if !wl[tf] {
bad = append(bad, tf)
continue
}
filtered = append(filtered, tf)
}
if len(bad) > 0 {
logging.JSON("ERROR", map[string]interface{}{"event": "fields_not_whitelisted", "removed": bad})
}
// 字段去重:移除完全重复的字段(包括主表自身的重复)
{
seen := make(map[string]bool)
deduped := make([]string, 0, len(filtered))
removed := []string{}
for _, tf := range filtered {
if seen[tf] {
removed = append(removed, tf)
continue
}
seen[tf] = true
deduped = append(deduped, tf)
}
if len(removed) > 0 {
logging.JSON("INFO", map[string]interface{}{"event": "fields_deduplicated_exact", "removed": removed, "reason": "移除完全重复的字段"})
}
filtered = deduped
}
// 主表和副表相同字段去重:以主表为主,移除副表的重复字段
if ds == "ymt" && (main == "order" || main == "order_info") {
mainTableFields := make(map[string]bool)
// 先收集主表的所有字段名
for _, tf := range filtered {
parts := strings.Split(tf, ".")
if len(parts) == 2 && parts[0] == "order" {
mainTableFields[parts[1]] = true
}
}
if len(mainTableFields) > 0 {
deduped := make([]string, 0, len(filtered))
removed := []string{}
for _, tf := range filtered {
parts := strings.Split(tf, ".")
if len(parts) == 2 {
if parts[0] == "order" {
// 主表字段,保留
deduped = append(deduped, tf)
} else {
// 副表字段,检查是否与主表字段重复
if mainTableFields[parts[1]] {
// 字段名重复,移除副表字段
removed = append(removed, tf)
continue
}
// 字段名不重复,保留
deduped = append(deduped, tf)
}
} else {
// 格式不正确,保留原样
deduped = append(deduped, tf)
}
}
if len(removed) > 0 {
logging.JSON("INFO", map[string]interface{}{"event": "fields_deduplicated", "removed": removed, "reason": "主表和副表存在相同字段,以主表为主"})
}
filtered = deduped
}
}
// 易码通立减金:保留 order_voucher.grant_time移除红包领取时间列避免“领取时间”为空
if ds == "ymt" && tv == 3 {
deduped := make([]string, 0, len(filtered))
removed := []string{}
for _, tf := range filtered {
if tf == "order_cash.receive_time" {
removed = append(removed, tf)
continue
}
deduped = append(deduped, tf)
}
if len(removed) > 0 {
logging.JSON("INFO", map[string]interface{}{"event": "fields_deduplicated_receive_time", "removed": removed, "reason": "立减金保留 order_voucher.grant_time"})
}
filtered = deduped
}
// 易码通客户名称字段去重:若同时选择 order.merchant_name 与 merchant.name仅保留 merchant.name
if ds == "ymt" {
hasOrderMerchant := false
hasMerchantName := false
for _, tf := range filtered {
if tf == "order.merchant_name" {
hasOrderMerchant = true
}
if tf == "merchant.name" {
hasMerchantName = true
}
}
if hasOrderMerchant && hasMerchantName {
deduped := make([]string, 0, len(filtered))
removed := []string{}
for _, tf := range filtered {
if tf == "order.merchant_name" {
removed = append(removed, tf)
continue
}
deduped = append(deduped, tf)
}
if len(removed) > 0 {
logging.JSON("INFO", map[string]interface{}{"event": "fields_deduplicated_customer_name", "removed": removed, "reason": "易码通客户名称仅保留 merchant.name"})
}
filtered = deduped
}
}
labels := FieldLabels()
// 相同列名(中文标签)去重:如果多个表的字段共享同一列名,优先保留主表字段
{
labelIdx := map[string]int{}
deduped := make([]string, 0, len(filtered))
removed := []string{}
for _, tf := range filtered {
label := labels[tf]
if label == "" {
label = tf
}
parts := strings.Split(tf, ".")
isMain := len(parts) == 2 && parts[0] == main
if idx, ok := labelIdx[label]; ok {
prev := deduped[idx]
prevParts := strings.Split(prev, ".")
prevMain := len(prevParts) == 2 && prevParts[0] == main
switch {
case prevMain:
removed = append(removed, tf)
continue
case isMain:
removed = append(removed, prev)
deduped[idx] = tf
continue
default:
removed = append(removed, tf)
continue
}
}
labelIdx[label] = len(deduped)
deduped = append(deduped, tf)
}
if len(removed) > 0 {
logging.JSON("INFO", map[string]interface{}{"event": "fields_deduplicated_by_label", "removed": removed, "reason": "同名列优先保留主表字段"})
}
filtered = deduped
}
// 字段匹配校验(数量与顺序)
if len(filtered) != len(fs) {
logging.JSON("ERROR", map[string]interface{}{"event": "field_count_mismatch", "template_count": len(fs), "final_count": len(filtered)})
}
// relax: creator_in 非必填,若权限中提供其他边界将被合并为等值过滤
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: filtered, Filters: p.Filters}
q, args, err := rrepo.Build(req, wl)
if err != nil {
r = WithSQL(r, q)
fail(w, r, http.StatusBadRequest, err.Error())
return
}
r = WithSQL(r, q)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql", "datasource": ds, "main_table": main, "file_format": p.FileFormat, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql ds=%s main=%s fmt=%s sql=%s args=%v final_sql=%s", ds, main, p.FileFormat, q, args, renderSQL(q, args))
dataDB := a.selectDataDB(ds)
score, sugg, err := rrepo.Explain(dataDB, q, args)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
sugg = append(sugg, exporter.IndexSuggestions(req)...)
if score < constants.ExportThresholds.PassScoreThreshold {
fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d请优化索引或缩小查询范围", score))
return
}
var estimate int64
estimate = rrepo.EstimateFastChunked(dataDB, ds, main, p.Filters)
hdrs := make([]string, len(filtered))
for i, tf := range filtered {
if v, ok := labels[tf]; ok {
hdrs[i] = v
} else {
hdrs[i] = tf
}
}
// 列头去重:如果仍有重复的列头(中文标签),对非主表字段添加前缀
{
cnt := map[string]int{}
for _, h := range hdrs {
cnt[h]++
}
for i := range hdrs {
if cnt[hdrs[i]] > 1 {
parts := strings.Split(filtered[i], ".")
if len(parts) == 2 && parts[0] != main {
hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i]
}
}
}
}
// owner from query userId if provided
owner := uint64(0)
if uidStr := r.URL.Query().Get("userId"); uidStr != "" {
first := strings.TrimSpace(strings.Split(uidStr, ",")[0])
if n, err := strconv.ParseUint(first, 10, 64); err == nil {
owner = n
}
}
id, err := rrepo.InsertJob(a.meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat)
if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
go a.runJob(uint64(id), dataDB, q, args, filtered, hdrs, p.FileFormat)
ok(w, r, map[string]interface{}{"id": id})
}
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fmt string) {
defer func() {
if r := recover(); r != nil {
repo.NewExportRepo().MarkFailed(a.meta, id)
logging.JSON("ERROR", map[string]interface{}{"event": "export_panic", "job_id": id, "error": utils.ToString(r)})
}
}()
// load datasource once for transform decisions
var jobDS string
var jobMain string
rrepo := repo.NewExportRepo()
{
tplID, _, _ := rrepo.GetJobFilters(a.meta, id)
if tplID > 0 {
ds, mt, _, _ := rrepo.GetTemplateMeta(a.meta, tplID)
jobDS = ds
if mt != "" {
jobMain = mt
} else {
jobMain = "order"
}
}
}
rrepo.StartJob(a.meta, id)
if fmt == "csv" {
newBaseWriter := func() (exporter.RowWriter, error) {
return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
}
files := []string{}
{
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays)
}
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays)
}
}
if len(chunks) > 0 {
var total int64
// 如果 row_estimate 为 0在分块导出开始时重新估算
var currentEst int64
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = row.Scan(&currentEst)
if currentEst == 0 {
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
if estChunk > 0 {
rrepo.UpdateRowEstimate(a.meta, id, estChunk)
}
}
skipChunk := false
if tplDS == "marketing" && main == "order" {
for _, f := range fs {
if strings.HasPrefix(f, "order_voucher.") {
skipChunk = true
break
}
}
if !skipChunk {
if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok {
skipChunk = true
}
}
}
if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold {
cur := rrepo.NewCursor(tplDS, main)
batch := constants.ChooseBatchSize(0, constants.FileFormatCSV)
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
cq, cargs, err := exporter.BuildSQL(req, wl)
if err != nil {
continue
}
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if e != nil {
rrepo.MarkFailed(a.meta, id)
return
}
total += cnt
rrepo.UpdateProgress(a.meta, id, total)
}
if total == 0 {
total = rrepo.Count(db, q, args)
}
if len(files) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files, total)
}
rrepo.MarkCompleted(a.meta, id, total)
return
}
}
}
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args))
{
var est int64
{
var filtersJSON []byte
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&filtersJSON)
var fl map[string]interface{}
json.Unmarshal(filtersJSON, &fl)
est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est)
}
batch := constants.ChooseBatchSize(est, constants.FileFormat(fmt))
files2 := []string{}
cur := rrepo.NewCursor(jobDS, jobMain)
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) }
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files2 = append(files2, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if err != nil {
rrepo.MarkFailed(a.meta, id)
return
}
if len(files2) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files2, count)
}
rrepo.MarkCompleted(a.meta, id, count)
return
}
}
if fmt == "xlsx" {
files := []string{}
{
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays)
}
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays)
}
}
if len(chunks) > 0 {
var total int64
// 如果 row_estimate 为 0在分块导出开始时重新估算
var currentEst int64
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = row.Scan(&currentEst)
if currentEst == 0 {
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
if estChunk > 0 {
rrepo.UpdateRowEstimate(a.meta, id, estChunk)
}
}
skipChunk := false
if tplDS == "marketing" && main == "order" {
for _, f := range fs {
if strings.HasPrefix(f, "order_voucher.") {
skipChunk = true
break
}
}
if !skipChunk {
if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok {
skipChunk = true
}
}
}
if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold {
cur := rrepo.NewCursor(tplDS, main)
batch := constants.ChooseBatchSize(0, constants.FileFormatXLSX)
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
cq, cargs, err := rrepo.Build(req, wl)
if err != nil {
continue
}
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
newWriter := func() (exporter.RowWriter, error) {
xw, e := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
if e == nil {
_ = xw.WriteHeader(cols)
}
return xw, e
}
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
// 进度回调按全局累计行数更新,避免跨分片出现数值回退
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if e != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error()})
log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e)
rrepo.MarkFailed(a.meta, id)
return
}
total += cnt
rrepo.UpdateProgress(a.meta, id, total)
}
if total == 0 {
total = rrepo.Count(db, q, args)
}
if len(files) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files, total)
}
rrepo.MarkCompleted(a.meta, id, total)
return
}
}
}
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args))
var est2 int64
{
var filtersJSON []byte
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&filtersJSON)
var fl map[string]interface{}
json.Unmarshal(filtersJSON, &fl)
est2 = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est2)
}
x, err := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
if err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()})
log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err)
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", string(constants.JobStatusFailed), time.Now(), id)
return
}
_ = x.WriteHeader(cols)
rrepo.UpdateProgress(a.meta, id, 0)
rows, err := db.Query(q, args...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", string(constants.JobStatusFailed), time.Now(), id)
return
}
defer rows.Close()
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var tick int64
for rows.Next() {
if err := rows.Scan(dest...); err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", string(constants.JobStatusFailed), time.Now(), id)
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = utils.ToString(out[i])
}
}
vals = transformRow(jobDS, fields, vals)
x.WriteRow(vals)
count++
tick++
if tick%200 == 0 {
rrepo.UpdateProgress(a.meta, id, count)
}
}
p, size, _ := x.Close()
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
rrepo.ZipAndRecord(a.meta, id, []string{p}, count)
rrepo.MarkCompleted(a.meta, id, count)
return
}
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, updated_at=? WHERE id= ?", string(constants.JobStatusFailed), time.Now(), time.Now(), id)
}
// recompute final rows for a job and correct export_jobs.total_rows
func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr string) {
id, _ := strconv.ParseUint(idStr, 10, 64)
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
if err := row.Scan(&tplID, &filtersJSON); err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
var ds string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&ds, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
q, args, err := exporter.BuildSQL(req, wl)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
dataDB := a.selectDataDB(ds)
final := repo.NewExportRepo().Count(dataDB, q, args)
repo.NewExportRepo().MarkCompleted(a.meta, id, final)
ok(w, r, map[string]interface{}{"id": id, "final_rows": final})
}
func (a *ExportsAPI) selectDataDB(ds string) *sql.DB {
if ds == "ymt" {
return a.ymt
}
return a.marketing
}
// moved to repo layer: repo.ZipAndRecord
func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
d, err := rrepo.GetJob(a.meta, id)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
flist, _ := rrepo.ListJobFiles(a.meta, id)
files := []map[string]interface{}{}
for _, f := range flist {
files = append(files, map[string]interface{}{"storage_uri": f.URI.String, "sheet_name": f.Sheet.String, "row_count": f.RowCount.Int64, "size_bytes": f.SizeBytes.Int64})
}
evalStatus := "通过"
if d.ExplainScore.Int64 < 60 {
evalStatus = "禁止"
}
desc := fmt.Sprintf("评分:%d估算行数:%d%s", d.ExplainScore.Int64, d.TotalRows.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[d.ExplainScore.Int64 >= 60])
if d.ExplainJSON.Valid && d.ExplainJSON.String != "" {
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(d.ExplainJSON.String), &arr); err == nil {
segs := []string{}
for _, r := range arr {
getStr := func(field string) string {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return ""
}
if s, ok := mm["String"].(string); ok {
return s
}
}
}
return ""
}
getInt := func(field string) int64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Int64"].(float64); ok {
return int64(f)
}
}
}
return 0
}
getFloat := func(field string) float64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Float64"].(float64); ok {
return f
}
}
}
return 0
}
tbl := getStr("Table")
typ := getStr("Type")
if typ == "" {
typ = getStr("SelectType")
}
key := getStr("Key")
rowsN := getInt("Rows")
filt := getFloat("Filtered")
extra := getStr("Extra")
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
continue
}
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
if extra != "" {
s += ", 额外:" + extra
}
segs = append(segs, s)
}
if len(segs) > 0 {
desc = strings.Join(segs, "")
}
}
}
ok(w, r, map[string]interface{}{"id": d.ID, "template_id": d.TemplateID, "status": d.Status, "requested_by": d.RequestedBy, "file_format": d.FileFormat, "total_rows": d.TotalRows.Int64, "started_at": d.StartedAt.Time, "finished_at": d.FinishedAt.Time, "created_at": d.CreatedAt, "updated_at": d.UpdatedAt, "files": files, "eval_status": evalStatus, "eval_desc": desc})
}
func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
var jid uint64
_, _ = fmt.Sscan(id, &jid)
tplID, filters, err := rrepo.GetJobFilters(a.meta, jid)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, tplID)
if err != nil {
fail(w, r, http.StatusBadRequest, "template not found")
return
}
var fl map[string]interface{}
json.Unmarshal(filters, &fl)
wl := Whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
q, args, err := rrepo.Build(req, wl)
if err != nil {
failCat(w, r, http.StatusBadRequest, err.Error(), "sql_build_error")
return
}
formatArg := func(a interface{}) string {
switch t := a.(type) {
case nil:
return "NULL"
case []byte:
s := string(t)
s = strings.ReplaceAll(s, "'", "''")
return "'" + s + "'"
case string:
s := strings.ReplaceAll(t, "'", "''")
return "'" + s + "'"
case int:
return strconv.Itoa(t)
case int64:
return strconv.FormatInt(t, 10)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case time.Time:
return "'" + t.Format("2006-01-02 15:04:05") + "'"
default:
return fmt.Sprintf("%v", t)
}
}
var sb strings.Builder
var ai int
for i := 0; i < len(q); i++ {
c := q[i]
if c == '?' && ai < len(args) {
sb.WriteString(formatArg(args[ai]))
ai++
} else {
sb.WriteByte(c)
}
}
ok(w, r, map[string]interface{}{"sql": q, "final_sql": sb.String()})
}
func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
uri, err := rrepo.GetLatestFileURI(a.meta, id)
if err != nil {
// fallback: try to serve local storage file by job id
// search for files named export_job_<id>_*.zip/xlsx/csv
dir := "storage"
entries, e := os.ReadDir(dir)
if e == nil {
best := ""
var bestInfo os.FileInfo
for _, ent := range entries {
name := ent.Name()
if strings.HasPrefix(name, "export_job_"+id+"_") && (strings.HasSuffix(name, ".zip") || strings.HasSuffix(name, ".xlsx") || strings.HasSuffix(name, ".csv")) {
info, _ := os.Stat(filepath.Join(dir, name))
if info != nil {
if best == "" || info.ModTime().After(bestInfo.ModTime()) {
best = name
bestInfo = info
}
}
}
}
if best != "" {
http.ServeFile(w, r, filepath.Join(dir, best))
return
}
}
fail(w, r, http.StatusNotFound, "not found")
return
}
http.ServeFile(w, r, uri)
}
func transformRow(ds string, fields []string, vals []string) []string {
payStatusIdx := -1
for i := range fields {
if fields[i] == "order.pay_status" {
payStatusIdx = i
break
}
}
isPaid := func() bool {
if payStatusIdx < 0 || payStatusIdx >= len(vals) {
return true
}
return constants.IsPaidStatus(ds, vals[payStatusIdx])
}()
for i := range fields {
if i >= len(vals) {
break
}
f := fields[i]
if f == "order.key" {
if ds == "ymt" {
key := os.Getenv("YMT_KEY_DECRYPT_KEY_B64")
if key == "" {
key = "z9DoIVLuDYEN/qsgweRA4A=="
}
if dec, err := ymtcrypto.SM4Decrypt(vals[i], key); err == nil && dec != "" {
vals[i] = dec
}
} else {
vals[i] = decodeOrderKey(vals[i])
}
}
if f == "activity.channels" {
if vals[i] == "" || vals[i] == "0" {
vals[i] = "无"
continue
}
if !isPaid {
vals[i] = "无"
continue
}
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(vals[i]), &arr); err != nil {
vals[i] = "无"
continue
}
names := make([]string, 0, len(arr))
for _, item := range arr {
if v, ok := item["pay_name"].(string); ok && strings.TrimSpace(v) != "" {
names = append(names, v)
continue
}
if v, ok := item["name"].(string); ok && strings.TrimSpace(v) != "" {
names = append(names, v)
}
}
if len(names) == 0 {
vals[i] = "无"
} else {
vals[i] = strings.Join(names, ",")
}
}
}
return vals
}
func decodeOrderKey(s string) string {
if s == "" {
return s
}
if len(s) > 2 && s[len(s)-2:] == "_1" {
s = s[:len(s)-2]
}
var n big.Int
if _, ok := n.SetString(s, 10); !ok {
return s
}
base := []rune{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j', 'k', 'l', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
baseCount := big.NewInt(int64(len(base)))
zero := big.NewInt(0)
var out []rune
for n.Cmp(zero) > 0 {
var mod big.Int
mod.Mod(&n, baseCount)
out = append(out, base[mod.Int64()])
n.Div(&n, baseCount)
}
for len(out) < 16 {
out = append(out, base[0])
}
for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 {
out[i], out[j] = out[j], out[i]
}
return string(out)
}
func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) {
a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", string(constants.JobStatusCanceled), time.Now(), id)
w.Write([]byte("ok"))
}
func renderSQL(q string, args []interface{}) string {
formatArg := func(a interface{}) string {
switch t := a.(type) {
case nil:
return "NULL"
case []byte:
s := string(t)
s = strings.ReplaceAll(s, "'", "''")
return "'" + s + "'"
case string:
s := strings.ReplaceAll(t, "'", "''")
return "'" + s + "'"
case int:
return strconv.Itoa(t)
case int64:
return strconv.FormatInt(t, 10)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case time.Time:
return "'" + t.Format("2006-01-02 15:04:05") + "'"
default:
return fmt.Sprintf("%v", t)
}
}
var sb strings.Builder
var ai int
for i := 0; i < len(q); i++ {
c := q[i]
if c == '?' && ai < len(args) {
sb.WriteString(formatArg(args[ai]))
ai++
} else {
sb.WriteByte(c)
}
}
return sb.String()
}
func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
page := 1
size := 15
if p := q.Get("page"); p != "" {
if n, err := strconv.Atoi(p); err == nil && n > 0 {
page = n
}
}
if s := q.Get("page_size"); s != "" {
if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 {
size = n
}
}
tplIDStr := q.Get("template_id")
var tplID uint64
if tplIDStr != "" {
if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil {
tplID = n
}
}
offset := (page - 1) * size
rrepo := repo.NewExportRepo()
var totalCount int64
uidStr := q.Get("userId")
totalCount = rrepo.CountJobs(a.meta, tplID, uidStr)
itemsRaw, err := rrepo.ListJobs(a.meta, tplID, uidStr, size, offset)
if err != nil {
failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error")
return
}
items := []map[string]interface{}{}
for _, it := range itemsRaw {
id, tid, req := it.ID, it.TemplateID, it.RequestedBy
status, fmtstr := it.Status, it.FileFormat
estimate, total := it.RowEstimate, it.TotalRows
createdAt, updatedAt := it.CreatedAt, it.UpdatedAt
score, explainRaw := it.ExplainScore, it.ExplainJSON
evalStatus := "通过"
if score.Int64 < 60 {
evalStatus = "禁止"
}
desc := fmt.Sprintf("评分:%d估算行数:%d%s", score.Int64, estimate.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[score.Int64 >= 60])
if explainRaw.Valid && explainRaw.String != "" {
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(explainRaw.String), &arr); err == nil {
segs := []string{}
for _, r := range arr {
getStr := func(field string) string {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return ""
}
if s, ok := mm["String"].(string); ok {
return s
}
}
}
return ""
}
getInt := func(field string) int64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Int64"].(float64); ok {
return int64(f)
}
}
}
return 0
}
getFloat := func(field string) float64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Float64"].(float64); ok {
return f
}
}
}
return 0
}
tbl := getStr("Table")
typ := getStr("Type")
if typ == "" {
typ = getStr("SelectType")
}
key := getStr("Key")
rowsN := getInt("Rows")
filt := getFloat("Filtered")
extra := getStr("Extra")
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
continue
}
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
if extra != "" {
s += ", 额外:" + extra
}
segs = append(segs, s)
}
if len(segs) > 0 {
desc = strings.Join(segs, "")
}
}
}
m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc}
items = append(items, m)
}
ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size})
}
// mergePermissionIntoFilters injects permission scope into filters in a canonical way
func mergePermissionIntoFilters(ds, main string, perm map[string]interface{}, filters map[string]interface{}) map[string]interface{} {
if filters == nil {
filters = map[string]interface{}{}
}
// if creator_in already present, keep it
if hasNonEmptyIDs(filters["creator_in"]) {
return filters
}
// try known keys
candidates := []string{"creator_in", "creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"}
ids := []interface{}{}
for _, k := range candidates {
if perm == nil {
break
}
if v, ok := perm[k]; ok {
ids = normalizeIDs(v)
if len(ids) > 0 {
break
}
}
}
// also check filters incoming alternative keys and normalize into creator_in
if len(ids) == 0 {
alt := []string{"creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"}
for _, k := range alt {
if v, ok := filters[k]; ok {
ids = normalizeIDs(v)
if len(ids) > 0 {
break
}
}
}
}
if len(ids) > 0 {
filters["creator_in"] = ids
}
// map alternative permission boundaries to supported filters
// reseller/merchant -> reseller_id_eq
if v, ok := pickFirst(perm, filters, []string{"reseller_id", "merchant_id"}); ok {
filters["reseller_id_eq"] = v
}
// plan/activity -> plan_id_eq
if v, ok := pickFirst(perm, filters, []string{"plan_id", "activity_id"}); ok {
filters["plan_id_eq"] = v
}
// account
if v, ok := pickFirst(perm, filters, []string{"account", "account_no"}); ok {
filters["account_eq"] = v
}
// out_trade_no
if v, ok := pickFirst(perm, filters, []string{"out_trade_no", "out_order_no"}); ok {
filters["out_trade_no_eq"] = v
}
return filters
}
func normalizeIDs(v interface{}) []interface{} {
out := []interface{}{}
switch t := v.(type) {
case []interface{}:
for _, x := range t {
if s := utils.ToString(x); s != "" {
out = append(out, s)
}
}
case []string:
for _, s := range t {
s2 := strings.TrimSpace(s)
if s2 != "" {
out = append(out, s2)
}
}
case []int:
for _, n := range t {
out = append(out, n)
}
case []int64:
for _, n := range t {
out = append(out, n)
}
case string:
// support comma-separated string
parts := strings.Split(t, ",")
for _, s := range parts {
s2 := strings.TrimSpace(s)
if s2 != "" {
out = append(out, s2)
}
}
default:
if s := utils.ToString(t); s != "" {
out = append(out, s)
}
}
return out
}
func hasNonEmptyIDs(v interface{}) bool {
arr := normalizeIDs(v)
return len(arr) > 0
}
func pickFirst(perm map[string]interface{}, filters map[string]interface{}, keys []string) (interface{}, bool) {
for _, k := range keys {
if perm != nil {
if v, ok := perm[k]; ok {
arr := normalizeIDs(v)
if len(arr) > 0 {
return arr[0], true
}
if s := utils.ToString(v); s != "" {
return s, true
}
}
}
if v, ok := filters[k]; ok {
arr := normalizeIDs(v)
if len(arr) > 0 {
return arr[0], true
}
if s := utils.ToString(v); s != "" {
return s, true
}
}
}
return nil, false
}