package api import ( "database/sql" "encoding/json" "fmt" "io" "log" "math/big" "net/http" "os" "path/filepath" "server/internal/constants" "server/internal/exporter" "server/internal/logging" "server/internal/repo" "server/internal/utils" "server/internal/ymtcrypto" "strconv" "strings" "time" ) type ExportsAPI struct { meta *sql.DB marketing *sql.DB ymt *sql.DB } func ExportsHandler(meta, marketing, ymt *sql.DB) http.Handler { api := &ExportsAPI{meta: meta, marketing: marketing, ymt: ymt} return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { p := strings.TrimPrefix(r.URL.Path, "/api/exports") if r.Method == http.MethodPost && p == "" { api.create(w, r) return } if r.Method == http.MethodGet && p == "" { api.list(w, r) return } if strings.HasPrefix(p, "/") { id := strings.TrimPrefix(p, "/") if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") { if strings.HasSuffix(p, "/sql") { id = strings.TrimSuffix(id, "/sql") api.getSQL(w, r, id) return } api.get(w, r, id) return } if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") { id = strings.TrimSuffix(id, "/download") api.download(w, r, id) return } if r.Method == http.MethodPost && strings.HasSuffix(p, "/recompute") { id = strings.TrimSuffix(id, "/recompute") api.recompute(w, r, id) return } if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") { id = strings.TrimSuffix(id, "/cancel") api.cancel(w, r, id) return } } w.WriteHeader(http.StatusNotFound) }) } type ExportPayload struct { TemplateID uint64 `json:"template_id"` RequestedBy uint64 `json:"requested_by"` Permission map[string]interface{} `json:"permission"` Options map[string]interface{} `json:"options"` FileFormat string `json:"file_format"` Filters map[string]interface{} `json:"filters"` Datasource string `json:"datasource"` } func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { b, _ := io.ReadAll(r.Body) var p ExportPayload json.Unmarshal(b, &p) r = WithPayload(r, p) var main string var ds string rrepo := repo.NewExportRepo() ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, p.TemplateID) if err != nil { fail(w, r, http.StatusBadRequest, "invalid template") return } if p.Datasource != "" { ds = p.Datasource } // ensure filters map initialized if p.Filters == nil { p.Filters = map[string]interface{}{} } // merge permission scope into filters to enforce boundary p.Filters = mergePermissionIntoFilters(p.Datasource, main, p.Permission, p.Filters) // support multiple userId in query: e.g., userId=15,25 → filters.creator_in { uidStr := r.URL.Query().Get("userId") if uidStr != "" { parts := strings.Split(uidStr, ",") ids := make([]interface{}, 0, len(parts)) for _, s := range parts { s = strings.TrimSpace(s) if s == "" { continue } if n, err := strconv.ParseUint(s, 10, 64); err == nil { ids = append(ids, n) } } if len(ids) > 0 { // FORCE set creator_in if URL params are present, even if p.Filters had something else (which is unlikely if mergePermission worked, but let's be safe) // Actually, we should probably append or merge? For now, let's assume URL overrides or merges if key missing. // Logic before was: if _, exists := p.Filters["creator_in"]; !exists { ... } // But if user passed userId in URL, they probably want it to be used. // If p.Filters["creator_in"] came from `Permission`, it might be the logged-in user. // If the user is an admin acting as another user (passed in URL), we should probably use the URL one? // But `mergePermissionIntoFilters` logic is strict. // Let's keep existing logic: if permission set it, don't override. // Wait, if permission is empty (e.g. admin), then `creator_in` is missing. if _, exists := p.Filters["creator_in"]; !exists { p.Filters["creator_in"] = ids } else { // If it exists, should we merge? // If the existing one is from permission, it's a boundary. // If we are admin, permission might be empty. // Let's trust `mergePermissionIntoFilters`. } } } } // support multiple merchantId in query: e.g., merchantId=1,2,3 → filters.merchant_id_in { midStr := r.URL.Query().Get("merchantId") if midStr != "" { parts := strings.Split(midStr, ",") ids := make([]interface{}, 0, len(parts)) for _, s := range parts { s = strings.TrimSpace(s) if s == "" { continue } if n, err := strconv.ParseUint(s, 10, 64); err == nil { ids = append(ids, n) } } if len(ids) > 0 { if _, exists := p.Filters["merchant_id_in"]; !exists { p.Filters["merchant_id_in"] = ids } } } } // DEBUG LOGGING logging.JSON("INFO", map[string]interface{}{ "event": "export_filters_debug", "filters": p.Filters, "has_creator_in": hasNonEmptyIDs(p.Filters["creator_in"]), "has_merchant_id_in": hasNonEmptyIDs(p.Filters["merchant_id_in"]), }) if ds == "marketing" && (main == "order" || main == "order_info") { if v, ok := p.Filters["create_time_between"]; ok { switch t := v.(type) { case []interface{}: if len(t) != 2 { fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值") return } case []string: if len(t) != 2 { fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值") return } default: fail(w, r, http.StatusBadRequest, "create_time_between 格式错误") return } } else { fail(w, r, http.StatusBadRequest, "缺少时间过滤:必须提供 create_time_between") return } } filtered := make([]string, 0, len(fs)) tv := 0 if v, ok := p.Filters["type_eq"]; ok { switch t := v.(type) { case float64: tv = int(t) case int: tv = t case string: s := strings.TrimSpace(t) for i := 0; i < len(s); i++ { c := s[i] if c >= '0' && c <= '9' { tv = tv*10 + int(c-'0') } } } } // Normalize template fields preserving order normalized := make([]string, 0, len(fs)) for _, tf := range fs { if ds == "ymt" && strings.HasPrefix(tf, "order_info.") { tf = strings.Replace(tf, "order_info.", "order.", 1) } if ds == "marketing" && tf == "order_voucher.channel_batch_no" { tf = "order_voucher.channel_activity_id" } normalized = append(normalized, tf) } // 移除 YMT 无效字段(key批次) if ds == "ymt" { tmp := make([]string, 0, len(normalized)) for _, tf := range normalized { if tf == "order.key_batch_id" || tf == "order.key_batch_name" { continue } tmp = append(tmp, tf) } normalized = tmp } // 不再使用白名单过滤,直接使用所有字段 filtered = normalized // 易码通立减金:保留 order_voucher.grant_time,移除红包领取时间列,避免“领取时间”为空 if ds == "ymt" && tv == 3 { deduped := make([]string, 0, len(filtered)) removed := []string{} for _, tf := range filtered { if tf == "order_cash.receive_time" { removed = append(removed, tf) continue } deduped = append(deduped, tf) } if len(removed) > 0 { logging.JSON("INFO", map[string]interface{}{"event": "fields_deduplicated_receive_time", "removed": removed, "reason": "立减金保留 order_voucher.grant_time"}) } filtered = deduped } // 营销系统:非直充类型(type!=1)时移除recharge_time、card_code、account字段 if ds == "marketing" && tv != 1 { deduped := make([]string, 0, len(filtered)) removed := []string{} for _, tf := range filtered { if tf == "order.recharge_time" || tf == "order.card_code" || tf == "order.account" { removed = append(removed, tf) continue } deduped = append(deduped, tf) } if len(removed) > 0 { logging.JSON("INFO", map[string]interface{}{"event": "fields_filtered_non_direct_charge", "removed": removed, "reason": "非直充类型不导出充值时间、卡密和账号"}) } filtered = deduped } labels := FieldLabels() // 字段匹配校验(数量与顺序) if len(filtered) != len(fs) { logging.JSON("ERROR", map[string]interface{}{"event": "field_count_mismatch", "template_count": len(fs), "final_count": len(filtered)}) } // relax: creator_in 非必填,若权限中提供其他边界将被合并为等值过滤 req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: filtered, Filters: p.Filters} q, args, usedFields, err := rrepo.BuildWithFields(req, nil) // 取消白名单过滤,前端选择多少字段就导出多少 if err != nil { r = WithSQL(r, q) fail(w, r, http.StatusBadRequest, err.Error()) return } // 使用实际使用的字段列表(解决白名单过滤后列数不匹配问题) if len(usedFields) > 0 { filtered = usedFields } r = WithSQL(r, q) logging.JSON("INFO", map[string]interface{}{"event": "export_sql", "datasource": ds, "main_table": main, "file_format": p.FileFormat, "sql": q, "args": args, "final_sql": renderSQL(q, args)}) log.Printf("export_sql ds=%s main=%s fmt=%s sql=%s args=%v final_sql=%s", ds, main, p.FileFormat, q, args, renderSQL(q, args)) dataDB := a.selectDataDB(ds) score, sugg, err := rrepo.Explain(dataDB, q, args) if err != nil { fail(w, r, http.StatusBadRequest, err.Error()) return } sugg = append(sugg, exporter.IndexSuggestions(req)...) if score < constants.ExportThresholds.PassScoreThreshold { fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d,请优化索引或缩小查询范围", score)) return } // 估算行数(优先使用分块统计,失败或结果为 0 时回退到精确 COUNT) var estimate int64 estimate = rrepo.EstimateFastChunked(dataDB, ds, main, p.Filters) if estimate <= 0 { logging.JSON("WARN", map[string]interface{}{ "event": "estimate_zero_fallback", "datasource": ds, "main_table": main, "filters": p.Filters, "stage": "fast_chunked", "estimate": estimate, }) // 使用完整导出 SQL 做一次精确统计,避免分表/索引等原因导致估算为 0 estimate = exporter.CountRows(dataDB, q, args) logging.JSON("INFO", map[string]interface{}{ "event": "estimate_exact_count", "datasource": ds, "main_table": main, "filters": p.Filters, "sql": q, "args": args, "estimate": estimate, }) } hdrs := make([]string, len(filtered)) for i, tf := range filtered { if v, ok := labels[tf]; ok { hdrs[i] = v } else { hdrs[i] = tf } } // 列头去重:如果仍有重复的列头(中文标签),对非主表字段添加前缀 { cnt := map[string]int{} for _, h := range hdrs { cnt[h]++ } for i := range hdrs { if cnt[hdrs[i]] > 1 { parts := strings.Split(filtered[i], ".") if len(parts) == 2 && parts[0] != main { hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i] } } } } // owner from query userId if provided owner := uint64(0) if uidStr := r.URL.Query().Get("userId"); uidStr != "" { first := strings.TrimSpace(strings.Split(uidStr, ",")[0]) if n, err := strconv.ParseUint(first, 10, 64); err == nil { owner = n } } id, err := rrepo.InsertJob(a.meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat) if err != nil { fail(w, r, http.StatusInternalServerError, err.Error()) return } go a.runJob(uint64(id), dataDB, q, args, filtered, hdrs, p.FileFormat) ok(w, r, map[string]interface{}{"id": id}) } func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fmt string) { defer func() { if r := recover(); r != nil { logging.JSON("ERROR", map[string]interface{}{ "event": "export_panic", "job_id": id, "error": utils.ToString(r), "fields": fields, "format": fmt, }) log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields) repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{ "error": utils.ToString(r), "fields": fields, "format": fmt, }) } }() // load datasource once for transform decisions var jobDS string var jobMain string rrepo := repo.NewExportRepo() { tplID, _, _ := rrepo.GetJobFilters(a.meta, id) if tplID > 0 { ds, mt, _, _ := rrepo.GetTemplateMeta(a.meta, tplID) jobDS = ds if mt != "" { jobMain = mt } else { jobMain = "order" } } } rrepo.StartJob(a.meta, id) if fmt == "csv" { newBaseWriter := func() (exporter.RowWriter, error) { return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) } files := []string{} { var tplID uint64 var filtersJSON []byte row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) _ = row.Scan(&tplID, &filtersJSON) var tplDS string var main string var fieldsJSON []byte tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) _ = tr.Scan(&tplDS, &main, &fieldsJSON) var fs []string var fl map[string]interface{} json.Unmarshal(fieldsJSON, &fs) json.Unmarshal(filtersJSON, &fl) wl := Whitelist() var chunks [][2]string if v, ok := fl["create_time_between"]; ok { if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays) } if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 { chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays) } } if len(chunks) > 0 { var total int64 // 如果 row_estimate 为 0,在分块导出开始时重新估算 var currentEst int64 row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) _ = row.Scan(¤tEst) if currentEst == 0 { estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl) if estChunk > 0 { rrepo.UpdateRowEstimate(a.meta, id, estChunk) } } skipChunk := false if tplDS == "marketing" && main == "order" { for _, f := range fs { if strings.HasPrefix(f, "order_voucher.") { skipChunk = true break } } if !skipChunk { if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok { skipChunk = true } } } if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold { cur := rrepo.NewCursor(tplDS, main) batch := constants.ChooseBatchSize(0, constants.FileFormatCSV) for _, rg := range chunks { fl["create_time_between"] = []string{rg[0], rg[1]} req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl} cq, cargs, err := exporter.BuildSQL(req, wl) if err != nil { continue } logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)}) log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs)) newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } chunkBase := total onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil } onRoll := func(path string, size int64, partRows int64) error { files = append(files, path) rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) return nil } cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) if e != nil { logging.JSON("ERROR", map[string]interface{}{ "event": "export_stream_error", "job_id": id, "stage": "csv_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs, }) log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq) rrepo.MarkFailed(a.meta, id, "csv_chunk_stream_error", map[string]interface{}{ "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs, }) return } total += cnt rrepo.UpdateProgress(a.meta, id, total) } if total == 0 { total = rrepo.Count(db, q, args) } if len(files) >= 1 { rrepo.ZipAndRecord(a.meta, id, files, total) } rrepo.MarkCompleted(a.meta, id, total) return } } } log.Printf("job_id=%d sql=%s args=%v", id, q, args) logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)}) log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args)) { var est int64 { var filtersJSON []byte row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id) _ = row.Scan(&filtersJSON) var fl map[string]interface{} json.Unmarshal(filtersJSON, &fl) est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) rrepo.UpdateRowEstimate(a.meta, id, est) } batch := constants.ChooseBatchSize(est, constants.FileFormat(fmt)) files2 := []string{} cur := rrepo.NewCursor(jobDS, jobMain) newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) } onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil } onRoll := func(path string, size int64, partRows int64) error { files2 = append(files2, path) rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) return nil } count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) if err != nil { logging.JSON("ERROR", map[string]interface{}{ "event": "export_stream_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error(), "datasource": jobDS, "fields": fields, "sql": q, }) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields)) rrepo.MarkFailed(a.meta, id, "csv_direct_stream_error", map[string]interface{}{ "error": err.Error(), "datasource": jobDS, "fields": fields, "sql": q, }) return } if len(files2) >= 1 { rrepo.ZipAndRecord(a.meta, id, files2, count) } rrepo.MarkCompleted(a.meta, id, count) return } } if fmt == "xlsx" { files := []string{} { var tplID uint64 var filtersJSON []byte row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) _ = row.Scan(&tplID, &filtersJSON) var tplDS string var main string var fieldsJSON []byte tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) _ = tr.Scan(&tplDS, &main, &fieldsJSON) var fs []string var fl map[string]interface{} json.Unmarshal(fieldsJSON, &fs) json.Unmarshal(filtersJSON, &fl) wl := Whitelist() var chunks [][2]string if v, ok := fl["create_time_between"]; ok { if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays) } if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 { chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays) } } if len(chunks) > 0 { var total int64 // 如果 row_estimate 为 0,在分块导出开始时重新估算 var currentEst int64 row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id) _ = row.Scan(¤tEst) if currentEst == 0 { estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl) if estChunk > 0 { rrepo.UpdateRowEstimate(a.meta, id, estChunk) } } skipChunk := false if tplDS == "marketing" && main == "order" { for _, f := range fs { if strings.HasPrefix(f, "order_voucher.") { skipChunk = true break } } if !skipChunk { if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok { skipChunk = true } } } if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold { cur := rrepo.NewCursor(tplDS, main) batch := constants.ChooseBatchSize(0, constants.FileFormatXLSX) for _, rg := range chunks { fl["create_time_between"] = []string{rg[0], rg[1]} req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl} cq, cargs, err := rrepo.Build(req, wl) if err != nil { continue } logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)}) log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs)) newWriter := func() (exporter.RowWriter, error) { xw, e := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1") if e == nil { _ = xw.WriteHeader(cols) } return xw, e } transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) } // 进度回调按全局累计行数更新,避免跨分片出现数值回退 chunkBase := total onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil } onRoll := func(path string, size int64, partRows int64) error { files = append(files, path) rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) return nil } cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) if e != nil { logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs}) log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e) rrepo.MarkFailed(a.meta, id, "xlsx_chunk_stream_error", map[string]interface{}{ "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs, }) return } total += cnt rrepo.UpdateProgress(a.meta, id, total) } if total == 0 { total = rrepo.Count(db, q, args) } if len(files) >= 1 { rrepo.ZipAndRecord(a.meta, id, files, total) } rrepo.MarkCompleted(a.meta, id, total) return } } } log.Printf("job_id=%d sql=%s args=%v", id, q, args) logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)}) log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args)) var est2 int64 { var filtersJSON []byte row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id) _ = row.Scan(&filtersJSON) var fl map[string]interface{} json.Unmarshal(filtersJSON, &fl) est2 = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) rrepo.UpdateRowEstimate(a.meta, id, est2) } x, err := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1") if err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()}) log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err) rrepo.MarkFailed(a.meta, id, "xlsx_writer_creation_failed", map[string]interface{}{ "error": err.Error(), "stage": "xlsx_direct", }) return } _ = x.WriteHeader(cols) rrepo.UpdateProgress(a.meta, id, 0) rows, err := db.Query(q, args...) if err != nil { logging.JSON("ERROR", map[string]interface{}{ "event": "export_query_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error(), "datasource": jobDS, "sql": q, "args": args, }) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_query error=%v", id, err) rrepo.MarkFailed(a.meta, id, "xlsx_query_failed", map[string]interface{}{ "error": err.Error(), "datasource": jobDS, "sql": q, "args": args, }) return } defer rows.Close() out := make([]interface{}, len(cols)) dest := make([]interface{}, len(cols)) for i := range out { dest[i] = &out[i] } var count int64 var tick int64 for rows.Next() { if err := rows.Scan(dest...); err != nil { logging.JSON("ERROR", map[string]interface{}{ "event": "export_scan_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error(), "count": count, }) log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_scan error=%v count=%d", id, err, count) rrepo.MarkFailed(a.meta, id, "xlsx_scan_failed", map[string]interface{}{ "error": err.Error(), "count": count, }) return } vals := make([]string, len(cols)) for i := range out { if b, ok := out[i].([]byte); ok { vals[i] = string(b) } else if out[i] == nil { vals[i] = "" } else { vals[i] = utils.ToString(out[i]) } } vals = transformRow(jobDS, fields, vals) x.WriteRow(vals) count++ tick++ if tick%200 == 0 { rrepo.UpdateProgress(a.meta, id, count) } } p, size, _ := x.Close() log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()}) a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now()) rrepo.ZipAndRecord(a.meta, id, []string{p}, count) rrepo.MarkCompleted(a.meta, id, count) return } logging.JSON("ERROR", map[string]interface{}{ "event": "export_format_unsupported", "job_id": id, "format": fmt, }) log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fmt) rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{ "format": fmt, }) } // recompute final rows for a job and correct export_jobs.total_rows func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr string) { id, _ := strconv.ParseUint(idStr, 10, 64) var tplID uint64 var filtersJSON []byte row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id) if err := row.Scan(&tplID, &filtersJSON); err != nil { fail(w, r, http.StatusNotFound, "not found") return } var ds string var main string var fieldsJSON []byte tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID) _ = tr.Scan(&ds, &main, &fieldsJSON) var fs []string var fl map[string]interface{} json.Unmarshal(fieldsJSON, &fs) json.Unmarshal(filtersJSON, &fl) wl := Whitelist() req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl} q, args, err := exporter.BuildSQL(req, wl) if err != nil { fail(w, r, http.StatusBadRequest, err.Error()) return } dataDB := a.selectDataDB(ds) final := repo.NewExportRepo().Count(dataDB, q, args) repo.NewExportRepo().MarkCompleted(a.meta, id, final) ok(w, r, map[string]interface{}{"id": id, "final_rows": final}) } func (a *ExportsAPI) selectDataDB(ds string) *sql.DB { if ds == "ymt" { return a.ymt } return a.marketing } // moved to repo layer: repo.ZipAndRecord func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) { rrepo := repo.NewExportRepo() d, err := rrepo.GetJob(a.meta, id) if err != nil { fail(w, r, http.StatusNotFound, "not found") return } flist, _ := rrepo.ListJobFiles(a.meta, id) files := []map[string]interface{}{} for _, f := range flist { files = append(files, map[string]interface{}{"storage_uri": f.URI.String, "sheet_name": f.Sheet.String, "row_count": f.RowCount.Int64, "size_bytes": f.SizeBytes.Int64}) } evalStatus := "通过" if d.ExplainScore.Int64 < 60 { evalStatus = "禁止" } desc := fmt.Sprintf("评分:%d,估算行数:%d;%s", d.ExplainScore.Int64, d.TotalRows.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[d.ExplainScore.Int64 >= 60]) if d.ExplainJSON.Valid && d.ExplainJSON.String != "" { var arr []map[string]interface{} if err := json.Unmarshal([]byte(d.ExplainJSON.String), &arr); err == nil { segs := []string{} for _, r := range arr { getStr := func(field string) string { if v, ok := r[field]; ok { if mm, ok := v.(map[string]interface{}); ok { if b, ok := mm["Valid"].(bool); ok && !b { return "" } if s, ok := mm["String"].(string); ok { return s } } } return "" } getInt := func(field string) int64 { if v, ok := r[field]; ok { if mm, ok := v.(map[string]interface{}); ok { if b, ok := mm["Valid"].(bool); ok && !b { return 0 } if f, ok := mm["Int64"].(float64); ok { return int64(f) } } } return 0 } getFloat := func(field string) float64 { if v, ok := r[field]; ok { if mm, ok := v.(map[string]interface{}); ok { if b, ok := mm["Valid"].(bool); ok && !b { return 0 } if f, ok := mm["Float64"].(float64); ok { return f } } } return 0 } tbl := getStr("Table") typ := getStr("Type") if typ == "" { typ = getStr("SelectType") } key := getStr("Key") rowsN := getInt("Rows") filt := getFloat("Filtered") extra := getStr("Extra") if tbl == "" && typ == "" && rowsN == 0 && extra == "" { continue } s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt) if extra != "" { s += ", 额外:" + extra } segs = append(segs, s) } if len(segs) > 0 { desc = strings.Join(segs, ";") } } } ok(w, r, map[string]interface{}{"id": d.ID, "template_id": d.TemplateID, "status": d.Status, "requested_by": d.RequestedBy, "file_format": d.FileFormat, "total_rows": d.TotalRows.Int64, "started_at": d.StartedAt.Time, "finished_at": d.FinishedAt.Time, "created_at": d.CreatedAt, "updated_at": d.UpdatedAt, "files": files, "eval_status": evalStatus, "eval_desc": desc}) } func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) { rrepo := repo.NewExportRepo() var jid uint64 _, _ = fmt.Sscan(id, &jid) tplID, filters, err := rrepo.GetJobFilters(a.meta, jid) if err != nil { fail(w, r, http.StatusNotFound, "not found") return } ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, tplID) if err != nil { fail(w, r, http.StatusBadRequest, "template not found") return } var fl map[string]interface{} json.Unmarshal(filters, &fl) wl := Whitelist() req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl} q, args, err := rrepo.Build(req, wl) if err != nil { failCat(w, r, http.StatusBadRequest, err.Error(), "sql_build_error") return } formatArg := func(a interface{}) string { switch t := a.(type) { case nil: return "NULL" case []byte: s := string(t) s = strings.ReplaceAll(s, "'", "''") return "'" + s + "'" case string: s := strings.ReplaceAll(t, "'", "''") return "'" + s + "'" case int: return strconv.Itoa(t) case int64: return strconv.FormatInt(t, 10) case float64: return strconv.FormatFloat(t, 'f', -1, 64) case time.Time: return "'" + t.Format("2006-01-02 15:04:05") + "'" default: return fmt.Sprintf("%v", t) } } var sb strings.Builder var ai int for i := 0; i < len(q); i++ { c := q[i] if c == '?' && ai < len(args) { sb.WriteString(formatArg(args[ai])) ai++ } else { sb.WriteByte(c) } } ok(w, r, map[string]interface{}{"sql": q, "final_sql": sb.String()}) } func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) { rrepo := repo.NewExportRepo() uri, err := rrepo.GetLatestFileURI(a.meta, id) if err != nil { // fallback: try to serve local storage file by job id // search for files named export_job__*.zip/xlsx/csv dir := "storage" entries, e := os.ReadDir(dir) if e == nil { best := "" var bestInfo os.FileInfo for _, ent := range entries { name := ent.Name() if strings.HasPrefix(name, "export_job_"+id+"_") && (strings.HasSuffix(name, ".zip") || strings.HasSuffix(name, ".xlsx") || strings.HasSuffix(name, ".csv")) { info, _ := os.Stat(filepath.Join(dir, name)) if info != nil { if best == "" || info.ModTime().After(bestInfo.ModTime()) { best = name bestInfo = info } } } } if best != "" { http.ServeFile(w, r, filepath.Join(dir, best)) return } } fail(w, r, http.StatusNotFound, "not found") return } http.ServeFile(w, r, uri) } func transformRow(ds string, fields []string, vals []string) []string { payStatusIdx := -1 for i := range fields { if fields[i] == "order.pay_status" { payStatusIdx = i break } } isPaid := func() bool { if payStatusIdx < 0 || payStatusIdx >= len(vals) { return true } return constants.IsPaidStatus(ds, vals[payStatusIdx]) }() for i := range fields { if i >= len(vals) { break } f := fields[i] v := vals[i] // ==================== 枚举转换 ==================== // order.type - 订单类型 if f == "order.type" { if n := parseIntVal(v); n >= 0 { if ds == "ymt" { if label, ok := constants.YMTOrderType[n]; ok { vals[i] = label } } else { if label, ok := constants.MarketingOrderType[n]; ok { vals[i] = label } } } continue } // order.status - 订单状态 if f == "order.status" { if n := parseIntVal(v); n >= 0 { if ds == "ymt" { if label, ok := constants.YMTOrderStatus[n]; ok { vals[i] = label } } else { if label, ok := constants.MarketingOrderStatus[n]; ok { vals[i] = label } } } continue } // order.pay_type - 支付方式 if f == "order.pay_type" { if n := parseIntVal(v); n >= 0 { if label, ok := constants.MarketingPayType[n]; ok { vals[i] = label } else if n == 0 { vals[i] = "" } } continue } // order.pay_status - 支付状态 if f == "order.pay_status" { if n := parseIntVal(v); n >= 0 { if ds == "ymt" { if label, ok := constants.YMTPayStatus[n]; ok { vals[i] = label } } else { if label, ok := constants.MarketingPayStatus[n]; ok { vals[i] = label } } } continue } // order.use_coupon - 是否使用优惠券 if f == "order.use_coupon" { switch v { case "1": vals[i] = "是" case "2", "0": vals[i] = "否" } continue } // order.deliver_status - 投递状态 if f == "order.deliver_status" { switch v { case "1": vals[i] = "待投递" case "2": vals[i] = "已投递" case "3": vals[i] = "投递失败" } continue } // order.is_inner - 供应商类型 if f == "order.is_inner" { if n := parseIntVal(v); n >= 0 { if label, ok := constants.YMTIsInner[n]; ok { vals[i] = label } } continue } // order_voucher.channel / voucher.channel - 立减金渠道 if f == "order_voucher.channel" || f == "voucher.channel" { if n := parseIntVal(v); n >= 0 { if label, ok := constants.OrderVoucherChannel[n]; ok { vals[i] = label } } continue } // order_voucher.status - 立减金状态 if f == "order_voucher.status" { if n := parseIntVal(v); n >= 0 { if ds == "ymt" { if label, ok := constants.YMTOrderVoucherStatus[n]; ok { vals[i] = label } } else { if label, ok := constants.MarketingOrderVoucherStatus[n]; ok { vals[i] = label } } } continue } // order_voucher.receive_mode / voucher.receive_mode - 领取方式 if f == "order_voucher.receive_mode" || f == "voucher.receive_mode" { if n := parseIntVal(v); n >= 0 { if label, ok := constants.OrderVoucherReceiveMode[n]; ok { vals[i] = label } } continue } // order_cash.channel - 红包渠道 if f == "order_cash.channel" { if n := parseIntVal(v); n >= 0 { if label, ok := constants.OrderCashChannel[n]; ok { vals[i] = label } } continue } // order_cash.receive_status - 红包领取状态 if f == "order_cash.receive_status" { if n := parseIntVal(v); n >= 0 { if label, ok := constants.OrderCashReceiveStatus[n]; ok { vals[i] = label } } continue } // order_digit.order_type - 数字订单类型 if f == "order_digit.order_type" { if n := parseIntVal(v); n >= 0 { if label, ok := constants.OrderDigitOrderType[n]; ok { vals[i] = label } } continue } // activity.settlement_type / plan.settlement_type - 结算方式 if f == "activity.settlement_type" || f == "plan.settlement_type" { if n := parseIntVal(v); n >= 0 { if ds == "ymt" { if label, ok := constants.YMTSettlementType[n]; ok { vals[i] = label } } else { if label, ok := constants.MarketingSettlementType[n]; ok { vals[i] = label } } } continue } // plan.send_method - 发放方式 if f == "plan.send_method" { if n := parseIntVal(v); n >= 0 { if label, ok := constants.MarketingSendMethod[n]; ok { vals[i] = label } } continue } // code_batch.period_type - 周期类型 if f == "code_batch.period_type" { if n := parseIntVal(v); n >= 0 { if label, ok := constants.MarketingPeriodType[n]; ok { vals[i] = label } } continue } // code_batch.recharge_type - 充值类型 if f == "code_batch.recharge_type" { if n := parseIntVal(v); n >= 0 { if label, ok := constants.MarketingRechargeType[n]; ok { vals[i] = label } } continue } // ==================== 特殊字段转换 ==================== // 解密/转换订单 key if f == "order.key" { if ds == "ymt" { key := os.Getenv("YMT_KEY_DECRYPT_KEY_B64") if key == "" { key = "z9DoIVLuDYEN/qsgweRA4A==" } if dec, err := ymtcrypto.SM4Decrypt(vals[i], key); err == nil && dec != "" { vals[i] = dec } } else { vals[i] = decodeOrderKey(vals[i]) } } // voucher_batch.provider: 将渠道编码转换为中文名称 if f == "voucher_batch.provider" { switch strings.TrimSpace(vals[i]) { // 老编码 case "lsxd": vals[i] = "蓝色兄弟" case "fjxw": vals[i] = "福建兴旺" case "fzxy": vals[i] = "福州兴雅" case "fzyt": vals[i] = "福州悦途" // 新编码:微信立减金渠道 case "voucher_wechat_lsxd": vals[i] = "蓝色兄弟" case "voucher_wechat_fjxw": vals[i] = "福建兴旺" case "voucher_wechat_fzxy": vals[i] = "福州兴雅" case "voucher_wechat_fzyt": vals[i] = "福州悦途" case "voucher_wechat_zjky": vals[i] = "浙江卡赢" case "voucher_wechat_zjky2": vals[i] = "浙江卡赢2" case "voucher_wechat_zjwsxx": vals[i] = "浙江喔刷" case "voucher_wechat_gzynd": vals[i] = "广州亿纳德" case "voucher_wechat_fjhrxxfw": vals[i] = "福建省宏仁信息服务" case "voucher_wechat_fzqmkj": vals[i] = "福州启蒙科技有限公司" case "voucher_wechat_fzydxx": vals[i] = "福州元朵信息科技有限公司" case "voucher_wechat_xyhxx": vals[i] = "新沂薪伙原信息科技有限公司" } } // activity.channels: 解析 JSON 并转成可读渠道名 if f == "activity.channels" { if vals[i] == "" || vals[i] == "0" { vals[i] = "无" continue } if !isPaid { vals[i] = "无" continue } var arr []map[string]interface{} if err := json.Unmarshal([]byte(vals[i]), &arr); err != nil { vals[i] = "无" continue } names := make([]string, 0, len(arr)) for _, item := range arr { if v, ok := item["pay_name"].(string); ok && strings.TrimSpace(v) != "" { names = append(names, v) continue } if v, ok := item["name"].(string); ok && strings.TrimSpace(v) != "" { names = append(names, v) } } if len(names) == 0 { vals[i] = "无" } else { vals[i] = strings.Join(names, ",") } } } return vals } func decodeOrderKey(s string) string { if s == "" { return s } if len(s) > 2 && s[len(s)-2:] == "_1" { s = s[:len(s)-2] } var n big.Int if _, ok := n.SetString(s, 10); !ok { return s } base := []rune{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j', 'k', 'l', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'} baseCount := big.NewInt(int64(len(base))) zero := big.NewInt(0) var out []rune for n.Cmp(zero) > 0 { var mod big.Int mod.Mod(&n, baseCount) out = append(out, base[mod.Int64()]) n.Div(&n, baseCount) } for len(out) < 16 { out = append(out, base[0]) } for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 { out[i], out[j] = out[j], out[i] } return string(out) } func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) { a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", string(constants.JobStatusCanceled), time.Now(), id) w.Write([]byte("ok")) } func renderSQL(q string, args []interface{}) string { formatArg := func(a interface{}) string { switch t := a.(type) { case nil: return "NULL" case []byte: s := string(t) s = strings.ReplaceAll(s, "'", "''") return "'" + s + "'" case string: s := strings.ReplaceAll(t, "'", "''") return "'" + s + "'" case int: return strconv.Itoa(t) case int64: return strconv.FormatInt(t, 10) case float64: return strconv.FormatFloat(t, 'f', -1, 64) case time.Time: return "'" + t.Format("2006-01-02 15:04:05") + "'" default: return fmt.Sprintf("%v", t) } } var sb strings.Builder var ai int for i := 0; i < len(q); i++ { c := q[i] if c == '?' && ai < len(args) { sb.WriteString(formatArg(args[ai])) ai++ } else { sb.WriteByte(c) } } return sb.String() } func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() page := 1 size := 15 if p := q.Get("page"); p != "" { if n, err := strconv.Atoi(p); err == nil && n > 0 { page = n } } if s := q.Get("page_size"); s != "" { if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 { size = n } } tplIDStr := q.Get("template_id") var tplID uint64 if tplIDStr != "" { if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil { tplID = n } } offset := (page - 1) * size rrepo := repo.NewExportRepo() var totalCount int64 uidStr := q.Get("userId") totalCount = rrepo.CountJobs(a.meta, tplID, uidStr) itemsRaw, err := rrepo.ListJobs(a.meta, tplID, uidStr, size, offset) if err != nil { failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error") return } items := []map[string]interface{}{} for _, it := range itemsRaw { id, tid, req := it.ID, it.TemplateID, it.RequestedBy status, fmtstr := it.Status, it.FileFormat estimate, total := it.RowEstimate, it.TotalRows createdAt, updatedAt := it.CreatedAt, it.UpdatedAt score, explainRaw := it.ExplainScore, it.ExplainJSON evalStatus := "通过" if score.Int64 < 60 { evalStatus = "禁止" } desc := fmt.Sprintf("评分:%d,估算行数:%d;%s", score.Int64, estimate.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[score.Int64 >= 60]) if explainRaw.Valid && explainRaw.String != "" { var arr []map[string]interface{} if err := json.Unmarshal([]byte(explainRaw.String), &arr); err == nil { segs := []string{} for _, r := range arr { getStr := func(field string) string { if v, ok := r[field]; ok { if mm, ok := v.(map[string]interface{}); ok { if b, ok := mm["Valid"].(bool); ok && !b { return "" } if s, ok := mm["String"].(string); ok { return s } } } return "" } getInt := func(field string) int64 { if v, ok := r[field]; ok { if mm, ok := v.(map[string]interface{}); ok { if b, ok := mm["Valid"].(bool); ok && !b { return 0 } if f, ok := mm["Int64"].(float64); ok { return int64(f) } } } return 0 } getFloat := func(field string) float64 { if v, ok := r[field]; ok { if mm, ok := v.(map[string]interface{}); ok { if b, ok := mm["Valid"].(bool); ok && !b { return 0 } if f, ok := mm["Float64"].(float64); ok { return f } } } return 0 } tbl := getStr("Table") typ := getStr("Type") if typ == "" { typ = getStr("SelectType") } key := getStr("Key") rowsN := getInt("Rows") filt := getFloat("Filtered") extra := getStr("Extra") if tbl == "" && typ == "" && rowsN == 0 && extra == "" { continue } s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt) if extra != "" { s += ", 额外:" + extra } segs = append(segs, s) } if len(segs) > 0 { desc = strings.Join(segs, ";") } } } m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc} items = append(items, m) } ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size}) } // mergePermissionIntoFilters injects permission scope into filters in a canonical way func mergePermissionIntoFilters(ds, main string, perm map[string]interface{}, filters map[string]interface{}) map[string]interface{} { if filters == nil { filters = map[string]interface{}{} } // if creator_in already present, keep it if hasNonEmptyIDs(filters["creator_in"]) { return filters } // try known keys candidates := []string{"creator_in", "creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"} ids := []interface{}{} for _, k := range candidates { if perm == nil { break } if v, ok := perm[k]; ok { ids = normalizeIDs(v) if len(ids) > 0 { break } } } // also check filters incoming alternative keys and normalize into creator_in if len(ids) == 0 { alt := []string{"creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"} for _, k := range alt { if v, ok := filters[k]; ok { ids = normalizeIDs(v) if len(ids) > 0 { break } } } } if len(ids) > 0 { filters["creator_in"] = ids } // map alternative permission boundaries to supported filters // reseller/merchant -> reseller_id_eq if v, ok := pickFirst(perm, filters, []string{"reseller_id", "merchant_id"}); ok { filters["reseller_id_eq"] = v } // plan/activity -> plan_id_eq if v, ok := pickFirst(perm, filters, []string{"plan_id", "activity_id"}); ok { filters["plan_id_eq"] = v } // account if v, ok := pickFirst(perm, filters, []string{"account", "account_no"}); ok { filters["account_eq"] = v } // out_trade_no if v, ok := pickFirst(perm, filters, []string{"out_trade_no", "out_order_no"}); ok { filters["out_trade_no_eq"] = v } return filters } func normalizeIDs(v interface{}) []interface{} { out := []interface{}{} switch t := v.(type) { case []interface{}: for _, x := range t { if s := utils.ToString(x); s != "" { out = append(out, s) } } case []string: for _, s := range t { s2 := strings.TrimSpace(s) if s2 != "" { out = append(out, s2) } } case []int: for _, n := range t { out = append(out, n) } case []int64: for _, n := range t { out = append(out, n) } case string: // support comma-separated string parts := strings.Split(t, ",") for _, s := range parts { s2 := strings.TrimSpace(s) if s2 != "" { out = append(out, s2) } } default: if s := utils.ToString(t); s != "" { out = append(out, s) } } return out } func hasNonEmptyIDs(v interface{}) bool { arr := normalizeIDs(v) return len(arr) > 0 } func pickFirst(perm map[string]interface{}, filters map[string]interface{}, keys []string) (interface{}, bool) { for _, k := range keys { if perm != nil { if v, ok := perm[k]; ok { arr := normalizeIDs(v) if len(arr) > 0 { return arr[0], true } if s := utils.ToString(v); s != "" { return s, true } } } if v, ok := filters[k]; ok { arr := normalizeIDs(v) if len(arr) > 0 { return arr[0], true } if s := utils.ToString(v); s != "" { return s, true } } } return nil, false } // parseIntVal 尝试将字符串解析为整数,失败返回-1 func parseIntVal(s string) int { if s == "" { return -1 } n := 0 for _, c := range s { if c < '0' || c > '9' { return -1 } n = n*10 + int(c-'0') } return n }