531 lines
15 KiB
Go
531 lines
15 KiB
Go
// Package api 提供HTTP API处理器
|
||
package api
|
||
|
||
import (
|
||
"database/sql"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"net/http"
|
||
"server/internal/exporter"
|
||
"server/internal/schema"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
// ==================== 模板API处理器 ====================
|
||
|
||
// TemplatesAPI 模板管理API
|
||
type TemplatesAPI struct {
|
||
metaDB *sql.DB // 元数据库(存储模板和任务)
|
||
marketingDB *sql.DB // 营销系统数据库
|
||
}
|
||
|
||
// TemplatesHandler 创建模板API处理器
|
||
func TemplatesHandler(metaDB, marketingDB *sql.DB) http.Handler {
|
||
api := &TemplatesAPI{metaDB: metaDB, marketingDB: marketingDB}
|
||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||
path := strings.TrimPrefix(r.URL.Path, "/api/templates")
|
||
|
||
// POST /api/templates - 创建模板
|
||
if r.Method == http.MethodPost && path == "" {
|
||
api.createTemplate(w, r)
|
||
return
|
||
}
|
||
|
||
// GET /api/templates - 获取模板列表
|
||
if r.Method == http.MethodGet && path == "" {
|
||
api.listTemplates(w, r)
|
||
return
|
||
}
|
||
|
||
// 带ID的路径处理
|
||
if strings.HasPrefix(path, "/") {
|
||
templateID := strings.TrimPrefix(path, "/")
|
||
|
||
// GET /api/templates/:id - 获取单个模板
|
||
if r.Method == http.MethodGet {
|
||
api.getTemplate(w, r, templateID)
|
||
return
|
||
}
|
||
|
||
// PATCH /api/templates/:id - 更新模板
|
||
if r.Method == http.MethodPatch {
|
||
api.patchTemplate(w, r, templateID)
|
||
return
|
||
}
|
||
|
||
// DELETE /api/templates/:id - 删除模板
|
||
if r.Method == http.MethodDelete {
|
||
api.deleteTemplate(w, r, templateID)
|
||
return
|
||
}
|
||
|
||
// POST /api/templates/:id/validate - 验证模板
|
||
if r.Method == http.MethodPost && strings.HasSuffix(path, "/validate") {
|
||
templateID = strings.TrimSuffix(templateID, "/validate")
|
||
api.validateTemplate(w, r, templateID)
|
||
return
|
||
}
|
||
}
|
||
|
||
fail(w, r, http.StatusNotFound, "not found")
|
||
})
|
||
}
|
||
|
||
// ==================== 请求/响应结构 ====================
|
||
|
||
// TemplatePayload 模板创建/更新请求体
|
||
type TemplatePayload struct {
|
||
Name string `json:"name"`
|
||
Datasource string `json:"datasource"`
|
||
MainTable string `json:"main_table"`
|
||
Fields []string `json:"fields"`
|
||
Filters map[string]interface{} `json:"filters"`
|
||
FileFormat string `json:"file_format"`
|
||
OwnerID uint64 `json:"owner_id"`
|
||
Visibility string `json:"visibility"`
|
||
}
|
||
|
||
// ==================== API方法 ====================
|
||
|
||
// createTemplate 创建新模板
|
||
func (api *TemplatesAPI) createTemplate(w http.ResponseWriter, r *http.Request) {
|
||
// 读取并解析请求体
|
||
body, err := io.ReadAll(r.Body)
|
||
if err != nil {
|
||
log.Printf("trace_id=%s error reading request body: %v", TraceIDFrom(r), err)
|
||
fail(w, r, http.StatusBadRequest, "invalid request body")
|
||
return
|
||
}
|
||
|
||
var payload TemplatePayload
|
||
if err := json.Unmarshal(body, &payload); err != nil {
|
||
log.Printf("trace_id=%s error parsing JSON: %v", TraceIDFrom(r), err)
|
||
fail(w, r, http.StatusBadRequest, "invalid JSON format")
|
||
return
|
||
}
|
||
|
||
r = WithPayload(r, payload)
|
||
|
||
// 从 URL 参数获取用户ID
|
||
if userIDStr := r.URL.Query().Get("current_user_id"); userIDStr != "" {
|
||
var userID uint64
|
||
if _, scanErr := fmt.Sscan(userIDStr, &userID); scanErr == nil && userID > 0 {
|
||
payload.OwnerID = userID
|
||
}
|
||
}
|
||
|
||
// 插入数据库
|
||
now := time.Now()
|
||
insertSQL := `INSERT INTO export_templates
|
||
(name, datasource, main_table, fields_json, filters_json, file_format,
|
||
visibility, owner_id, enabled, stats_enabled, last_validated_at, created_at, updated_at)
|
||
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)`
|
||
|
||
args := []interface{}{
|
||
payload.Name,
|
||
payload.Datasource,
|
||
payload.MainTable,
|
||
toJSON(payload.Fields),
|
||
toJSON(payload.Filters),
|
||
payload.FileFormat,
|
||
payload.Visibility,
|
||
payload.OwnerID,
|
||
1, // enabled
|
||
0, // stats_enabled
|
||
now, // last_validated_at
|
||
now, // created_at
|
||
now, // updated_at
|
||
}
|
||
|
||
log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), insertSQL, args)
|
||
|
||
if _, err := api.metaDB.Exec(insertSQL, args...); err != nil {
|
||
fail(w, r, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
|
||
writeJSON(w, r, http.StatusCreated, 0, "ok", nil)
|
||
}
|
||
|
||
// listTemplates 获取模板列表
|
||
func (api *TemplatesAPI) listTemplates(w http.ResponseWriter, r *http.Request) {
|
||
userIDStr := r.URL.Query().Get("current_user_id")
|
||
|
||
// 构建查询SQL
|
||
querySQL := `SELECT id, name, datasource, main_table, file_format, visibility,
|
||
owner_id, enabled, last_validated_at, created_at, updated_at, fields_json,
|
||
(SELECT COUNT(1) FROM export_jobs ej WHERE ej.template_id = export_templates.id) AS exec_count
|
||
FROM export_templates`
|
||
|
||
var args []interface{}
|
||
var conditions []string
|
||
|
||
if userIDStr != "" {
|
||
conditions = append(conditions, "owner_id IN (0, ?)")
|
||
args = append(args, userIDStr)
|
||
}
|
||
conditions = append(conditions, "enabled = 1")
|
||
|
||
if len(conditions) > 0 {
|
||
querySQL += " WHERE " + strings.Join(conditions, " AND ")
|
||
}
|
||
querySQL += " ORDER BY datasource ASC, id DESC LIMIT 200"
|
||
|
||
rows, err := api.metaDB.Query(querySQL, args...)
|
||
if err != nil {
|
||
fail(w, r, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
defer rows.Close()
|
||
|
||
whitelist := Whitelist()
|
||
templates := []map[string]interface{}{}
|
||
|
||
for rows.Next() {
|
||
var (
|
||
id uint64
|
||
name string
|
||
datasource string
|
||
mainTable string
|
||
fileFormat string
|
||
visibility string
|
||
ownerID uint64
|
||
enabled int
|
||
lastValidatedAt sql.NullTime
|
||
createdAt time.Time
|
||
updatedAt time.Time
|
||
fieldsRaw []byte
|
||
execCount int64
|
||
)
|
||
|
||
if err := rows.Scan(&id, &name, &datasource, &mainTable, &fileFormat, &visibility,
|
||
&ownerID, &enabled, &lastValidatedAt, &createdAt, &updatedAt, &fieldsRaw, &execCount); err != nil {
|
||
fail(w, r, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
|
||
// 解析字段并计算有效字段数
|
||
var fields []string
|
||
_ = json.Unmarshal(fieldsRaw, &fields)
|
||
fieldCount := countValidFields(datasource, mainTable, fields, whitelist)
|
||
|
||
templates = append(templates, map[string]interface{}{
|
||
"id": id,
|
||
"name": name,
|
||
"datasource": datasource,
|
||
"main_table": mainTable,
|
||
"file_format": fileFormat,
|
||
"visibility": visibility,
|
||
"owner_id": ownerID,
|
||
"enabled": enabled == 1,
|
||
"last_validated_at": lastValidatedAt.Time,
|
||
"created_at": createdAt,
|
||
"updated_at": updatedAt,
|
||
"field_count": fieldCount,
|
||
"exec_count": execCount,
|
||
})
|
||
}
|
||
|
||
ok(w, r, templates)
|
||
}
|
||
|
||
// countValidFields 计算有效字段数(不去重,不过滤白名单)
|
||
func countValidFields(datasource, mainTable string, fields []string, whitelist map[string]bool) int64 {
|
||
return int64(len(fields))
|
||
}
|
||
|
||
// getTemplate 获取单个模板详情
|
||
func (api *TemplatesAPI) getTemplate(w http.ResponseWriter, r *http.Request, templateID string) {
|
||
querySQL := `SELECT id, name, datasource, main_table, fields_json, filters_json,
|
||
file_format, visibility, owner_id, enabled, explain_score,
|
||
last_validated_at, created_at, updated_at
|
||
FROM export_templates WHERE id=?`
|
||
|
||
row := api.metaDB.QueryRow(querySQL, templateID)
|
||
|
||
var (
|
||
id uint64
|
||
name string
|
||
datasource string
|
||
mainTable string
|
||
fileFormat string
|
||
visibility string
|
||
ownerID uint64
|
||
enabled int
|
||
explainScore sql.NullInt64
|
||
lastValidatedAt sql.NullTime
|
||
createdAt time.Time
|
||
updatedAt time.Time
|
||
fieldsJSON []byte
|
||
filtersJSON []byte
|
||
)
|
||
|
||
err := row.Scan(&id, &name, &datasource, &mainTable, &fieldsJSON, &filtersJSON,
|
||
&fileFormat, &visibility, &ownerID, &enabled, &explainScore,
|
||
&lastValidatedAt, &createdAt, &updatedAt)
|
||
if err != nil {
|
||
fail(w, r, http.StatusNotFound, "not found")
|
||
return
|
||
}
|
||
|
||
result := map[string]interface{}{
|
||
"id": id,
|
||
"name": name,
|
||
"datasource": datasource,
|
||
"main_table": mainTable,
|
||
"file_format": fileFormat,
|
||
"visibility": visibility,
|
||
"owner_id": ownerID,
|
||
"enabled": enabled == 1,
|
||
"explain_score": explainScore.Int64,
|
||
"last_validated_at": lastValidatedAt.Time,
|
||
"created_at": createdAt,
|
||
"updated_at": updatedAt,
|
||
"fields": fromJSON(fieldsJSON),
|
||
"filters": fromJSON(filtersJSON),
|
||
}
|
||
|
||
ok(w, r, result)
|
||
}
|
||
|
||
// patchTemplate 更新模板
|
||
func (api *TemplatesAPI) patchTemplate(w http.ResponseWriter, r *http.Request, templateID string) {
|
||
traceID := TraceIDFrom(r)
|
||
|
||
// 读取请求体
|
||
body, err := io.ReadAll(r.Body)
|
||
if err != nil {
|
||
log.Printf("trace_id=%s error reading request body: %v", traceID, err)
|
||
fail(w, r, http.StatusBadRequest, "invalid request body")
|
||
return
|
||
}
|
||
|
||
log.Printf("trace_id=%s patchTemplate request body: %s", traceID, string(body))
|
||
|
||
// 解析JSON
|
||
var payload map[string]interface{}
|
||
if err := json.Unmarshal(body, &payload); err != nil {
|
||
log.Printf("trace_id=%s error unmarshaling request body: %v", traceID, err)
|
||
fail(w, r, http.StatusBadRequest, "invalid JSON format")
|
||
return
|
||
}
|
||
|
||
log.Printf("trace_id=%s patchTemplate parsed payload: %v", traceID, payload)
|
||
log.Printf("trace_id=%s patchTemplate template ID: %s", traceID, templateID)
|
||
|
||
// 构建UPDATE语句
|
||
var setClauses []string
|
||
var args []interface{}
|
||
|
||
for key, value := range payload {
|
||
log.Printf("trace_id=%s patchTemplate processing field: %s, value: %v, type: %T", traceID, key, value, value)
|
||
|
||
switch key {
|
||
case "name", "visibility", "file_format", "main_table":
|
||
if strVal, isStr := value.(string); isStr {
|
||
setClauses = append(setClauses, key+"=?")
|
||
args = append(args, strVal)
|
||
log.Printf("trace_id=%s patchTemplate added string field: %s, value: %s", traceID, key, strVal)
|
||
} else {
|
||
log.Printf("trace_id=%s patchTemplate invalid string field: %s, value: %v, type: %T", traceID, key, value, value)
|
||
}
|
||
|
||
case "fields":
|
||
setClauses = append(setClauses, "fields_json=?")
|
||
jsonBytes := toJSON(value)
|
||
args = append(args, jsonBytes)
|
||
log.Printf("trace_id=%s patchTemplate added fields_json: %s", traceID, string(jsonBytes))
|
||
|
||
case "filters":
|
||
setClauses = append(setClauses, "filters_json=?")
|
||
jsonBytes := toJSON(value)
|
||
args = append(args, jsonBytes)
|
||
log.Printf("trace_id=%s patchTemplate added filters_json: %s", traceID, string(jsonBytes))
|
||
|
||
case "enabled":
|
||
setClauses = append(setClauses, "enabled=?")
|
||
if boolVal, isBool := value.(bool); isBool {
|
||
if boolVal {
|
||
args = append(args, 1)
|
||
} else {
|
||
args = append(args, 0)
|
||
}
|
||
log.Printf("trace_id=%s patchTemplate added enabled: %t", traceID, boolVal)
|
||
} else {
|
||
log.Printf("trace_id=%s patchTemplate invalid bool field: %s, value: %v, type: %T", traceID, key, value, value)
|
||
}
|
||
}
|
||
}
|
||
|
||
if len(setClauses) == 0 {
|
||
log.Printf("trace_id=%s patchTemplate no fields to update", traceID)
|
||
fail(w, r, http.StatusBadRequest, "no patch")
|
||
return
|
||
}
|
||
|
||
// 添加updated_at
|
||
setClauses = append(setClauses, "updated_at=?")
|
||
now := time.Now()
|
||
args = append(args, now, templateID)
|
||
|
||
updateSQL := "UPDATE export_templates SET " + strings.Join(setClauses, ",") + " WHERE id= ?"
|
||
log.Printf("trace_id=%s patchTemplate executing SQL: %s", traceID, updateSQL)
|
||
log.Printf("trace_id=%s patchTemplate SQL args: %v", traceID, args)
|
||
|
||
if _, err := api.metaDB.Exec(updateSQL, args...); err != nil {
|
||
log.Printf("trace_id=%s patchTemplate SQL error: %v", traceID, err)
|
||
fail(w, r, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
|
||
log.Printf("trace_id=%s patchTemplate update successful", traceID)
|
||
ok(w, r, nil)
|
||
}
|
||
|
||
// deleteTemplate 删除模板
|
||
func (api *TemplatesAPI) deleteTemplate(w http.ResponseWriter, r *http.Request, templateID string) {
|
||
// 检查是否为公共模板(owner_id=0)
|
||
var ownerID uint64
|
||
rowOwner := api.metaDB.QueryRow("SELECT owner_id FROM export_templates WHERE id=?", templateID)
|
||
if err := rowOwner.Scan(&ownerID); err != nil {
|
||
fail(w, r, http.StatusNotFound, "template not found")
|
||
return
|
||
}
|
||
if ownerID == 0 {
|
||
fail(w, r, http.StatusForbidden, "公共模板不允许删除")
|
||
return
|
||
}
|
||
|
||
// 检查是否有关联的导出任务
|
||
var jobCount int64
|
||
row := api.metaDB.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id=?", templateID)
|
||
_ = row.Scan(&jobCount)
|
||
|
||
if jobCount > 0 {
|
||
// 有关联任务,检查是否要求软删除
|
||
softDelete := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("soft")))
|
||
if softDelete == "1" || softDelete == "true" || softDelete == "yes" {
|
||
// 软删除:禁用模板
|
||
_, _ = api.metaDB.Exec("UPDATE export_templates SET enabled=?, updated_at=? WHERE id=?", 0, time.Now(), templateID)
|
||
ok(w, r, nil)
|
||
return
|
||
}
|
||
fail(w, r, http.StatusBadRequest, "template in use")
|
||
return
|
||
}
|
||
|
||
// 无关联任务,硬删除
|
||
if _, err := api.metaDB.Exec("DELETE FROM export_templates WHERE id=?", templateID); err != nil {
|
||
fail(w, r, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
|
||
ok(w, r, nil)
|
||
}
|
||
|
||
// validateTemplate 验证模板
|
||
func (api *TemplatesAPI) validateTemplate(w http.ResponseWriter, r *http.Request, templateID string) {
|
||
// 获取模板信息
|
||
row := api.metaDB.QueryRow(
|
||
"SELECT datasource, main_table, fields_json, filters_json FROM export_templates WHERE id=?",
|
||
templateID,
|
||
)
|
||
|
||
var (
|
||
datasource string
|
||
mainTable string
|
||
fieldsJSON []byte
|
||
filtersJSON []byte
|
||
)
|
||
|
||
if err := row.Scan(&datasource, &mainTable, &fieldsJSON, &filtersJSON); err != nil {
|
||
fail(w, r, http.StatusNotFound, "not found")
|
||
return
|
||
}
|
||
|
||
// 解析字段和过滤条件
|
||
var fields []string
|
||
var filters map[string]interface{}
|
||
_ = json.Unmarshal(fieldsJSON, &fields)
|
||
_ = json.Unmarshal(filtersJSON, &filters)
|
||
|
||
// 构建SQL
|
||
whitelist := Whitelist()
|
||
request := exporter.BuildRequest{
|
||
MainTable: mainTable,
|
||
Datasource: datasource,
|
||
Fields: fields,
|
||
Filters: filters,
|
||
}
|
||
|
||
query, args, err := exporter.BuildSQL(request, whitelist)
|
||
if err != nil {
|
||
failCat(w, r, http.StatusBadRequest, err.Error(), "sql_build_error")
|
||
return
|
||
}
|
||
|
||
// 执行EXPLAIN分析
|
||
dataDB := api.selectDataDB(datasource)
|
||
score, suggestions, err := exporter.EvaluateExplain(dataDB, query, args)
|
||
if err != nil {
|
||
failCat(w, r, http.StatusBadRequest, err.Error(), "explain_error")
|
||
return
|
||
}
|
||
|
||
// 添加索引建议
|
||
indexSuggestions := exporter.IndexSuggestions(request)
|
||
suggestions = append(suggestions, indexSuggestions...)
|
||
|
||
// 更新模板的验证结果
|
||
explainResult := map[string]interface{}{
|
||
"sql": query,
|
||
"suggestions": suggestions,
|
||
}
|
||
now := time.Now()
|
||
_, _ = api.metaDB.Exec(
|
||
"UPDATE export_templates SET explain_json=?, explain_score=?, last_validated_at=?, updated_at=? WHERE id=?",
|
||
toJSON(explainResult), score, now, now, templateID,
|
||
)
|
||
|
||
ok(w, r, map[string]interface{}{
|
||
"score": score,
|
||
"suggestions": suggestions,
|
||
})
|
||
}
|
||
|
||
// selectDataDB 根据数据源选择对应的数据库连接
|
||
func (api *TemplatesAPI) selectDataDB(datasource string) *sql.DB {
|
||
if datasource == "ymt" {
|
||
return api.metaDB // YMT数据在meta库
|
||
}
|
||
return api.marketingDB
|
||
}
|
||
|
||
// ==================== 辅助函数 ====================
|
||
|
||
// toJSON 将对象转换为JSON字节
|
||
func toJSON(v interface{}) []byte {
|
||
b, _ := json.Marshal(v)
|
||
return b
|
||
}
|
||
|
||
// fromJSON 将JSON字节解析为对象
|
||
func fromJSON(b []byte) interface{} {
|
||
var v interface{}
|
||
_ = json.Unmarshal(b, &v)
|
||
return v
|
||
}
|
||
|
||
// Whitelist 获取字段白名单
|
||
func Whitelist() map[string]bool {
|
||
return schema.AllWhitelist()
|
||
}
|
||
|
||
// FieldLabels 获取字段标签映射
|
||
func FieldLabels() map[string]string {
|
||
return schema.AllLabels()
|
||
}
|