MarketingSystemDataExportTool/server/internal/api/exports.go

1707 lines
51 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package api
import (
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"math/big"
"net/http"
"os"
"path/filepath"
"server/internal/constants"
"server/internal/exporter"
"server/internal/logging"
"server/internal/repo"
"server/internal/utils"
"server/internal/ymtcrypto"
"strconv"
"strings"
"time"
)
type ExportsAPI struct {
meta *sql.DB
marketing *sql.DB
ymt *sql.DB
}
func ExportsHandler(meta, marketing, ymt *sql.DB) http.Handler {
api := &ExportsAPI{meta: meta, marketing: marketing, ymt: ymt}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
if r.Method == http.MethodPost && p == "" {
api.create(w, r)
return
}
if r.Method == http.MethodGet && p == "" {
api.list(w, r)
return
}
if strings.HasPrefix(p, "/") {
id := strings.TrimPrefix(p, "/")
if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") {
if strings.HasSuffix(p, "/sql") {
id = strings.TrimSuffix(id, "/sql")
api.getSQL(w, r, id)
return
}
api.get(w, r, id)
return
}
if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") {
id = strings.TrimSuffix(id, "/download")
api.download(w, r, id)
return
}
if r.Method == http.MethodPost && strings.HasSuffix(p, "/recompute") {
id = strings.TrimSuffix(id, "/recompute")
api.recompute(w, r, id)
return
}
if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") {
id = strings.TrimSuffix(id, "/cancel")
api.cancel(w, r, id)
return
}
}
w.WriteHeader(http.StatusNotFound)
})
}
type ExportPayload struct {
TemplateID uint64 `json:"template_id"`
RequestedBy uint64 `json:"requested_by"`
Permission map[string]interface{} `json:"permission"`
Options map[string]interface{} `json:"options"`
FileFormat string `json:"file_format"`
Filters map[string]interface{} `json:"filters"`
Datasource string `json:"datasource"`
}
func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
var p ExportPayload
json.Unmarshal(b, &p)
r = WithPayload(r, p)
var main string
var ds string
rrepo := repo.NewExportRepo()
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, p.TemplateID)
if err != nil {
fail(w, r, http.StatusBadRequest, "invalid template")
return
}
if p.Datasource != "" {
ds = p.Datasource
}
// ensure filters map initialized
if p.Filters == nil {
p.Filters = map[string]interface{}{}
}
// merge permission scope into filters to enforce boundary
p.Filters = mergePermissionIntoFilters(p.Datasource, main, p.Permission, p.Filters)
// support multiple userId in query: e.g., userId=15,25 → filters.creator_in
{
uidStr := r.URL.Query().Get("userId")
if uidStr != "" {
parts := strings.Split(uidStr, ",")
ids := make([]interface{}, 0, len(parts))
for _, s := range parts {
s = strings.TrimSpace(s)
if s == "" {
continue
}
if n, err := strconv.ParseUint(s, 10, 64); err == nil {
ids = append(ids, n)
}
}
if len(ids) > 0 {
// FORCE set creator_in if URL params are present, even if p.Filters had something else (which is unlikely if mergePermission worked, but let's be safe)
// Actually, we should probably append or merge? For now, let's assume URL overrides or merges if key missing.
// Logic before was: if _, exists := p.Filters["creator_in"]; !exists { ... }
// But if user passed userId in URL, they probably want it to be used.
// If p.Filters["creator_in"] came from `Permission`, it might be the logged-in user.
// If the user is an admin acting as another user (passed in URL), we should probably use the URL one?
// But `mergePermissionIntoFilters` logic is strict.
// Let's keep existing logic: if permission set it, don't override.
// Wait, if permission is empty (e.g. admin), then `creator_in` is missing.
if _, exists := p.Filters["creator_in"]; !exists {
p.Filters["creator_in"] = ids
} else {
// If it exists, should we merge?
// If the existing one is from permission, it's a boundary.
// If we are admin, permission might be empty.
// Let's trust `mergePermissionIntoFilters`.
}
}
}
}
// support multiple merchantId in query: e.g., merchantId=1,2,3 → filters.merchant_id_in
{
midStr := r.URL.Query().Get("merchantId")
if midStr != "" {
parts := strings.Split(midStr, ",")
ids := make([]interface{}, 0, len(parts))
for _, s := range parts {
s = strings.TrimSpace(s)
if s == "" {
continue
}
if n, err := strconv.ParseUint(s, 10, 64); err == nil {
ids = append(ids, n)
}
}
if len(ids) > 0 {
if _, exists := p.Filters["merchant_id_in"]; !exists {
p.Filters["merchant_id_in"] = ids
}
}
}
}
// DEBUG LOGGING
logging.JSON("INFO", map[string]interface{}{
"event": "export_filters_debug",
"filters": p.Filters,
"has_creator_in": hasNonEmptyIDs(p.Filters["creator_in"]),
"has_merchant_id_in": hasNonEmptyIDs(p.Filters["merchant_id_in"]),
})
if ds == "marketing" && (main == "order" || main == "order_info") {
if v, ok := p.Filters["create_time_between"]; ok {
switch t := v.(type) {
case []interface{}:
if len(t) != 2 {
fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值")
return
}
case []string:
if len(t) != 2 {
fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值")
return
}
default:
fail(w, r, http.StatusBadRequest, "create_time_between 格式错误")
return
}
} else {
fail(w, r, http.StatusBadRequest, "缺少时间过滤:必须提供 create_time_between")
return
}
}
filtered := make([]string, 0, len(fs))
tv := 0
if v, ok := p.Filters["type_eq"]; ok {
switch t := v.(type) {
case float64:
tv = int(t)
case int:
tv = t
case string:
s := strings.TrimSpace(t)
for i := 0; i < len(s); i++ {
c := s[i]
if c >= '0' && c <= '9' {
tv = tv*10 + int(c-'0')
}
}
}
}
// Normalize template fields preserving order
normalized := make([]string, 0, len(fs))
for _, tf := range fs {
if ds == "ymt" && strings.HasPrefix(tf, "order_info.") {
tf = strings.Replace(tf, "order_info.", "order.", 1)
}
if ds == "marketing" && tf == "order_voucher.channel_batch_no" {
tf = "order_voucher.channel_activity_id"
}
normalized = append(normalized, tf)
}
// 移除 YMT 无效字段key批次
if ds == "ymt" {
tmp := make([]string, 0, len(normalized))
for _, tf := range normalized {
if tf == "order.key_batch_id" || tf == "order.key_batch_name" {
continue
}
tmp = append(tmp, tf)
}
normalized = tmp
}
// 不再使用白名单过滤,直接使用所有字段
filtered = normalized
// 易码通立减金:保留 order_voucher.grant_time移除红包领取时间列避免“领取时间”为空
if ds == "ymt" && tv == 3 {
deduped := make([]string, 0, len(filtered))
removed := []string{}
for _, tf := range filtered {
if tf == "order_cash.receive_time" {
removed = append(removed, tf)
continue
}
deduped = append(deduped, tf)
}
if len(removed) > 0 {
logging.JSON("INFO", map[string]interface{}{"event": "fields_deduplicated_receive_time", "removed": removed, "reason": "立减金保留 order_voucher.grant_time"})
}
filtered = deduped
}
// 营销系统:非直充类型(type!=1)时移除recharge_time、card_code、account字段
if ds == "marketing" && tv != 1 {
deduped := make([]string, 0, len(filtered))
removed := []string{}
for _, tf := range filtered {
if tf == "order.recharge_time" || tf == "order.card_code" || tf == "order.account" {
removed = append(removed, tf)
continue
}
deduped = append(deduped, tf)
}
if len(removed) > 0 {
logging.JSON("INFO", map[string]interface{}{"event": "fields_filtered_non_direct_charge", "removed": removed, "reason": "非直充类型不导出充值时间、卡密和账号"})
}
filtered = deduped
}
labels := FieldLabels()
// 字段匹配校验(数量与顺序)
if len(filtered) != len(fs) {
logging.JSON("ERROR", map[string]interface{}{"event": "field_count_mismatch", "template_count": len(fs), "final_count": len(filtered)})
}
// relax: creator_in 非必填,若权限中提供其他边界将被合并为等值过滤
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: filtered, Filters: p.Filters}
q, args, usedFields, err := rrepo.BuildWithFields(req, nil) // 取消白名单过滤,前端选择多少字段就导出多少
if err != nil {
r = WithSQL(r, q)
fail(w, r, http.StatusBadRequest, err.Error())
return
}
// 使用实际使用的字段列表(解决白名单过滤后列数不匹配问题)
if len(usedFields) > 0 {
filtered = usedFields
}
r = WithSQL(r, q)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql", "datasource": ds, "main_table": main, "file_format": p.FileFormat, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql ds=%s main=%s fmt=%s sql=%s args=%v final_sql=%s", ds, main, p.FileFormat, q, args, renderSQL(q, args))
dataDB := a.selectDataDB(ds)
score, sugg, err := rrepo.Explain(dataDB, q, args)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
sugg = append(sugg, exporter.IndexSuggestions(req)...)
if score < constants.ExportThresholds.PassScoreThreshold {
fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d请优化索引或缩小查询范围", score))
return
}
// 估算行数(优先使用分块统计,失败或结果为 0 时回退到精确 COUNT
var estimate int64
estimate = rrepo.EstimateFastChunked(dataDB, ds, main, p.Filters)
if estimate <= 0 {
logging.JSON("WARN", map[string]interface{}{
"event": "estimate_zero_fallback",
"datasource": ds,
"main_table": main,
"filters": p.Filters,
"stage": "fast_chunked",
"estimate": estimate,
})
// 使用完整导出 SQL 做一次精确统计,避免分表/索引等原因导致估算为 0
estimate = exporter.CountRows(dataDB, q, args)
logging.JSON("INFO", map[string]interface{}{
"event": "estimate_exact_count",
"datasource": ds,
"main_table": main,
"filters": p.Filters,
"sql": q,
"args": args,
"estimate": estimate,
})
}
hdrs := make([]string, len(filtered))
for i, tf := range filtered {
if v, ok := labels[tf]; ok {
hdrs[i] = v
} else {
hdrs[i] = tf
}
}
// 列头去重:如果仍有重复的列头(中文标签),对非主表字段添加前缀
{
cnt := map[string]int{}
for _, h := range hdrs {
cnt[h]++
}
for i := range hdrs {
if cnt[hdrs[i]] > 1 {
parts := strings.Split(filtered[i], ".")
if len(parts) == 2 && parts[0] != main {
hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i]
}
}
}
}
// owner from query current_user_id or userId if provided
owner := uint64(0)
ownStr := r.URL.Query().Get("current_user_id")
if ownStr == "" {
ownStr = r.URL.Query().Get("userId")
}
if ownStr != "" {
first := strings.TrimSpace(strings.Split(ownStr, ",")[0])
if n, err := strconv.ParseUint(first, 10, 64); err == nil {
owner = n
}
}
id, err := rrepo.InsertJob(a.meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat)
if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
go a.runJob(uint64(id), dataDB, q, args, filtered, hdrs, p.FileFormat)
ok(w, r, map[string]interface{}{"id": id})
}
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fmt string) {
defer func() {
if r := recover(); r != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_panic",
"job_id": id,
"error": utils.ToString(r),
"fields": fields,
"format": fmt,
})
log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields)
repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{
"error": utils.ToString(r),
"fields": fields,
"format": fmt,
})
}
}()
// load datasource once for transform decisions
var jobDS string
var jobMain string
rrepo := repo.NewExportRepo()
{
tplID, _, _ := rrepo.GetJobFilters(a.meta, id)
if tplID > 0 {
ds, mt, _, _ := rrepo.GetTemplateMeta(a.meta, tplID)
jobDS = ds
if mt != "" {
jobMain = mt
} else {
jobMain = "order"
}
}
}
// 检查预估行数如果超过阈值且格式是xlsx强制改为csv
if fmt == "xlsx" {
var rowEstimate int64
estRow := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = estRow.Scan(&rowEstimate)
if rowEstimate > constants.ExportThresholds.XlsxMaxRows {
logging.JSON("INFO", map[string]interface{}{
"event": "force_csv_format",
"job_id": id,
"row_estimate": rowEstimate,
"threshold": constants.ExportThresholds.XlsxMaxRows,
"reason": "row_estimate exceeds xlsx max rows, forcing csv format",
})
fmt = "csv"
}
}
rrepo.StartJob(a.meta, id)
if fmt == "csv" {
newBaseWriter := func() (exporter.RowWriter, error) {
return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
}
files := []string{}
{
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays)
}
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays)
}
}
if len(chunks) > 0 {
var total int64
// 如果 row_estimate 为 0在分块导出开始时重新估算
var currentEst int64
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = row.Scan(&currentEst)
if currentEst == 0 {
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
if estChunk > 0 {
rrepo.UpdateRowEstimate(a.meta, id, estChunk)
}
}
skipChunk := false
if tplDS == "marketing" && main == "order" {
for _, f := range fs {
if strings.HasPrefix(f, "order_voucher.") {
skipChunk = true
break
}
}
if !skipChunk {
if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok {
skipChunk = true
}
}
}
if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold {
cur := rrepo.NewCursor(tplDS, main)
batch := constants.ChooseBatchSize(0, constants.FileFormatCSV)
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
cq, cargs, err := exporter.BuildSQL(req, wl)
if err != nil {
continue
}
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if e != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_stream_error",
"job_id": id,
"stage": "csv_chunk",
"error": e.Error(),
"datasource": jobDS,
"sql": cq,
"args": cargs,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq)
rrepo.MarkFailed(a.meta, id, "csv_chunk_stream_error", map[string]interface{}{
"error": e.Error(),
"datasource": jobDS,
"sql": cq,
"args": cargs,
})
return
}
total += cnt
rrepo.UpdateProgress(a.meta, id, total)
}
if total == 0 {
total = rrepo.Count(db, q, args)
}
if len(files) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files, total)
}
rrepo.MarkCompleted(a.meta, id, total)
return
}
}
}
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args))
{
var est int64
{
var filtersJSON []byte
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&filtersJSON)
var fl map[string]interface{}
json.Unmarshal(filtersJSON, &fl)
est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est)
}
batch := constants.ChooseBatchSize(est, constants.FileFormat(fmt))
files2 := []string{}
cur := rrepo.NewCursor(jobDS, jobMain)
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) }
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files2 = append(files2, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_stream_error",
"job_id": id,
"stage": "xlsx_direct",
"error": err.Error(),
"datasource": jobDS,
"fields": fields,
"sql": q,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields))
rrepo.MarkFailed(a.meta, id, "csv_direct_stream_error", map[string]interface{}{
"error": err.Error(),
"datasource": jobDS,
"fields": fields,
"sql": q,
})
return
}
if len(files2) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files2, count)
}
rrepo.MarkCompleted(a.meta, id, count)
return
}
}
if fmt == "xlsx" {
files := []string{}
{
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays)
}
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays)
}
}
if len(chunks) > 0 {
var total int64
// 如果 row_estimate 为 0在分块导出开始时重新估算
var currentEst int64
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = row.Scan(&currentEst)
if currentEst == 0 {
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
if estChunk > 0 {
rrepo.UpdateRowEstimate(a.meta, id, estChunk)
}
}
skipChunk := false
if tplDS == "marketing" && main == "order" {
for _, f := range fs {
if strings.HasPrefix(f, "order_voucher.") {
skipChunk = true
break
}
}
if !skipChunk {
if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok {
skipChunk = true
}
}
}
if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold {
cur := rrepo.NewCursor(tplDS, main)
batch := constants.ChooseBatchSize(0, constants.FileFormatXLSX)
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
cq, cargs, err := rrepo.Build(req, wl)
if err != nil {
continue
}
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
newWriter := func() (exporter.RowWriter, error) {
xw, e := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
if e == nil {
_ = xw.WriteHeader(cols)
}
return xw, e
}
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
// 进度回调按全局累计行数更新,避免跨分片出现数值回退
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if e != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs})
log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e)
rrepo.MarkFailed(a.meta, id, "xlsx_chunk_stream_error", map[string]interface{}{
"error": e.Error(),
"datasource": jobDS,
"sql": cq,
"args": cargs,
})
return
}
total += cnt
rrepo.UpdateProgress(a.meta, id, total)
}
if total == 0 {
total = rrepo.Count(db, q, args)
}
if len(files) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files, total)
}
rrepo.MarkCompleted(a.meta, id, total)
return
}
}
}
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args))
var est2 int64
{
var filtersJSON []byte
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&filtersJSON)
var fl map[string]interface{}
json.Unmarshal(filtersJSON, &fl)
est2 = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est2)
}
x, err := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
if err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()})
log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err)
rrepo.MarkFailed(a.meta, id, "xlsx_writer_creation_failed", map[string]interface{}{
"error": err.Error(),
"stage": "xlsx_direct",
})
return
}
_ = x.WriteHeader(cols)
rrepo.UpdateProgress(a.meta, id, 0)
rows, err := db.Query(q, args...)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_query_error",
"job_id": id,
"stage": "xlsx_direct",
"error": err.Error(),
"datasource": jobDS,
"sql": q,
"args": args,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_query error=%v", id, err)
rrepo.MarkFailed(a.meta, id, "xlsx_query_failed", map[string]interface{}{
"error": err.Error(),
"datasource": jobDS,
"sql": q,
"args": args,
})
return
}
defer rows.Close()
out := make([]interface{}, len(cols))
dest := make([]interface{}, len(cols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var tick int64
for rows.Next() {
if err := rows.Scan(dest...); err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_scan_error",
"job_id": id,
"stage": "xlsx_direct",
"error": err.Error(),
"count": count,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_scan error=%v count=%d", id, err, count)
rrepo.MarkFailed(a.meta, id, "xlsx_scan_failed", map[string]interface{}{
"error": err.Error(),
"count": count,
})
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = utils.ToString(out[i])
}
}
vals = transformRow(jobDS, fields, vals)
x.WriteRow(vals)
count++
tick++
if tick%200 == 0 {
rrepo.UpdateProgress(a.meta, id, count)
}
}
p, size, _ := x.Close()
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
rrepo.ZipAndRecord(a.meta, id, []string{p}, count)
rrepo.MarkCompleted(a.meta, id, count)
return
}
logging.JSON("ERROR", map[string]interface{}{
"event": "export_format_unsupported",
"job_id": id,
"format": fmt,
})
log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fmt)
rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{
"format": fmt,
})
}
// recompute final rows for a job and correct export_jobs.total_rows
func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr string) {
id, _ := strconv.ParseUint(idStr, 10, 64)
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
if err := row.Scan(&tplID, &filtersJSON); err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
var ds string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&ds, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
q, args, err := exporter.BuildSQL(req, wl)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
dataDB := a.selectDataDB(ds)
final := repo.NewExportRepo().Count(dataDB, q, args)
repo.NewExportRepo().MarkCompleted(a.meta, id, final)
ok(w, r, map[string]interface{}{"id": id, "final_rows": final})
}
func (a *ExportsAPI) selectDataDB(ds string) *sql.DB {
if ds == "ymt" {
return a.ymt
}
return a.marketing
}
// moved to repo layer: repo.ZipAndRecord
func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
d, err := rrepo.GetJob(a.meta, id)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
flist, _ := rrepo.ListJobFiles(a.meta, id)
files := []map[string]interface{}{}
for _, f := range flist {
files = append(files, map[string]interface{}{"storage_uri": f.URI.String, "sheet_name": f.Sheet.String, "row_count": f.RowCount.Int64, "size_bytes": f.SizeBytes.Int64})
}
evalStatus := "通过"
if d.ExplainScore.Int64 < 60 {
evalStatus = "禁止"
}
desc := fmt.Sprintf("评分:%d估算行数:%d%s", d.ExplainScore.Int64, d.TotalRows.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[d.ExplainScore.Int64 >= 60])
if d.ExplainJSON.Valid && d.ExplainJSON.String != "" {
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(d.ExplainJSON.String), &arr); err == nil {
segs := []string{}
for _, r := range arr {
getStr := func(field string) string {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return ""
}
if s, ok := mm["String"].(string); ok {
return s
}
}
}
return ""
}
getInt := func(field string) int64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Int64"].(float64); ok {
return int64(f)
}
}
}
return 0
}
getFloat := func(field string) float64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Float64"].(float64); ok {
return f
}
}
}
return 0
}
tbl := getStr("Table")
typ := getStr("Type")
if typ == "" {
typ = getStr("SelectType")
}
key := getStr("Key")
rowsN := getInt("Rows")
filt := getFloat("Filtered")
extra := getStr("Extra")
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
continue
}
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
if extra != "" {
s += ", 额外:" + extra
}
segs = append(segs, s)
}
if len(segs) > 0 {
desc = strings.Join(segs, "")
}
}
}
ok(w, r, map[string]interface{}{"id": d.ID, "template_id": d.TemplateID, "status": d.Status, "requested_by": d.RequestedBy, "file_format": d.FileFormat, "total_rows": d.TotalRows.Int64, "started_at": d.StartedAt.Time, "finished_at": d.FinishedAt.Time, "created_at": d.CreatedAt, "updated_at": d.UpdatedAt, "files": files, "eval_status": evalStatus, "eval_desc": desc})
}
func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
var jid uint64
_, _ = fmt.Sscan(id, &jid)
tplID, filters, err := rrepo.GetJobFilters(a.meta, jid)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, tplID)
if err != nil {
fail(w, r, http.StatusBadRequest, "template not found")
return
}
var fl map[string]interface{}
json.Unmarshal(filters, &fl)
wl := Whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
q, args, err := rrepo.Build(req, wl)
if err != nil {
failCat(w, r, http.StatusBadRequest, err.Error(), "sql_build_error")
return
}
formatArg := func(a interface{}) string {
switch t := a.(type) {
case nil:
return "NULL"
case []byte:
s := string(t)
s = strings.ReplaceAll(s, "'", "''")
return "'" + s + "'"
case string:
s := strings.ReplaceAll(t, "'", "''")
return "'" + s + "'"
case int:
return strconv.Itoa(t)
case int64:
return strconv.FormatInt(t, 10)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case time.Time:
return "'" + t.Format("2006-01-02 15:04:05") + "'"
default:
return fmt.Sprintf("%v", t)
}
}
var sb strings.Builder
var ai int
for i := 0; i < len(q); i++ {
c := q[i]
if c == '?' && ai < len(args) {
sb.WriteString(formatArg(args[ai]))
ai++
} else {
sb.WriteByte(c)
}
}
ok(w, r, map[string]interface{}{"sql": q, "final_sql": sb.String()})
}
func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
uri, err := rrepo.GetLatestFileURI(a.meta, id)
if err != nil {
// fallback: try to serve local storage file by job id
// search for files named export_job_<id>_*.zip/xlsx/csv
dir := "storage"
entries, e := os.ReadDir(dir)
if e == nil {
best := ""
var bestInfo os.FileInfo
for _, ent := range entries {
name := ent.Name()
if strings.HasPrefix(name, "export_job_"+id+"_") && (strings.HasSuffix(name, ".zip") || strings.HasSuffix(name, ".xlsx") || strings.HasSuffix(name, ".csv")) {
info, _ := os.Stat(filepath.Join(dir, name))
if info != nil {
if best == "" || info.ModTime().After(bestInfo.ModTime()) {
best = name
bestInfo = info
}
}
}
}
if best != "" {
http.ServeFile(w, r, filepath.Join(dir, best))
return
}
}
fail(w, r, http.StatusNotFound, "not found")
return
}
http.ServeFile(w, r, uri)
}
func transformRow(ds string, fields []string, vals []string) []string {
payStatusIdx := -1
for i := range fields {
if fields[i] == "order.pay_status" {
payStatusIdx = i
break
}
}
isPaid := func() bool {
if payStatusIdx < 0 || payStatusIdx >= len(vals) {
return true
}
return constants.IsPaidStatus(ds, vals[payStatusIdx])
}()
for i := range fields {
if i >= len(vals) {
break
}
f := fields[i]
v := vals[i]
// ==================== 枚举转换 ====================
// order.type - 订单类型
if f == "order.type" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTOrderType[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.MarketingOrderType[n]; ok {
vals[i] = label
}
}
}
continue
}
// order.status - 订单状态
if f == "order.status" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTOrderStatus[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.MarketingOrderStatus[n]; ok {
vals[i] = label
}
}
}
continue
}
// order.pay_type - 支付方式
if f == "order.pay_type" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MarketingPayType[n]; ok {
vals[i] = label
} else if n == 0 {
vals[i] = ""
}
}
continue
}
// order.pay_status - 支付状态
if f == "order.pay_status" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTPayStatus[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.MarketingPayStatus[n]; ok {
vals[i] = label
}
}
}
continue
}
// order.use_coupon - 是否使用优惠券
if f == "order.use_coupon" {
switch v {
case "1":
vals[i] = "是"
case "2", "0":
vals[i] = "否"
}
continue
}
// order.deliver_status - 投递状态
if f == "order.deliver_status" {
switch v {
case "1":
vals[i] = "待投递"
case "2":
vals[i] = "已投递"
case "3":
vals[i] = "投递失败"
}
continue
}
// order.is_inner - 供应商类型
if f == "order.is_inner" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.YMTIsInner[n]; ok {
vals[i] = label
}
}
continue
}
// order_voucher.channel / voucher.channel - 立减金渠道
if f == "order_voucher.channel" || f == "voucher.channel" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.OrderVoucherChannel[n]; ok {
vals[i] = label
}
}
continue
}
// order_voucher.status - 立减金状态
if f == "order_voucher.status" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTOrderVoucherStatus[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.MarketingOrderVoucherStatus[n]; ok {
vals[i] = label
}
}
}
continue
}
// order_voucher.receive_mode / voucher.receive_mode - 领取方式
if f == "order_voucher.receive_mode" || f == "voucher.receive_mode" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTVoucherReceiveMode[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.OrderVoucherReceiveMode[n]; ok {
vals[i] = label
}
}
}
continue
}
// voucher.is_webview - 打开方式
if f == "voucher.is_webview" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.VoucherOpenMode[n]; ok {
vals[i] = label
}
}
continue
}
// goods_voucher_subject_config.type - 主体类型
if f == "goods_voucher_subject_config.type" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.VoucherSubjectType[n]; ok {
vals[i] = label
}
}
continue
}
// order_cash.channel - 红包渠道
if f == "order_cash.channel" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.OrderCashChannel[n]; ok {
vals[i] = label
}
}
continue
}
// order_cash.receive_status - 红包领取状态
if f == "order_cash.receive_status" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.OrderCashReceiveStatus[n]; ok {
vals[i] = label
}
}
continue
}
// order_cash.status - 红包状态(营销系统)
if f == "order_cash.status" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MarketingOrderCashStatus[n]; ok {
vals[i] = label
}
}
continue
}
// order_digit.order_type - 数字订单类型
if f == "order_digit.order_type" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.OrderDigitOrderType[n]; ok {
vals[i] = label
}
}
continue
}
// activity.settlement_type / plan.settlement_type - 结算方式
if f == "activity.settlement_type" || f == "plan.settlement_type" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTSettlementType[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.MarketingSettlementType[n]; ok {
vals[i] = label
}
}
}
continue
}
// plan.send_method - 发放方式
if f == "plan.send_method" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MarketingSendMethod[n]; ok {
vals[i] = label
}
}
continue
}
// code_batch.period_type - 周期类型
if f == "code_batch.period_type" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MarketingPeriodType[n]; ok {
vals[i] = label
}
}
continue
}
// code_batch.recharge_type - 充值类型
if f == "code_batch.recharge_type" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MarketingRechargeType[n]; ok {
vals[i] = label
}
}
continue
}
// key_batch.style - key码样式
if f == "key_batch.style" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.KeyBatchStyle[n]; ok {
vals[i] = label
}
}
continue
}
// merchant_key_send.status - key码API发放状态
if f == "merchant_key_send.status" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MerchantKeySendStatus[n]; ok {
vals[i] = label
}
}
continue
}
// ==================== 特殊字段转换 ====================
// 解密/转换订单 key
if f == "order.key" {
if ds == "ymt" {
key := os.Getenv("YMT_KEY_DECRYPT_KEY_B64")
if key == "" {
key = "z9DoIVLuDYEN/qsgweRA4A=="
}
if dec, err := ymtcrypto.SM4Decrypt(vals[i], key); err == nil && dec != "" {
vals[i] = dec
}
} else {
vals[i] = decodeOrderKey(vals[i])
}
}
// voucher_batch.provider: 将渠道编码转换为中文名称
if f == "voucher_batch.provider" {
switch strings.TrimSpace(vals[i]) {
// 老编码
case "lsxd":
vals[i] = "蓝色兄弟"
case "fjxw":
vals[i] = "福建兴旺"
case "fzxy":
vals[i] = "福州兴雅"
case "fzyt":
vals[i] = "福州悦途"
// 新编码:微信立减金渠道
case "voucher_wechat_lsxd":
vals[i] = "蓝色兄弟"
case "voucher_wechat_fjxw":
vals[i] = "福建兴旺"
case "voucher_wechat_fzxy":
vals[i] = "福州兴雅"
case "voucher_wechat_fzyt":
vals[i] = "福州悦途"
case "voucher_wechat_zjky":
vals[i] = "浙江卡赢"
case "voucher_wechat_zjky2":
vals[i] = "浙江卡赢2"
case "voucher_wechat_zjwsxx":
vals[i] = "浙江喔刷"
case "voucher_wechat_gzynd":
vals[i] = "广州亿纳德"
case "voucher_wechat_fjhrxxfw":
vals[i] = "福建省宏仁信息服务"
case "voucher_wechat_fzqmkj":
vals[i] = "福州启蒙科技有限公司"
case "voucher_wechat_fzydxx":
vals[i] = "福州元朵信息科技有限公司"
case "voucher_wechat_xyhxx":
vals[i] = "新沂薪伙原信息科技有限公司"
}
}
// activity.channels: 解析 JSON 并转成可读渠道名
if f == "activity.channels" {
if vals[i] == "" || vals[i] == "0" {
vals[i] = "无"
continue
}
if !isPaid {
vals[i] = "无"
continue
}
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(vals[i]), &arr); err != nil {
vals[i] = "无"
continue
}
names := make([]string, 0, len(arr))
for _, item := range arr {
if v, ok := item["pay_name"].(string); ok && strings.TrimSpace(v) != "" {
names = append(names, v)
continue
}
if v, ok := item["name"].(string); ok && strings.TrimSpace(v) != "" {
names = append(names, v)
}
}
if len(names) == 0 {
vals[i] = "无"
} else {
vals[i] = strings.Join(names, ",")
}
}
}
return vals
}
func decodeOrderKey(s string) string {
if s == "" {
return s
}
if len(s) > 2 && s[len(s)-2:] == "_1" {
s = s[:len(s)-2]
}
var n big.Int
if _, ok := n.SetString(s, 10); !ok {
return s
}
base := []rune{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j', 'k', 'l', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
baseCount := big.NewInt(int64(len(base)))
zero := big.NewInt(0)
var out []rune
for n.Cmp(zero) > 0 {
var mod big.Int
mod.Mod(&n, baseCount)
out = append(out, base[mod.Int64()])
n.Div(&n, baseCount)
}
for len(out) < 16 {
out = append(out, base[0])
}
for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 {
out[i], out[j] = out[j], out[i]
}
return string(out)
}
func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) {
a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", string(constants.JobStatusCanceled), time.Now(), id)
w.Write([]byte("ok"))
}
func renderSQL(q string, args []interface{}) string {
formatArg := func(a interface{}) string {
switch t := a.(type) {
case nil:
return "NULL"
case []byte:
s := string(t)
s = strings.ReplaceAll(s, "'", "''")
return "'" + s + "'"
case string:
s := strings.ReplaceAll(t, "'", "''")
return "'" + s + "'"
case int:
return strconv.Itoa(t)
case int64:
return strconv.FormatInt(t, 10)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case time.Time:
return "'" + t.Format("2006-01-02 15:04:05") + "'"
default:
return fmt.Sprintf("%v", t)
}
}
var sb strings.Builder
var ai int
for i := 0; i < len(q); i++ {
c := q[i]
if c == '?' && ai < len(args) {
sb.WriteString(formatArg(args[ai]))
ai++
} else {
sb.WriteByte(c)
}
}
return sb.String()
}
func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
page := 1
size := 15
if p := q.Get("page"); p != "" {
if n, err := strconv.Atoi(p); err == nil && n > 0 {
page = n
}
}
if s := q.Get("page_size"); s != "" {
if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 {
size = n
}
}
tplIDStr := q.Get("template_id")
var tplID uint64
if tplIDStr != "" {
if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil {
tplID = n
}
}
offset := (page - 1) * size
rrepo := repo.NewExportRepo()
var totalCount int64
totalCount = rrepo.CountJobs(a.meta, tplID, "")
itemsRaw, err := rrepo.ListJobs(a.meta, tplID, "", size, offset)
if err != nil {
failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error")
return
}
items := []map[string]interface{}{}
for _, it := range itemsRaw {
id, tid, req := it.ID, it.TemplateID, it.RequestedBy
status, fmtstr := it.Status, it.FileFormat
estimate, total := it.RowEstimate, it.TotalRows
createdAt, updatedAt := it.CreatedAt, it.UpdatedAt
score, explainRaw := it.ExplainScore, it.ExplainJSON
evalStatus := "通过"
if score.Int64 < 60 {
evalStatus = "禁止"
}
desc := fmt.Sprintf("评分:%d估算行数:%d%s", score.Int64, estimate.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[score.Int64 >= 60])
if explainRaw.Valid && explainRaw.String != "" {
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(explainRaw.String), &arr); err == nil {
segs := []string{}
for _, r := range arr {
getStr := func(field string) string {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return ""
}
if s, ok := mm["String"].(string); ok {
return s
}
}
}
return ""
}
getInt := func(field string) int64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Int64"].(float64); ok {
return int64(f)
}
}
}
return 0
}
getFloat := func(field string) float64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Float64"].(float64); ok {
return f
}
}
}
return 0
}
tbl := getStr("Table")
typ := getStr("Type")
if typ == "" {
typ = getStr("SelectType")
}
key := getStr("Key")
rowsN := getInt("Rows")
filt := getFloat("Filtered")
extra := getStr("Extra")
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
continue
}
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
if extra != "" {
s += ", 额外:" + extra
}
segs = append(segs, s)
}
if len(segs) > 0 {
desc = strings.Join(segs, "")
}
}
}
m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc}
items = append(items, m)
}
ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size})
}
// mergePermissionIntoFilters injects permission scope into filters in a canonical way
func mergePermissionIntoFilters(ds, main string, perm map[string]interface{}, filters map[string]interface{}) map[string]interface{} {
if filters == nil {
filters = map[string]interface{}{}
}
// if creator_in already present, keep it
if hasNonEmptyIDs(filters["creator_in"]) {
return filters
}
// 先处理 plan_id_eq 和 reseller_id_eq 的设置
if v, ok := pickFirst(perm, filters, []string{"reseller_id", "merchant_id"}); ok {
filters["reseller_id_eq"] = v
}
if v, ok := pickFirst(perm, filters, []string{"plan_id", "activity_id"}); ok {
filters["plan_id_eq"] = v
}
// 如果传递了 plan_id_eq 或 reseller_id_eq 且不为空,则不再过滤 creator
if ds == "marketing" && (main == "order" || main == "order_info") {
if v, ok := filters["plan_id_eq"]; ok && v != nil && v != "" && v != 0 {
goto skipCreator
}
if v, ok := filters["reseller_id_eq"]; ok && v != nil && v != "" && v != 0 {
goto skipCreator
}
}
// try known keys
{
candidates := []string{"creator_in", "creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"}
ids := []interface{}{}
for _, k := range candidates {
if perm == nil {
break
}
if v, ok := perm[k]; ok {
ids = normalizeIDs(v)
if len(ids) > 0 {
break
}
}
}
// also check filters incoming alternative keys and normalize into creator_in
if len(ids) == 0 {
alt := []string{"creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"}
for _, k := range alt {
if v, ok := filters[k]; ok {
ids = normalizeIDs(v)
if len(ids) > 0 {
break
}
}
}
}
if len(ids) > 0 {
filters["creator_in"] = ids
}
}
skipCreator:
// account
if v, ok := pickFirst(perm, filters, []string{"account", "account_no"}); ok {
filters["account_eq"] = v
}
// out_trade_no
if v, ok := pickFirst(perm, filters, []string{"out_trade_no", "out_order_no"}); ok {
filters["out_trade_no_eq"] = v
}
return filters
}
func normalizeIDs(v interface{}) []interface{} {
out := []interface{}{}
switch t := v.(type) {
case []interface{}:
for _, x := range t {
if s := utils.ToString(x); s != "" {
out = append(out, s)
}
}
case []string:
for _, s := range t {
s2 := strings.TrimSpace(s)
if s2 != "" {
out = append(out, s2)
}
}
case []int:
for _, n := range t {
out = append(out, n)
}
case []int64:
for _, n := range t {
out = append(out, n)
}
case string:
// support comma-separated string
parts := strings.Split(t, ",")
for _, s := range parts {
s2 := strings.TrimSpace(s)
if s2 != "" {
out = append(out, s2)
}
}
default:
if s := utils.ToString(t); s != "" {
out = append(out, s)
}
}
return out
}
func hasNonEmptyIDs(v interface{}) bool {
arr := normalizeIDs(v)
return len(arr) > 0
}
func pickFirst(perm map[string]interface{}, filters map[string]interface{}, keys []string) (interface{}, bool) {
for _, k := range keys {
if perm != nil {
if v, ok := perm[k]; ok {
arr := normalizeIDs(v)
if len(arr) > 0 {
return arr[0], true
}
if s := utils.ToString(v); s != "" {
return s, true
}
}
}
if v, ok := filters[k]; ok {
arr := normalizeIDs(v)
if len(arr) > 0 {
return arr[0], true
}
if s := utils.ToString(v); s != "" {
return s, true
}
}
}
return nil, false
}
// parseIntVal 尝试将字符串解析为整数,失败返回-1
func parseIntVal(s string) int {
if s == "" {
return -1
}
n := 0
for _, c := range s {
if c < '0' || c > '9' {
return -1
}
n = n*10 + int(c-'0')
}
return n
}