MarketingSystemDataExportTool/server/internal/api/exports.go.bak

1865 lines
56 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package api
import (
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"math/big"
"net/http"
"os"
"path/filepath"
"server/internal/constants"
"server/internal/exporter"
"server/internal/logging"
"server/internal/repo"
"server/internal/utils"
"server/internal/ymtcrypto"
"strconv"
"strings"
"time"
)
type ExportsAPI struct {
Meta *sql.DB
Marketing *sql.DB
YMT *sql.DB
}
func ExportsHandler(meta, marketing, ymt *sql.DB) http.Handler {
api := &ExportsAPI{Meta: meta, Marketing: marketing, YMT: ymt}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
if r.Method == http.MethodPost && p == "" {
api.create(w, r)
return
}
if r.Method == http.MethodGet && p == "" {
api.list(w, r)
return
}
if strings.HasPrefix(p, "/") {
id := strings.TrimPrefix(p, "/")
if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") {
if strings.HasSuffix(p, "/sql") {
id = strings.TrimSuffix(id, "/sql")
api.getSQL(w, r, id)
return
}
api.get(w, r, id)
return
}
if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") {
id = strings.TrimSuffix(id, "/download")
api.download(w, r, id)
return
}
if r.Method == http.MethodPost && strings.HasSuffix(p, "/recompute") {
id = strings.TrimSuffix(id, "/recompute")
api.recompute(w, r, id)
return
}
if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") {
id = strings.TrimSuffix(id, "/cancel")
api.cancel(w, r, id)
return
}
}
w.WriteHeader(http.StatusNotFound)
})
}
type ExportPayload struct {
TemplateID uint64 `json:"template_id"`
RequestedBy uint64 `json:"requested_by"`
Permission map[string]interface{} `json:"permission"`
Options map[string]interface{} `json:"options"`
FileFormat string `json:"file_format"`
Filters map[string]interface{} `json:"filters"`
Datasource string `json:"datasource"`
}
func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
b, _ := io.ReadAll(r.Body)
var p ExportPayload
json.Unmarshal(b, &p)
r = WithPayload(r, p)
var main string
var ds string
rrepo := repo.NewExportRepo()
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, p.TemplateID)
if err != nil {
fail(w, r, http.StatusBadRequest, "invalid template")
return
}
if p.Datasource != "" {
ds = p.Datasource
}
// ensure filters map initialized
if p.Filters == nil {
p.Filters = map[string]interface{}{}
}
// merge permission scope into filters to enforce boundary
p.Filters = mergePermissionIntoFilters(p.Datasource, main, p.Permission, p.Filters)
// 注意:不再从 URL 参数 userId 或 current_user_id 自动转换为 creator_in 过滤
// current_user_id 仅用于记录导出任务的 owner不用于数据过滤
// support multiple merchantId in query: e.g., merchantId=1,2,3 → filters.merchant_id_in
{
midStr := r.URL.Query().Get("merchantId")
if midStr != "" {
parts := strings.Split(midStr, ",")
ids := make([]interface{}, 0, len(parts))
for _, s := range parts {
s = strings.TrimSpace(s)
if s == "" {
continue
}
if n, err := strconv.ParseUint(s, 10, 64); err == nil {
ids = append(ids, n)
}
}
if len(ids) > 0 {
if _, exists := p.Filters["merchant_id_in"]; !exists {
p.Filters["merchant_id_in"] = ids
}
}
}
}
// DEBUG LOGGING
logging.JSON("INFO", map[string]interface{}{
"event": "export_filters_debug",
"filters": p.Filters,
"has_creator_in": hasNonEmptyIDs(p.Filters["creator_in"]),
"has_merchant_id_in": hasNonEmptyIDs(p.Filters["merchant_id_in"]),
})
if ds == "marketing" && (main == "order" || main == "order_info") {
if v, ok := p.Filters["create_time_between"]; ok {
switch t := v.(type) {
case []interface{}:
if len(t) != 2 {
fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值")
return
}
case []string:
if len(t) != 2 {
fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值")
return
}
default:
fail(w, r, http.StatusBadRequest, "create_time_between 格式错误")
return
}
} else {
fail(w, r, http.StatusBadRequest, "缺少时间过滤:必须提供 create_time_between")
return
}
}
filtered := make([]string, 0, len(fs))
tv := 0
if v, ok := p.Filters["type_eq"]; ok {
switch t := v.(type) {
case float64:
tv = int(t)
case int:
tv = t
case string:
s := strings.TrimSpace(t)
for i := 0; i < len(s); i++ {
c := s[i]
if c >= '0' && c <= '9' {
tv = tv*10 + int(c-'0')
}
}
}
}
// Normalize template fields preserving order
normalized := make([]string, 0, len(fs))
for _, tf := range fs {
if ds == "ymt" && strings.HasPrefix(tf, "order_info.") {
tf = strings.Replace(tf, "order_info.", "order.", 1)
}
if ds == "marketing" && tf == "order_voucher.channel_batch_no" {
tf = "order_voucher.channel_activity_id"
}
normalized = append(normalized, tf)
}
// 移除 YMT 无效字段key批次
if ds == "ymt" {
tmp := make([]string, 0, len(normalized))
for _, tf := range normalized {
if tf == "order.key_batch_id" || tf == "order.key_batch_name" {
continue
}
tmp = append(tmp, tf)
}
normalized = tmp
}
// 不再使用白名单过滤,直接使用所有字段
filtered = normalized
// 易码通立减金:保留 order_voucher.grant_time移除红包领取时间列避免“领取时间”为空
if ds == "ymt" && tv == 3 {
deduped := make([]string, 0, len(filtered))
removed := []string{}
for _, tf := range filtered {
if tf == "order_cash.receive_time" {
removed = append(removed, tf)
continue
}
deduped = append(deduped, tf)
}
if len(removed) > 0 {
logging.JSON("INFO", map[string]interface{}{"event": "fields_deduplicated_receive_time", "removed": removed, "reason": "立减金保留 order_voucher.grant_time"})
}
filtered = deduped
}
// 营销系统:非直充类型(type!=1)时移除recharge_time、card_code、account字段
if ds == "marketing" && tv != 1 {
deduped := make([]string, 0, len(filtered))
removed := []string{}
for _, tf := range filtered {
if tf == "order.recharge_time" || tf == "order.card_code" || tf == "order.account" {
removed = append(removed, tf)
continue
}
deduped = append(deduped, tf)
}
if len(removed) > 0 {
logging.JSON("INFO", map[string]interface{}{"event": "fields_filtered_non_direct_charge", "removed": removed, "reason": "非直充类型不导出充值时间、卡密和账号"})
}
filtered = deduped
}
labels := FieldLabels()
// 字段匹配校验(数量与顺序)
if len(filtered) != len(fs) {
logging.JSON("ERROR", map[string]interface{}{"event": "field_count_mismatch", "template_count": len(fs), "final_count": len(filtered)})
}
// relax: creator_in 非必填,若权限中提供其他边界将被合并为等值过滤
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: filtered, Filters: p.Filters}
q, args, usedFields, err := rrepo.BuildWithFields(req, nil) // 取消白名单过滤,前端选择多少字段就导出多少
if err != nil {
r = WithSQL(r, q)
fail(w, r, http.StatusBadRequest, err.Error())
return
}
// 使用实际使用的字段列表(解决白名单过滤后列数不匹配问题)
if len(usedFields) > 0 {
filtered = usedFields
}
r = WithSQL(r, q)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql", "datasource": ds, "main_table": main, "file_format": p.FileFormat, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql ds=%s main=%s fmt=%s sql=%s args=%v final_sql=%s", ds, main, p.FileFormat, q, args, renderSQL(q, args))
dataDB := a.selectDataDB(ds)
score, sugg, err := rrepo.Explain(dataDB, q, args)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
sugg = append(sugg, exporter.IndexSuggestions(req)...)
if score < constants.ExportThresholds.PassScoreThreshold {
fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d请优化索引或缩小查询范围", score))
return
}
// 估算行数(优先使用分块统计,失败或结果为 0 时回退到精确 COUNT
var estimate int64
estimate = rrepo.EstimateFastChunked(dataDB, ds, main, p.Filters)
if estimate <= 0 {
logging.JSON("WARN", map[string]interface{}{
"event": "estimate_zero_fallback",
"datasource": ds,
"main_table": main,
"filters": p.Filters,
"stage": "fast_chunked",
"estimate": estimate,
})
// 使用完整导出 SQL 做一次精确统计,避免分表/索引等原因导致估算为 0
estimate = exporter.CountRows(dataDB, q, args)
logging.JSON("INFO", map[string]interface{}{
"event": "estimate_exact_count",
"datasource": ds,
"main_table": main,
"filters": p.Filters,
"sql": q,
"args": args,
"estimate": estimate,
})
}
hdrs := make([]string, len(filtered))
for i, tf := range filtered {
if v, ok := labels[tf]; ok {
hdrs[i] = v
} else {
hdrs[i] = tf
}
}
// 列头去重:如果仍有重复的列头(中文标签),对非主表字段添加前缀
{
cnt := map[string]int{}
for _, h := range hdrs {
cnt[h]++
}
for i := range hdrs {
if cnt[hdrs[i]] > 1 {
parts := strings.Split(filtered[i], ".")
if len(parts) == 2 && parts[0] != main {
hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i]
}
}
}
}
// owner from query current_user_id or userId if provided
owner := uint64(0)
ownStr := r.URL.Query().Get("current_user_id")
if ownStr == "" {
ownStr = r.URL.Query().Get("userId")
}
if ownStr != "" {
first := strings.TrimSpace(strings.Split(ownStr, ",")[0])
if n, err := strconv.ParseUint(first, 10, 64); err == nil {
owner = n
}
}
id, err := rrepo.InsertJob(a.meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat)
if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
go a.runJobByID(uint64(id))
ok(w, r, map[string]interface{}{"id": id})
}
// runJobByID 通过任务ID从数据库读取信息并执行任务
func (a *ExportsAPI) RunJobByID(jobID uint64) {
rrepo := repo.NewExportRepo()
// 增加重启计数
rrepo.IncrementRestartCount(a.meta, jobID)
// 获取任务详情
jobDetail, err := rrepo.GetJob(a.meta, strconv.FormatUint(jobID, 10))
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "get_job_failed",
"job_id": jobID,
"error": err.Error(),
})
return
}
// 获取模板信息和任务过滤条件
tplID, filtersJSON, err := rrepo.GetJobFilters(a.meta, jobID)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "get_job_filters_failed",
"job_id": jobID,
"error": err.Error(),
})
return
}
var filters map[string]interface{}
json.Unmarshal(filtersJSON, &filters)
// 获取模板字段信息
ds, mainTable, fields, err := rrepo.GetTemplateMeta(a.meta, tplID)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "get_template_meta_failed",
"job_id": jobID,
"error": err.Error(),
})
return
}
// 获取数据库连接
dataDB := a.selectDataDB(ds)
// 构建 SQL
wl := Whitelist()
req := exporter.BuildRequest{MainTable: mainTable, Datasource: ds, Fields: fields, Filters: filters}
q, args, usedFields, err := rrepo.BuildWithFields(req, wl)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "build_sql_failed",
"job_id": jobID,
"error": err.Error(),
})
return
}
if len(usedFields) > 0 {
fields = usedFields
}
// 构建列标题
labels := FieldLabels()
hdrs := make([]string, len(fields))
for i, tf := range fields {
if v, ok := labels[tf]; ok {
hdrs[i] = v
} else {
hdrs[i] = tf
}
}
// 列头去重
{
cnt := map[string]int{}
for _, h := range hdrs {
cnt[h]++
}
for i := range hdrs {
if cnt[hdrs[i]] > 1 {
parts := strings.Split(fields[i], ".")
if len(parts) == 2 && parts[0] != mainTable {
hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i]
}
}
}
}
a.runJob(jobID, dataDB, q, args, fields, hdrs, jobDetail.FileFormat)
}
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fileFormat string) {
defer func() {
if r := recover(); r != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_panic",
"job_id": id,
"error": utils.ToString(r),
"fields": fields,
"format": fileFormat,
})
log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields)
repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{
"error": utils.ToString(r),
"fields": fields,
"format": fileFormat,
})
}
}()
// load datasource once for transform decisions
var jobDS string
var jobMain string
rrepo := repo.NewExportRepo()
{
tplID, _, _ := rrepo.GetJobFilters(a.meta, id)
if tplID > 0 {
ds, mt, _, _ := rrepo.GetTemplateMeta(a.meta, tplID)
jobDS = ds
if mt != "" {
jobMain = mt
} else {
jobMain = "order"
}
}
}
// 检查预估行数如果超过阈值且格式是xlsx强制改为csv
if fileFormat == "xlsx" {
var rowEstimate int64
estRow := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = estRow.Scan(&rowEstimate)
if rowEstimate > constants.ExportThresholds.XlsxMaxRows {
logging.JSON("INFO", map[string]interface{}{
"event": "force_csv_format",
"job_id": id,
"row_estimate": rowEstimate,
"threshold": constants.ExportThresholds.XlsxMaxRows,
"reason": "row_estimate exceeds xlsx max rows, forcing csv format",
})
fileFormat = "csv"
}
}
rrepo.StartJob(a.meta, id)
if fileFormat == "csv" {
newBaseWriter := func() (exporter.RowWriter, error) {
return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
}
files := []string{}
{
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays)
}
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays)
}
}
if len(chunks) > 0 {
var total int64
// 如果 row_estimate 为 0在分块导出开始时重新估算
var currentEst int64
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = row.Scan(&currentEst)
if currentEst == 0 {
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
if estChunk > 0 {
rrepo.UpdateRowEstimate(a.meta, id, estChunk)
}
}
skipChunk := false
if tplDS == "marketing" && main == "order" {
for _, f := range fs {
if strings.HasPrefix(f, "order_voucher.") {
skipChunk = true
break
}
}
if !skipChunk {
if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok {
skipChunk = true
}
}
}
if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold {
cur := rrepo.NewCursor(tplDS, main)
batch := constants.ChooseBatchSize(0, constants.FileFormatCSV)
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
cq, cargs, err := exporter.BuildSQL(req, wl)
if err != nil {
continue
}
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if e != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_stream_error",
"job_id": id,
"stage": "csv_chunk",
"error": e.Error(),
"datasource": jobDS,
"sql": cq,
"args": cargs,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq)
rrepo.MarkFailed(a.meta, id, "csv_chunk_stream_error", map[string]interface{}{
"error": e.Error(),
"datasource": jobDS,
"sql": cq,
"args": cargs,
})
return
}
total += cnt
rrepo.UpdateProgress(a.meta, id, total)
}
if total == 0 {
total = rrepo.Count(db, q, args)
}
if len(files) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files, total)
}
rrepo.MarkCompleted(a.meta, id, total)
return
}
}
}
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args))
{
var est int64
{
var filtersJSON []byte
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&filtersJSON)
var fl map[string]interface{}
json.Unmarshal(filtersJSON, &fl)
est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est)
}
batch := constants.ChooseBatchSize(est, constants.FileFormat(fileFormat))
files2 := []string{}
cur := rrepo.NewCursor(jobDS, jobMain)
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) }
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files2 = append(files2, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_stream_error",
"job_id": id,
"stage": "xlsx_direct",
"error": err.Error(),
"datasource": jobDS,
"fields": fields,
"sql": q,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields))
rrepo.MarkFailed(a.meta, id, "csv_direct_stream_error", map[string]interface{}{
"error": err.Error(),
"datasource": jobDS,
"fields": fields,
"sql": q,
})
return
}
if len(files2) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files2, count)
}
rrepo.MarkCompleted(a.meta, id, count)
return
}
}
if fileFormat == "xlsx" {
files := []string{}
{
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays)
}
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays)
}
}
if len(chunks) > 0 {
var total int64
// 如果 row_estimate 为 0在分块导出开始时重新估算
var currentEst int64
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = row.Scan(&currentEst)
if currentEst == 0 {
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
if estChunk > 0 {
rrepo.UpdateRowEstimate(a.meta, id, estChunk)
}
}
skipChunk := false
if tplDS == "marketing" && main == "order" {
for _, f := range fs {
if strings.HasPrefix(f, "order_voucher.") {
skipChunk = true
break
}
}
if !skipChunk {
if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok {
skipChunk = true
}
}
}
if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold {
cur := rrepo.NewCursor(tplDS, main)
batch := constants.ChooseBatchSize(0, constants.FileFormatXLSX)
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
cq, cargs, err := rrepo.Build(req, wl)
if err != nil {
continue
}
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
newWriter := func() (exporter.RowWriter, error) {
xw, e := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
if e == nil {
_ = xw.WriteHeader(cols)
}
return xw, e
}
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
// 进度回调按全局累计行数更新,避免跨分片出现数值回退
chunkBase := total
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
onRoll := func(path string, size int64, partRows int64) error {
files = append(files, path)
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
return nil
}
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if e != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs})
log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e)
rrepo.MarkFailed(a.meta, id, "xlsx_chunk_stream_error", map[string]interface{}{
"error": e.Error(),
"datasource": jobDS,
"sql": cq,
"args": cargs,
})
return
}
total += cnt
rrepo.UpdateProgress(a.meta, id, total)
}
if total == 0 {
total = rrepo.Count(db, q, args)
}
if len(files) >= 1 {
rrepo.ZipAndRecord(a.meta, id, files, total)
}
rrepo.MarkCompleted(a.meta, id, total)
return
}
}
}
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args))
var est2 int64
{
var filtersJSON []byte
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&filtersJSON)
var fl map[string]interface{}
json.Unmarshal(filtersJSON, &fl)
est2 = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est2)
}
x, err := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
if err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()})
log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err)
rrepo.MarkFailed(a.meta, id, "xlsx_writer_creation_failed", map[string]interface{}{
"error": err.Error(),
"stage": "xlsx_direct",
})
return
}
_ = x.WriteHeader(cols)
rrepo.UpdateProgress(a.meta, id, 0)
// 记录查询执行前的参数类型信息
argTypes := make([]string, len(args))
for i, arg := range args {
argTypes[i] = fmt.Sprintf("%T", arg)
}
logging.JSON("INFO", map[string]interface{}{
"event": "export_query_before_execute",
"job_id": id,
"stage": "xlsx_direct",
"datasource": jobDS,
"sql": q,
"args": args,
"arg_types": argTypes,
"final_sql": renderSQL(q, args),
})
log.Printf("[EXPORT_DEBUG] job_id=%d stage=xlsx_direct before_query sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args))
rows, err := db.Query(q, args...)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_query_error",
"job_id": id,
"stage": "xlsx_direct",
"error": err.Error(),
"datasource": jobDS,
"sql": q,
"args": args,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_query error=%v", id, err)
rrepo.MarkFailed(a.meta, id, "xlsx_query_failed", map[string]interface{}{
"error": err.Error(),
"datasource": jobDS,
"sql": q,
"args": args,
})
return
}
defer rows.Close()
// 动态获取实际列数
actualCols, err := rows.Columns()
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_columns_error",
"job_id": id,
"stage": "xlsx_direct",
"error": err.Error(),
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_columns error=%v", id, err)
rrepo.MarkFailed(a.meta, id, "xlsx_columns_failed", map[string]interface{}{
"error": err.Error(),
})
return
}
if len(actualCols) != len(cols) {
logging.JSON("WARN", map[string]interface{}{
"event": "export_column_count_mismatch",
"job_id": id,
"stage": "xlsx_direct",
"expected_cols": len(cols),
"actual_cols": len(actualCols),
})
log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct column_mismatch expected=%d actual=%d", id, len(cols), len(actualCols))
}
out := make([]interface{}, len(actualCols))
dest := make([]interface{}, len(actualCols))
for i := range out {
dest[i] = &out[i]
}
var count int64
var tick int64
var firstRow []string
firstRowCaptured := false
for rows.Next() {
if err := rows.Scan(dest...); err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_scan_error",
"job_id": id,
"stage": "xlsx_direct",
"error": err.Error(),
"count": count,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_scan error=%v count=%d", id, err, count)
rrepo.MarkFailed(a.meta, id, "xlsx_scan_failed", map[string]interface{}{
"error": err.Error(),
"count": count,
})
return
}
vals := make([]string, len(cols))
for i := range out {
if b, ok := out[i].([]byte); ok {
vals[i] = string(b)
} else if out[i] == nil {
vals[i] = ""
} else {
vals[i] = utils.ToString(out[i])
}
}
// 仅记录第一行原始数据到日志中,方便排查是否有查询结果
if !firstRowCaptured {
firstRow = make([]string, len(vals))
copy(firstRow, vals)
firstRowCaptured = true
}
vals = transformRow(jobDS, fields, vals)
x.WriteRow(vals)
count++
tick++
if tick%200 == 0 {
rrepo.UpdateProgress(a.meta, id, count)
}
}
// 如果查询到了数据,记录一条包含首行数据的日志,便于确认导出前 SQL 是否返回结果
if count > 0 && firstRowCaptured {
logging.JSON("INFO", map[string]interface{}{
"event": "export_first_row_sample",
"job_id": id,
"datasource": jobDS,
"total_rows": count,
"first_row": firstRow,
"sql": q,
"args": args,
"final_sql": renderSQL(q, args),
"fields_order": fields,
})
} else if count == 0 {
// 如果查询返回0行记录详细信息以便排查
logging.JSON("WARN", map[string]interface{}{
"event": "export_zero_rows",
"job_id": id,
"datasource": jobDS,
"stage": "xlsx_direct",
"sql": q,
"args": args,
"arg_types": argTypes,
"final_sql": renderSQL(q, args),
"expected_cols": len(cols),
"actual_cols": len(actualCols),
})
log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct zero_rows sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args))
}
p, size, _ := x.Close()
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
rrepo.ZipAndRecord(a.meta, id, []string{p}, count)
rrepo.MarkCompleted(a.meta, id, count)
return
}
logging.JSON("ERROR", map[string]interface{}{
"event": "export_format_unsupported",
"job_id": id,
"format": fileFormat,
})
log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fileFormat)
rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{
"format": fileFormat,
})
}
// recompute final rows for a job and correct export_jobs.total_rows
func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr string) {
id, _ := strconv.ParseUint(idStr, 10, 64)
var tplID uint64
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
if err := row.Scan(&tplID, &filtersJSON); err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
var ds string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&ds, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := Whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
q, args, err := exporter.BuildSQL(req, wl)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
return
}
dataDB := a.selectDataDB(ds)
final := repo.NewExportRepo().Count(dataDB, q, args)
repo.NewExportRepo().MarkCompleted(a.meta, id, final)
ok(w, r, map[string]interface{}{"id": id, "final_rows": final})
}
func (a *ExportsAPI) selectDataDB(ds string) *sql.DB {
if ds == "ymt" {
return a.ymt
}
return a.marketing
}
// moved to repo layer: repo.ZipAndRecord
func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
d, err := rrepo.GetJob(a.meta, id)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
flist, _ := rrepo.ListJobFiles(a.meta, id)
files := []map[string]interface{}{}
for _, f := range flist {
files = append(files, map[string]interface{}{"storage_uri": f.URI.String, "sheet_name": f.Sheet.String, "row_count": f.RowCount.Int64, "size_bytes": f.SizeBytes.Int64})
}
evalStatus := "通过"
if d.ExplainScore.Int64 < 60 {
evalStatus = "禁止"
}
desc := fmt.Sprintf("评分:%d估算行数:%d%s", d.ExplainScore.Int64, d.TotalRows.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[d.ExplainScore.Int64 >= 60])
if d.ExplainJSON.Valid && d.ExplainJSON.String != "" {
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(d.ExplainJSON.String), &arr); err == nil {
segs := []string{}
for _, r := range arr {
getStr := func(field string) string {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return ""
}
if s, ok := mm["String"].(string); ok {
return s
}
}
}
return ""
}
getInt := func(field string) int64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Int64"].(float64); ok {
return int64(f)
}
}
}
return 0
}
getFloat := func(field string) float64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Float64"].(float64); ok {
return f
}
}
}
return 0
}
tbl := getStr("Table")
typ := getStr("Type")
if typ == "" {
typ = getStr("SelectType")
}
key := getStr("Key")
rowsN := getInt("Rows")
filt := getFloat("Filtered")
extra := getStr("Extra")
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
continue
}
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
if extra != "" {
s += ", 额外:" + extra
}
segs = append(segs, s)
}
if len(segs) > 0 {
desc = strings.Join(segs, "")
}
}
}
ok(w, r, map[string]interface{}{"id": d.ID, "template_id": d.TemplateID, "status": d.Status, "requested_by": d.RequestedBy, "file_format": d.FileFormat, "total_rows": d.TotalRows.Int64, "started_at": d.StartedAt.Time, "finished_at": d.FinishedAt.Time, "created_at": d.CreatedAt, "updated_at": d.UpdatedAt, "files": files, "eval_status": evalStatus, "eval_desc": desc})
}
func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
var jid uint64
_, _ = fmt.Sscan(id, &jid)
tplID, filters, err := rrepo.GetJobFilters(a.meta, jid)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, tplID)
if err != nil {
fail(w, r, http.StatusBadRequest, "template not found")
return
}
var fl map[string]interface{}
json.Unmarshal(filters, &fl)
wl := Whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
q, args, err := rrepo.Build(req, wl)
if err != nil {
failCat(w, r, http.StatusBadRequest, err.Error(), "sql_build_error")
return
}
formatArg := func(a interface{}) string {
switch t := a.(type) {
case nil:
return "NULL"
case []byte:
s := string(t)
s = strings.ReplaceAll(s, "'", "''")
return "'" + s + "'"
case string:
s := strings.ReplaceAll(t, "'", "''")
return "'" + s + "'"
case int:
return strconv.Itoa(t)
case int64:
return strconv.FormatInt(t, 10)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case time.Time:
return "'" + t.Format("2006-01-02 15:04:05") + "'"
default:
return fmt.Sprintf("%v", t)
}
}
var sb strings.Builder
var ai int
for i := 0; i < len(q); i++ {
c := q[i]
if c == '?' && ai < len(args) {
sb.WriteString(formatArg(args[ai]))
ai++
} else {
sb.WriteByte(c)
}
}
ok(w, r, map[string]interface{}{"sql": q, "final_sql": sb.String()})
}
func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) {
rrepo := repo.NewExportRepo()
uri, err := rrepo.GetLatestFileURI(a.meta, id)
if err != nil {
// fallback: try to serve local storage file by job id
// search for files named export_job_<id>_*.zip/xlsx/csv
dir := "storage"
entries, e := os.ReadDir(dir)
if e == nil {
best := ""
var bestInfo os.FileInfo
for _, ent := range entries {
name := ent.Name()
if strings.HasPrefix(name, "export_job_"+id+"_") && (strings.HasSuffix(name, ".zip") || strings.HasSuffix(name, ".xlsx") || strings.HasSuffix(name, ".csv")) {
info, _ := os.Stat(filepath.Join(dir, name))
if info != nil {
if best == "" || info.ModTime().After(bestInfo.ModTime()) {
best = name
bestInfo = info
}
}
}
}
if best != "" {
http.ServeFile(w, r, filepath.Join(dir, best))
return
}
}
fail(w, r, http.StatusNotFound, "not found")
return
}
http.ServeFile(w, r, uri)
}
func transformRow(ds string, fields []string, vals []string) []string {
payStatusIdx := -1
for i := range fields {
if fields[i] == "order.pay_status" {
payStatusIdx = i
break
}
}
isPaid := func() bool {
if payStatusIdx < 0 || payStatusIdx >= len(vals) {
return true
}
return constants.IsPaidStatus(ds, vals[payStatusIdx])
}()
for i := range fields {
if i >= len(vals) {
break
}
f := fields[i]
v := vals[i]
// ==================== 枚举转换 ====================
// order.type - 订单类型
if f == "order.type" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTOrderType[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.MarketingOrderType[n]; ok {
vals[i] = label
}
}
}
continue
}
// order.status - 订单状态
if f == "order.status" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTOrderStatus[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.MarketingOrderStatus[n]; ok {
vals[i] = label
}
}
}
continue
}
// order.pay_type - 支付方式
if f == "order.pay_type" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MarketingPayType[n]; ok {
vals[i] = label
} else if n == 0 {
vals[i] = ""
}
}
continue
}
// order.pay_status - 支付状态
if f == "order.pay_status" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTPayStatus[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.MarketingPayStatus[n]; ok {
vals[i] = label
}
}
}
continue
}
// order.use_coupon - 是否使用优惠券
if f == "order.use_coupon" {
switch v {
case "1":
vals[i] = "是"
case "2", "0":
vals[i] = "否"
}
continue
}
// order.deliver_status - 投递状态
if f == "order.deliver_status" {
switch v {
case "1":
vals[i] = "待投递"
case "2":
vals[i] = "已投递"
case "3":
vals[i] = "投递失败"
}
continue
}
// order.is_inner - 供应商类型
if f == "order.is_inner" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.YMTIsInner[n]; ok {
vals[i] = label
}
}
continue
}
// order_voucher.channel / voucher.channel - 立减金渠道
if f == "order_voucher.channel" || f == "voucher.channel" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.OrderVoucherChannel[n]; ok {
vals[i] = label
}
}
continue
}
// order_voucher.status - 立减金状态
if f == "order_voucher.status" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTOrderVoucherStatus[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.MarketingOrderVoucherStatus[n]; ok {
vals[i] = label
}
}
}
continue
}
// order_voucher.receive_mode / voucher.receive_mode - 领取方式
if f == "order_voucher.receive_mode" || f == "voucher.receive_mode" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTVoucherReceiveMode[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.OrderVoucherReceiveMode[n]; ok {
vals[i] = label
}
}
}
continue
}
// voucher.is_webview - 打开方式
if f == "voucher.is_webview" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.VoucherOpenMode[n]; ok {
vals[i] = label
}
}
continue
}
// goods_voucher_subject_config.type - 主体类型
if f == "goods_voucher_subject_config.type" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.VoucherSubjectType[n]; ok {
vals[i] = label
}
}
continue
}
// order_cash.channel - 红包渠道
if f == "order_cash.channel" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.OrderCashChannel[n]; ok {
vals[i] = label
}
}
continue
}
// order_cash.receive_status - 红包领取状态
if f == "order_cash.receive_status" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.OrderCashReceiveStatus[n]; ok {
vals[i] = label
}
}
continue
}
// order_cash.status - 红包状态(营销系统)
if f == "order_cash.status" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MarketingOrderCashStatus[n]; ok {
vals[i] = label
}
}
continue
}
// order_digit.order_type - 数字订单类型
if f == "order_digit.order_type" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.OrderDigitOrderType[n]; ok {
vals[i] = label
}
}
continue
}
// activity.settlement_type / plan.settlement_type - 结算方式
if f == "activity.settlement_type" || f == "plan.settlement_type" {
if n := parseIntVal(v); n >= 0 {
if ds == "ymt" {
if label, ok := constants.YMTSettlementType[n]; ok {
vals[i] = label
}
} else {
if label, ok := constants.MarketingSettlementType[n]; ok {
vals[i] = label
}
}
}
continue
}
// plan.send_method - 发放方式
if f == "plan.send_method" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MarketingSendMethod[n]; ok {
vals[i] = label
}
}
continue
}
// code_batch.period_type - 周期类型
if f == "code_batch.period_type" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MarketingPeriodType[n]; ok {
vals[i] = label
}
}
continue
}
// code_batch.recharge_type - 充值类型
if f == "code_batch.recharge_type" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MarketingRechargeType[n]; ok {
vals[i] = label
}
}
continue
}
// key_batch.style - key码样式
if f == "key_batch.style" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.KeyBatchStyle[n]; ok {
vals[i] = label
}
}
continue
}
// merchant_key_send.status - key码API发放状态
if f == "merchant_key_send.status" {
if n := parseIntVal(v); n >= 0 {
if label, ok := constants.MerchantKeySendStatus[n]; ok {
vals[i] = label
}
}
continue
}
// ==================== 特殊字段转换 ====================
// 解密/转换订单 key
if f == "order.key" {
if ds == "ymt" {
key := os.Getenv("YMT_KEY_DECRYPT_KEY_B64")
if key == "" {
key = "z9DoIVLuDYEN/qsgweRA4A=="
}
if dec, err := ymtcrypto.SM4Decrypt(vals[i], key); err == nil && dec != "" {
vals[i] = dec
}
} else {
vals[i] = decodeOrderKey(vals[i])
}
}
// voucher_batch.provider: 将渠道编码转换为中文名称
if f == "voucher_batch.provider" {
switch strings.TrimSpace(vals[i]) {
// 老编码
case "lsxd":
vals[i] = "蓝色兄弟"
case "fjxw":
vals[i] = "福建兴旺"
case "fzxy":
vals[i] = "福州兴雅"
case "fzyt":
vals[i] = "福州悦途"
// 新编码:微信立减金渠道
case "voucher_wechat_lsxd":
vals[i] = "蓝色兄弟"
case "voucher_wechat_fjxw":
vals[i] = "福建兴旺"
case "voucher_wechat_fzxy":
vals[i] = "福州兴雅"
case "voucher_wechat_fzyt":
vals[i] = "福州悦途"
case "voucher_wechat_zjky":
vals[i] = "浙江卡赢"
case "voucher_wechat_zjky2":
vals[i] = "浙江卡赢2"
case "voucher_wechat_zjwsxx":
vals[i] = "浙江喔刷"
case "voucher_wechat_gzynd":
vals[i] = "广州亿纳德"
case "voucher_wechat_fjhrxxfw":
vals[i] = "福建省宏仁信息服务"
case "voucher_wechat_fzqmkj":
vals[i] = "福州启蒙科技有限公司"
case "voucher_wechat_fzydxx":
vals[i] = "福州元朵信息科技有限公司"
case "voucher_wechat_xyhxx":
vals[i] = "新沂薪伙原信息科技有限公司"
}
}
// activity.channels: 解析 JSON 并转成可读渠道名
if f == "activity.channels" {
if vals[i] == "" || vals[i] == "0" {
vals[i] = "无"
continue
}
if !isPaid {
vals[i] = "无"
continue
}
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(vals[i]), &arr); err != nil {
vals[i] = "无"
continue
}
names := make([]string, 0, len(arr))
for _, item := range arr {
if v, ok := item["pay_name"].(string); ok && strings.TrimSpace(v) != "" {
names = append(names, v)
continue
}
if v, ok := item["name"].(string); ok && strings.TrimSpace(v) != "" {
names = append(names, v)
}
}
if len(names) == 0 {
vals[i] = "无"
} else {
vals[i] = strings.Join(names, ",")
}
}
}
return vals
}
func decodeOrderKey(s string) string {
if s == "" {
return s
}
if len(s) > 2 && s[len(s)-2:] == "_1" {
s = s[:len(s)-2]
}
var n big.Int
if _, ok := n.SetString(s, 10); !ok {
return s
}
base := []rune{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j', 'k', 'l', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
baseCount := big.NewInt(int64(len(base)))
zero := big.NewInt(0)
var out []rune
for n.Cmp(zero) > 0 {
var mod big.Int
mod.Mod(&n, baseCount)
out = append(out, base[mod.Int64()])
n.Div(&n, baseCount)
}
for len(out) < 16 {
out = append(out, base[0])
}
for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 {
out[i], out[j] = out[j], out[i]
}
return string(out)
}
func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) {
a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", string(constants.JobStatusCanceled), time.Now(), id)
w.Write([]byte("ok"))
}
func renderSQL(q string, args []interface{}) string {
formatArg := func(a interface{}) string {
switch t := a.(type) {
case nil:
return "NULL"
case []byte:
s := string(t)
s = strings.ReplaceAll(s, "'", "''")
return "'" + s + "'"
case string:
s := strings.ReplaceAll(t, "'", "''")
return "'" + s + "'"
case int:
return strconv.Itoa(t)
case int64:
return strconv.FormatInt(t, 10)
case float64:
return strconv.FormatFloat(t, 'f', -1, 64)
case time.Time:
return "'" + t.Format("2006-01-02 15:04:05") + "'"
default:
return fmt.Sprintf("%v", t)
}
}
var sb strings.Builder
var ai int
for i := 0; i < len(q); i++ {
c := q[i]
if c == '?' && ai < len(args) {
sb.WriteString(formatArg(args[ai]))
ai++
} else {
sb.WriteByte(c)
}
}
return sb.String()
}
func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
page := 1
size := 15
if p := q.Get("page"); p != "" {
if n, err := strconv.Atoi(p); err == nil && n > 0 {
page = n
}
}
if s := q.Get("page_size"); s != "" {
if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 {
size = n
}
}
tplIDStr := q.Get("template_id")
var tplID uint64
if tplIDStr != "" {
if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil {
tplID = n
}
}
offset := (page - 1) * size
rrepo := repo.NewExportRepo()
var totalCount int64
totalCount = rrepo.CountJobs(a.meta, tplID, "")
itemsRaw, err := rrepo.ListJobs(a.meta, tplID, "", size, offset)
if err != nil {
failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error")
return
}
items := []map[string]interface{}{}
for _, it := range itemsRaw {
id, tid, req := it.ID, it.TemplateID, it.RequestedBy
status, fmtstr := it.Status, it.FileFormat
estimate, total := it.RowEstimate, it.TotalRows
createdAt, updatedAt := it.CreatedAt, it.UpdatedAt
score, explainRaw := it.ExplainScore, it.ExplainJSON
evalStatus := "通过"
if score.Int64 < 60 {
evalStatus = "禁止"
}
desc := fmt.Sprintf("评分:%d估算行数:%d%s", score.Int64, estimate.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[score.Int64 >= 60])
if explainRaw.Valid && explainRaw.String != "" {
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(explainRaw.String), &arr); err == nil {
segs := []string{}
for _, r := range arr {
getStr := func(field string) string {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return ""
}
if s, ok := mm["String"].(string); ok {
return s
}
}
}
return ""
}
getInt := func(field string) int64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Int64"].(float64); ok {
return int64(f)
}
}
}
return 0
}
getFloat := func(field string) float64 {
if v, ok := r[field]; ok {
if mm, ok := v.(map[string]interface{}); ok {
if b, ok := mm["Valid"].(bool); ok && !b {
return 0
}
if f, ok := mm["Float64"].(float64); ok {
return f
}
}
}
return 0
}
tbl := getStr("Table")
typ := getStr("Type")
if typ == "" {
typ = getStr("SelectType")
}
key := getStr("Key")
rowsN := getInt("Rows")
filt := getFloat("Filtered")
extra := getStr("Extra")
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
continue
}
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
if extra != "" {
s += ", 额外:" + extra
}
segs = append(segs, s)
}
if len(segs) > 0 {
desc = strings.Join(segs, "")
}
}
}
m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc}
items = append(items, m)
}
ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size})
}
// mergePermissionIntoFilters injects permission scope into filters in a canonical way
func mergePermissionIntoFilters(ds, main string, perm map[string]interface{}, filters map[string]interface{}) map[string]interface{} {
if filters == nil {
filters = map[string]interface{}{}
}
// 先处理 plan_id_eq 和 reseller_id_eq 的设置
if v, ok := pickFirst(perm, filters, []string{"reseller_id", "merchant_id"}); ok {
filters["reseller_id_eq"] = v
}
if v, ok := pickFirst(perm, filters, []string{"plan_id", "activity_id"}); ok {
filters["plan_id_eq"] = v
}
// 如果传递了 plan_id_eq 或 reseller_id_eq 且不为空,则删除已有的 creator_in 并跳过设置(适用于所有数据源)
if main == "order" || main == "order_info" {
hasPlanOrReseller := false
if v, ok := filters["plan_id_eq"]; ok && v != nil && v != "" && v != 0 {
hasPlanOrReseller = true
}
if v, ok := filters["reseller_id_eq"]; ok && v != nil && v != "" && v != 0 {
hasPlanOrReseller = true
}
if hasPlanOrReseller {
// 删除已有的 creator_in
delete(filters, "creator_in")
delete(filters, "creator_ids")
goto skipCreator
}
}
// if creator_in already present, keep it
if hasNonEmptyIDs(filters["creator_in"]) {
return filters
}
// try known keys (明确排除 current_user_id它仅用于记录 owner不用于数据过滤)
{
candidates := []string{"creator_in", "creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"}
ids := []interface{}{}
for _, k := range candidates {
if perm == nil {
break
}
// 明确排除 current_user_id 字段(即使不在 candidates 列表中,也显式检查以确保安全)
if k == "current_user_id" {
continue
}
if v, ok := perm[k]; ok {
ids = normalizeIDs(v)
if len(ids) > 0 {
break
}
}
}
// also check filters incoming alternative keys and normalize into creator_in
// 明确排除 current_user_id 字段
if len(ids) == 0 {
alt := []string{"creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"}
for _, k := range alt {
// 明确排除 current_user_id 字段
if k == "current_user_id" {
continue
}
if v, ok := filters[k]; ok {
ids = normalizeIDs(v)
if len(ids) > 0 {
break
}
}
}
}
// 额外检查:如果 permission 或 filters 中直接有 current_user_id明确排除它
if perm != nil {
delete(perm, "current_user_id")
}
delete(filters, "current_user_id")
if len(ids) > 0 {
filters["creator_in"] = ids
}
}
skipCreator:
// account
if v, ok := pickFirst(perm, filters, []string{"account", "account_no"}); ok {
filters["account_eq"] = v
}
// out_trade_no
if v, ok := pickFirst(perm, filters, []string{"out_trade_no", "out_order_no"}); ok {
filters["out_trade_no_eq"] = v
}
return filters
}
func normalizeIDs(v interface{}) []interface{} {
out := []interface{}{}
switch t := v.(type) {
case []interface{}:
for _, x := range t {
if s := utils.ToString(x); s != "" {
out = append(out, s)
}
}
case []string:
for _, s := range t {
s2 := strings.TrimSpace(s)
if s2 != "" {
out = append(out, s2)
}
}
case []int:
for _, n := range t {
out = append(out, n)
}
case []int64:
for _, n := range t {
out = append(out, n)
}
case string:
// support comma-separated string
parts := strings.Split(t, ",")
for _, s := range parts {
s2 := strings.TrimSpace(s)
if s2 != "" {
out = append(out, s2)
}
}
default:
if s := utils.ToString(t); s != "" {
out = append(out, s)
}
}
return out
}
func hasNonEmptyIDs(v interface{}) bool {
arr := normalizeIDs(v)
return len(arr) > 0
}
func pickFirst(perm map[string]interface{}, filters map[string]interface{}, keys []string) (interface{}, bool) {
for _, k := range keys {
if perm != nil {
if v, ok := perm[k]; ok {
arr := normalizeIDs(v)
if len(arr) > 0 {
return arr[0], true
}
if s := utils.ToString(v); s != "" {
return s, true
}
}
}
if v, ok := filters[k]; ok {
arr := normalizeIDs(v)
if len(arr) > 0 {
return arr[0], true
}
if s := utils.ToString(v); s != "" {
return s, true
}
}
}
return nil, false
}
// parseIntVal 尝试将字符串解析为整数,失败返回-1
func parseIntVal(s string) int {
if s == "" {
return -1
}
n := 0
for _, c := range s {
if c < '0' || c > '9' {
return -1
}
n = n*10 + int(c-'0')
}
return n
}