MarketingSystemDataExportTool/server/internal/api/templates.go

545 lines
16 KiB
Go

// Package api 提供HTTP API处理器
package api
import (
"database/sql"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"server/internal/exporter"
"server/internal/schema"
"strings"
"time"
)
// ==================== 模板API处理器 ====================
// TemplatesAPI 模板管理API
type TemplatesAPI struct {
metaDB *sql.DB // 元数据库(存储模板和任务)
marketingDB *sql.DB // 营销系统数据库
}
// TemplatesHandler 创建模板API处理器
func TemplatesHandler(metaDB, marketingDB *sql.DB) http.Handler {
api := &TemplatesAPI{metaDB: metaDB, marketingDB: marketingDB}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/api/templates")
// POST /api/templates - 创建模板
if r.Method == http.MethodPost && path == "" {
api.createTemplate(w, r)
return
}
// GET /api/templates - 获取模板列表
if r.Method == http.MethodGet && path == "" {
api.listTemplates(w, r)
return
}
// 带ID的路径处理
if strings.HasPrefix(path, "/") {
templateID := strings.TrimPrefix(path, "/")
// GET /api/templates/:id - 获取单个模板
if r.Method == http.MethodGet {
api.getTemplate(w, r, templateID)
return
}
// PATCH /api/templates/:id - 更新模板
if r.Method == http.MethodPatch {
api.patchTemplate(w, r, templateID)
return
}
// DELETE /api/templates/:id - 删除模板
if r.Method == http.MethodDelete {
api.deleteTemplate(w, r, templateID)
return
}
// POST /api/templates/:id/validate - 验证模板
if r.Method == http.MethodPost && strings.HasSuffix(path, "/validate") {
templateID = strings.TrimSuffix(templateID, "/validate")
api.validateTemplate(w, r, templateID)
return
}
}
fail(w, r, http.StatusNotFound, "not found")
})
}
// ==================== 请求/响应结构 ====================
// TemplatePayload 模板创建/更新请求体
type TemplatePayload struct {
Name string `json:"name"`
Datasource string `json:"datasource"`
MainTable string `json:"main_table"`
Fields []string `json:"fields"`
Filters map[string]interface{} `json:"filters"`
FileFormat string `json:"file_format"`
OwnerID uint64 `json:"owner_id"`
Visibility string `json:"visibility"`
}
// ==================== API方法 ====================
// createTemplate 创建新模板
func (api *TemplatesAPI) createTemplate(w http.ResponseWriter, r *http.Request) {
// 读取并解析请求体
body, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("trace_id=%s error reading request body: %v", TraceIDFrom(r), err)
fail(w, r, http.StatusBadRequest, "invalid request body")
return
}
var payload TemplatePayload
if err := json.Unmarshal(body, &payload); err != nil {
log.Printf("trace_id=%s error parsing JSON: %v", TraceIDFrom(r), err)
fail(w, r, http.StatusBadRequest, "invalid JSON format")
return
}
r = WithPayload(r, payload)
// 介URL参数获取用户ID
if userIDStr := r.URL.Query().Get("userId"); userIDStr != "" {
var userID uint64
if _, scanErr := fmt.Sscan(userIDStr, &userID); scanErr == nil && userID > 0 {
payload.OwnerID = userID
}
}
// 插入数据库
now := time.Now()
insertSQL := `INSERT INTO export_templates
(name, datasource, main_table, fields_json, filters_json, file_format,
visibility, owner_id, enabled, stats_enabled, last_validated_at, created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)`
args := []interface{}{
payload.Name,
payload.Datasource,
payload.MainTable,
toJSON(payload.Fields),
toJSON(payload.Filters),
payload.FileFormat,
payload.Visibility,
payload.OwnerID,
1, // enabled
0, // stats_enabled
now, // last_validated_at
now, // created_at
now, // updated_at
}
log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), insertSQL, args)
if _, err := api.metaDB.Exec(insertSQL, args...); err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, r, http.StatusCreated, 0, "ok", nil)
}
// listTemplates 获取模板列表
func (api *TemplatesAPI) listTemplates(w http.ResponseWriter, r *http.Request) {
userIDStr := r.URL.Query().Get("userId")
// 构建查询SQL
querySQL := `SELECT id, name, datasource, main_table, file_format, visibility,
owner_id, enabled, last_validated_at, created_at, updated_at, fields_json,
(SELECT COUNT(1) FROM export_jobs ej WHERE ej.template_id = export_templates.id) AS exec_count
FROM export_templates`
var args []interface{}
var conditions []string
if userIDStr != "" {
conditions = append(conditions, "owner_id IN (0, ?)")
args = append(args, userIDStr)
}
conditions = append(conditions, "enabled = 1")
if len(conditions) > 0 {
querySQL += " WHERE " + strings.Join(conditions, " AND ")
}
querySQL += " ORDER BY datasource ASC, id DESC LIMIT 200"
rows, err := api.metaDB.Query(querySQL, args...)
if err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
defer rows.Close()
whitelist := Whitelist()
templates := []map[string]interface{}{}
for rows.Next() {
var (
id uint64
name string
datasource string
mainTable string
fileFormat string
visibility string
ownerID uint64
enabled int
lastValidatedAt sql.NullTime
createdAt time.Time
updatedAt time.Time
fieldsRaw []byte
execCount int64
)
if err := rows.Scan(&id, &name, &datasource, &mainTable, &fileFormat, &visibility,
&ownerID, &enabled, &lastValidatedAt, &createdAt, &updatedAt, &fieldsRaw, &execCount); err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
// 解析字段并计算有效字段数
var fields []string
_ = json.Unmarshal(fieldsRaw, &fields)
fieldCount := countValidFields(datasource, mainTable, fields, whitelist)
templates = append(templates, map[string]interface{}{
"id": id,
"name": name,
"datasource": datasource,
"main_table": mainTable,
"file_format": fileFormat,
"visibility": visibility,
"owner_id": ownerID,
"enabled": enabled == 1,
"last_validated_at": lastValidatedAt.Time,
"created_at": createdAt,
"updated_at": updatedAt,
"field_count": fieldCount,
"exec_count": execCount,
})
}
ok(w, r, templates)
}
// countValidFields 计算有效字段数(去重)
func countValidFields(datasource, mainTable string, fields []string, whitelist map[string]bool) int64 {
seen := map[string]struct{}{}
for _, field := range fields {
// YMT系统的order_info映射为order
if datasource == "ymt" && strings.HasPrefix(field, "order_info.") {
field = strings.Replace(field, "order_info.", "order.", 1)
}
// 检查白名单
if !whitelist[field] {
continue
}
// YMT系统客户名称去重
if datasource == "ymt" && field == "order.merchant_name" {
if _, exists := seen["merchant.name"]; exists {
continue
}
}
if _, exists := seen[field]; exists {
continue
}
seen[field] = struct{}{}
}
return int64(len(seen))
}
// getTemplate 获取单个模板详情
func (api *TemplatesAPI) getTemplate(w http.ResponseWriter, r *http.Request, templateID string) {
querySQL := `SELECT id, name, datasource, main_table, fields_json, filters_json,
file_format, visibility, owner_id, enabled, explain_score,
last_validated_at, created_at, updated_at
FROM export_templates WHERE id=?`
row := api.metaDB.QueryRow(querySQL, templateID)
var (
id uint64
name string
datasource string
mainTable string
fileFormat string
visibility string
ownerID uint64
enabled int
explainScore sql.NullInt64
lastValidatedAt sql.NullTime
createdAt time.Time
updatedAt time.Time
fieldsJSON []byte
filtersJSON []byte
)
err := row.Scan(&id, &name, &datasource, &mainTable, &fieldsJSON, &filtersJSON,
&fileFormat, &visibility, &ownerID, &enabled, &explainScore,
&lastValidatedAt, &createdAt, &updatedAt)
if err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
result := map[string]interface{}{
"id": id,
"name": name,
"datasource": datasource,
"main_table": mainTable,
"file_format": fileFormat,
"visibility": visibility,
"owner_id": ownerID,
"enabled": enabled == 1,
"explain_score": explainScore.Int64,
"last_validated_at": lastValidatedAt.Time,
"created_at": createdAt,
"updated_at": updatedAt,
"fields": fromJSON(fieldsJSON),
"filters": fromJSON(filtersJSON),
}
ok(w, r, result)
}
// patchTemplate 更新模板
func (api *TemplatesAPI) patchTemplate(w http.ResponseWriter, r *http.Request, templateID string) {
traceID := TraceIDFrom(r)
// 读取请求体
body, err := io.ReadAll(r.Body)
if err != nil {
log.Printf("trace_id=%s error reading request body: %v", traceID, err)
fail(w, r, http.StatusBadRequest, "invalid request body")
return
}
log.Printf("trace_id=%s patchTemplate request body: %s", traceID, string(body))
// 解析JSON
var payload map[string]interface{}
if err := json.Unmarshal(body, &payload); err != nil {
log.Printf("trace_id=%s error unmarshaling request body: %v", traceID, err)
fail(w, r, http.StatusBadRequest, "invalid JSON format")
return
}
log.Printf("trace_id=%s patchTemplate parsed payload: %v", traceID, payload)
log.Printf("trace_id=%s patchTemplate template ID: %s", traceID, templateID)
// 构建UPDATE语句
var setClauses []string
var args []interface{}
for key, value := range payload {
log.Printf("trace_id=%s patchTemplate processing field: %s, value: %v, type: %T", traceID, key, value, value)
switch key {
case "name", "visibility", "file_format", "main_table":
if strVal, isStr := value.(string); isStr {
setClauses = append(setClauses, key+"=?")
args = append(args, strVal)
log.Printf("trace_id=%s patchTemplate added string field: %s, value: %s", traceID, key, strVal)
} else {
log.Printf("trace_id=%s patchTemplate invalid string field: %s, value: %v, type: %T", traceID, key, value, value)
}
case "fields":
setClauses = append(setClauses, "fields_json=?")
jsonBytes := toJSON(value)
args = append(args, jsonBytes)
log.Printf("trace_id=%s patchTemplate added fields_json: %s", traceID, string(jsonBytes))
case "filters":
setClauses = append(setClauses, "filters_json=?")
jsonBytes := toJSON(value)
args = append(args, jsonBytes)
log.Printf("trace_id=%s patchTemplate added filters_json: %s", traceID, string(jsonBytes))
case "enabled":
setClauses = append(setClauses, "enabled=?")
if boolVal, isBool := value.(bool); isBool {
if boolVal {
args = append(args, 1)
} else {
args = append(args, 0)
}
log.Printf("trace_id=%s patchTemplate added enabled: %t", traceID, boolVal)
} else {
log.Printf("trace_id=%s patchTemplate invalid bool field: %s, value: %v, type: %T", traceID, key, value, value)
}
}
}
if len(setClauses) == 0 {
log.Printf("trace_id=%s patchTemplate no fields to update", traceID)
fail(w, r, http.StatusBadRequest, "no patch")
return
}
// 添加updated_at
setClauses = append(setClauses, "updated_at=?")
now := time.Now()
args = append(args, now, templateID)
updateSQL := "UPDATE export_templates SET " + strings.Join(setClauses, ",") + " WHERE id= ?"
log.Printf("trace_id=%s patchTemplate executing SQL: %s", traceID, updateSQL)
log.Printf("trace_id=%s patchTemplate SQL args: %v", traceID, args)
if _, err := api.metaDB.Exec(updateSQL, args...); err != nil {
log.Printf("trace_id=%s patchTemplate SQL error: %v", traceID, err)
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
log.Printf("trace_id=%s patchTemplate update successful", traceID)
ok(w, r, nil)
}
// deleteTemplate 删除模板
func (api *TemplatesAPI) deleteTemplate(w http.ResponseWriter, r *http.Request, templateID string) {
// 检查是否有关联的导出任务
var jobCount int64
row := api.metaDB.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id=?", templateID)
_ = row.Scan(&jobCount)
if jobCount > 0 {
// 有关联任务,检查是否要求软删除
softDelete := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("soft")))
if softDelete == "1" || softDelete == "true" || softDelete == "yes" {
// 软删除:禁用模板
_, _ = api.metaDB.Exec("UPDATE export_templates SET enabled=?, updated_at=? WHERE id=?", 0, time.Now(), templateID)
ok(w, r, nil)
return
}
fail(w, r, http.StatusBadRequest, "template in use")
return
}
// 无关联任务,硬删除
if _, err := api.metaDB.Exec("DELETE FROM export_templates WHERE id=?", templateID); err != nil {
fail(w, r, http.StatusInternalServerError, err.Error())
return
}
ok(w, r, nil)
}
// validateTemplate 验证模板
func (api *TemplatesAPI) validateTemplate(w http.ResponseWriter, r *http.Request, templateID string) {
// 获取模板信息
row := api.metaDB.QueryRow(
"SELECT datasource, main_table, fields_json, filters_json FROM export_templates WHERE id=?",
templateID,
)
var (
datasource string
mainTable string
fieldsJSON []byte
filtersJSON []byte
)
if err := row.Scan(&datasource, &mainTable, &fieldsJSON, &filtersJSON); err != nil {
fail(w, r, http.StatusNotFound, "not found")
return
}
// 解析字段和过滤条件
var fields []string
var filters map[string]interface{}
_ = json.Unmarshal(fieldsJSON, &fields)
_ = json.Unmarshal(filtersJSON, &filters)
// 构建SQL
whitelist := Whitelist()
request := exporter.BuildRequest{
MainTable: mainTable,
Datasource: datasource,
Fields: fields,
Filters: filters,
}
query, args, err := exporter.BuildSQL(request, whitelist)
if err != nil {
failCat(w, r, http.StatusBadRequest, err.Error(), "sql_build_error")
return
}
// 执行EXPLAIN分析
dataDB := api.selectDataDB(datasource)
score, suggestions, err := exporter.EvaluateExplain(dataDB, query, args)
if err != nil {
failCat(w, r, http.StatusBadRequest, err.Error(), "explain_error")
return
}
// 添加索引建议
indexSuggestions := exporter.IndexSuggestions(request)
suggestions = append(suggestions, indexSuggestions...)
// 更新模板的验证结果
explainResult := map[string]interface{}{
"sql": query,
"suggestions": suggestions,
}
now := time.Now()
_, _ = api.metaDB.Exec(
"UPDATE export_templates SET explain_json=?, explain_score=?, last_validated_at=?, updated_at=? WHERE id=?",
toJSON(explainResult), score, now, now, templateID,
)
ok(w, r, map[string]interface{}{
"score": score,
"suggestions": suggestions,
})
}
// selectDataDB 根据数据源选择对应的数据库连接
func (api *TemplatesAPI) selectDataDB(datasource string) *sql.DB {
if datasource == "ymt" {
return api.metaDB // YMT数据在meta库
}
return api.marketingDB
}
// ==================== 辅助函数 ====================
// toJSON 将对象转换为JSON字节
func toJSON(v interface{}) []byte {
b, _ := json.Marshal(v)
return b
}
// fromJSON 将JSON字节解析为对象
func fromJSON(b []byte) interface{} {
var v interface{}
_ = json.Unmarshal(b, &v)
return v
}
// Whitelist 获取字段白名单
func Whitelist() map[string]bool {
return schema.AllWhitelist()
}
// FieldLabels 获取字段标签映射
func FieldLabels() map[string]string {
return schema.AllLabels()
}