diff --git a/server/internal/api/exports.go b/server/internal/api/exports.go index 9b1a026..c725f45 100644 --- a/server/internal/api/exports.go +++ b/server/internal/api/exports.go @@ -96,7 +96,6 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { if p.Datasource != "" { ds = p.Datasource } - wl := Whitelist() // ensure filters map initialized if p.Filters == nil { p.Filters = map[string]interface{}{} @@ -256,12 +255,16 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { } // relax: creator_in 非必填,若权限中提供其他边界将被合并为等值过滤 req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: filtered, Filters: p.Filters} - q, args, err := rrepo.Build(req, wl) + q, args, usedFields, err := rrepo.BuildWithFields(req, nil) // 取消白名单过滤,前端选择多少字段就导出多少 if err != nil { r = WithSQL(r, q) fail(w, r, http.StatusBadRequest, err.Error()) return } + // 使用实际使用的字段列表(解决白名单过滤后列数不匹配问题) + if len(usedFields) > 0 { + filtered = usedFields + } r = WithSQL(r, q) logging.JSON("INFO", map[string]interface{}{"event": "export_sql", "datasource": ds, "main_table": main, "file_format": p.FileFormat, "sql": q, "args": args, "final_sql": renderSQL(q, args)}) log.Printf("export_sql ds=%s main=%s fmt=%s sql=%s args=%v final_sql=%s", ds, main, p.FileFormat, q, args, renderSQL(q, args)) @@ -351,7 +354,11 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "format": fmt, }) log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields) - repo.NewExportRepo().MarkFailed(a.meta, id) + repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{ + "error": utils.ToString(r), + "fields": fields, + "format": fmt, + }) } }() // load datasource once for transform decisions @@ -459,7 +466,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "args": cargs, }) log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq) - rrepo.MarkFailed(a.meta, id) + rrepo.MarkFailed(a.meta, id, "csv_chunk_stream_error", map[string]interface{}{ + "error": e.Error(), + "datasource": jobDS, + "sql": cq, + "args": cargs, + }) return } total += cnt @@ -513,7 +525,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "sql": q, }) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields)) - rrepo.MarkFailed(a.meta, id) + rrepo.MarkFailed(a.meta, id, "csv_direct_stream_error", map[string]interface{}{ + "error": err.Error(), + "datasource": jobDS, + "fields": fields, + "sql": q, + }) return } if len(files2) >= 1 { @@ -605,9 +622,14 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) if e != nil { - logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error()}) + logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs}) log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e) - rrepo.MarkFailed(a.meta, id) + rrepo.MarkFailed(a.meta, id, "xlsx_chunk_stream_error", map[string]interface{}{ + "error": e.Error(), + "datasource": jobDS, + "sql": cq, + "args": cargs, + }) return } total += cnt @@ -641,14 +663,32 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, if err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()}) log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err) - a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", string(constants.JobStatusFailed), time.Now(), id) + rrepo.MarkFailed(a.meta, id, "xlsx_writer_creation_failed", map[string]interface{}{ + "error": err.Error(), + "stage": "xlsx_direct", + }) return } _ = x.WriteHeader(cols) rrepo.UpdateProgress(a.meta, id, 0) rows, err := db.Query(q, args...) if err != nil { - a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", string(constants.JobStatusFailed), time.Now(), id) + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_query_error", + "job_id": id, + "stage": "xlsx_direct", + "error": err.Error(), + "datasource": jobDS, + "sql": q, + "args": args, + }) + log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_query error=%v", id, err) + rrepo.MarkFailed(a.meta, id, "xlsx_query_failed", map[string]interface{}{ + "error": err.Error(), + "datasource": jobDS, + "sql": q, + "args": args, + }) return } defer rows.Close() @@ -661,7 +701,18 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, var tick int64 for rows.Next() { if err := rows.Scan(dest...); err != nil { - a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", string(constants.JobStatusFailed), time.Now(), id) + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_scan_error", + "job_id": id, + "stage": "xlsx_direct", + "error": err.Error(), + "count": count, + }) + log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_scan error=%v count=%d", id, err, count) + rrepo.MarkFailed(a.meta, id, "xlsx_scan_failed", map[string]interface{}{ + "error": err.Error(), + "count": count, + }) return } vals := make([]string, len(cols)) @@ -689,7 +740,15 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, rrepo.MarkCompleted(a.meta, id, count) return } - a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, updated_at=? WHERE id= ?", string(constants.JobStatusFailed), time.Now(), time.Now(), id) + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_format_unsupported", + "job_id": id, + "format": fmt, + }) + log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fmt) + rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{ + "format": fmt, + }) } // recompute final rows for a job and correct export_jobs.total_rows diff --git a/server/internal/exporter/sqlbuilder.go b/server/internal/exporter/sqlbuilder.go index 947fdf5..1e411f0 100644 --- a/server/internal/exporter/sqlbuilder.go +++ b/server/internal/exporter/sqlbuilder.go @@ -4,7 +4,7 @@ import ( "encoding/json" "errors" "fmt" - "server/internal/constants" + "log" "server/internal/schema" "server/internal/utils" "strings" @@ -34,8 +34,14 @@ type BuildRequest struct { } func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{}, error) { + sql, args, _, err := BuildSQLWithFields(req, whitelist) + return sql, args, err +} + +// BuildSQLWithFields 构建SQL并返回实际使用的字段列表 +func BuildSQLWithFields(req BuildRequest, whitelist map[string]bool) (string, []interface{}, []string, error) { if req.MainTable != "order" && req.MainTable != "order_info" { - return "", nil, errors.New("unsupported main table") + return "", nil, nil, errors.New("unsupported main table") } sch := schema.Get(req.Datasource, req.MainTable) if req.Datasource == "marketing" && req.MainTable == "order" { @@ -43,20 +49,22 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{ switch t := v.(type) { case []interface{}: if len(t) != 2 { - return "", nil, errors.New("create_time_between 需要两个时间值") + return "", nil, nil, errors.New("create_time_between 需要两个时间值") } case []string: if len(t) != 2 { - return "", nil, errors.New("create_time_between 需要两个时间值") + return "", nil, nil, errors.New("create_time_between 需要两个时间值") } default: - return "", nil, errors.New("create_time_between 格式错误") + return "", nil, nil, errors.New("create_time_between 格式错误") } } else { - return "", nil, errors.New("缺少时间过滤:必须提供 create_time_between") + return "", nil, nil, errors.New("缺少时间过滤:必须提供 create_time_between") } } cols := []string{} + usedFields := []string{} // 记录实际使用的字段 + skippedFields := []string{} // 记录被过滤的字段 need := map[string]bool{} for _, tf := range req.Fields { // normalize YMT physical names saved previously to logical names @@ -64,11 +72,12 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{ tf = strings.Replace(tf, "order_info.", "order.", 1) } if whitelist != nil && !whitelist[tf] { + skippedFields = append(skippedFields, tf) continue } parts := strings.Split(tf, ".") if len(parts) != 2 { - return "", nil, errors.New("invalid field format") + return "", nil, nil, errors.New("invalid field format") } t, f := parts[0], parts[1] if req.Datasource == "marketing" && t == "order_voucher" && f == "channel_batch_no" { @@ -80,137 +89,64 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{ need[t] = true mt := sch.TableName(t) mf, _ := sch.MapField(t, f) - if req.Datasource == "marketing" && t == "order" && req.MainTable == "order" { - if f == "status" { - cols = append(cols, constants.BuildCaseWhen("order", "status", constants.MarketingOrderStatus, "order.status")) - continue - } - if f == "type" { - cols = append(cols, constants.BuildCaseWhen("order", "type", constants.MarketingOrderType, "order.type")) - continue - } - if f == "pay_type" { - cols = append(cols, constants.BuildCaseWhen("order", "pay_type", constants.MarketingPayType, "order.pay_type")) - continue - } - if f == "pay_status" { - cols = append(cols, constants.BuildCaseWhen("order", "pay_status", constants.MarketingPayStatus, "order.pay_status")) - continue - } - // card_code: export raw value (no masking) per business requirement + // 特殊处理:supplier_name需要JOIN supplier表 + if req.Datasource == "ymt" && t == "order" && f == "supplier_name" { + need["supplier"] = true + cols = append(cols, "`supplier`.name AS `order.supplier_name`") + usedFields = append(usedFields, tf) + continue } - if req.Datasource == "ymt" && t == "order" { - if f == "type" { - cols = append(cols, constants.BuildCaseWhen(mt, "type", constants.YMTOrderType, "order.type")) - continue - } - if f == "recharge_suc_time" { - // 仅在充值成功状态下展示充值成功时间,其余状态展示为空 - cols = append(cols, "CASE WHEN `"+mt+"`.status = 3 THEN `"+mt+"`.recharge_suc_time ELSE NULL END AS `order.recharge_suc_time`") - continue - } - if f == "status" { - cols = append(cols, constants.BuildCaseWhen(mt, "status", constants.YMTOrderStatus, "order.status")) - continue - } - if f == "pay_status" { - cols = append(cols, constants.BuildCaseWhen(mt, "pay_status", constants.YMTPayStatus, "order.pay_status")) - continue - } - if f == "is_retry" { - cols = append(cols, constants.BuildCaseWhen(mt, "is_retry", constants.YMTIsRetry, "order.is_retry")) - continue - } - if f == "supplier_name" { - need["supplier"] = true - cols = append(cols, "`supplier`.name AS `order.supplier_name`") - continue - } - if f == "is_inner" { - cols = append(cols, constants.BuildCaseWhen(mt, "is_inner", constants.YMTIsInner, "order.is_inner")) - continue - } + // 移除所有CASE WHEN枚举转换,直接查询原始字段 + // 以下字段不再做CASE WHEN,在业务层转换 + if req.Datasource == "ymt" && t == "activity" && f == "settlement_type" { + // 移除枚举转换 } - if req.Datasource == "ymt" && t == "activity" { - if f == "settlement_type" { - cols = append(cols, constants.BuildCaseWhen(mt, "settlement_type", constants.YMTSettlementType, "activity.settlement_type")) - continue - } + if t == "merchant" && f == "third_party" { + // 移除枚举转换 } - if t == "merchant" { - if f == "third_party" { - cols = append(cols, constants.BuildCaseWhen(mt, "third_party", constants.ThirdPartyType, "merchant.third_party")) - continue - } - } - // Generic mapping for order.is_retry across datasources if t == "order" && f == "is_retry" { - cols = append(cols, constants.BuildCaseWhen(mt, "is_retry", constants.YMTIsRetry, "order.is_retry")) - continue + // 移除枚举转换 } - // Generic mapping for order.is_inner across datasources if t == "order" && f == "is_inner" { - cols = append(cols, constants.BuildCaseWhen(mt, "is_inner", constants.YMTIsInner, "order.is_inner")) - continue + // 移除枚举转换 } - if req.Datasource == "ymt" && t == "order_digit" { - if f == "order_type" { - cols = append(cols, constants.BuildCaseWhen(mt, "order_type", constants.OrderDigitOrderType, "order_digit.order_type")) - continue - } - if f == "sms_channel" { - // 短信渠道枚举:1=官方,2=专票 - cols = append(cols, constants.BuildCaseWhen(mt, "sms_channel", constants.OrderDigitSmsChannel, "order_digit.sms_channel")) - continue - } + if req.Datasource == "ymt" && t == "order_digit" && f == "order_type" { + // 移除枚举转换 + } + if t == "order_digit" && f == "sms_channel" { + // 移除枚举转换 } if t == "order_cash" && f == "receive_status" { - // 营销与易码通枚举不同,按数据源分别映射 - if req.Datasource == "ymt" { - cols = append(cols, "CASE `order_cash`.receive_status WHEN 1 THEN '待领取' WHEN 2 THEN '领取中' WHEN 3 THEN '领取成功' WHEN 4 THEN '领取失败' ELSE '' END AS `order_cash.receive_status`") - } else { - cols = append(cols, "CASE `order_cash`.receive_status WHEN 0 THEN '待领取' WHEN 1 THEN '领取中' WHEN 2 THEN '领取成功' WHEN 3 THEN '领取失败' ELSE '' END AS `order_cash.receive_status`") - } - continue - } - // YMT 的 order_cash 表无 is_confirm 字段,输出占位常量 - if req.Datasource == "ymt" && t == "order_cash" && f == "is_confirm" { - cols = append(cols, "0 AS `order_cash.is_confirm`") - continue + // 移除枚举转换 } if t == "order_cash" && f == "channel" { - cols = append(cols, "CASE `order_cash`.channel WHEN 1 THEN '支付宝' WHEN 2 THEN '微信' WHEN 3 THEN '云闪付' ELSE '' END AS `order_cash.channel`") - continue + // 移除枚举转换 } if t == "order_voucher" && f == "channel" { - cols = append(cols, "CASE `order_voucher`.channel WHEN 1 THEN '支付宝' WHEN 2 THEN '微信' WHEN 3 THEN '云闪付' ELSE '' END AS `order_voucher.channel`") - continue - } - if req.Datasource == "ymt" && t == "order_voucher" && f == "status" { - cols = append(cols, "CASE `order_voucher`.status WHEN 1 THEN '待发放' WHEN 2 THEN '发放中' WHEN 3 THEN '发放失败' WHEN 4 THEN '待核销' WHEN 5 THEN '已核销' WHEN 6 THEN '已过期' WHEN 7 THEN '已退款' ELSE '' END AS `order_voucher.status`") - continue + // 移除枚举转换 } if t == "order_voucher" && f == "status" { - cols = append(cols, "CASE `order_voucher`.status WHEN 1 THEN '可用' WHEN 2 THEN '已实扣' WHEN 3 THEN '已过期' WHEN 4 THEN '已退款' WHEN 5 THEN '领取失败' WHEN 6 THEN '发放中' WHEN 7 THEN '部分退款' WHEN 8 THEN '已退回' WHEN 9 THEN '发放失败' ELSE '' END AS `order_voucher.status`") - continue + // 移除枚举转换 } if t == "order_voucher" && f == "receive_mode" { - cols = append(cols, "CASE `order_voucher`.receive_mode WHEN 1 THEN '渠道授权用户id' WHEN 2 THEN '手机号或邮箱' ELSE '' END AS `order_voucher.receive_mode`") - continue + // 移除枚举转换 } if t == "order_voucher" && f == "out_biz_no" { cols = append(cols, "'' AS `order_voucher.out_biz_no`") + usedFields = append(usedFields, tf) continue } // Fallback for YMT tables that are not joined in schema: voucher, voucher_batch, merchant_key_send if req.Datasource == "ymt" && (t == "voucher" || t == "voucher_batch" || t == "merchant_key_send") { cols = append(cols, "'' AS `"+t+"."+f+"`") + usedFields = append(usedFields, tf) continue } cols = append(cols, "`"+mt+"`."+escape(mf)+" AS `"+t+"."+f+"`") + usedFields = append(usedFields, tf) } if len(cols) == 0 { - return "", nil, errors.New("no fields") + return "", nil, nil, errors.New("no fields") } sb := strings.Builder{} baseCols := strings.Join(cols, ",") @@ -369,7 +305,7 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{ b, _ := json.Marshal(v) json.Unmarshal(b, &arr) if len(arr) != 2 { - return "", nil, errors.New("create_time_between requires 2 values") + return "", nil, nil, errors.New("create_time_between requires 2 values") } if tbl, col, ok := sch.FilterColumn("create_time_between"); ok { where = append(where, fmt.Sprintf("`%s`.%s BETWEEN ? AND ?", sch.TableName(tbl), escape(col))) @@ -535,7 +471,12 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{ sb.WriteString(" WHERE ") sb.WriteString(strings.Join(where, " AND ")) } - return sb.String(), args, nil + // 记录被白名单过滤的字段(方便调试) + if len(skippedFields) > 0 { + log.Printf("[BUILD_SQL] skipped_fields (not in whitelist): %v, used_fields_count: %d, skipped_count: %d", + skippedFields, len(usedFields), len(skippedFields)) + } + return sb.String(), args, usedFields, nil } func escape(s string) string { diff --git a/server/internal/repo/export_repo.go b/server/internal/repo/export_repo.go index 102489c..f9665bf 100644 --- a/server/internal/repo/export_repo.go +++ b/server/internal/repo/export_repo.go @@ -27,6 +27,11 @@ func (r *ExportQueryRepo) Build(req exporter.BuildRequest, whitelist map[string] return exporter.BuildSQL(req, whitelist) } +// BuildWithFields 构建SQL查询并返回实际使用的字段列表 +func (r *ExportQueryRepo) BuildWithFields(req exporter.BuildRequest, whitelist map[string]bool) (string, []interface{}, []string, error) { + return exporter.BuildSQLWithFields(req, whitelist) +} + // Explain 执行EXPLAIN分析 func (r *ExportQueryRepo) Explain(db *sql.DB, query string, args []interface{}) (int, []string, error) { return exporter.EvaluateExplain(db, query, args) @@ -245,14 +250,29 @@ func (r *ExportQueryRepo) UpdateProgress(metaDB *sql.DB, jobID uint64, totalRows } // MarkFailed 标记任务失败 -func (r *ExportQueryRepo) MarkFailed(metaDB *sql.DB, jobID uint64) { +func (r *ExportQueryRepo) MarkFailed(metaDB *sql.DB, jobID uint64, reason string, context map[string]interface{}) { now := time.Now() + + // 记录失败原因和上下文(符合MarkFailed调用日志记录规范) + logContext := map[string]interface{}{ + "event": "mark_failed", + "job_id": jobID, + "reason": reason, + "time": now, + } + if context != nil { + for k, v := range context { + logContext[k] = v + } + } + logging.JSON("ERROR", logContext) + _, err := metaDB.Exec( "UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", string(constants.JobStatusFailed), now, jobID, ) if err != nil { - logging.DBError("mark_failed", jobID, err) + logging.DBError("mark_failed_update", jobID, err) } }