1865 lines
56 KiB
Go
1865 lines
56 KiB
Go
package api
|
||
|
||
import (
|
||
"database/sql"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"math/big"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"server/internal/constants"
|
||
"server/internal/exporter"
|
||
"server/internal/logging"
|
||
"server/internal/repo"
|
||
"server/internal/utils"
|
||
"server/internal/ymtcrypto"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
type ExportsAPI struct {
|
||
Meta *sql.DB
|
||
Marketing *sql.DB
|
||
YMT *sql.DB
|
||
}
|
||
|
||
func ExportsHandler(meta, marketing, ymt *sql.DB) http.Handler {
|
||
api := &ExportsAPI{Meta: meta, Marketing: marketing, YMT: ymt}
|
||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
|
||
if r.Method == http.MethodPost && p == "" {
|
||
api.create(w, r)
|
||
return
|
||
}
|
||
if r.Method == http.MethodGet && p == "" {
|
||
api.list(w, r)
|
||
return
|
||
}
|
||
if strings.HasPrefix(p, "/") {
|
||
id := strings.TrimPrefix(p, "/")
|
||
if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") {
|
||
if strings.HasSuffix(p, "/sql") {
|
||
id = strings.TrimSuffix(id, "/sql")
|
||
api.getSQL(w, r, id)
|
||
return
|
||
}
|
||
api.get(w, r, id)
|
||
return
|
||
}
|
||
if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") {
|
||
id = strings.TrimSuffix(id, "/download")
|
||
api.download(w, r, id)
|
||
return
|
||
}
|
||
if r.Method == http.MethodPost && strings.HasSuffix(p, "/recompute") {
|
||
id = strings.TrimSuffix(id, "/recompute")
|
||
api.recompute(w, r, id)
|
||
return
|
||
}
|
||
if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") {
|
||
id = strings.TrimSuffix(id, "/cancel")
|
||
api.cancel(w, r, id)
|
||
return
|
||
}
|
||
}
|
||
w.WriteHeader(http.StatusNotFound)
|
||
})
|
||
}
|
||
|
||
type ExportPayload struct {
|
||
TemplateID uint64 `json:"template_id"`
|
||
RequestedBy uint64 `json:"requested_by"`
|
||
Permission map[string]interface{} `json:"permission"`
|
||
Options map[string]interface{} `json:"options"`
|
||
FileFormat string `json:"file_format"`
|
||
Filters map[string]interface{} `json:"filters"`
|
||
Datasource string `json:"datasource"`
|
||
}
|
||
|
||
func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
|
||
b, _ := io.ReadAll(r.Body)
|
||
var p ExportPayload
|
||
json.Unmarshal(b, &p)
|
||
r = WithPayload(r, p)
|
||
var main string
|
||
var ds string
|
||
rrepo := repo.NewExportRepo()
|
||
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, p.TemplateID)
|
||
if err != nil {
|
||
fail(w, r, http.StatusBadRequest, "invalid template")
|
||
return
|
||
}
|
||
if p.Datasource != "" {
|
||
ds = p.Datasource
|
||
}
|
||
// ensure filters map initialized
|
||
if p.Filters == nil {
|
||
p.Filters = map[string]interface{}{}
|
||
}
|
||
// merge permission scope into filters to enforce boundary
|
||
p.Filters = mergePermissionIntoFilters(p.Datasource, main, p.Permission, p.Filters)
|
||
// 注意:不再从 URL 参数 userId 或 current_user_id 自动转换为 creator_in 过滤
|
||
// current_user_id 仅用于记录导出任务的 owner,不用于数据过滤
|
||
// support multiple merchantId in query: e.g., merchantId=1,2,3 → filters.merchant_id_in
|
||
{
|
||
midStr := r.URL.Query().Get("merchantId")
|
||
if midStr != "" {
|
||
parts := strings.Split(midStr, ",")
|
||
ids := make([]interface{}, 0, len(parts))
|
||
for _, s := range parts {
|
||
s = strings.TrimSpace(s)
|
||
if s == "" {
|
||
continue
|
||
}
|
||
if n, err := strconv.ParseUint(s, 10, 64); err == nil {
|
||
ids = append(ids, n)
|
||
}
|
||
}
|
||
if len(ids) > 0 {
|
||
if _, exists := p.Filters["merchant_id_in"]; !exists {
|
||
p.Filters["merchant_id_in"] = ids
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// DEBUG LOGGING
|
||
logging.JSON("INFO", map[string]interface{}{
|
||
"event": "export_filters_debug",
|
||
"filters": p.Filters,
|
||
"has_creator_in": hasNonEmptyIDs(p.Filters["creator_in"]),
|
||
"has_merchant_id_in": hasNonEmptyIDs(p.Filters["merchant_id_in"]),
|
||
})
|
||
if ds == "marketing" && (main == "order" || main == "order_info") {
|
||
if v, ok := p.Filters["create_time_between"]; ok {
|
||
switch t := v.(type) {
|
||
case []interface{}:
|
||
if len(t) != 2 {
|
||
fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值")
|
||
return
|
||
}
|
||
case []string:
|
||
if len(t) != 2 {
|
||
fail(w, r, http.StatusBadRequest, "create_time_between 需要两个时间值")
|
||
return
|
||
}
|
||
default:
|
||
fail(w, r, http.StatusBadRequest, "create_time_between 格式错误")
|
||
return
|
||
}
|
||
} else {
|
||
fail(w, r, http.StatusBadRequest, "缺少时间过滤:必须提供 create_time_between")
|
||
return
|
||
}
|
||
}
|
||
filtered := make([]string, 0, len(fs))
|
||
tv := 0
|
||
if v, ok := p.Filters["type_eq"]; ok {
|
||
switch t := v.(type) {
|
||
case float64:
|
||
tv = int(t)
|
||
case int:
|
||
tv = t
|
||
case string:
|
||
s := strings.TrimSpace(t)
|
||
for i := 0; i < len(s); i++ {
|
||
c := s[i]
|
||
if c >= '0' && c <= '9' {
|
||
tv = tv*10 + int(c-'0')
|
||
}
|
||
}
|
||
}
|
||
}
|
||
// Normalize template fields preserving order
|
||
normalized := make([]string, 0, len(fs))
|
||
for _, tf := range fs {
|
||
if ds == "ymt" && strings.HasPrefix(tf, "order_info.") {
|
||
tf = strings.Replace(tf, "order_info.", "order.", 1)
|
||
}
|
||
if ds == "marketing" && tf == "order_voucher.channel_batch_no" {
|
||
tf = "order_voucher.channel_activity_id"
|
||
}
|
||
normalized = append(normalized, tf)
|
||
}
|
||
// 移除 YMT 无效字段(key批次)
|
||
if ds == "ymt" {
|
||
tmp := make([]string, 0, len(normalized))
|
||
for _, tf := range normalized {
|
||
if tf == "order.key_batch_id" || tf == "order.key_batch_name" {
|
||
continue
|
||
}
|
||
tmp = append(tmp, tf)
|
||
}
|
||
normalized = tmp
|
||
}
|
||
// 不再使用白名单过滤,直接使用所有字段
|
||
filtered = normalized
|
||
// 易码通立减金:保留 order_voucher.grant_time,移除红包领取时间列,避免“领取时间”为空
|
||
if ds == "ymt" && tv == 3 {
|
||
deduped := make([]string, 0, len(filtered))
|
||
removed := []string{}
|
||
for _, tf := range filtered {
|
||
if tf == "order_cash.receive_time" {
|
||
removed = append(removed, tf)
|
||
continue
|
||
}
|
||
deduped = append(deduped, tf)
|
||
}
|
||
if len(removed) > 0 {
|
||
logging.JSON("INFO", map[string]interface{}{"event": "fields_deduplicated_receive_time", "removed": removed, "reason": "立减金保留 order_voucher.grant_time"})
|
||
}
|
||
filtered = deduped
|
||
}
|
||
// 营销系统:非直充类型(type!=1)时移除recharge_time、card_code、account字段
|
||
if ds == "marketing" && tv != 1 {
|
||
deduped := make([]string, 0, len(filtered))
|
||
removed := []string{}
|
||
for _, tf := range filtered {
|
||
if tf == "order.recharge_time" || tf == "order.card_code" || tf == "order.account" {
|
||
removed = append(removed, tf)
|
||
continue
|
||
}
|
||
deduped = append(deduped, tf)
|
||
}
|
||
if len(removed) > 0 {
|
||
logging.JSON("INFO", map[string]interface{}{"event": "fields_filtered_non_direct_charge", "removed": removed, "reason": "非直充类型不导出充值时间、卡密和账号"})
|
||
}
|
||
filtered = deduped
|
||
}
|
||
labels := FieldLabels()
|
||
// 字段匹配校验(数量与顺序)
|
||
if len(filtered) != len(fs) {
|
||
logging.JSON("ERROR", map[string]interface{}{"event": "field_count_mismatch", "template_count": len(fs), "final_count": len(filtered)})
|
||
}
|
||
// relax: creator_in 非必填,若权限中提供其他边界将被合并为等值过滤
|
||
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: filtered, Filters: p.Filters}
|
||
q, args, usedFields, err := rrepo.BuildWithFields(req, nil) // 取消白名单过滤,前端选择多少字段就导出多少
|
||
if err != nil {
|
||
r = WithSQL(r, q)
|
||
fail(w, r, http.StatusBadRequest, err.Error())
|
||
return
|
||
}
|
||
// 使用实际使用的字段列表(解决白名单过滤后列数不匹配问题)
|
||
if len(usedFields) > 0 {
|
||
filtered = usedFields
|
||
}
|
||
r = WithSQL(r, q)
|
||
logging.JSON("INFO", map[string]interface{}{"event": "export_sql", "datasource": ds, "main_table": main, "file_format": p.FileFormat, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
|
||
log.Printf("export_sql ds=%s main=%s fmt=%s sql=%s args=%v final_sql=%s", ds, main, p.FileFormat, q, args, renderSQL(q, args))
|
||
dataDB := a.selectDataDB(ds)
|
||
score, sugg, err := rrepo.Explain(dataDB, q, args)
|
||
if err != nil {
|
||
fail(w, r, http.StatusBadRequest, err.Error())
|
||
return
|
||
}
|
||
sugg = append(sugg, exporter.IndexSuggestions(req)...)
|
||
if score < constants.ExportThresholds.PassScoreThreshold {
|
||
fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d,请优化索引或缩小查询范围", score))
|
||
return
|
||
}
|
||
// 估算行数(优先使用分块统计,失败或结果为 0 时回退到精确 COUNT)
|
||
var estimate int64
|
||
estimate = rrepo.EstimateFastChunked(dataDB, ds, main, p.Filters)
|
||
if estimate <= 0 {
|
||
logging.JSON("WARN", map[string]interface{}{
|
||
"event": "estimate_zero_fallback",
|
||
"datasource": ds,
|
||
"main_table": main,
|
||
"filters": p.Filters,
|
||
"stage": "fast_chunked",
|
||
"estimate": estimate,
|
||
})
|
||
// 使用完整导出 SQL 做一次精确统计,避免分表/索引等原因导致估算为 0
|
||
estimate = exporter.CountRows(dataDB, q, args)
|
||
logging.JSON("INFO", map[string]interface{}{
|
||
"event": "estimate_exact_count",
|
||
"datasource": ds,
|
||
"main_table": main,
|
||
"filters": p.Filters,
|
||
"sql": q,
|
||
"args": args,
|
||
"estimate": estimate,
|
||
})
|
||
}
|
||
hdrs := make([]string, len(filtered))
|
||
for i, tf := range filtered {
|
||
if v, ok := labels[tf]; ok {
|
||
hdrs[i] = v
|
||
} else {
|
||
hdrs[i] = tf
|
||
}
|
||
}
|
||
// 列头去重:如果仍有重复的列头(中文标签),对非主表字段添加前缀
|
||
{
|
||
cnt := map[string]int{}
|
||
for _, h := range hdrs {
|
||
cnt[h]++
|
||
}
|
||
for i := range hdrs {
|
||
if cnt[hdrs[i]] > 1 {
|
||
parts := strings.Split(filtered[i], ".")
|
||
if len(parts) == 2 && parts[0] != main {
|
||
hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i]
|
||
}
|
||
}
|
||
}
|
||
}
|
||
// owner from query current_user_id or userId if provided
|
||
owner := uint64(0)
|
||
ownStr := r.URL.Query().Get("current_user_id")
|
||
if ownStr == "" {
|
||
ownStr = r.URL.Query().Get("userId")
|
||
}
|
||
if ownStr != "" {
|
||
first := strings.TrimSpace(strings.Split(ownStr, ",")[0])
|
||
if n, err := strconv.ParseUint(first, 10, 64); err == nil {
|
||
owner = n
|
||
}
|
||
}
|
||
id, err := rrepo.InsertJob(a.meta, p.TemplateID, p.RequestedBy, owner, p.Permission, p.Filters, p.Options, map[string]interface{}{"sql": q, "suggestions": sugg}, score, estimate, p.FileFormat)
|
||
if err != nil {
|
||
fail(w, r, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
go a.runJobByID(uint64(id))
|
||
ok(w, r, map[string]interface{}{"id": id})
|
||
}
|
||
|
||
// runJobByID 通过任务ID从数据库读取信息并执行任务
|
||
func (a *ExportsAPI) RunJobByID(jobID uint64) {
|
||
rrepo := repo.NewExportRepo()
|
||
|
||
// 增加重启计数
|
||
rrepo.IncrementRestartCount(a.meta, jobID)
|
||
|
||
// 获取任务详情
|
||
jobDetail, err := rrepo.GetJob(a.meta, strconv.FormatUint(jobID, 10))
|
||
if err != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "get_job_failed",
|
||
"job_id": jobID,
|
||
"error": err.Error(),
|
||
})
|
||
return
|
||
}
|
||
|
||
// 获取模板信息和任务过滤条件
|
||
tplID, filtersJSON, err := rrepo.GetJobFilters(a.meta, jobID)
|
||
if err != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "get_job_filters_failed",
|
||
"job_id": jobID,
|
||
"error": err.Error(),
|
||
})
|
||
return
|
||
}
|
||
|
||
var filters map[string]interface{}
|
||
json.Unmarshal(filtersJSON, &filters)
|
||
|
||
// 获取模板字段信息
|
||
ds, mainTable, fields, err := rrepo.GetTemplateMeta(a.meta, tplID)
|
||
if err != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "get_template_meta_failed",
|
||
"job_id": jobID,
|
||
"error": err.Error(),
|
||
})
|
||
return
|
||
}
|
||
|
||
// 获取数据库连接
|
||
dataDB := a.selectDataDB(ds)
|
||
|
||
// 构建 SQL
|
||
wl := Whitelist()
|
||
req := exporter.BuildRequest{MainTable: mainTable, Datasource: ds, Fields: fields, Filters: filters}
|
||
q, args, usedFields, err := rrepo.BuildWithFields(req, wl)
|
||
if err != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "build_sql_failed",
|
||
"job_id": jobID,
|
||
"error": err.Error(),
|
||
})
|
||
return
|
||
}
|
||
|
||
if len(usedFields) > 0 {
|
||
fields = usedFields
|
||
}
|
||
|
||
// 构建列标题
|
||
labels := FieldLabels()
|
||
hdrs := make([]string, len(fields))
|
||
for i, tf := range fields {
|
||
if v, ok := labels[tf]; ok {
|
||
hdrs[i] = v
|
||
} else {
|
||
hdrs[i] = tf
|
||
}
|
||
}
|
||
|
||
// 列头去重
|
||
{
|
||
cnt := map[string]int{}
|
||
for _, h := range hdrs {
|
||
cnt[h]++
|
||
}
|
||
for i := range hdrs {
|
||
if cnt[hdrs[i]] > 1 {
|
||
parts := strings.Split(fields[i], ".")
|
||
if len(parts) == 2 && parts[0] != mainTable {
|
||
hdrs[i] = tableLabel(parts[0]) + "." + hdrs[i]
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
a.runJob(jobID, dataDB, q, args, fields, hdrs, jobDetail.FileFormat)
|
||
}
|
||
|
||
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fileFormat string) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "export_panic",
|
||
"job_id": id,
|
||
"error": utils.ToString(r),
|
||
"fields": fields,
|
||
"format": fileFormat,
|
||
})
|
||
log.Printf("[EXPORT_FAILED] job_id=%d reason=panic error=%v fields=%v", id, r, fields)
|
||
repo.NewExportRepo().MarkFailed(a.meta, id, "export_panic", map[string]interface{}{
|
||
"error": utils.ToString(r),
|
||
"fields": fields,
|
||
"format": fileFormat,
|
||
})
|
||
}
|
||
}()
|
||
// load datasource once for transform decisions
|
||
var jobDS string
|
||
var jobMain string
|
||
rrepo := repo.NewExportRepo()
|
||
{
|
||
tplID, _, _ := rrepo.GetJobFilters(a.meta, id)
|
||
if tplID > 0 {
|
||
ds, mt, _, _ := rrepo.GetTemplateMeta(a.meta, tplID)
|
||
jobDS = ds
|
||
if mt != "" {
|
||
jobMain = mt
|
||
} else {
|
||
jobMain = "order"
|
||
}
|
||
}
|
||
}
|
||
|
||
// 检查预估行数,如果超过阈值且格式是xlsx,强制改为csv
|
||
if fileFormat == "xlsx" {
|
||
var rowEstimate int64
|
||
estRow := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
|
||
_ = estRow.Scan(&rowEstimate)
|
||
if rowEstimate > constants.ExportThresholds.XlsxMaxRows {
|
||
logging.JSON("INFO", map[string]interface{}{
|
||
"event": "force_csv_format",
|
||
"job_id": id,
|
||
"row_estimate": rowEstimate,
|
||
"threshold": constants.ExportThresholds.XlsxMaxRows,
|
||
"reason": "row_estimate exceeds xlsx max rows, forcing csv format",
|
||
})
|
||
fileFormat = "csv"
|
||
}
|
||
}
|
||
|
||
rrepo.StartJob(a.meta, id)
|
||
if fileFormat == "csv" {
|
||
newBaseWriter := func() (exporter.RowWriter, error) {
|
||
return exporter.NewCSVWriter("storage", "export_job_"+strconv.FormatUint(id, 10))
|
||
}
|
||
files := []string{}
|
||
{
|
||
var tplID uint64
|
||
var filtersJSON []byte
|
||
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
|
||
_ = row.Scan(&tplID, &filtersJSON)
|
||
var tplDS string
|
||
var main string
|
||
var fieldsJSON []byte
|
||
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
|
||
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
|
||
var fs []string
|
||
var fl map[string]interface{}
|
||
json.Unmarshal(fieldsJSON, &fs)
|
||
json.Unmarshal(filtersJSON, &fl)
|
||
wl := Whitelist()
|
||
var chunks [][2]string
|
||
if v, ok := fl["create_time_between"]; ok {
|
||
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
|
||
chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays)
|
||
}
|
||
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
|
||
chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays)
|
||
}
|
||
}
|
||
if len(chunks) > 0 {
|
||
var total int64
|
||
// 如果 row_estimate 为 0,在分块导出开始时重新估算
|
||
var currentEst int64
|
||
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
|
||
_ = row.Scan(¤tEst)
|
||
if currentEst == 0 {
|
||
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
|
||
if estChunk > 0 {
|
||
rrepo.UpdateRowEstimate(a.meta, id, estChunk)
|
||
}
|
||
}
|
||
skipChunk := false
|
||
if tplDS == "marketing" && main == "order" {
|
||
for _, f := range fs {
|
||
if strings.HasPrefix(f, "order_voucher.") {
|
||
skipChunk = true
|
||
break
|
||
}
|
||
}
|
||
if !skipChunk {
|
||
if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok {
|
||
skipChunk = true
|
||
}
|
||
}
|
||
}
|
||
if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold {
|
||
cur := rrepo.NewCursor(tplDS, main)
|
||
batch := constants.ChooseBatchSize(0, constants.FileFormatCSV)
|
||
for _, rg := range chunks {
|
||
fl["create_time_between"] = []string{rg[0], rg[1]}
|
||
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
|
||
cq, cargs, err := exporter.BuildSQL(req, wl)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
|
||
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
|
||
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
|
||
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
|
||
chunkBase := total
|
||
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
|
||
onRoll := func(path string, size int64, partRows int64) error {
|
||
files = append(files, path)
|
||
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
|
||
return nil
|
||
}
|
||
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
|
||
if e != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "export_stream_error",
|
||
"job_id": id,
|
||
"stage": "csv_chunk",
|
||
"error": e.Error(),
|
||
"datasource": jobDS,
|
||
"sql": cq,
|
||
"args": cargs,
|
||
})
|
||
log.Printf("[EXPORT_FAILED] job_id=%d stage=csv_chunk error=%v sql=%s", id, e, cq)
|
||
rrepo.MarkFailed(a.meta, id, "csv_chunk_stream_error", map[string]interface{}{
|
||
"error": e.Error(),
|
||
"datasource": jobDS,
|
||
"sql": cq,
|
||
"args": cargs,
|
||
})
|
||
return
|
||
}
|
||
total += cnt
|
||
rrepo.UpdateProgress(a.meta, id, total)
|
||
}
|
||
if total == 0 {
|
||
total = rrepo.Count(db, q, args)
|
||
}
|
||
if len(files) >= 1 {
|
||
rrepo.ZipAndRecord(a.meta, id, files, total)
|
||
}
|
||
rrepo.MarkCompleted(a.meta, id, total)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
|
||
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
|
||
log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args))
|
||
{
|
||
var est int64
|
||
{
|
||
var filtersJSON []byte
|
||
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
|
||
_ = row.Scan(&filtersJSON)
|
||
var fl map[string]interface{}
|
||
json.Unmarshal(filtersJSON, &fl)
|
||
est = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
|
||
rrepo.UpdateRowEstimate(a.meta, id, est)
|
||
}
|
||
batch := constants.ChooseBatchSize(est, constants.FileFormat(fileFormat))
|
||
files2 := []string{}
|
||
cur := rrepo.NewCursor(jobDS, jobMain)
|
||
newWriter := func() (exporter.RowWriter, error) { return newBaseWriter() }
|
||
transform := func(vals []string) []string { return transformRow(jobDS, fields, vals) }
|
||
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, totalRows); return nil }
|
||
onRoll := func(path string, size int64, partRows int64) error {
|
||
files2 = append(files2, path)
|
||
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
|
||
return nil
|
||
}
|
||
count, files2, err := rrepo.StreamCursor(db, q, args, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
|
||
if err != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "export_stream_error",
|
||
"job_id": id,
|
||
"stage": "xlsx_direct",
|
||
"error": err.Error(),
|
||
"datasource": jobDS,
|
||
"fields": fields,
|
||
"sql": q,
|
||
})
|
||
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct error=%v fields_count=%d", id, err, len(fields))
|
||
rrepo.MarkFailed(a.meta, id, "csv_direct_stream_error", map[string]interface{}{
|
||
"error": err.Error(),
|
||
"datasource": jobDS,
|
||
"fields": fields,
|
||
"sql": q,
|
||
})
|
||
return
|
||
}
|
||
if len(files2) >= 1 {
|
||
rrepo.ZipAndRecord(a.meta, id, files2, count)
|
||
}
|
||
rrepo.MarkCompleted(a.meta, id, count)
|
||
return
|
||
}
|
||
}
|
||
if fileFormat == "xlsx" {
|
||
files := []string{}
|
||
{
|
||
var tplID uint64
|
||
var filtersJSON []byte
|
||
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
|
||
_ = row.Scan(&tplID, &filtersJSON)
|
||
var tplDS string
|
||
var main string
|
||
var fieldsJSON []byte
|
||
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
|
||
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
|
||
var fs []string
|
||
var fl map[string]interface{}
|
||
json.Unmarshal(fieldsJSON, &fs)
|
||
json.Unmarshal(filtersJSON, &fl)
|
||
wl := Whitelist()
|
||
var chunks [][2]string
|
||
if v, ok := fl["create_time_between"]; ok {
|
||
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
|
||
chunks = exporter.SplitByDays(utils.ToString(arr[0]), utils.ToString(arr[1]), constants.ExportThresholds.ChunkDays)
|
||
}
|
||
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
|
||
chunks = exporter.SplitByDays(arrs[0], arrs[1], constants.ExportThresholds.ChunkDays)
|
||
}
|
||
}
|
||
if len(chunks) > 0 {
|
||
var total int64
|
||
// 如果 row_estimate 为 0,在分块导出开始时重新估算
|
||
var currentEst int64
|
||
row := a.meta.QueryRow("SELECT row_estimate FROM export_jobs WHERE id=?", id)
|
||
_ = row.Scan(¤tEst)
|
||
if currentEst == 0 {
|
||
estChunk := rrepo.EstimateFastChunked(db, tplDS, main, fl)
|
||
if estChunk > 0 {
|
||
rrepo.UpdateRowEstimate(a.meta, id, estChunk)
|
||
}
|
||
}
|
||
skipChunk := false
|
||
if tplDS == "marketing" && main == "order" {
|
||
for _, f := range fs {
|
||
if strings.HasPrefix(f, "order_voucher.") {
|
||
skipChunk = true
|
||
break
|
||
}
|
||
}
|
||
if !skipChunk {
|
||
if _, ok := fl["order_voucher_channel_activity_id_eq"]; ok {
|
||
skipChunk = true
|
||
}
|
||
}
|
||
}
|
||
if !skipChunk && currentEst > constants.ExportThresholds.ChunkThreshold {
|
||
cur := rrepo.NewCursor(tplDS, main)
|
||
batch := constants.ChooseBatchSize(0, constants.FileFormatXLSX)
|
||
for _, rg := range chunks {
|
||
fl["create_time_between"] = []string{rg[0], rg[1]}
|
||
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
|
||
cq, cargs, err := rrepo.Build(req, wl)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_chunk", "job_id": id, "range": rg, "sql": cq, "args": cargs, "final_sql": renderSQL(cq, cargs)})
|
||
log.Printf("export_sql_chunk job_id=%d range=%s~%s sql=%s args=%v final_sql=%s", id, rg[0], rg[1], cq, cargs, renderSQL(cq, cargs))
|
||
newWriter := func() (exporter.RowWriter, error) {
|
||
xw, e := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
|
||
if e == nil {
|
||
_ = xw.WriteHeader(cols)
|
||
}
|
||
return xw, e
|
||
}
|
||
transform := func(vals []string) []string { return transformRow(jobDS, fs, vals) }
|
||
// 进度回调按全局累计行数更新,避免跨分片出现数值回退
|
||
chunkBase := total
|
||
onProgress := func(totalRows int64) error { rrepo.UpdateProgress(a.meta, id, chunkBase+totalRows); return nil }
|
||
onRoll := func(path string, size int64, partRows int64) error {
|
||
files = append(files, path)
|
||
rrepo.InsertJobFile(a.meta, id, path, "", partRows, size)
|
||
return nil
|
||
}
|
||
cnt, _, e := rrepo.StreamCursor(db, cq, cargs, cur, batch, cols, newWriter, transform, constants.ExportThresholds.MaxRowsPerFile, onRoll, onProgress)
|
||
if e != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{"event": "export_stream_error", "job_id": id, "stage": "xlsx_chunk", "error": e.Error(), "datasource": jobDS, "sql": cq, "args": cargs})
|
||
log.Printf("export_stream_error job_id=%d stage=xlsx_chunk err=%v", id, e)
|
||
rrepo.MarkFailed(a.meta, id, "xlsx_chunk_stream_error", map[string]interface{}{
|
||
"error": e.Error(),
|
||
"datasource": jobDS,
|
||
"sql": cq,
|
||
"args": cargs,
|
||
})
|
||
return
|
||
}
|
||
total += cnt
|
||
rrepo.UpdateProgress(a.meta, id, total)
|
||
}
|
||
if total == 0 {
|
||
total = rrepo.Count(db, q, args)
|
||
}
|
||
if len(files) >= 1 {
|
||
rrepo.ZipAndRecord(a.meta, id, files, total)
|
||
}
|
||
rrepo.MarkCompleted(a.meta, id, total)
|
||
return
|
||
}
|
||
}
|
||
}
|
||
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
|
||
logging.JSON("INFO", map[string]interface{}{"event": "export_sql_execute", "job_id": id, "sql": q, "args": args, "final_sql": renderSQL(q, args)})
|
||
log.Printf("export_sql_execute job_id=%d final_sql=%s", id, renderSQL(q, args))
|
||
var est2 int64
|
||
{
|
||
var filtersJSON []byte
|
||
row := a.meta.QueryRow("SELECT filters_json FROM export_jobs WHERE id=?", id)
|
||
_ = row.Scan(&filtersJSON)
|
||
var fl map[string]interface{}
|
||
json.Unmarshal(filtersJSON, &fl)
|
||
est2 = rrepo.EstimateFastChunked(db, jobDS, jobMain, fl)
|
||
rrepo.UpdateRowEstimate(a.meta, id, est2)
|
||
}
|
||
x, err := exporter.NewXLSXWriter("storage", "export_job_"+strconv.FormatUint(id, 10), "Sheet1")
|
||
if err != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{"event": "export_writer_error", "job_id": id, "stage": "xlsx_direct", "error": err.Error()})
|
||
log.Printf("export_writer_error job_id=%d stage=xlsx_direct err=%v", id, err)
|
||
rrepo.MarkFailed(a.meta, id, "xlsx_writer_creation_failed", map[string]interface{}{
|
||
"error": err.Error(),
|
||
"stage": "xlsx_direct",
|
||
})
|
||
return
|
||
}
|
||
_ = x.WriteHeader(cols)
|
||
rrepo.UpdateProgress(a.meta, id, 0)
|
||
// 记录查询执行前的参数类型信息
|
||
argTypes := make([]string, len(args))
|
||
for i, arg := range args {
|
||
argTypes[i] = fmt.Sprintf("%T", arg)
|
||
}
|
||
logging.JSON("INFO", map[string]interface{}{
|
||
"event": "export_query_before_execute",
|
||
"job_id": id,
|
||
"stage": "xlsx_direct",
|
||
"datasource": jobDS,
|
||
"sql": q,
|
||
"args": args,
|
||
"arg_types": argTypes,
|
||
"final_sql": renderSQL(q, args),
|
||
})
|
||
log.Printf("[EXPORT_DEBUG] job_id=%d stage=xlsx_direct before_query sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args))
|
||
rows, err := db.Query(q, args...)
|
||
if err != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "export_query_error",
|
||
"job_id": id,
|
||
"stage": "xlsx_direct",
|
||
"error": err.Error(),
|
||
"datasource": jobDS,
|
||
"sql": q,
|
||
"args": args,
|
||
})
|
||
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_query error=%v", id, err)
|
||
rrepo.MarkFailed(a.meta, id, "xlsx_query_failed", map[string]interface{}{
|
||
"error": err.Error(),
|
||
"datasource": jobDS,
|
||
"sql": q,
|
||
"args": args,
|
||
})
|
||
return
|
||
}
|
||
defer rows.Close()
|
||
// 动态获取实际列数
|
||
actualCols, err := rows.Columns()
|
||
if err != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "export_columns_error",
|
||
"job_id": id,
|
||
"stage": "xlsx_direct",
|
||
"error": err.Error(),
|
||
})
|
||
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_columns error=%v", id, err)
|
||
rrepo.MarkFailed(a.meta, id, "xlsx_columns_failed", map[string]interface{}{
|
||
"error": err.Error(),
|
||
})
|
||
return
|
||
}
|
||
if len(actualCols) != len(cols) {
|
||
logging.JSON("WARN", map[string]interface{}{
|
||
"event": "export_column_count_mismatch",
|
||
"job_id": id,
|
||
"stage": "xlsx_direct",
|
||
"expected_cols": len(cols),
|
||
"actual_cols": len(actualCols),
|
||
})
|
||
log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct column_mismatch expected=%d actual=%d", id, len(cols), len(actualCols))
|
||
}
|
||
out := make([]interface{}, len(actualCols))
|
||
dest := make([]interface{}, len(actualCols))
|
||
for i := range out {
|
||
dest[i] = &out[i]
|
||
}
|
||
var count int64
|
||
var tick int64
|
||
var firstRow []string
|
||
firstRowCaptured := false
|
||
for rows.Next() {
|
||
if err := rows.Scan(dest...); err != nil {
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "export_scan_error",
|
||
"job_id": id,
|
||
"stage": "xlsx_direct",
|
||
"error": err.Error(),
|
||
"count": count,
|
||
})
|
||
log.Printf("[EXPORT_FAILED] job_id=%d stage=xlsx_direct_scan error=%v count=%d", id, err, count)
|
||
rrepo.MarkFailed(a.meta, id, "xlsx_scan_failed", map[string]interface{}{
|
||
"error": err.Error(),
|
||
"count": count,
|
||
})
|
||
return
|
||
}
|
||
vals := make([]string, len(cols))
|
||
for i := range out {
|
||
if b, ok := out[i].([]byte); ok {
|
||
vals[i] = string(b)
|
||
} else if out[i] == nil {
|
||
vals[i] = ""
|
||
} else {
|
||
vals[i] = utils.ToString(out[i])
|
||
}
|
||
}
|
||
// 仅记录第一行原始数据到日志中,方便排查是否有查询结果
|
||
if !firstRowCaptured {
|
||
firstRow = make([]string, len(vals))
|
||
copy(firstRow, vals)
|
||
firstRowCaptured = true
|
||
}
|
||
vals = transformRow(jobDS, fields, vals)
|
||
x.WriteRow(vals)
|
||
count++
|
||
tick++
|
||
if tick%200 == 0 {
|
||
rrepo.UpdateProgress(a.meta, id, count)
|
||
}
|
||
}
|
||
// 如果查询到了数据,记录一条包含首行数据的日志,便于确认导出前 SQL 是否返回结果
|
||
if count > 0 && firstRowCaptured {
|
||
logging.JSON("INFO", map[string]interface{}{
|
||
"event": "export_first_row_sample",
|
||
"job_id": id,
|
||
"datasource": jobDS,
|
||
"total_rows": count,
|
||
"first_row": firstRow,
|
||
"sql": q,
|
||
"args": args,
|
||
"final_sql": renderSQL(q, args),
|
||
"fields_order": fields,
|
||
})
|
||
} else if count == 0 {
|
||
// 如果查询返回0行,记录详细信息以便排查
|
||
logging.JSON("WARN", map[string]interface{}{
|
||
"event": "export_zero_rows",
|
||
"job_id": id,
|
||
"datasource": jobDS,
|
||
"stage": "xlsx_direct",
|
||
"sql": q,
|
||
"args": args,
|
||
"arg_types": argTypes,
|
||
"final_sql": renderSQL(q, args),
|
||
"expected_cols": len(cols),
|
||
"actual_cols": len(actualCols),
|
||
})
|
||
log.Printf("[EXPORT_WARN] job_id=%d stage=xlsx_direct zero_rows sql=%s args=%v arg_types=%v final_sql=%s", id, q, args, argTypes, renderSQL(q, args))
|
||
}
|
||
p, size, _ := x.Close()
|
||
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
|
||
rrepo.ZipAndRecord(a.meta, id, []string{p}, count)
|
||
rrepo.MarkCompleted(a.meta, id, count)
|
||
return
|
||
}
|
||
logging.JSON("ERROR", map[string]interface{}{
|
||
"event": "export_format_unsupported",
|
||
"job_id": id,
|
||
"format": fileFormat,
|
||
})
|
||
log.Printf("[EXPORT_FAILED] job_id=%d reason=unsupported_format format=%s", id, fileFormat)
|
||
rrepo.MarkFailed(a.meta, id, "unsupported_format", map[string]interface{}{
|
||
"format": fileFormat,
|
||
})
|
||
}
|
||
|
||
// recompute final rows for a job and correct export_jobs.total_rows
|
||
func (a *ExportsAPI) recompute(w http.ResponseWriter, r *http.Request, idStr string) {
|
||
id, _ := strconv.ParseUint(idStr, 10, 64)
|
||
var tplID uint64
|
||
var filtersJSON []byte
|
||
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
|
||
if err := row.Scan(&tplID, &filtersJSON); err != nil {
|
||
fail(w, r, http.StatusNotFound, "not found")
|
||
return
|
||
}
|
||
var ds string
|
||
var main string
|
||
var fieldsJSON []byte
|
||
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
|
||
_ = tr.Scan(&ds, &main, &fieldsJSON)
|
||
var fs []string
|
||
var fl map[string]interface{}
|
||
json.Unmarshal(fieldsJSON, &fs)
|
||
json.Unmarshal(filtersJSON, &fl)
|
||
wl := Whitelist()
|
||
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
|
||
q, args, err := exporter.BuildSQL(req, wl)
|
||
if err != nil {
|
||
fail(w, r, http.StatusBadRequest, err.Error())
|
||
return
|
||
}
|
||
dataDB := a.selectDataDB(ds)
|
||
final := repo.NewExportRepo().Count(dataDB, q, args)
|
||
repo.NewExportRepo().MarkCompleted(a.meta, id, final)
|
||
ok(w, r, map[string]interface{}{"id": id, "final_rows": final})
|
||
}
|
||
|
||
func (a *ExportsAPI) selectDataDB(ds string) *sql.DB {
|
||
if ds == "ymt" {
|
||
return a.ymt
|
||
}
|
||
return a.marketing
|
||
}
|
||
|
||
// moved to repo layer: repo.ZipAndRecord
|
||
|
||
func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) {
|
||
rrepo := repo.NewExportRepo()
|
||
d, err := rrepo.GetJob(a.meta, id)
|
||
if err != nil {
|
||
fail(w, r, http.StatusNotFound, "not found")
|
||
return
|
||
}
|
||
flist, _ := rrepo.ListJobFiles(a.meta, id)
|
||
files := []map[string]interface{}{}
|
||
for _, f := range flist {
|
||
files = append(files, map[string]interface{}{"storage_uri": f.URI.String, "sheet_name": f.Sheet.String, "row_count": f.RowCount.Int64, "size_bytes": f.SizeBytes.Int64})
|
||
}
|
||
evalStatus := "通过"
|
||
if d.ExplainScore.Int64 < 60 {
|
||
evalStatus = "禁止"
|
||
}
|
||
desc := fmt.Sprintf("评分:%d,估算行数:%d;%s", d.ExplainScore.Int64, d.TotalRows.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[d.ExplainScore.Int64 >= 60])
|
||
if d.ExplainJSON.Valid && d.ExplainJSON.String != "" {
|
||
var arr []map[string]interface{}
|
||
if err := json.Unmarshal([]byte(d.ExplainJSON.String), &arr); err == nil {
|
||
segs := []string{}
|
||
for _, r := range arr {
|
||
getStr := func(field string) string {
|
||
if v, ok := r[field]; ok {
|
||
if mm, ok := v.(map[string]interface{}); ok {
|
||
if b, ok := mm["Valid"].(bool); ok && !b {
|
||
return ""
|
||
}
|
||
if s, ok := mm["String"].(string); ok {
|
||
return s
|
||
}
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
getInt := func(field string) int64 {
|
||
if v, ok := r[field]; ok {
|
||
if mm, ok := v.(map[string]interface{}); ok {
|
||
if b, ok := mm["Valid"].(bool); ok && !b {
|
||
return 0
|
||
}
|
||
if f, ok := mm["Int64"].(float64); ok {
|
||
return int64(f)
|
||
}
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
getFloat := func(field string) float64 {
|
||
if v, ok := r[field]; ok {
|
||
if mm, ok := v.(map[string]interface{}); ok {
|
||
if b, ok := mm["Valid"].(bool); ok && !b {
|
||
return 0
|
||
}
|
||
if f, ok := mm["Float64"].(float64); ok {
|
||
return f
|
||
}
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
tbl := getStr("Table")
|
||
typ := getStr("Type")
|
||
if typ == "" {
|
||
typ = getStr("SelectType")
|
||
}
|
||
key := getStr("Key")
|
||
rowsN := getInt("Rows")
|
||
filt := getFloat("Filtered")
|
||
extra := getStr("Extra")
|
||
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
|
||
continue
|
||
}
|
||
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
|
||
if extra != "" {
|
||
s += ", 额外:" + extra
|
||
}
|
||
segs = append(segs, s)
|
||
}
|
||
if len(segs) > 0 {
|
||
desc = strings.Join(segs, ";")
|
||
}
|
||
}
|
||
}
|
||
ok(w, r, map[string]interface{}{"id": d.ID, "template_id": d.TemplateID, "status": d.Status, "requested_by": d.RequestedBy, "file_format": d.FileFormat, "total_rows": d.TotalRows.Int64, "started_at": d.StartedAt.Time, "finished_at": d.FinishedAt.Time, "created_at": d.CreatedAt, "updated_at": d.UpdatedAt, "files": files, "eval_status": evalStatus, "eval_desc": desc})
|
||
}
|
||
|
||
func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
|
||
rrepo := repo.NewExportRepo()
|
||
var jid uint64
|
||
_, _ = fmt.Sscan(id, &jid)
|
||
tplID, filters, err := rrepo.GetJobFilters(a.meta, jid)
|
||
if err != nil {
|
||
fail(w, r, http.StatusNotFound, "not found")
|
||
return
|
||
}
|
||
ds, main, fs, err := rrepo.GetTemplateMeta(a.meta, tplID)
|
||
if err != nil {
|
||
fail(w, r, http.StatusBadRequest, "template not found")
|
||
return
|
||
}
|
||
var fl map[string]interface{}
|
||
json.Unmarshal(filters, &fl)
|
||
wl := Whitelist()
|
||
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
|
||
q, args, err := rrepo.Build(req, wl)
|
||
if err != nil {
|
||
failCat(w, r, http.StatusBadRequest, err.Error(), "sql_build_error")
|
||
return
|
||
}
|
||
formatArg := func(a interface{}) string {
|
||
switch t := a.(type) {
|
||
case nil:
|
||
return "NULL"
|
||
case []byte:
|
||
s := string(t)
|
||
s = strings.ReplaceAll(s, "'", "''")
|
||
return "'" + s + "'"
|
||
case string:
|
||
s := strings.ReplaceAll(t, "'", "''")
|
||
return "'" + s + "'"
|
||
case int:
|
||
return strconv.Itoa(t)
|
||
case int64:
|
||
return strconv.FormatInt(t, 10)
|
||
case float64:
|
||
return strconv.FormatFloat(t, 'f', -1, 64)
|
||
case time.Time:
|
||
return "'" + t.Format("2006-01-02 15:04:05") + "'"
|
||
default:
|
||
return fmt.Sprintf("%v", t)
|
||
}
|
||
}
|
||
var sb strings.Builder
|
||
var ai int
|
||
for i := 0; i < len(q); i++ {
|
||
c := q[i]
|
||
if c == '?' && ai < len(args) {
|
||
sb.WriteString(formatArg(args[ai]))
|
||
ai++
|
||
} else {
|
||
sb.WriteByte(c)
|
||
}
|
||
}
|
||
ok(w, r, map[string]interface{}{"sql": q, "final_sql": sb.String()})
|
||
}
|
||
|
||
func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) {
|
||
rrepo := repo.NewExportRepo()
|
||
uri, err := rrepo.GetLatestFileURI(a.meta, id)
|
||
if err != nil {
|
||
// fallback: try to serve local storage file by job id
|
||
// search for files named export_job_<id>_*.zip/xlsx/csv
|
||
dir := "storage"
|
||
entries, e := os.ReadDir(dir)
|
||
if e == nil {
|
||
best := ""
|
||
var bestInfo os.FileInfo
|
||
for _, ent := range entries {
|
||
name := ent.Name()
|
||
if strings.HasPrefix(name, "export_job_"+id+"_") && (strings.HasSuffix(name, ".zip") || strings.HasSuffix(name, ".xlsx") || strings.HasSuffix(name, ".csv")) {
|
||
info, _ := os.Stat(filepath.Join(dir, name))
|
||
if info != nil {
|
||
if best == "" || info.ModTime().After(bestInfo.ModTime()) {
|
||
best = name
|
||
bestInfo = info
|
||
}
|
||
}
|
||
}
|
||
}
|
||
if best != "" {
|
||
http.ServeFile(w, r, filepath.Join(dir, best))
|
||
return
|
||
}
|
||
}
|
||
fail(w, r, http.StatusNotFound, "not found")
|
||
return
|
||
}
|
||
http.ServeFile(w, r, uri)
|
||
}
|
||
|
||
func transformRow(ds string, fields []string, vals []string) []string {
|
||
payStatusIdx := -1
|
||
for i := range fields {
|
||
if fields[i] == "order.pay_status" {
|
||
payStatusIdx = i
|
||
break
|
||
}
|
||
}
|
||
isPaid := func() bool {
|
||
if payStatusIdx < 0 || payStatusIdx >= len(vals) {
|
||
return true
|
||
}
|
||
return constants.IsPaidStatus(ds, vals[payStatusIdx])
|
||
}()
|
||
for i := range fields {
|
||
if i >= len(vals) {
|
||
break
|
||
}
|
||
f := fields[i]
|
||
v := vals[i]
|
||
|
||
// ==================== 枚举转换 ====================
|
||
// order.type - 订单类型
|
||
if f == "order.type" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if ds == "ymt" {
|
||
if label, ok := constants.YMTOrderType[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
} else {
|
||
if label, ok := constants.MarketingOrderType[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order.status - 订单状态
|
||
if f == "order.status" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if ds == "ymt" {
|
||
if label, ok := constants.YMTOrderStatus[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
} else {
|
||
if label, ok := constants.MarketingOrderStatus[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order.pay_type - 支付方式
|
||
if f == "order.pay_type" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.MarketingPayType[n]; ok {
|
||
vals[i] = label
|
||
} else if n == 0 {
|
||
vals[i] = ""
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order.pay_status - 支付状态
|
||
if f == "order.pay_status" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if ds == "ymt" {
|
||
if label, ok := constants.YMTPayStatus[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
} else {
|
||
if label, ok := constants.MarketingPayStatus[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order.use_coupon - 是否使用优惠券
|
||
if f == "order.use_coupon" {
|
||
switch v {
|
||
case "1":
|
||
vals[i] = "是"
|
||
case "2", "0":
|
||
vals[i] = "否"
|
||
}
|
||
continue
|
||
}
|
||
// order.deliver_status - 投递状态
|
||
if f == "order.deliver_status" {
|
||
switch v {
|
||
case "1":
|
||
vals[i] = "待投递"
|
||
case "2":
|
||
vals[i] = "已投递"
|
||
case "3":
|
||
vals[i] = "投递失败"
|
||
}
|
||
continue
|
||
}
|
||
// order.is_inner - 供应商类型
|
||
if f == "order.is_inner" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.YMTIsInner[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order_voucher.channel / voucher.channel - 立减金渠道
|
||
if f == "order_voucher.channel" || f == "voucher.channel" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.OrderVoucherChannel[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order_voucher.status - 立减金状态
|
||
if f == "order_voucher.status" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if ds == "ymt" {
|
||
if label, ok := constants.YMTOrderVoucherStatus[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
} else {
|
||
if label, ok := constants.MarketingOrderVoucherStatus[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order_voucher.receive_mode / voucher.receive_mode - 领取方式
|
||
if f == "order_voucher.receive_mode" || f == "voucher.receive_mode" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if ds == "ymt" {
|
||
if label, ok := constants.YMTVoucherReceiveMode[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
} else {
|
||
if label, ok := constants.OrderVoucherReceiveMode[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// voucher.is_webview - 打开方式
|
||
if f == "voucher.is_webview" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.VoucherOpenMode[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// goods_voucher_subject_config.type - 主体类型
|
||
if f == "goods_voucher_subject_config.type" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.VoucherSubjectType[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order_cash.channel - 红包渠道
|
||
if f == "order_cash.channel" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.OrderCashChannel[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order_cash.receive_status - 红包领取状态
|
||
if f == "order_cash.receive_status" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.OrderCashReceiveStatus[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order_cash.status - 红包状态(营销系统)
|
||
if f == "order_cash.status" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.MarketingOrderCashStatus[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// order_digit.order_type - 数字订单类型
|
||
if f == "order_digit.order_type" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.OrderDigitOrderType[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// activity.settlement_type / plan.settlement_type - 结算方式
|
||
if f == "activity.settlement_type" || f == "plan.settlement_type" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if ds == "ymt" {
|
||
if label, ok := constants.YMTSettlementType[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
} else {
|
||
if label, ok := constants.MarketingSettlementType[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// plan.send_method - 发放方式
|
||
if f == "plan.send_method" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.MarketingSendMethod[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// code_batch.period_type - 周期类型
|
||
if f == "code_batch.period_type" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.MarketingPeriodType[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// code_batch.recharge_type - 充值类型
|
||
if f == "code_batch.recharge_type" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.MarketingRechargeType[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// key_batch.style - key码样式
|
||
if f == "key_batch.style" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.KeyBatchStyle[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// merchant_key_send.status - key码API发放状态
|
||
if f == "merchant_key_send.status" {
|
||
if n := parseIntVal(v); n >= 0 {
|
||
if label, ok := constants.MerchantKeySendStatus[n]; ok {
|
||
vals[i] = label
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
|
||
// ==================== 特殊字段转换 ====================
|
||
// 解密/转换订单 key
|
||
if f == "order.key" {
|
||
if ds == "ymt" {
|
||
key := os.Getenv("YMT_KEY_DECRYPT_KEY_B64")
|
||
if key == "" {
|
||
key = "z9DoIVLuDYEN/qsgweRA4A=="
|
||
}
|
||
if dec, err := ymtcrypto.SM4Decrypt(vals[i], key); err == nil && dec != "" {
|
||
vals[i] = dec
|
||
}
|
||
} else {
|
||
vals[i] = decodeOrderKey(vals[i])
|
||
}
|
||
}
|
||
// voucher_batch.provider: 将渠道编码转换为中文名称
|
||
if f == "voucher_batch.provider" {
|
||
switch strings.TrimSpace(vals[i]) {
|
||
// 老编码
|
||
case "lsxd":
|
||
vals[i] = "蓝色兄弟"
|
||
case "fjxw":
|
||
vals[i] = "福建兴旺"
|
||
case "fzxy":
|
||
vals[i] = "福州兴雅"
|
||
case "fzyt":
|
||
vals[i] = "福州悦途"
|
||
// 新编码:微信立减金渠道
|
||
case "voucher_wechat_lsxd":
|
||
vals[i] = "蓝色兄弟"
|
||
case "voucher_wechat_fjxw":
|
||
vals[i] = "福建兴旺"
|
||
case "voucher_wechat_fzxy":
|
||
vals[i] = "福州兴雅"
|
||
case "voucher_wechat_fzyt":
|
||
vals[i] = "福州悦途"
|
||
case "voucher_wechat_zjky":
|
||
vals[i] = "浙江卡赢"
|
||
case "voucher_wechat_zjky2":
|
||
vals[i] = "浙江卡赢2"
|
||
case "voucher_wechat_zjwsxx":
|
||
vals[i] = "浙江喔刷"
|
||
case "voucher_wechat_gzynd":
|
||
vals[i] = "广州亿纳德"
|
||
case "voucher_wechat_fjhrxxfw":
|
||
vals[i] = "福建省宏仁信息服务"
|
||
case "voucher_wechat_fzqmkj":
|
||
vals[i] = "福州启蒙科技有限公司"
|
||
case "voucher_wechat_fzydxx":
|
||
vals[i] = "福州元朵信息科技有限公司"
|
||
case "voucher_wechat_xyhxx":
|
||
vals[i] = "新沂薪伙原信息科技有限公司"
|
||
}
|
||
}
|
||
// activity.channels: 解析 JSON 并转成可读渠道名
|
||
if f == "activity.channels" {
|
||
if vals[i] == "" || vals[i] == "0" {
|
||
vals[i] = "无"
|
||
continue
|
||
}
|
||
if !isPaid {
|
||
vals[i] = "无"
|
||
continue
|
||
}
|
||
var arr []map[string]interface{}
|
||
if err := json.Unmarshal([]byte(vals[i]), &arr); err != nil {
|
||
vals[i] = "无"
|
||
continue
|
||
}
|
||
names := make([]string, 0, len(arr))
|
||
for _, item := range arr {
|
||
if v, ok := item["pay_name"].(string); ok && strings.TrimSpace(v) != "" {
|
||
names = append(names, v)
|
||
continue
|
||
}
|
||
if v, ok := item["name"].(string); ok && strings.TrimSpace(v) != "" {
|
||
names = append(names, v)
|
||
}
|
||
}
|
||
if len(names) == 0 {
|
||
vals[i] = "无"
|
||
} else {
|
||
vals[i] = strings.Join(names, ",")
|
||
}
|
||
}
|
||
}
|
||
return vals
|
||
}
|
||
|
||
func decodeOrderKey(s string) string {
|
||
if s == "" {
|
||
return s
|
||
}
|
||
if len(s) > 2 && s[len(s)-2:] == "_1" {
|
||
s = s[:len(s)-2]
|
||
}
|
||
var n big.Int
|
||
if _, ok := n.SetString(s, 10); !ok {
|
||
return s
|
||
}
|
||
base := []rune{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j', 'k', 'l', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
|
||
baseCount := big.NewInt(int64(len(base)))
|
||
zero := big.NewInt(0)
|
||
var out []rune
|
||
for n.Cmp(zero) > 0 {
|
||
var mod big.Int
|
||
mod.Mod(&n, baseCount)
|
||
out = append(out, base[mod.Int64()])
|
||
n.Div(&n, baseCount)
|
||
}
|
||
for len(out) < 16 {
|
||
out = append(out, base[0])
|
||
}
|
||
for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 {
|
||
out[i], out[j] = out[j], out[i]
|
||
}
|
||
return string(out)
|
||
}
|
||
|
||
func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", string(constants.JobStatusCanceled), time.Now(), id)
|
||
w.Write([]byte("ok"))
|
||
}
|
||
|
||
func renderSQL(q string, args []interface{}) string {
|
||
formatArg := func(a interface{}) string {
|
||
switch t := a.(type) {
|
||
case nil:
|
||
return "NULL"
|
||
case []byte:
|
||
s := string(t)
|
||
s = strings.ReplaceAll(s, "'", "''")
|
||
return "'" + s + "'"
|
||
case string:
|
||
s := strings.ReplaceAll(t, "'", "''")
|
||
return "'" + s + "'"
|
||
case int:
|
||
return strconv.Itoa(t)
|
||
case int64:
|
||
return strconv.FormatInt(t, 10)
|
||
case float64:
|
||
return strconv.FormatFloat(t, 'f', -1, 64)
|
||
case time.Time:
|
||
return "'" + t.Format("2006-01-02 15:04:05") + "'"
|
||
default:
|
||
return fmt.Sprintf("%v", t)
|
||
}
|
||
}
|
||
var sb strings.Builder
|
||
var ai int
|
||
for i := 0; i < len(q); i++ {
|
||
c := q[i]
|
||
if c == '?' && ai < len(args) {
|
||
sb.WriteString(formatArg(args[ai]))
|
||
ai++
|
||
} else {
|
||
sb.WriteByte(c)
|
||
}
|
||
}
|
||
return sb.String()
|
||
}
|
||
func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
|
||
q := r.URL.Query()
|
||
page := 1
|
||
size := 15
|
||
if p := q.Get("page"); p != "" {
|
||
if n, err := strconv.Atoi(p); err == nil && n > 0 {
|
||
page = n
|
||
}
|
||
}
|
||
if s := q.Get("page_size"); s != "" {
|
||
if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 {
|
||
size = n
|
||
}
|
||
}
|
||
tplIDStr := q.Get("template_id")
|
||
var tplID uint64
|
||
if tplIDStr != "" {
|
||
if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil {
|
||
tplID = n
|
||
}
|
||
}
|
||
offset := (page - 1) * size
|
||
rrepo := repo.NewExportRepo()
|
||
var totalCount int64
|
||
totalCount = rrepo.CountJobs(a.meta, tplID, "")
|
||
itemsRaw, err := rrepo.ListJobs(a.meta, tplID, "", size, offset)
|
||
if err != nil {
|
||
failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error")
|
||
return
|
||
}
|
||
items := []map[string]interface{}{}
|
||
for _, it := range itemsRaw {
|
||
id, tid, req := it.ID, it.TemplateID, it.RequestedBy
|
||
status, fmtstr := it.Status, it.FileFormat
|
||
estimate, total := it.RowEstimate, it.TotalRows
|
||
createdAt, updatedAt := it.CreatedAt, it.UpdatedAt
|
||
score, explainRaw := it.ExplainScore, it.ExplainJSON
|
||
evalStatus := "通过"
|
||
if score.Int64 < 60 {
|
||
evalStatus = "禁止"
|
||
}
|
||
desc := fmt.Sprintf("评分:%d,估算行数:%d;%s", score.Int64, estimate.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[score.Int64 >= 60])
|
||
if explainRaw.Valid && explainRaw.String != "" {
|
||
var arr []map[string]interface{}
|
||
if err := json.Unmarshal([]byte(explainRaw.String), &arr); err == nil {
|
||
segs := []string{}
|
||
for _, r := range arr {
|
||
getStr := func(field string) string {
|
||
if v, ok := r[field]; ok {
|
||
if mm, ok := v.(map[string]interface{}); ok {
|
||
if b, ok := mm["Valid"].(bool); ok && !b {
|
||
return ""
|
||
}
|
||
if s, ok := mm["String"].(string); ok {
|
||
return s
|
||
}
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
getInt := func(field string) int64 {
|
||
if v, ok := r[field]; ok {
|
||
if mm, ok := v.(map[string]interface{}); ok {
|
||
if b, ok := mm["Valid"].(bool); ok && !b {
|
||
return 0
|
||
}
|
||
if f, ok := mm["Int64"].(float64); ok {
|
||
return int64(f)
|
||
}
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
getFloat := func(field string) float64 {
|
||
if v, ok := r[field]; ok {
|
||
if mm, ok := v.(map[string]interface{}); ok {
|
||
if b, ok := mm["Valid"].(bool); ok && !b {
|
||
return 0
|
||
}
|
||
if f, ok := mm["Float64"].(float64); ok {
|
||
return f
|
||
}
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
tbl := getStr("Table")
|
||
typ := getStr("Type")
|
||
if typ == "" {
|
||
typ = getStr("SelectType")
|
||
}
|
||
key := getStr("Key")
|
||
rowsN := getInt("Rows")
|
||
filt := getFloat("Filtered")
|
||
extra := getStr("Extra")
|
||
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
|
||
continue
|
||
}
|
||
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
|
||
if extra != "" {
|
||
s += ", 额外:" + extra
|
||
}
|
||
segs = append(segs, s)
|
||
}
|
||
if len(segs) > 0 {
|
||
desc = strings.Join(segs, ";")
|
||
}
|
||
}
|
||
}
|
||
m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc}
|
||
items = append(items, m)
|
||
}
|
||
ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size})
|
||
}
|
||
|
||
// mergePermissionIntoFilters injects permission scope into filters in a canonical way
|
||
func mergePermissionIntoFilters(ds, main string, perm map[string]interface{}, filters map[string]interface{}) map[string]interface{} {
|
||
if filters == nil {
|
||
filters = map[string]interface{}{}
|
||
}
|
||
// 先处理 plan_id_eq 和 reseller_id_eq 的设置
|
||
if v, ok := pickFirst(perm, filters, []string{"reseller_id", "merchant_id"}); ok {
|
||
filters["reseller_id_eq"] = v
|
||
}
|
||
if v, ok := pickFirst(perm, filters, []string{"plan_id", "activity_id"}); ok {
|
||
filters["plan_id_eq"] = v
|
||
}
|
||
// 如果传递了 plan_id_eq 或 reseller_id_eq 且不为空,则删除已有的 creator_in 并跳过设置(适用于所有数据源)
|
||
if main == "order" || main == "order_info" {
|
||
hasPlanOrReseller := false
|
||
if v, ok := filters["plan_id_eq"]; ok && v != nil && v != "" && v != 0 {
|
||
hasPlanOrReseller = true
|
||
}
|
||
if v, ok := filters["reseller_id_eq"]; ok && v != nil && v != "" && v != 0 {
|
||
hasPlanOrReseller = true
|
||
}
|
||
if hasPlanOrReseller {
|
||
// 删除已有的 creator_in
|
||
delete(filters, "creator_in")
|
||
delete(filters, "creator_ids")
|
||
goto skipCreator
|
||
}
|
||
}
|
||
// if creator_in already present, keep it
|
||
if hasNonEmptyIDs(filters["creator_in"]) {
|
||
return filters
|
||
}
|
||
// try known keys (明确排除 current_user_id,它仅用于记录 owner,不用于数据过滤)
|
||
{
|
||
candidates := []string{"creator_in", "creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"}
|
||
ids := []interface{}{}
|
||
for _, k := range candidates {
|
||
if perm == nil {
|
||
break
|
||
}
|
||
// 明确排除 current_user_id 字段(即使不在 candidates 列表中,也显式检查以确保安全)
|
||
if k == "current_user_id" {
|
||
continue
|
||
}
|
||
if v, ok := perm[k]; ok {
|
||
ids = normalizeIDs(v)
|
||
if len(ids) > 0 {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
// also check filters incoming alternative keys and normalize into creator_in
|
||
// 明确排除 current_user_id 字段
|
||
if len(ids) == 0 {
|
||
alt := []string{"creator_ids", "user_ids", "user_id_in", "user_id", "owner_id"}
|
||
for _, k := range alt {
|
||
// 明确排除 current_user_id 字段
|
||
if k == "current_user_id" {
|
||
continue
|
||
}
|
||
if v, ok := filters[k]; ok {
|
||
ids = normalizeIDs(v)
|
||
if len(ids) > 0 {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
}
|
||
// 额外检查:如果 permission 或 filters 中直接有 current_user_id,明确排除它
|
||
if perm != nil {
|
||
delete(perm, "current_user_id")
|
||
}
|
||
delete(filters, "current_user_id")
|
||
if len(ids) > 0 {
|
||
filters["creator_in"] = ids
|
||
}
|
||
}
|
||
skipCreator:
|
||
// account
|
||
if v, ok := pickFirst(perm, filters, []string{"account", "account_no"}); ok {
|
||
filters["account_eq"] = v
|
||
}
|
||
// out_trade_no
|
||
if v, ok := pickFirst(perm, filters, []string{"out_trade_no", "out_order_no"}); ok {
|
||
filters["out_trade_no_eq"] = v
|
||
}
|
||
return filters
|
||
}
|
||
|
||
func normalizeIDs(v interface{}) []interface{} {
|
||
out := []interface{}{}
|
||
switch t := v.(type) {
|
||
case []interface{}:
|
||
for _, x := range t {
|
||
if s := utils.ToString(x); s != "" {
|
||
out = append(out, s)
|
||
}
|
||
}
|
||
case []string:
|
||
for _, s := range t {
|
||
s2 := strings.TrimSpace(s)
|
||
if s2 != "" {
|
||
out = append(out, s2)
|
||
}
|
||
}
|
||
case []int:
|
||
for _, n := range t {
|
||
out = append(out, n)
|
||
}
|
||
case []int64:
|
||
for _, n := range t {
|
||
out = append(out, n)
|
||
}
|
||
case string:
|
||
// support comma-separated string
|
||
parts := strings.Split(t, ",")
|
||
for _, s := range parts {
|
||
s2 := strings.TrimSpace(s)
|
||
if s2 != "" {
|
||
out = append(out, s2)
|
||
}
|
||
}
|
||
default:
|
||
if s := utils.ToString(t); s != "" {
|
||
out = append(out, s)
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func hasNonEmptyIDs(v interface{}) bool {
|
||
arr := normalizeIDs(v)
|
||
return len(arr) > 0
|
||
}
|
||
|
||
func pickFirst(perm map[string]interface{}, filters map[string]interface{}, keys []string) (interface{}, bool) {
|
||
for _, k := range keys {
|
||
if perm != nil {
|
||
if v, ok := perm[k]; ok {
|
||
arr := normalizeIDs(v)
|
||
if len(arr) > 0 {
|
||
return arr[0], true
|
||
}
|
||
if s := utils.ToString(v); s != "" {
|
||
return s, true
|
||
}
|
||
}
|
||
}
|
||
if v, ok := filters[k]; ok {
|
||
arr := normalizeIDs(v)
|
||
if len(arr) > 0 {
|
||
return arr[0], true
|
||
}
|
||
if s := utils.ToString(v); s != "" {
|
||
return s, true
|
||
}
|
||
}
|
||
}
|
||
return nil, false
|
||
}
|
||
|
||
// parseIntVal 尝试将字符串解析为整数,失败返回-1
|
||
func parseIntVal(s string) int {
|
||
if s == "" {
|
||
return -1
|
||
}
|
||
n := 0
|
||
for _, c := range s {
|
||
if c < '0' || c > '9' {
|
||
return -1
|
||
}
|
||
n = n*10 + int(c-'0')
|
||
}
|
||
return n
|
||
}
|