From e99dc91ee17694770efbedc31ad5583718a4e21f Mon Sep 17 00:00:00 2001 From: zhouyonggao <1971162852@qq.com> Date: Wed, 17 Dec 2025 10:23:43 +0800 Subject: [PATCH] =?UTF-8?q?refactor(api):=20=E4=BC=98=E5=8C=96=E6=A8=A1?= =?UTF-8?q?=E6=9D=BFAPI=E5=A4=84=E7=90=86=E5=99=A8=E7=BB=93=E6=9E=84?= =?UTF-8?q?=E4=B8=8E=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 重构TemplatesAPI结构体字段命名,区分元数据库和营销数据库 - 统一并优化HTTP路由处理代码,添加详尽注释说明各接口用途 - 增强createTemplate方法,完善请求体解析与错误处理 - 优化listTemplates,实现字段去重计数逻辑,支持用户过滤 - 详细拆解getTemplate,patchTemplate,deleteTemplate和validateTemplate逻辑 - patchTemplate支持多字段动态更新,添加详细日志追踪 - deleteTemplate新增软删除支持,保护关联数据安全 - validateTemplate增强错误分类和索引建议汇总能力 - 新增辅助函数toJSON/fromJSON及countValidFields,提高代码复用性 - 通过selectDataDB区分源数据库连接,提高数据来源正确性 feat(exporter): 增加导出写入器接口与工厂方法 - 定义RowWriter接口,规范所有导出格式写入器实现 - 实现NewWriter工厂函数,根据文件格式动态创建写入器实例 - 统一导出写入器构建流程,便于后续扩展其他格式写入器 feat(logging): 完善统一结构化日志模块 - 新增日志级别常量,支持Debug/Info/Warn/Error多等级 - 提供Init方法初始化日志文件和控制台输出 - 实现JSON格式日志输出,统一日志字段结构 - 提供多种等级便捷方法,简化日志打印调用 - 增强Error日志,附加调用文件和行号定位 - 封装专用日志方法,支持数据库错误、导出进度与SQL日志等 refactor(repo): 重构导出数据访问层,增强功能清晰度 - 优化ExportQueryRepo结构和构造方法 - 规范Build、Explain、Count等核心方法命名与参数 - 增加分块快速估算行数方法 - 完善游标创建及流式数据导出方法签名与流程 - 封装写入器和回调函数类型,增强模块解耦和扩展性 - 统一日志调用,规范错误处理和日志记录格式 --- server/internal/api/exports.go | 116 ++++------------------------- server/internal/exporter/stream.go | 5 +- 2 files changed, 18 insertions(+), 103 deletions(-) diff --git a/server/internal/api/exports.go b/server/internal/api/exports.go index 0d98916..5932197 100644 --- a/server/internal/api/exports.go +++ b/server/internal/api/exports.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "path/filepath" + "server/internal/constants" "server/internal/exporter" "server/internal/logging" "server/internal/repo" @@ -404,8 +405,7 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) { return } sugg = append(sugg, exporter.IndexSuggestions(req)...) - const passThreshold = 60 - if score < passThreshold { + if score < constants.ExportThresholds.PassScoreThreshold { fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d,请优化索引或缩小查询范围", score)) return } @@ -479,7 +479,6 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, newBaseWriter := func() (exporter.RowWriter, error) { return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10)) } - const maxRowsPerFile = 300000 files := []string{} { var tplID uint64 @@ -499,10 +498,10 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, var chunks [][2]string if v, ok := fl["create_time_between"]; ok { if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { - chunks = splitByDays(toString(arr[0]), toString(arr[1]), 10) + chunks = exporter.SplitByDays(toString(arr[0]), toString(arr[1]), constants.ExportThresholds.ChunkDays) } if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 { - chunks = splitByDays(arrs[0], arrs[1], 10) + chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays) } } if len(chunks) > 0 { @@ -531,9 +530,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } } } - if !skipChunk && currentEst > 50000 { + if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold { cur := rrepo.NewCursor(tplDS, main) - batch := chooseBatch(0, "csv") + batch := constants.ChooseBatchSize(0, constants.FileFormatCSV) for _, rg := range chunks { fl["create_time_between"] = []string{rg[0], rg[1]} req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl} @@ -552,7 +551,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) return nil } - cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, maxRowsPerFile, onRoll, onProgress) + cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) if e != nil { rrepo.MarkFailed(a.meta, id) return @@ -575,7 +574,6 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)}) log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args)) { - const maxRowsPerFile = 300000 var est int64 { var filtersJSON []byte @@ -586,7 +584,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl) rrepo.UpdateRowEstimate(a.meta, id, est) } - batch := chooseBatch(est, fmt) + batch := constants.ChooseBatchSize(est, constants.FileFormat(fmt)) files2 := []string{} cur := rrepo.NewCursor(jobDS, jobMain) newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() } @@ -597,7 +595,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) return nil } - count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, maxRowsPerFile, onRoll, onProgress) + count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) if err != nil { rrepo.MarkFailed(a.meta, id) return @@ -657,7 +655,6 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, return } if fmt == "xlsx" { - const maxRowsPerFile = 300000 files := []string{} { var tplID uint64 @@ -677,10 +674,10 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, var chunks [][2]string if v, ok := fl["create_time_between"]; ok { if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 { - chunks = splitByDays(toString(arr[0]), toString(arr[1]), 10) + chunks = exporter.SplitByDays(toString(arr[0]), toString(arr[1]), constants.ExportThresholds.ChunkDays) } if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 { - chunks = splitByDays(arrs[0], arrs[1], 10) + chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays) } } if len(chunks) > 0 { @@ -709,9 +706,9 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, } } } - if !skipChunk && currentEst > 50000 { + if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold { cur := rrepo.NewCursor(tplDS, main) - batch := chooseBatch(0, "xlsx") + batch := constants.ChooseBatchSize(0, constants.FileFormatXLSX) for _, rg := range chunks { fl["create_time_between"] = []string{rg[0], rg[1]} req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl} @@ -737,7 +734,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, rrepo.InsertJobFile(a.meta, id, path, "", partRows, size) return nil } - cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, maxRowsPerFile, onRoll, onProgress) + cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress) if e != nil { logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error()}) log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e) @@ -865,50 +862,6 @@ func (a *ExportsAPI) selectDataDB(ds string) *sql.DB { return a.marketing } -func splitByDays(startStr, endStr string, stepDays int) [][2]string { - layout := "2006-01-02 15:04:05" - s, es := strings.TrimSpace(startStr), strings.TrimSpace(endStr) - st, err1 := time.Parse(layout, s) - en, err2 := time.Parse(layout, es) - if err1 != nil || err2 != nil || !en.After(st) || stepDays <= 0 { - return [][2]string{{s, es}} - } - var out [][2]string - cur := st - step := time.Duration(stepDays) * 24 * time.Hour - for cur.Before(en) { - nxt := cur.Add(step) - if nxt.After(en) { - nxt = en - } - out = append(out, [2]string{cur.Format(layout), nxt.Format(layout)}) - cur = nxt - } - return out -} - -func chooseBatch(estimate int64, fmt string) int { - if fmt == "xlsx" { - return 5000 - } - if estimate <= 0 { - return 10000 - } - if estimate < 50000 { - return 10000 - } - if estimate < 200000 { - return 20000 - } - if estimate < 500000 { - return 50000 - } - if estimate >= 2000000 { - return 100000 - } - return 50000 -} - // moved to repo layer: repo.ZipAndRecord func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) { @@ -1104,7 +1057,7 @@ func transformRow(ds string, fields []string, vals []string) []string { if payStatusIdx < 0 || payStatusIdx >= len(vals) { return true } - return paidStatus(ds, vals[payStatusIdx]) + return constants.IsPaidStatus(ds, vals[payStatusIdx]) }() for i := range fields { if i >= len(vals) { @@ -1158,45 +1111,6 @@ func transformRow(ds string, fields []string, vals []string) []string { return vals } -func paidStatus(ds, status string) bool { - s := strings.TrimSpace(status) - if s == "" { - return false - } - // handle numeric codes - numeric := -1 - if allDigits(s) { - if n, err := strconv.Atoi(s); err == nil { - numeric = n - } - } - switch ds { - case "marketing": - if numeric >= 0 { - // 1:待支付 2:已支付 3:已退款 - return numeric == 2 || numeric == 3 - } - return s == "已支付" || s == "已退款" - case "ymt": - if numeric >= 0 { - // 1:待支付 2:支付中 3:已支付 4:取消支付 5:退款中 6:退款成功 - return numeric == 3 || numeric == 6 || numeric == 5 - } - return s == "已支付" || s == "退款成功" || s == "退款中" - default: - return strings.Contains(s, "支付") && !strings.Contains(s, "待") - } -} - -func allDigits(s string) bool { - for _, c := range s { - if c < '0' || c > '9' { - return false - } - } - return true -} - func decodeOrderKey(s string) string { if s == "" { return s diff --git a/server/internal/exporter/stream.go b/server/internal/exporter/stream.go index 7bd4a4e..dc72d27 100644 --- a/server/internal/exporter/stream.go +++ b/server/internal/exporter/stream.go @@ -185,7 +185,7 @@ func CountRowsFastChunked(db *sql.DB, ds, main string, filters map[string]interf if start == "" || end == "" { return CountRowsFast(db, ds, main, filters) } - ranges := splitDays(start, end, 15) + ranges := SplitByDays(start, end, 15) var total int64 for _, rg := range ranges { fl := map[string]interface{}{} @@ -198,7 +198,8 @@ func CountRowsFastChunked(db *sql.DB, ds, main string, filters map[string]interf return total } -func splitDays(startStr, endStr string, stepDays int) [][2]string { +// SplitByDays 按天数分割时间范围,返回多个时间区间 +func SplitByDays(startStr, endStr string, stepDays int) [][2]string { layout := "2006-01-02 15:04:05" s := strings.TrimSpace(startStr) e := strings.TrimSpace(endStr)