refactor(export): 移除导出SQL中的枚举转换并增强错误标记详细信息

- 导出接口取消白名单过滤,字段导出与前端选择保持一致
- 构建SQL时移除所有CASE WHEN枚举字段转换,改由业务层处理
- 增加实际使用字段列表返回,解决字段列数不匹配问题
- ExportRepo.MarkFailed 增加失败原因和上下文参数,丰富失败日志
- 导出任务多个失败分支均调用MarkFailed并传递详细错误上下文
- 日志中增加导出失败详细信息,包含错误、数据源、字段、SQL及参数
- 删除无用的枚举映射代码,简化SQL构建逻辑
- 保障创建时间区间过滤的格式和参数合法性检查
- 增加导出SQL构建时被白名单过滤字段的日志输出,方便调试
This commit is contained in:
zhouyonggao 2025-12-18 12:35:46 +08:00
parent d89f6fffad
commit f6911a0cc6
3 changed files with 143 additions and 123 deletions

View File

@ -96,7 +96,6 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
if p.Datasource != "" {
ds = p.Datasource
}
wl := Whitelist()
// ensure filters map initialized
if p.Filters == nil {
p.Filters = map[string]interface{}{}
@ -256,12 +255,16 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
}
// relax: creator_in 非必填,若权限中提供其他边界将被合并为等值过滤
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: filtered, Filters: p.Filters}
q, args, err := rrepo.Build(req, wl)
q, args, usedFields, err := rrepo.BuildWithFields(req, nil) // 取消白名单过滤,前端选择多少字段就导出多少
if err != nil {
r = WithSQL(r, q)
fail(w, r, http.StatusBadRequest, err.Error())
return
}
// 使用实际使用的字段列表(解决白名单过滤后列数不匹配问题)
if len(usedFields) > 0 {
filtered = usedFields
}
r = WithSQL(r, q)
logging.JSON("INFO", map[string]interface{}{"event": "export_sql", "datasource": ds, "main_table": main, "file_format": p.FileFormat, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
log.Printf("export_sql ds=%s main=%s fmt=%s sql=%s args=%v final_sql=%s", ds, main, p.FileFormat, q, args, renderSQL(q, args))
@ -351,7 +354,11 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"format": fmt,
})
log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields)
repo.NewExportRepo().MarkFailed(a.meta, id)
repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{
"error": utils.ToString(r),
"fields": fields,
"format": fmt,
})
}
}()
// load datasource once for transform decisions
@ -459,7 +466,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"args": cargs,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq)
rrepo.MarkFailed(a.meta, id)
rrepo.MarkFailed(a.meta, id, "csv_chunk_stream_error", map[string]interface{}{
"error": e.Error(),
"datasource": jobDS,
"sql": cq,
"args": cargs,
})
return
}
total += cnt
@ -513,7 +525,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"sql": q,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields))
rrepo.MarkFailed(a.meta, id)
rrepo.MarkFailed(a.meta, id, "csv_direct_stream_error", map[string]interface{}{
"error": err.Error(),
"datasource": jobDS,
"fields": fields,
"sql": q,
})
return
}
if len(files2) >= 1 {
@ -605,9 +622,14 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
}
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
if e != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error()})
logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs})
log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e)
rrepo.MarkFailed(a.meta, id)
rrepo.MarkFailed(a.meta, id, "xlsx_chunk_stream_error", map[string]interface{}{
"error": e.Error(),
"datasource": jobDS,
"sql": cq,
"args": cargs,
})
return
}
total += cnt
@ -641,14 +663,32 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
if err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()})
log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err)
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", string(constants.JobStatusFailed), time.Now(), id)
rrepo.MarkFailed(a.meta, id, "xlsx_writer_creation_failed", map[string]interface{}{
"error": err.Error(),
"stage": "xlsx_direct",
})
return
}
_ = x.WriteHeader(cols)
rrepo.UpdateProgress(a.meta, id, 0)
rows, err := db.Query(q, args...)
if err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", string(constants.JobStatusFailed), time.Now(), id)
logging.JSON("ERROR", map[string]interface{}{
"event": "export_query_error",
"job_id": id,
"stage": "xlsx_direct",
"error": err.Error(),
"datasource": jobDS,
"sql": q,
"args": args,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_query error=%v", id, err)
rrepo.MarkFailed(a.meta, id, "xlsx_query_failed", map[string]interface{}{
"error": err.Error(),
"datasource": jobDS,
"sql": q,
"args": args,
})
return
}
defer rows.Close()
@ -661,7 +701,18 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var tick int64
for rows.Next() {
if err := rows.Scan(dest...); err != nil {
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", string(constants.JobStatusFailed), time.Now(), id)
logging.JSON("ERROR", map[string]interface{}{
"event": "export_scan_error",
"job_id": id,
"stage": "xlsx_direct",
"error": err.Error(),
"count": count,
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_scan error=%v count=%d", id, err, count)
rrepo.MarkFailed(a.meta, id, "xlsx_scan_failed", map[string]interface{}{
"error": err.Error(),
"count": count,
})
return
}
vals := make([]string, len(cols))
@ -689,7 +740,15 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
rrepo.MarkCompleted(a.meta, id, count)
return
}
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, updated_at=? WHERE id= ?", string(constants.JobStatusFailed), time.Now(), time.Now(), id)
logging.JSON("ERROR", map[string]interface{}{
"event": "export_format_unsupported",
"job_id": id,
"format": fmt,
})
log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fmt)
rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{
"format": fmt,
})
}
// recompute final rows for a job and correct export_jobs.total_rows

View File

@ -4,7 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"server/internal/constants"
"log"
"server/internal/schema"
"server/internal/utils"
"strings"
@ -34,8 +34,14 @@ type BuildRequest struct {
}
func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{}, error) {
sql, args, _, err := BuildSQLWithFields(req, whitelist)
return sql, args, err
}
// BuildSQLWithFields 构建SQL并返回实际使用的字段列表
func BuildSQLWithFields(req BuildRequest, whitelist map[string]bool) (string, []interface{}, []string, error) {
if req.MainTable != "order" && req.MainTable != "order_info" {
return "", nil, errors.New("unsupported main table")
return "", nil, nil, errors.New("unsupported main table")
}
sch := schema.Get(req.Datasource, req.MainTable)
if req.Datasource == "marketing" && req.MainTable == "order" {
@ -43,20 +49,22 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
switch t := v.(type) {
case []interface{}:
if len(t) != 2 {
return "", nil, errors.New("create_time_between 需要两个时间值")
return "", nil, nil, errors.New("create_time_between 需要两个时间值")
}
case []string:
if len(t) != 2 {
return "", nil, errors.New("create_time_between 需要两个时间值")
return "", nil, nil, errors.New("create_time_between 需要两个时间值")
}
default:
return "", nil, errors.New("create_time_between 格式错误")
return "", nil, nil, errors.New("create_time_between 格式错误")
}
} else {
return "", nil, errors.New("缺少时间过滤:必须提供 create_time_between")
return "", nil, nil, errors.New("缺少时间过滤:必须提供 create_time_between")
}
}
cols := []string{}
usedFields := []string{} // 记录实际使用的字段
skippedFields := []string{} // 记录被过滤的字段
need := map[string]bool{}
for _, tf := range req.Fields {
// normalize YMT physical names saved previously to logical names
@ -64,11 +72,12 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
tf = strings.Replace(tf, "order_info.", "order.", 1)
}
if whitelist != nil && !whitelist[tf] {
skippedFields = append(skippedFields, tf)
continue
}
parts := strings.Split(tf, ".")
if len(parts) != 2 {
return "", nil, errors.New("invalid field format")
return "", nil, nil, errors.New("invalid field format")
}
t, f := parts[0], parts[1]
if req.Datasource == "marketing" && t == "order_voucher" && f == "channel_batch_no" {
@ -80,137 +89,64 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
need[t] = true
mt := sch.TableName(t)
mf, _ := sch.MapField(t, f)
if req.Datasource == "marketing" && t == "order" && req.MainTable == "order" {
if f == "status" {
cols = append(cols, constants.BuildCaseWhen("order", "status", constants.MarketingOrderStatus, "order.status"))
continue
}
if f == "type" {
cols = append(cols, constants.BuildCaseWhen("order", "type", constants.MarketingOrderType, "order.type"))
continue
}
if f == "pay_type" {
cols = append(cols, constants.BuildCaseWhen("order", "pay_type", constants.MarketingPayType, "order.pay_type"))
continue
}
if f == "pay_status" {
cols = append(cols, constants.BuildCaseWhen("order", "pay_status", constants.MarketingPayStatus, "order.pay_status"))
continue
}
// card_code: export raw value (no masking) per business requirement
}
if req.Datasource == "ymt" && t == "order" {
if f == "type" {
cols = append(cols, constants.BuildCaseWhen(mt, "type", constants.YMTOrderType, "order.type"))
continue
}
if f == "recharge_suc_time" {
// 仅在充值成功状态下展示充值成功时间,其余状态展示为空
cols = append(cols, "CASE WHEN `"+mt+"`.status = 3 THEN `"+mt+"`.recharge_suc_time ELSE NULL END AS `order.recharge_suc_time`")
continue
}
if f == "status" {
cols = append(cols, constants.BuildCaseWhen(mt, "status", constants.YMTOrderStatus, "order.status"))
continue
}
if f == "pay_status" {
cols = append(cols, constants.BuildCaseWhen(mt, "pay_status", constants.YMTPayStatus, "order.pay_status"))
continue
}
if f == "is_retry" {
cols = append(cols, constants.BuildCaseWhen(mt, "is_retry", constants.YMTIsRetry, "order.is_retry"))
continue
}
if f == "supplier_name" {
// 特殊处理supplier_name需要JOIN supplier表
if req.Datasource == "ymt" && t == "order" && f == "supplier_name" {
need["supplier"] = true
cols = append(cols, "`supplier`.name AS `order.supplier_name`")
usedFields = append(usedFields, tf)
continue
}
if f == "is_inner" {
cols = append(cols, constants.BuildCaseWhen(mt, "is_inner", constants.YMTIsInner, "order.is_inner"))
continue
// 移除所有CASE WHEN枚举转换直接查询原始字段
// 以下字段不再做CASE WHEN在业务层转换
if req.Datasource == "ymt" && t == "activity" && f == "settlement_type" {
// 移除枚举转换
}
if t == "merchant" && f == "third_party" {
// 移除枚举转换
}
if req.Datasource == "ymt" && t == "activity" {
if f == "settlement_type" {
cols = append(cols, constants.BuildCaseWhen(mt, "settlement_type", constants.YMTSettlementType, "activity.settlement_type"))
continue
}
}
if t == "merchant" {
if f == "third_party" {
cols = append(cols, constants.BuildCaseWhen(mt, "third_party", constants.ThirdPartyType, "merchant.third_party"))
continue
}
}
// Generic mapping for order.is_retry across datasources
if t == "order" && f == "is_retry" {
cols = append(cols, constants.BuildCaseWhen(mt, "is_retry", constants.YMTIsRetry, "order.is_retry"))
continue
// 移除枚举转换
}
// Generic mapping for order.is_inner across datasources
if t == "order" && f == "is_inner" {
cols = append(cols, constants.BuildCaseWhen(mt, "is_inner", constants.YMTIsInner, "order.is_inner"))
continue
// 移除枚举转换
}
if req.Datasource == "ymt" && t == "order_digit" {
if f == "order_type" {
cols = append(cols, constants.BuildCaseWhen(mt, "order_type", constants.OrderDigitOrderType, "order_digit.order_type"))
continue
}
if f == "sms_channel" {
// 短信渠道枚举1=官方2=专票
cols = append(cols, constants.BuildCaseWhen(mt, "sms_channel", constants.OrderDigitSmsChannel, "order_digit.sms_channel"))
continue
if req.Datasource == "ymt" && t == "order_digit" && f == "order_type" {
// 移除枚举转换
}
if t == "order_digit" && f == "sms_channel" {
// 移除枚举转换
}
if t == "order_cash" && f == "receive_status" {
// 营销与易码通枚举不同,按数据源分别映射
if req.Datasource == "ymt" {
cols = append(cols, "CASE `order_cash`.receive_status WHEN 1 THEN '待领取' WHEN 2 THEN '领取中' WHEN 3 THEN '领取成功' WHEN 4 THEN '领取失败' ELSE '' END AS `order_cash.receive_status`")
} else {
cols = append(cols, "CASE `order_cash`.receive_status WHEN 0 THEN '待领取' WHEN 1 THEN '领取中' WHEN 2 THEN '领取成功' WHEN 3 THEN '领取失败' ELSE '' END AS `order_cash.receive_status`")
}
continue
}
// YMT 的 order_cash 表无 is_confirm 字段,输出占位常量
if req.Datasource == "ymt" && t == "order_cash" && f == "is_confirm" {
cols = append(cols, "0 AS `order_cash.is_confirm`")
continue
// 移除枚举转换
}
if t == "order_cash" && f == "channel" {
cols = append(cols, "CASE `order_cash`.channel WHEN 1 THEN '支付宝' WHEN 2 THEN '微信' WHEN 3 THEN '云闪付' ELSE '' END AS `order_cash.channel`")
continue
// 移除枚举转换
}
if t == "order_voucher" && f == "channel" {
cols = append(cols, "CASE `order_voucher`.channel WHEN 1 THEN '支付宝' WHEN 2 THEN '微信' WHEN 3 THEN '云闪付' ELSE '' END AS `order_voucher.channel`")
continue
}
if req.Datasource == "ymt" && t == "order_voucher" && f == "status" {
cols = append(cols, "CASE `order_voucher`.status WHEN 1 THEN '待发放' WHEN 2 THEN '发放中' WHEN 3 THEN '发放失败' WHEN 4 THEN '待核销' WHEN 5 THEN '已核销' WHEN 6 THEN '已过期' WHEN 7 THEN '已退款' ELSE '' END AS `order_voucher.status`")
continue
// 移除枚举转换
}
if t == "order_voucher" && f == "status" {
cols = append(cols, "CASE `order_voucher`.status WHEN 1 THEN '可用' WHEN 2 THEN '已实扣' WHEN 3 THEN '已过期' WHEN 4 THEN '已退款' WHEN 5 THEN '领取失败' WHEN 6 THEN '发放中' WHEN 7 THEN '部分退款' WHEN 8 THEN '已退回' WHEN 9 THEN '发放失败' ELSE '' END AS `order_voucher.status`")
continue
// 移除枚举转换
}
if t == "order_voucher" && f == "receive_mode" {
cols = append(cols, "CASE `order_voucher`.receive_mode WHEN 1 THEN '渠道授权用户id' WHEN 2 THEN '手机号或邮箱' ELSE '' END AS `order_voucher.receive_mode`")
continue
// 移除枚举转换
}
if t == "order_voucher" && f == "out_biz_no" {
cols = append(cols, "'' AS `order_voucher.out_biz_no`")
usedFields = append(usedFields, tf)
continue
}
// Fallback for YMT tables that are not joined in schema: voucher, voucher_batch, merchant_key_send
if req.Datasource == "ymt" && (t == "voucher" || t == "voucher_batch" || t == "merchant_key_send") {
cols = append(cols, "'' AS `"+t+"."+f+"`")
usedFields = append(usedFields, tf)
continue
}
cols = append(cols, "`"+mt+"`."+escape(mf)+" AS `"+t+"."+f+"`")
usedFields = append(usedFields, tf)
}
if len(cols) == 0 {
return "", nil, errors.New("no fields")
return "", nil, nil, errors.New("no fields")
}
sb := strings.Builder{}
baseCols := strings.Join(cols, ",")
@ -369,7 +305,7 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
b, _ := json.Marshal(v)
json.Unmarshal(b, &arr)
if len(arr) != 2 {
return "", nil, errors.New("create_time_between requires 2 values")
return "", nil, nil, errors.New("create_time_between requires 2 values")
}
if tbl, col, ok := sch.FilterColumn("create_time_between"); ok {
where = append(where, fmt.Sprintf("`%s`.%s BETWEEN ? AND ?", sch.TableName(tbl), escape(col)))
@ -535,7 +471,12 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
sb.WriteString(" WHERE ")
sb.WriteString(strings.Join(where, " AND "))
}
return sb.String(), args, nil
// 记录被白名单过滤的字段(方便调试)
if len(skippedFields) > 0 {
log.Printf("[BUILD_SQL] skipped_fields (not in whitelist): %v, used_fields_count: %d, skipped_count: %d",
skippedFields, len(usedFields), len(skippedFields))
}
return sb.String(), args, usedFields, nil
}
func escape(s string) string {

View File

@ -27,6 +27,11 @@ func (r *ExportQueryRepo) Build(req exporter.BuildRequest, whitelist map[string]
return exporter.BuildSQL(req, whitelist)
}
// BuildWithFields 构建SQL查询并返回实际使用的字段列表
func (r *ExportQueryRepo) BuildWithFields(req exporter.BuildRequest, whitelist map[string]bool) (string, []interface{}, []string, error) {
return exporter.BuildSQLWithFields(req, whitelist)
}
// Explain 执行EXPLAIN分析
func (r *ExportQueryRepo) Explain(db *sql.DB, query string, args []interface{}) (int, []string, error) {
return exporter.EvaluateExplain(db, query, args)
@ -245,14 +250,29 @@ func (r *ExportQueryRepo) UpdateProgress(metaDB *sql.DB, jobID uint64, totalRows
}
// MarkFailed 标记任务失败
func (r *ExportQueryRepo) MarkFailed(metaDB *sql.DB, jobID uint64) {
func (r *ExportQueryRepo) MarkFailed(metaDB *sql.DB, jobID uint64, reason string, context map[string]interface{}) {
now := time.Now()
// 记录失败原因和上下文符合MarkFailed调用日志记录规范
logContext := map[string]interface{}{
"event": "mark_failed",
"job_id": jobID,
"reason": reason,
"time": now,
}
if context != nil {
for k, v := range context {
logContext[k] = v
}
}
logging.JSON("ERROR", logContext)
_, err := metaDB.Exec(
"UPDATE export_jobs SET status=?, finished_at=? WHERE id=?",
string(constants.JobStatusFailed), now, jobID,
)
if err != nil {
logging.DBError("mark_failed", jobID, err)
logging.DBError("mark_failed_update", jobID, err)
}
}