refactor(exporter): 重构SQL构建逻辑以支持多数据源

将SQL构建逻辑重构为基于schema接口的实现,支持不同数据源的字段映射和表连接
修复重复转换行数据的问题
公开Whitelist和FieldLabels函数以供外部调用
This commit is contained in:
zhouyonggao 2025-11-27 14:21:29 +08:00
parent 4467bc536d
commit 2ac2d61551
8 changed files with 344 additions and 279 deletions

Binary file not shown.

View File

@ -18,22 +18,22 @@ import (
)
type ExportsAPI struct {
meta *sql.DB
marketing *sql.DB
meta *sql.DB
marketing *sql.DB
}
func ExportsHandler(meta, marketing *sql.DB) http.Handler {
api := &ExportsAPI{meta: meta, marketing: marketing}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
if r.Method == http.MethodPost && p == "" {
api.create(w, r)
return
}
if r.Method == http.MethodGet && p == "" {
api.list(w, r)
return
}
api := &ExportsAPI{meta: meta, marketing: marketing}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
if r.Method == http.MethodPost && p == "" {
api.create(w, r)
return
}
if r.Method == http.MethodGet && p == "" {
api.list(w, r)
return
}
if strings.HasPrefix(p, "/") {
id := strings.TrimPrefix(p, "/")
if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") {
@ -56,27 +56,27 @@ func ExportsHandler(meta, marketing *sql.DB) http.Handler {
return
}
}
w.WriteHeader(http.StatusNotFound)
})
w.WriteHeader(http.StatusNotFound)
})
}
func (a *ExportsAPI) ensureOwnerColumn() {
// Try to add owner_id column if not exists; ignore errors
_, _ = a.meta.Exec("ALTER TABLE export_jobs ADD COLUMN owner_id BIGINT UNSIGNED NOT NULL DEFAULT 0")
// Try to add owner_id column if not exists; ignore errors
_, _ = a.meta.Exec("ALTER TABLE export_jobs ADD COLUMN owner_id BIGINT UNSIGNED NOT NULL DEFAULT 0")
}
type ExportPayload struct {
TemplateID uint64 `json:"template_id"`
RequestedBy uint64 `json:"requested_by"`
Permission map[string]interface{} `json:"permission"`
Options map[string]interface{} `json:"options"`
FileFormat string `json:"file_format"`
Filters map[string]interface{} `json:"filters"`
Datasource string `json:"datasource"`
TemplateID uint64 `json:"template_id"`
RequestedBy uint64 `json:"requested_by"`
Permission map[string]interface{} `json:"permission"`
Options map[string]interface{} `json:"options"`
FileFormat string `json:"file_format"`
Filters map[string]interface{} `json:"filters"`
Datasource string `json:"datasource"`
}
func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
a.ensureOwnerColumn()
a.ensureOwnerColumn()
b, _ := io.ReadAll(r.Body)
var p ExportPayload
json.Unmarshal(b, &p)
@ -96,8 +96,8 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
}
var fs []string
json.Unmarshal(fields, &fs)
wl := whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: p.Filters}
wl := Whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: p.Filters}
q, args, err := exporter.BuildSQL(req, wl)
if err != nil {
r = WithSQL(r, q)
@ -138,7 +138,7 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
}
}
}()
labels := fieldLabels()
labels := FieldLabels()
hdrs := make([]string, len(fs))
for i, tf := range fs {
if v, ok := labels[tf]; ok {
@ -147,13 +147,15 @@ func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
hdrs[i] = tf
}
}
// owner from query userId if provided
owner := uint64(0)
if uidStr := r.URL.Query().Get("userId"); uidStr != "" {
if n, err := strconv.ParseUint(uidStr, 10, 64); err == nil { owner = n }
}
ejSQL := "INSERT INTO export_jobs (template_id, status, requested_by, owner_id, permission_scope_json, filters_json, options_json, explain_json, explain_score, row_estimate, file_format, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)"
ejArgs := []interface{}{p.TemplateID, "queued", p.RequestedBy, owner, toJSON(p.Permission), toJSON(p.Filters), toJSON(p.Options), toJSON(expRows), score, estimate, p.FileFormat, time.Now(), time.Now()}
// owner from query userId if provided
owner := uint64(0)
if uidStr := r.URL.Query().Get("userId"); uidStr != "" {
if n, err := strconv.ParseUint(uidStr, 10, 64); err == nil {
owner = n
}
}
ejSQL := "INSERT INTO export_jobs (template_id, status, requested_by, owner_id, permission_scope_json, filters_json, options_json, explain_json, explain_score, row_estimate, file_format, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)"
ejArgs := []interface{}{p.TemplateID, "queued", p.RequestedBy, owner, toJSON(p.Permission), toJSON(p.Filters), toJSON(p.Options), toJSON(expRows), score, estimate, p.FileFormat, time.Now(), time.Now()}
log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), ejSQL, ejArgs)
res, err := a.meta.Exec(ejSQL, ejArgs...)
if err != nil {
@ -182,16 +184,16 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := whitelist()
wl := Whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
@ -212,7 +214,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var tick int64
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
cq, cargs, err := exporter.BuildSQL(req, wl)
if err != nil {
continue
@ -244,9 +246,6 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
}
}
vals = transformRow(fs, vals)
vals = transformRow(fields, vals)
vals = transformRow(fields, vals)
vals = transformRow(fields, vals)
w.WriteRow(vals)
count++
partCount++
@ -516,16 +515,16 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var filtersJSON []byte
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
_ = row.Scan(&tplID, &filtersJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var tplDS string
var main string
var fieldsJSON []byte
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
var fs []string
var fl map[string]interface{}
json.Unmarshal(fieldsJSON, &fs)
json.Unmarshal(filtersJSON, &fl)
wl := whitelist()
wl := Whitelist()
var chunks [][2]string
if v, ok := fl["create_time_between"]; ok {
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
@ -546,7 +545,7 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
var tick int64
for _, rg := range chunks {
fl["create_time_between"] = []string{rg[0], rg[1]}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
cq, cargs, err := exporter.BuildSQL(req, wl)
if err != nil {
continue
@ -578,10 +577,6 @@ func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{},
}
}
vals = transformRow(fs, vals)
vals = transformRow(fs, vals)
vals = transformRow(fs, vals)
vals = transformRow(fs, vals)
vals = transformRow(fields, vals)
x.WriteRow(vals)
count++
partCount++
@ -792,11 +787,11 @@ func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
fail(w, r, http.StatusNotFound, "not found")
return
}
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
var ds string
var main string
var fields []byte
if err := tr.Scan(&ds, &main, &fields); err != nil {
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
var ds string
var main string
var fields []byte
if err := tr.Scan(&ds, &main, &fields); err != nil {
fail(w, r, http.StatusBadRequest, "template not found")
return
}
@ -804,8 +799,8 @@ func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
var fl map[string]interface{}
json.Unmarshal(fields, &fs)
json.Unmarshal(filters, &fl)
wl := whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
wl := Whitelist()
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
q, args, err := exporter.BuildSQL(req, wl)
if err != nil {
fail(w, r, http.StatusBadRequest, err.Error())
@ -931,10 +926,10 @@ func toString(v interface{}) string {
}
}
func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
a.ensureOwnerColumn()
q := r.URL.Query()
page := 1
size := 15
a.ensureOwnerColumn()
q := r.URL.Query()
page := 1
size := 15
if p := q.Get("page"); p != "" {
if n, err := strconv.Atoi(p); err == nil && n > 0 {
page = n
@ -953,40 +948,40 @@ func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
}
}
offset := (page - 1) * size
var totalCount int64
uidStr := q.Get("userId")
if tplID > 0 {
if uidStr != "" {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ? AND owner_id = ?", tplID, uidStr)
_ = row.Scan(&totalCount)
} else {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ?", tplID)
_ = row.Scan(&totalCount)
}
} else {
if uidStr != "" {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE owner_id = ?", uidStr)
_ = row.Scan(&totalCount)
} else {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs")
_ = row.Scan(&totalCount)
}
}
var totalCount int64
uidStr := q.Get("userId")
if tplID > 0 {
if uidStr != "" {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ? AND owner_id = ?", tplID, uidStr)
_ = row.Scan(&totalCount)
} else {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ?", tplID)
_ = row.Scan(&totalCount)
}
} else {
if uidStr != "" {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE owner_id = ?", uidStr)
_ = row.Scan(&totalCount)
} else {
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs")
_ = row.Scan(&totalCount)
}
}
var rows *sql.Rows
var err error
if tplID > 0 {
if uidStr != "" {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? AND owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, uidStr, size, offset)
} else {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, size, offset)
}
} else {
if uidStr != "" {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", uidStr, size, offset)
} else {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs ORDER BY id DESC LIMIT ? OFFSET ?", size, offset)
}
}
if tplID > 0 {
if uidStr != "" {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? AND owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, uidStr, size, offset)
} else {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, size, offset)
}
} else {
if uidStr != "" {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", uidStr, size, offset)
} else {
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs ORDER BY id DESC LIMIT ? OFFSET ?", size, offset)
}
}
if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return

View File

@ -238,7 +238,7 @@ func fromJSON(b []byte) interface{} {
return v
}
func whitelist() map[string]bool {
func Whitelist() map[string]bool {
m := map[string]bool{
"order.order_number": true,
"order.key": true,
@ -460,7 +460,7 @@ func whitelist() map[string]bool {
return m
}
func fieldLabels() map[string]string {
func FieldLabels() map[string]string {
return map[string]string{
"order.order_number": "订单编号",
"order.key": "KEY",
@ -678,5 +678,5 @@ func fieldLabels() map[string]string {
"activity.goods_pay_button_text": "商品支付按钮文本",
"activity.is_open_db_transaction": "是否开启事务",
"activity.bank_tag": "银行标识",
}
}
}

View File

@ -1,10 +1,12 @@
package exporter
import (
"encoding/json"
"errors"
"strconv"
"strings"
"encoding/json"
"errors"
"fmt"
"marketing-system-data-tool/server/internal/schema"
"strconv"
"strings"
)
type BuildRequest struct {
@ -18,69 +20,66 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if req.MainTable != "order" && req.MainTable != "order_info" {
return "", nil, errors.New("unsupported main table")
}
cols := []string{}
need := map[string]bool{}
for _, tf := range req.Fields {
if !whitelist[tf] {
return "", nil, errors.New("field not allowed")
}
parts := strings.Split(tf, ".")
if len(parts) != 2 {
return "", nil, errors.New("invalid field format")
}
t, f := parts[0], parts[1]
need[t] = true
if t == "order" {
if req.MainTable == "order_info" {
switch f {
case "order_number": f = "order_no"
case "key": f = "key_code"
case "creator": f = "user_id"
case "out_trade_no": f = "out_order_no"
case "plan_id": f = "activity_id"
case "reseller_id": f = "merchant_id"
case "product_id": f = "goods_id"
case "pay_amount": f = "pay_price"
case "key_batch_id": f = "key_batch_name"
}
cols = append(cols, "`order_info`."+escape(f))
sch := schema.Get(req.Datasource, req.MainTable)
cols := []string{}
need := map[string]bool{}
for _, tf := range req.Fields {
if !whitelist[tf] {
return "", nil, errors.New("field not allowed")
}
parts := strings.Split(tf, ".")
if len(parts) != 2 {
return "", nil, errors.New("invalid field format")
}
t, f := parts[0], parts[1]
need[t] = true
mt := sch.TableName(t)
mf, _ := sch.MapField(t, f)
if t == "order" && req.MainTable == "order" {
if f == "status" {
cols = append(cols, "CASE `order`.type WHEN 1 THEN '直充卡密' WHEN 2 THEN '立减金' WHEN 3 THEN '红包' ELSE '' END AS type")
cols = append(cols, "CASE `order`.status WHEN 0 THEN '待充值' WHEN 1 THEN '充值中' WHEN 2 THEN '已完成' WHEN 3 THEN '充值失败' WHEN 4 THEN '已取消' WHEN 5 THEN '已过期' WHEN 6 THEN '待支付' END AS status")
continue
}
if f == "status" {
cols = append(cols, "CASE `order`.type " +
"WHEN 1 THEN (CASE `order`.status WHEN 0 THEN '待充值' WHEN 1 THEN '充值中' WHEN 2 THEN '已完成' WHEN 3 THEN '充值失败' WHEN 4 THEN '已取消' WHEN 5 THEN '已过期' WHEN 6 THEN '待支付' END) " +
"WHEN 2 THEN (CASE `order`.status WHEN 0 THEN '待领取' WHEN 1 THEN '待领取' WHEN 2 THEN '已领取' WHEN 3 THEN '领取失败' WHEN 4 THEN '已取消' WHEN 5 THEN '已过期' WHEN 6 THEN '待支付' END) " +
"WHEN 3 THEN (CASE `order`.status WHEN 0 THEN '待领取' WHEN 1 THEN '待领取' WHEN 2 THEN '已核销' WHEN 3 THEN '领取失败' WHEN 4 THEN '已取消' WHEN 5 THEN '已过期' WHEN 6 THEN '' END) " +
"ELSE (CASE `order`.status WHEN 0 THEN '待充值' WHEN 1 THEN '充值中' WHEN 2 THEN '已完成' WHEN 3 THEN '充值失败' WHEN 4 THEN '已取消' WHEN 5 THEN '已过期' WHEN 6 THEN '待支付' END) END AS status")
} else if f == "type" {
if f == "type" {
cols = append(cols, "CASE `order`.type WHEN 1 THEN '直充卡密' WHEN 2 THEN '立减金' WHEN 3 THEN '红包' ELSE '' END AS type")
} else if f == "type" {
cols = append(cols, "CASE `order`.type WHEN 1 THEN '直充卡密' WHEN 2 THEN '立减金' WHEN 3 THEN '红包' ELSE '' END AS type")
} else if f == "pay_type" {
cols = append(cols, "CASE `order`.pay_type WHEN 1 THEN '支付宝' WHEN 5 THEN '微信' ELSE '' END AS pay_type")
} else if f == "pay_status" {
cols = append(cols, "CASE `order`.pay_status WHEN 1 THEN '待支付' WHEN 2 THEN '已支付' WHEN 3 THEN '已退款' ELSE '' END AS pay_status")
} else {
cols = append(cols, "`order`."+escape(f))
}
} else {
if t == "order_cash" && f == "receive_status" {
cols = append(cols, "CASE `order_cash`.receive_status WHEN 0 THEN '待领取' WHEN 1 THEN '领取中' WHEN 2 THEN '领取成功' WHEN 3 THEN '领取失败' ELSE '' END AS receive_status")
} else if t == "order_cash" && f == "channel" {
cols = append(cols, "CASE `order_cash`.channel WHEN 1 THEN '支付宝' WHEN 2 THEN '微信' WHEN 3 THEN '云闪付' ELSE '' END AS channel")
} else if t == "order_voucher" && f == "channel" {
cols = append(cols, "CASE `order_voucher`.channel WHEN 1 THEN '支付宝' WHEN 2 THEN '微信' ELSE '' END AS channel")
} else if t == "order_voucher" && f == "status" {
cols = append(cols, "CASE `order_voucher`.status WHEN 1 THEN '可用' WHEN 2 THEN '已实扣' WHEN 3 THEN '已过期' WHEN 4 THEN '已退款' WHEN 5 THEN '领取失败' WHEN 6 THEN '发放中' WHEN 7 THEN '部分退款' WHEN 8 THEN '已退回' WHEN 9 THEN '发放失败' ELSE '' END AS status")
} else if t == "order_voucher" && f == "receive_mode" {
cols = append(cols, "CASE `order_voucher`.receive_mode WHEN 1 THEN '渠道授权用户id' WHEN 2 THEN '手机号或邮箱' ELSE '' END AS receive_mode")
} else if t == "order_voucher" && f == "out_biz_no" {
cols = append(cols, "'' AS out_biz_no")
} else {
cols = append(cols, "`"+t+"`."+escape(f))
continue
}
if f == "pay_type" {
cols = append(cols, "CASE `order`.pay_type WHEN 1 THEN '支付宝' WHEN 5 THEN '微信' ELSE '' END AS pay_type")
continue
}
if f == "pay_status" {
cols = append(cols, "CASE `order`.pay_status WHEN 1 THEN '待支付' WHEN 2 THEN '已支付' WHEN 3 THEN '已退款' ELSE '' END AS pay_status")
continue
}
}
}
if t == "order_cash" && f == "receive_status" {
cols = append(cols, "CASE `order_cash`.receive_status WHEN 0 THEN '待领取' WHEN 1 THEN '领取中' WHEN 2 THEN '领取成功' WHEN 3 THEN '领取失败' ELSE '' END AS receive_status")
continue
}
if t == "order_cash" && f == "channel" {
cols = append(cols, "CASE `order_cash`.channel WHEN 1 THEN '支付宝' WHEN 2 THEN '微信' WHEN 3 THEN '云闪付' ELSE '' END AS channel")
continue
}
if t == "order_voucher" && f == "channel" {
cols = append(cols, "CASE `order_voucher`.channel WHEN 1 THEN '支付宝' WHEN 2 THEN '微信' ELSE '' END AS channel")
continue
}
if t == "order_voucher" && f == "status" {
cols = append(cols, "CASE `order_voucher`.status WHEN 1 THEN '可用' WHEN 2 THEN '已实扣' WHEN 3 THEN '已过期' WHEN 4 THEN '已退款' WHEN 5 THEN '领取失败' WHEN 6 THEN '发放中' WHEN 7 THEN '部分退款' WHEN 8 THEN '已退回' WHEN 9 THEN '发放失败' ELSE '' END AS status")
continue
}
if t == "order_voucher" && f == "receive_mode" {
cols = append(cols, "CASE `order_voucher`.receive_mode WHEN 1 THEN '渠道授权用户id' WHEN 2 THEN '手机号或邮箱' ELSE '' END AS receive_mode")
continue
}
if t == "order_voucher" && f == "out_biz_no" {
cols = append(cols, "'' AS out_biz_no")
continue
}
cols = append(cols, "`"+mt+"`."+escape(mf))
}
if len(cols) == 0 {
return "", nil, errors.New("no fields")
}
@ -88,67 +87,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
sb.WriteString("SELECT ")
sb.WriteString(strings.Join(cols, ","))
sb.WriteString(" FROM `" + req.MainTable + "`")
// JOINs based on need
// order_detail
if need["order_detail"] && req.MainTable == "order" {
sb.WriteString(" LEFT JOIN `order_detail` ON `order_detail`.order_number = `order`.order_number")
}
// order_cash
if need["order_cash"] {
if req.MainTable == "order_info" {
sb.WriteString(" LEFT JOIN `order_cash` ON `order_cash`.order_no = `order_info`.order_no")
} else {
sb.WriteString(" LEFT JOIN `order_cash` ON `order_cash`.order_number = `order`.order_number")
}
}
// order_voucher
if need["order_voucher"] {
if req.MainTable == "order_info" {
sb.WriteString(" LEFT JOIN `order_voucher` ON `order_voucher`.order_no = `order_info`.order_no")
} else {
sb.WriteString(" LEFT JOIN `order_voucher` ON `order_voucher`.order_number = `order`.order_number")
}
}
// order_digit (ymt only)
if need["order_digit"] && req.MainTable == "order_info" {
sb.WriteString(" LEFT JOIN `order_digit` ON `order_digit`.order_no = `order_info`.order_no")
}
// goods_voucher_batch (ymt)
if need["goods_voucher_batch"] && req.MainTable == "order_info" {
sb.WriteString(" LEFT JOIN `goods_voucher_batch` ON `goods_voucher_batch`.channel_batch_no = `order_voucher`.channel_batch_no")
}
// goods_voucher_subject_config (ymt)
if need["goods_voucher_subject_config"] && req.MainTable == "order_info" {
sb.WriteString(" LEFT JOIN `goods_voucher_subject_config` ON `goods_voucher_subject_config`.id = `goods_voucher_batch`.voucher_subject_id")
}
// merchant (ymt)
if need["merchant"] && req.MainTable == "order_info" {
sb.WriteString(" LEFT JOIN `merchant` ON `merchant`.id = `order_info`.merchant_id")
}
// activity (ymt)
if need["activity"] && req.MainTable == "order_info" {
sb.WriteString(" LEFT JOIN `activity` ON `activity`.id = `order_info`.activity_id")
}
// plan
if req.MainTable == "order" {
if need["plan"] || need["key_batch"] {
sb.WriteString(" LEFT JOIN `plan` ON `plan`.id = `order`.plan_id")
}
if need["key_batch"] {
sb.WriteString(" LEFT JOIN `key_batch` ON `key_batch`.plan_id = `plan`.id")
}
if need["code_batch"] {
sb.WriteString(" LEFT JOIN `code_batch` ON `code_batch`.key_batch_id = `key_batch`.id")
}
if need["voucher"] {
sb.WriteString(" LEFT JOIN `voucher` ON `voucher`.channel_activity_id = `order_voucher`.channel_activity_id")
}
if need["voucher_batch"] {
sb.WriteString(" LEFT JOIN `voucher_batch` ON `voucher_batch`.voucher_id = `voucher`.id")
}
if need["merchant_key_send"] {
sb.WriteString(" LEFT JOIN `merchant_key_send` ON `order`." + escape("key") + " = `merchant_key_send`.key")
}
for _, j := range sch.BuildJoins(need, req.MainTable) {
sb.WriteString(j)
}
args := []interface{}{}
@ -185,12 +125,10 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if len(ids) == 0 {
return "", nil, errors.New("creator_in required")
}
ph := strings.Repeat("?,", len(ids))
ph = strings.TrimSuffix(ph, ",")
if req.MainTable == "order_info" {
where = append(where, "`order_info`.user_id IN ("+ph+")")
} else {
where = append(where, "`order`.creator IN ("+ph+")")
ph := strings.Repeat("?,", len(ids))
ph = strings.TrimSuffix(ph, ",")
if tbl, col, ok := sch.FilterColumn("creator_in"); ok {
where = append(where, fmt.Sprintf("`%s`.%s IN (%s)", sch.TableName(tbl), escape(col), ph))
}
args = append(args, ids...)
}
@ -201,10 +139,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if len(arr) != 2 {
return "", nil, errors.New("create_time_between requires 2 values")
}
if req.MainTable == "order_info" {
where = append(where, "`order_info`.create_time BETWEEN ? AND ?")
} else {
where = append(where, "`order`.create_time BETWEEN ? AND ?")
if tbl, col, ok := sch.FilterColumn("create_time_between"); ok {
where = append(where, fmt.Sprintf("`%s`.%s BETWEEN ? AND ?", sch.TableName(tbl), escape(col)))
}
args = append(args, arr[0], arr[1])
}
@ -226,10 +162,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
}
}
if tv == 1 || tv == 2 || tv == 3 {
if req.MainTable == "order_info" {
where = append(where, "`order_info`.type = ?")
} else {
where = append(where, "`order`.type = ?")
if tbl, col, ok := sch.FilterColumn("type_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, tv)
}
@ -237,10 +171,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["out_trade_no_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order_info" {
where = append(where, "`order_info`.out_order_no = ?")
} else {
where = append(where, "`order`.out_trade_no = ?")
if tbl, col, ok := sch.FilterColumn("out_trade_no_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, s)
}
@ -248,10 +180,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["account_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order_info" {
where = append(where, "`order_info`.account = ?")
} else {
where = append(where, "`order`.account = ?")
if tbl, col, ok := sch.FilterColumn("account_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, s)
}
@ -259,10 +189,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["plan_id_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order_info" {
where = append(where, "`order_info`.activity_id = ?")
} else {
where = append(where, "`order`.plan_id = ?")
if tbl, col, ok := sch.FilterColumn("plan_id_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, s)
}
@ -270,10 +198,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["key_batch_id_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order_info" {
where = append(where, "`order_info`.key_batch_name = ?")
} else {
where = append(where, "`order`.key_batch_id = ?")
if tbl, col, ok := sch.FilterColumn("key_batch_id_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, s)
}
@ -281,10 +207,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["product_id_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order_info" {
where = append(where, "`order_info`.goods_id = ?")
} else {
where = append(where, "`order`.product_id = ?")
if tbl, col, ok := sch.FilterColumn("product_id_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, s)
}
@ -292,10 +216,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["reseller_id_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order_info" {
where = append(where, "`order_info`.merchant_id = ?")
} else {
where = append(where, "`order`.reseller_id = ?")
if tbl, col, ok := sch.FilterColumn("reseller_id_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, s)
}
@ -303,10 +225,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["code_batch_id_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order_info" {
where = append(where, "`order_info`.supplier_product_id = ?")
} else {
where = append(where, "`order`.code_batch_id = ?")
if tbl, col, ok := sch.FilterColumn("code_batch_id_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, s)
}
@ -314,10 +234,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["order_cash_cash_activity_id_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order_info" {
where = append(where, "`order_cash`.activity_id = ?")
} else {
where = append(where, "`order_cash`.cash_activity_id = ?")
if tbl, col, ok := sch.FilterColumn("order_cash_cash_activity_id_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, s)
}
@ -325,10 +243,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["order_voucher_channel_activity_id_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order_info" {
where = append(where, "`order_voucher`.channel_batch_no = ?")
} else {
where = append(where, "`order_voucher`.channel_activity_id = ?")
if tbl, col, ok := sch.FilterColumn("order_voucher_channel_activity_id_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, s)
}
@ -336,8 +252,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["voucher_batch_channel_activity_id_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order" { // only marketing schema has voucher_batch
where = append(where, "`voucher_batch`.channel_activity_id = ?")
if tbl, col, ok := sch.FilterColumn("voucher_batch_channel_activity_id_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
args = append(args, s)
}
}
@ -345,8 +261,8 @@ func BuildSQL(req BuildRequest, whitelist map[string]bool) (string, []interface{
if v, ok := req.Filters["merchant_out_biz_no_eq"]; ok {
s := toString(v)
if s != "" {
if req.MainTable == "order" { // marketing only
where = append(where, "`merchant_key_send`.out_biz_no = ?")
if tbl, col, ok := sch.FilterColumn("merchant_out_biz_no_eq"); ok {
where = append(where, fmt.Sprintf("`%s`.%s = ?", sch.TableName(tbl), escape(col)))
}
args = append(args, s)
}

View File

@ -0,0 +1,60 @@
package schema
type marketingSchema struct{}
func (marketingSchema) TableName(t string) string { return t }
func (marketingSchema) MapField(t, f string) (string, bool) { return f, true }
func (marketingSchema) BuildJoins(need map[string]bool, main string) []string {
out := []string{}
if need["order_detail"] {
out = append(out, " LEFT JOIN `order_detail` ON `order_detail`.order_number = `order`.order_number")
}
if need["order_cash"] {
out = append(out, " LEFT JOIN `order_cash` ON `order_cash`.order_number = `order`.order_number")
}
if need["order_voucher"] {
out = append(out, " LEFT JOIN `order_voucher` ON `order_voucher`.order_number = `order`.order_number")
}
if need["plan"] || need["key_batch"] {
out = append(out, " LEFT JOIN `plan` ON `plan`.id = `order`.plan_id")
}
if need["key_batch"] {
out = append(out, " LEFT JOIN `key_batch` ON `key_batch`.plan_id = `plan`.id")
}
if need["code_batch"] {
out = append(out, " LEFT JOIN `code_batch` ON `code_batch`.key_batch_id = `key_batch`.id")
}
if need["voucher"] {
out = append(out, " LEFT JOIN `voucher` ON `voucher`.channel_activity_id = `order_voucher`.channel_activity_id")
}
if need["voucher_batch"] {
out = append(out, " LEFT JOIN `voucher_batch` ON `voucher_batch`.voucher_id = `voucher`.id")
}
if need["merchant_key_send"] {
out = append(out, " LEFT JOIN `merchant_key_send` ON `order`.`key` = `merchant_key_send`.key")
}
return out
}
func (marketingSchema) FilterColumn(key string) (string, string, bool) {
switch key {
case "creator_in": return "order", "creator", true
case "create_time_between": return "order", "create_time", true
case "type_eq": return "order", "type", true
case "out_trade_no_eq": return "order", "out_trade_no", true
case "account_eq": return "order", "account", true
case "plan_id_eq": return "order", "plan_id", true
case "key_batch_id_eq": return "order", "key_batch_id", true
case "product_id_eq": return "order", "product_id", true
case "reseller_id_eq": return "order", "reseller_id", true
case "code_batch_id_eq": return "order", "code_batch_id", true
case "order_cash_cash_activity_id_eq": return "order_cash", "cash_activity_id", true
case "order_voucher_channel_activity_id_eq": return "order_voucher", "channel_activity_id", true
case "voucher_batch_channel_activity_id_eq": return "voucher_batch", "channel_activity_id", true
case "merchant_out_biz_no_eq": return "merchant_key_send", "out_biz_no", true
default:
return "", "", false
}
}

View File

@ -0,0 +1,15 @@
package schema
type Schema interface {
TableName(string) string
MapField(string, string) (string, bool)
BuildJoins(map[string]bool, string) []string
FilterColumn(string) (string, string, bool)
}
func Get(datasource string, main string) Schema {
if datasource == "ymt" || main == "order_info" {
return ymtSchema{}
}
return marketingSchema{}
}

View File

@ -0,0 +1,74 @@
package schema
type ymtSchema struct{}
func (ymtSchema) TableName(t string) string {
if t == "order" {
return "order_info"
}
return t
}
func (s ymtSchema) MapField(t, f string) (string, bool) {
if t == "order" {
switch f {
case "order_number": return "order_no", true
case "key": return "key_code", true
case "creator": return "user_id", true
case "out_trade_no": return "out_order_no", true
case "plan_id": return "activity_id", true
case "reseller_id": return "merchant_id", true
case "product_id": return "goods_id", true
case "pay_amount": return "pay_price", true
case "key_batch_id": return "key_batch_name", true
default:
return f, true
}
}
return f, true
}
func (s ymtSchema) BuildJoins(need map[string]bool, main string) []string {
out := []string{}
if need["order_cash"] {
out = append(out, " LEFT JOIN `order_cash` ON `order_cash`.order_no = `order_info`.order_no")
}
if need["order_voucher"] {
out = append(out, " LEFT JOIN `order_voucher` ON `order_voucher`.order_no = `order_info`.order_no")
}
if need["order_digit"] {
out = append(out, " LEFT JOIN `order_digit` ON `order_digit`.order_no = `order_info`.order_no")
}
if need["goods_voucher_batch"] {
out = append(out, " LEFT JOIN `goods_voucher_batch` ON `goods_voucher_batch`.channel_batch_no = `order_voucher`.channel_batch_no")
}
if need["goods_voucher_subject_config"] {
out = append(out, " LEFT JOIN `goods_voucher_subject_config` ON `goods_voucher_subject_config`.id = `goods_voucher_batch`.voucher_subject_id")
}
if need["merchant"] {
out = append(out, " LEFT JOIN `merchant` ON `merchant`.id = `order_info`.merchant_id")
}
if need["activity"] {
out = append(out, " LEFT JOIN `activity` ON `activity`.id = `order_info`.activity_id")
}
return out
}
func (s ymtSchema) FilterColumn(key string) (string, string, bool) {
switch key {
case "creator_in": return "order", "user_id", true
case "create_time_between": return "order", "create_time", true
case "type_eq": return "order", "type", true
case "out_trade_no_eq": return "order", "out_order_no", true
case "account_eq": return "order", "account", true
case "plan_id_eq": return "order", "activity_id", true
case "key_batch_id_eq": return "order", "key_batch_name", true
case "product_id_eq": return "order", "goods_id", true
case "reseller_id_eq": return "order", "merchant_id", true
case "code_batch_id_eq": return "order", "supplier_product_id", true
case "order_cash_cash_activity_id_eq": return "order_cash", "activity_id", true
case "order_voucher_channel_activity_id_eq": return "order_voucher", "channel_batch_no", true
default:
return "", "", false
}
}

View File

@ -68,3 +68,8 @@ trace_id=a89d208e57f34d306f9315a6fcce39f7 sql=INSERT INTO export_templates (name
{"bytes":1272,"duration_ms":170,"kind":"access","level":"INFO","method":"","path":"","query":"","remote":"","status":200,"trace_id":"","ts":"2025-11-27T11:31:14+08:00"}
{"bytes":870,"duration_ms":99,"kind":"access","level":"INFO","method":"","path":"","query":"","remote":"","status":200,"trace_id":"","ts":"2025-11-27T11:31:19+08:00"}
{"bytes":1614,"duration_ms":11,"kind":"access","level":"INFO","method":"","path":"","query":"","remote":"","status":200,"trace_id":"","ts":"2025-11-27T11:31:19+08:00"}
connecting YMT MySQL: 47.97.27.195:3306 db merketing user root
connecting Marketing MySQL: 192.168.6.92:3306 db market user root
server listening on :8077
{"bytes":1371,"duration_ms":56,"kind":"access","level":"INFO","method":"","path":"","query":"","remote":"","status":200,"trace_id":"","ts":"2025-11-27T14:19:10+08:00"}
{"bytes":870,"duration_ms":131,"kind":"access","level":"INFO","method":"","path":"","query":"","remote":"","status":200,"trace_id":"","ts":"2025-11-27T14:20:15+08:00"}