diff --git a/server/internal/api/exports.go b/server/internal/api/exports.go index ab6f10b..237e564 100644 --- a/server/internal/api/exports.go +++ b/server/internal/api/exports.go @@ -118,9 +118,9 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { } } if len(ids) > 0 { - // 如果传递了 plan_id_eq 或 reseller_id_eq,不设置 creator_in + // 如果传递了 plan_id_eq 或 reseller_id_eq,不设置 creator_in(适用于所有数据源) skipCreator := false - if ds == "marketing" && (main == "order" || main == "order_info") { + if main == "order" || main == "order_info" { if v, ok := p.Filters["plan_id_eq"]; ok && v != nil && v != "" && v != 0 { skipCreator = true } @@ -361,7 +361,7 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { ok(w, r, map[string]interface{}{"id": id}) } -func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fmt string) { +func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fileFormat string) { defer func() { if r := recover(); r != nil { logging.JSON("ERROR", map[string]interface{}{ @@ -369,13 +369,13 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "job_id": id, "error": utils.ToString(r), "fields": fields, - "format": fmt, + "format": fileFormat, }) log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields) repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{ "error": utils.ToString(r), "fields": fields, - "format": fmt, + "format": fileFormat, }) } }() @@ -397,7 +397,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } // 检查预估行数,如果超过阈值且格式是xlsx,强制改为csv - if fmt == "xlsx" { + if fileFormat == "xlsx" { var rowEstimate int64 estRow := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) _ = estRow.Scan(&rowEstimate) @@ -409,12 +409,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, "threshold": constants.ExportThresholds.XlsxMaxRows, "reason": "row_estimate exceeds xlsx max rows, forcing csv format", }) - fmt = "csv" + fileFormat = "csv" } } rrepo.StartJob(a.meta, id) - if fmt == "csv" { + if fileFormat == "csv" { newBaseWriter := func() (exporter.RowWriter, error) { return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) } @@ -538,7 +538,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) rrepo.UpdateRowEstimate(a.meta, id, est) } - batch := constants.ChooseBatchSize(est, constants.FileFormat(fmt)) + batch := constants.ChooseBatchSize(est, constants.FileFormat(fileFormat)) files2 := []string{} cur := rrepo.NewCursor(jobDS, jobMain) newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } @@ -576,7 +576,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, return } } - if fmt == "xlsx" { + if fileFormat == "xlsx" { files := []string{} { var tplID uint64 @@ -707,6 +707,22 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } _ = x.WriteHeader(cols) rrepo.UpdateProgress(a.meta, id, 0) + // 记录查询执行前的参数类型信息 + argTypes := make([]string, len(args)) + for i, arg := range args { + argTypes[i] = fmt.Sprintf("%T", arg) + } + logging.JSON("INFO", map[string]interface{}{ + "event": "export_query_before_execute", + "job_id": id, + "stage": "xlsx_direct", + "datasource": jobDS, + "sql": q, + "args": args, + "arg_types": argTypes, + "final_sql": renderSQL(q, args), + }) + log.Printf("[EXPORT_DEBUG] job_id=%d stage=xlsx_direct before_query sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args)) rows, err := db.Query(q, args...) if err != nil { logging.JSON("ERROR", map[string]interface{}{ @@ -728,13 +744,40 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, return } defer rows.Close() - out := make([]interface{}, len(cols)) - dest := make([]interface{}, len(cols)) + // 动态获取实际列数 + actualCols, err := rows.Columns() + if err != nil { + logging.JSON("ERROR", map[string]interface{}{ + "event": "export_columns_error", + "job_id": id, + "stage": "xlsx_direct", + "error": err.Error(), + }) + log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_columns error=%v", id, err) + rrepo.MarkFailed(a.meta, id, "xlsx_columns_failed", map[string]interface{}{ + "error": err.Error(), + }) + return + } + if len(actualCols) != len(cols) { + logging.JSON("WARN", map[string]interface{}{ + "event": "export_column_count_mismatch", + "job_id": id, + "stage": "xlsx_direct", + "expected_cols": len(cols), + "actual_cols": len(actualCols), + }) + log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct column_mismatch expected=%d actual=%d", id, len(cols), len(actualCols)) + } + out := make([]interface{}, len(actualCols)) + dest := make([]interface{}, len(actualCols)) for i := range out { dest[i] = &out[i] } var count int64 var tick int64 + var firstRow []string + firstRowCaptured := false for rows.Next() { if err := rows.Scan(dest...); err != nil { logging.JSON("ERROR", map[string]interface{}{ @@ -761,6 +804,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, vals[i] = utils.ToString(out[i]) } } + // 仅记录第一行原始数据到日志中,方便排查是否有查询结果 + if !firstRowCaptured { + firstRow = make([]string, len(vals)) + copy(firstRow, vals) + firstRowCaptured = true + } vals = transformRow(jobDS, fields, vals) x.WriteRow(vals) count++ @@ -769,6 +818,35 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, rrepo.UpdateProgress(a.meta, id, count) } } + // 如果查询到了数据,记录一条包含首行数据的日志,便于确认导出前 SQL 是否返回结果 + if count > 0 && firstRowCaptured { + logging.JSON("INFO", map[string]interface{}{ + "event": "export_first_row_sample", + "job_id": id, + "datasource": jobDS, + "total_rows": count, + "first_row": firstRow, + "sql": q, + "args": args, + "final_sql": renderSQL(q, args), + "fields_order": fields, + }) + } else if count == 0 { + // 如果查询返回0行,记录详细信息以便排查 + logging.JSON("WARN", map[string]interface{}{ + "event": "export_zero_rows", + "job_id": id, + "datasource": jobDS, + "stage": "xlsx_direct", + "sql": q, + "args": args, + "arg_types": argTypes, + "final_sql": renderSQL(q, args), + "expected_cols": len(cols), + "actual_cols": len(actualCols), + }) + log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct zero_rows sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args)) + } p, size, _ := x.Close() log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()}) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now()) @@ -779,11 +857,11 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, logging.JSON("ERROR", map[string]interface{}{ "event": "export_format_unsupported", "job_id": id, - "format": fmt, + "format": fileFormat, }) - log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fmt) + log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fileFormat) rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{ - "format": fmt, + "format": fileFormat, }) } @@ -1560,8 +1638,8 @@ func mergePermissionIntoFilters(ds, main string, perm map[string]interface{}, fi if v, ok := pickFirst(perm, filters, []string{"plan_id", "activity_id"}); ok { filters["plan_id_eq"] = v } - // 如果传递了 plan_id_eq 或 reseller_id_eq 且不为空,则删除已有的 creator_in 并跳过设置 - if ds == "marketing" && (main == "order" || main == "order_info") { + // 如果传递了 plan_id_eq 或 reseller_id_eq 且不为空,则删除已有的 creator_in 并跳过设置(适用于所有数据源) + if main == "order" || main == "order_info" { hasPlanOrReseller := false if v, ok := filters["plan_id_eq"]; ok && v != nil && v != "" && v != 0 { hasPlanOrReseller = true diff --git a/server/internal/exporter/sqlbuilder.go b/server/internal/exporter/sqlbuilder.go index 5ae5a56..041d0aa 100644 --- a/server/internal/exporter/sqlbuilder.go +++ b/server/internal/exporter/sqlbuilder.go @@ -7,6 +7,7 @@ import ( "log" "server/internal/schema" "server/internal/utils" + "strconv" "strings" ) @@ -26,6 +27,33 @@ func isZeroID(v interface{}) bool { return false } +// toIntID safely converts various types to int64 for ID parameters +func toIntID(v interface{}) int64 { + switch t := v.(type) { + case int: + return int64(t) + case int32: + return int64(t) + case int64: + return t + case float32: + return int64(t) + case float64: + return int64(t) + case string: + s := strings.TrimSpace(t) + if s == "" || s == "0" { + return 0 + } + if id, err := strconv.ParseInt(s, 10, 64); err == nil { + return id + } + return 0 + default: + return 0 + } +} + type BuildRequest struct { MainTable string Datasource string @@ -212,19 +240,19 @@ func BuildSQLWithFields(req BuildRequest, whitelist map[string]bool) (string, [] case []interface{}: for _, x := range t { if !isZeroID(x) { - creatorArgs = append(creatorArgs, x) + creatorArgs = append(creatorArgs, toIntID(x)) } } case []int: for _, x := range t { if !isZeroID(x) { - creatorArgs = append(creatorArgs, x) + creatorArgs = append(creatorArgs, int64(x)) } } case []string: for _, x := range t { if !isZeroID(x) { - creatorArgs = append(creatorArgs, x) + creatorArgs = append(creatorArgs, toIntID(x)) } } } @@ -240,19 +268,19 @@ func BuildSQLWithFields(req BuildRequest, whitelist map[string]bool) (string, [] case []interface{}: for _, x := range t { if !isZeroID(x) { - merchantArgs = append(merchantArgs, x) + merchantArgs = append(merchantArgs, toIntID(x)) } } case []int: for _, x := range t { if !isZeroID(x) { - merchantArgs = append(merchantArgs, x) + merchantArgs = append(merchantArgs, int64(x)) } } case []string: for _, x := range t { if !isZeroID(x) { - merchantArgs = append(merchantArgs, x) + merchantArgs = append(merchantArgs, toIntID(x)) } } } @@ -370,12 +398,11 @@ func BuildSQLWithFields(req BuildRequest, whitelist map[string]bool) (string, [] } } if v, ok := req.Filters["plan_id_eq"]; ok { - s := utils.ToString(v) - if s != "" && s != "0" { + if !isZeroID(v) { if tbl, col, ok := sch.FilterColumn("plan_id_eq"); ok { where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col))) + args = append(args, toIntID(v)) } - args = append(args, s) } } if v, ok := req.Filters["key_batch_id_eq"]; ok { @@ -405,12 +432,11 @@ func BuildSQLWithFields(req BuildRequest, whitelist map[string]bool) (string, [] } // If merchant_id_in is present, it handles the merchant_id logic (via OR condition), if _, hasIn := req.Filters["merchant_id_in"]; !hasIn { - s := utils.ToString(v) - if s != "" { + if !isZeroID(v) { if tbl, col, ok := sch.FilterColumn("reseller_id_eq"); ok { where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col))) + args = append(args, toIntID(v)) } - args = append(args, s) } } } @@ -546,19 +572,19 @@ func BuildCountSQL(req BuildRequest, whitelist map[string]bool) (string, []inter case []interface{}: for _, x := range t { if !isZeroID(x) { - creatorArgs = append(creatorArgs, x) + creatorArgs = append(creatorArgs, toIntID(x)) } } case []int: for _, x := range t { if !isZeroID(x) { - creatorArgs = append(creatorArgs, x) + creatorArgs = append(creatorArgs, int64(x)) } } case []string: for _, x := range t { if !isZeroID(x) { - creatorArgs = append(creatorArgs, x) + creatorArgs = append(creatorArgs, toIntID(x)) } } } @@ -574,19 +600,19 @@ func BuildCountSQL(req BuildRequest, whitelist map[string]bool) (string, []inter case []interface{}: for _, x := range t { if !isZeroID(x) { - merchantArgs = append(merchantArgs, x) + merchantArgs = append(merchantArgs, toIntID(x)) } } case []int: for _, x := range t { if !isZeroID(x) { - merchantArgs = append(merchantArgs, x) + merchantArgs = append(merchantArgs, int64(x)) } } case []string: for _, x := range t { if !isZeroID(x) { - merchantArgs = append(merchantArgs, x) + merchantArgs = append(merchantArgs, toIntID(x)) } } } @@ -644,6 +670,11 @@ func BuildCountSQL(req BuildRequest, whitelist map[string]bool) (string, []inter where = append(where, fmt.Sprintf("`%s`.%s BETWEEN ? AND ?", sch.TableName(tbl), escape(col))) args = append(args, arr[0], arr[1]) } + case "plan_id_eq", "reseller_id_eq": + if !isZeroID(v) { + where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col))) + args = append(args, toIntID(v)) + } default: s := utils.ToString(v) if s != "" {