fix(exports): 优化导出逻辑以支持多数据源和参数类型记录

- 修改 runJob 函数,重命名参数 fmt 为 fileFormat,提升可读性
- 在 runJob 中添加对查询参数类型的记录,便于调试和排查问题
- 更新 create 函数中的过滤条件逻辑,确保适用于所有数据源
- 引入 toIntID 函数,安全地将多种类型转换为 int64,优化 SQL 构建过程中的参数处理
- 记录导出查询的首行数据和零行结果,增强日志信息以便于后续分析
This commit is contained in:
zhouyonggao 2025-12-19 02:12:44 +08:00
parent 0eb65dde05
commit fca0e70115
2 changed files with 144 additions and 35 deletions

View File

@ -118,9 +118,9 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
} }
} }
if len(ids) > 0 { if len(ids) > 0 {
// 如果传递了 plan_id_eq 或 reseller_id_eq不设置 creator_in // 如果传递了 plan_id_eq 或 reseller_id_eq不设置 creator_in(适用于所有数据源)
skipCreator := false skipCreator := false
if ds == "marketing" && (main == "order" || main == "order_info") { if main == "order" || main == "order_info" {
if v, ok := p.Filters["plan_id_eq"]; ok && v != nil && v != "" && v != 0 { if v, ok := p.Filters["plan_id_eq"]; ok && v != nil && v != "" && v != 0 {
skipCreator = true skipCreator = true
} }
@ -361,7 +361,7 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
ok(w, r, map[string]interface{}{"id": id}) ok(w, r, map[string]interface{}{"id": id})
} }
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fmt string) { func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fileFormat string) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
logging.JSON("ERROR", map[string]interface{}{ logging.JSON("ERROR", map[string]interface{}{
@ -369,13 +369,13 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"job_id": id, "job_id": id,
"error": utils.ToString(r), "error": utils.ToString(r),
"fields": fields, "fields": fields,
"format": fmt, "format": fileFormat,
}) })
log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields) log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields)
repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{ repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{
"error": utils.ToString(r), "error": utils.ToString(r),
"fields": fields, "fields": fields,
"format": fmt, "format": fileFormat,
}) })
} }
}() }()
@ -397,7 +397,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
} }
// 检查预估行数如果超过阈值且格式是xlsx强制改为csv // 检查预估行数如果超过阈值且格式是xlsx强制改为csv
if fmt == "xlsx" { if fileFormat == "xlsx" {
var rowEstimate int64 var rowEstimate int64
estRow := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) estRow := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
_ = estRow.Scan(&rowEstimate) _ = estRow.Scan(&rowEstimate)
@ -409,12 +409,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
"threshold": constants.ExportThresholds.XlsxMaxRows, "threshold": constants.ExportThresholds.XlsxMaxRows,
"reason": "row_estimate exceeds xlsx max rows, forcing csv format", "reason": "row_estimate exceeds xlsx max rows, forcing csv format",
}) })
fmt = "csv" fileFormat = "csv"
} }
} }
rrepo.StartJob(a.meta, id) rrepo.StartJob(a.meta, id)
if fmt == "csv" { if fileFormat == "csv" {
newBaseWriter := func() (exporter.RowWriter, error) { newBaseWriter := func() (exporter.RowWriter, error) {
return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
} }
@ -538,7 +538,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
rrepo.UpdateRowEstimate(a.meta, id, est) rrepo.UpdateRowEstimate(a.meta, id, est)
} }
batch := constants.ChooseBatchSize(est, constants.FileFormat(fmt)) batch := constants.ChooseBatchSize(est, constants.FileFormat(fileFormat))
files2 := []string{} files2 := []string{}
cur := rrepo.NewCursor(jobDS, jobMain) cur := rrepo.NewCursor(jobDS, jobMain)
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
@ -576,7 +576,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
return return
} }
} }
if fmt == "xlsx" { if fileFormat == "xlsx" {
files := []string{} files := []string{}
{ {
var tplID uint64 var tplID uint64
@ -707,6 +707,22 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
} }
_ = x.WriteHeader(cols) _ = x.WriteHeader(cols)
rrepo.UpdateProgress(a.meta, id, 0) rrepo.UpdateProgress(a.meta, id, 0)
// 记录查询执行前的参数类型信息
argTypes := make([]string, len(args))
for i, arg := range args {
argTypes[i] = fmt.Sprintf("%T", arg)
}
logging.JSON("INFO", map[string]interface{}{
"event": "export_query_before_execute",
"job_id": id,
"stage": "xlsx_direct",
"datasource": jobDS,
"sql": q,
"args": args,
"arg_types": argTypes,
"final_sql": renderSQL(q, args),
})
log.Printf("[EXPORT_DEBUG] job_id=%d stage=xlsx_direct before_query sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args))
rows, err := db.Query(q, args...) rows, err := db.Query(q, args...)
if err != nil { if err != nil {
logging.JSON("ERROR", map[string]interface{}{ logging.JSON("ERROR", map[string]interface{}{
@ -728,13 +744,40 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
return return
} }
defer rows.Close() defer rows.Close()
out := make([]interface{}, len(cols)) // 动态获取实际列数
dest := make([]interface{}, len(cols)) actualCols, err := rows.Columns()
if err != nil {
logging.JSON("ERROR", map[string]interface{}{
"event": "export_columns_error",
"job_id": id,
"stage": "xlsx_direct",
"error": err.Error(),
})
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_columns error=%v", id, err)
rrepo.MarkFailed(a.meta, id, "xlsx_columns_failed", map[string]interface{}{
"error": err.Error(),
})
return
}
if len(actualCols) != len(cols) {
logging.JSON("WARN", map[string]interface{}{
"event": "export_column_count_mismatch",
"job_id": id,
"stage": "xlsx_direct",
"expected_cols": len(cols),
"actual_cols": len(actualCols),
})
log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct column_mismatch expected=%d actual=%d", id, len(cols), len(actualCols))
}
out := make([]interface{}, len(actualCols))
dest := make([]interface{}, len(actualCols))
for i := range out { for i := range out {
dest[i] = &out[i] dest[i] = &out[i]
} }
var count int64 var count int64
var tick int64 var tick int64
var firstRow []string
firstRowCaptured := false
for rows.Next() { for rows.Next() {
if err := rows.Scan(dest...); err != nil { if err := rows.Scan(dest...); err != nil {
logging.JSON("ERROR", map[string]interface{}{ logging.JSON("ERROR", map[string]interface{}{
@ -761,6 +804,12 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
vals[i] = utils.ToString(out[i]) vals[i] = utils.ToString(out[i])
} }
} }
// 仅记录第一行原始数据到日志中,方便排查是否有查询结果
if !firstRowCaptured {
firstRow = make([]string, len(vals))
copy(firstRow, vals)
firstRowCaptured = true
}
vals = transformRow(jobDS, fields, vals) vals = transformRow(jobDS, fields, vals)
x.WriteRow(vals) x.WriteRow(vals)
count++ count++
@ -769,6 +818,35 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
rrepo.UpdateProgress(a.meta, id, count) rrepo.UpdateProgress(a.meta, id, count)
} }
} }
// 如果查询到了数据,记录一条包含首行数据的日志,便于确认导出前 SQL 是否返回结果
if count > 0 && firstRowCaptured {
logging.JSON("INFO", map[string]interface{}{
"event": "export_first_row_sample",
"job_id": id,
"datasource": jobDS,
"total_rows": count,
"first_row": firstRow,
"sql": q,
"args": args,
"final_sql": renderSQL(q, args),
"fields_order": fields,
})
} else if count == 0 {
// 如果查询返回0行记录详细信息以便排查
logging.JSON("WARN", map[string]interface{}{
"event": "export_zero_rows",
"job_id": id,
"datasource": jobDS,
"stage": "xlsx_direct",
"sql": q,
"args": args,
"arg_types": argTypes,
"final_sql": renderSQL(q, args),
"expected_cols": len(cols),
"actual_cols": len(actualCols),
})
log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct zero_rows sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args))
}
p, size, _ := x.Close() p, size, _ := x.Close()
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()}) log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now()) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
@ -779,11 +857,11 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
logging.JSON("ERROR", map[string]interface{}{ logging.JSON("ERROR", map[string]interface{}{
"event": "export_format_unsupported", "event": "export_format_unsupported",
"job_id": id, "job_id": id,
"format": fmt, "format": fileFormat,
}) })
log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fmt) log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fileFormat)
rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{ rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{
"format": fmt, "format": fileFormat,
}) })
} }
@ -1560,8 +1638,8 @@ func mergePermissionIntoFilters(ds, main string, perm map[string]interface{}, fi
if v, ok := pickFirst(perm, filters, []string{"plan_id", "activity_id"}); ok { if v, ok := pickFirst(perm, filters, []string{"plan_id", "activity_id"}); ok {
filters["plan_id_eq"] = v filters["plan_id_eq"] = v
} }
// 如果传递了 plan_id_eq 或 reseller_id_eq 且不为空,则删除已有的 creator_in 并跳过设置 // 如果传递了 plan_id_eq 或 reseller_id_eq 且不为空,则删除已有的 creator_in 并跳过设置(适用于所有数据源)
if ds == "marketing" && (main == "order" || main == "order_info") { if main == "order" || main == "order_info" {
hasPlanOrReseller := false hasPlanOrReseller := false
if v, ok := filters["plan_id_eq"]; ok && v != nil && v != "" && v != 0 { if v, ok := filters["plan_id_eq"]; ok && v != nil && v != "" && v != 0 {
hasPlanOrReseller = true hasPlanOrReseller = true

View File

@ -7,6 +7,7 @@ import (
"log" "log"
"server/internal/schema" "server/internal/schema"
"server/internal/utils" "server/internal/utils"
"strconv"
"strings" "strings"
) )
@ -26,6 +27,33 @@ func isZeroID(v interface{}) bool {
return false return false
} }
// toIntID safely converts various types to int64 for ID parameters
func toIntID(v interface{}) int64 {
switch t := v.(type) {
case int:
return int64(t)
case int32:
return int64(t)
case int64:
return t
case float32:
return int64(t)
case float64:
return int64(t)
case string:
s := strings.TrimSpace(t)
if s == "" || s == "0" {
return 0
}
if id, err := strconv.ParseInt(s, 10, 64); err == nil {
return id
}
return 0
default:
return 0
}
}
type BuildRequest struct { type BuildRequest struct {
MainTable string MainTable string
Datasource string Datasource string
@ -212,19 +240,19 @@ func BuildSQLWithFields(req BuildRequest, whitelist map[string]bool) (string, []
case []interface{}: case []interface{}:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
creatorArgs = append(creatorArgs, x) creatorArgs = append(creatorArgs, toIntID(x))
} }
} }
case []int: case []int:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
creatorArgs = append(creatorArgs, x) creatorArgs = append(creatorArgs, int64(x))
} }
} }
case []string: case []string:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
creatorArgs = append(creatorArgs, x) creatorArgs = append(creatorArgs, toIntID(x))
} }
} }
} }
@ -240,19 +268,19 @@ func BuildSQLWithFields(req BuildRequest, whitelist map[string]bool) (string, []
case []interface{}: case []interface{}:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
merchantArgs = append(merchantArgs, x) merchantArgs = append(merchantArgs, toIntID(x))
} }
} }
case []int: case []int:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
merchantArgs = append(merchantArgs, x) merchantArgs = append(merchantArgs, int64(x))
} }
} }
case []string: case []string:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
merchantArgs = append(merchantArgs, x) merchantArgs = append(merchantArgs, toIntID(x))
} }
} }
} }
@ -370,12 +398,11 @@ func BuildSQLWithFields(req BuildRequest, whitelist map[string]bool) (string, []
} }
} }
if v, ok := req.Filters["plan_id_eq"]; ok { if v, ok := req.Filters["plan_id_eq"]; ok {
s := utils.ToString(v) if !isZeroID(v) {
if s != "" && s != "0" {
if tbl, col, ok := sch.FilterColumn("plan_id_eq"); ok { if tbl, col, ok := sch.FilterColumn("plan_id_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col))) where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
args = append(args, toIntID(v))
} }
args = append(args, s)
} }
} }
if v, ok := req.Filters["key_batch_id_eq"]; ok { if v, ok := req.Filters["key_batch_id_eq"]; ok {
@ -405,12 +432,11 @@ func BuildSQLWithFields(req BuildRequest, whitelist map[string]bool) (string, []
} }
// If merchant_id_in is present, it handles the merchant_id logic (via OR condition), // If merchant_id_in is present, it handles the merchant_id logic (via OR condition),
if _, hasIn := req.Filters["merchant_id_in"]; !hasIn { if _, hasIn := req.Filters["merchant_id_in"]; !hasIn {
s := utils.ToString(v) if !isZeroID(v) {
if s != "" {
if tbl, col, ok := sch.FilterColumn("reseller_id_eq"); ok { if tbl, col, ok := sch.FilterColumn("reseller_id_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col))) where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
args = append(args, toIntID(v))
} }
args = append(args, s)
} }
} }
} }
@ -546,19 +572,19 @@ func BuildCountSQL(req BuildRequest, whitelist map[string]bool) (string, []inter
case []interface{}: case []interface{}:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
creatorArgs = append(creatorArgs, x) creatorArgs = append(creatorArgs, toIntID(x))
} }
} }
case []int: case []int:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
creatorArgs = append(creatorArgs, x) creatorArgs = append(creatorArgs, int64(x))
} }
} }
case []string: case []string:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
creatorArgs = append(creatorArgs, x) creatorArgs = append(creatorArgs, toIntID(x))
} }
} }
} }
@ -574,19 +600,19 @@ func BuildCountSQL(req BuildRequest, whitelist map[string]bool) (string, []inter
case []interface{}: case []interface{}:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
merchantArgs = append(merchantArgs, x) merchantArgs = append(merchantArgs, toIntID(x))
} }
} }
case []int: case []int:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
merchantArgs = append(merchantArgs, x) merchantArgs = append(merchantArgs, int64(x))
} }
} }
case []string: case []string:
for _, x := range t { for _, x := range t {
if !isZeroID(x) { if !isZeroID(x) {
merchantArgs = append(merchantArgs, x) merchantArgs = append(merchantArgs, toIntID(x))
} }
} }
} }
@ -644,6 +670,11 @@ func BuildCountSQL(req BuildRequest, whitelist map[string]bool) (string, []inter
where = append(where, fmt.Sprintf("`%s`.%s BETWEEN ? AND ?", sch.TableName(tbl), escape(col))) where = append(where, fmt.Sprintf("`%s`.%s BETWEEN ? AND ?", sch.TableName(tbl), escape(col)))
args = append(args, arr[0], arr[1]) args = append(args, arr[0], arr[1])
} }
case "plan_id_eq", "reseller_id_eq":
if !isZeroID(v) {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
args = append(args, toIntID(v))
}
default: default:
s := utils.ToString(v) s := utils.ToString(v)
if s != "" { if s != "" {