MarketingSystemDataExportTool/server/internal/repo/export_repo.go

512 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package repo 提供数据访问层
package repo
import (
"database/sql"
"encoding/json"
"server/internal/constants"
"server/internal/exporter"
"server/internal/logging"
"time"
)
// ==================== 导出仓库 ====================
// ExportQueryRepo 导出查询仓库
type ExportQueryRepo struct{}
// NewExportRepo 创建导出仓库实例
func NewExportRepo() *ExportQueryRepo {
return &ExportQueryRepo{}
}
// ==================== SQL构建 ====================
// Build 构建SQL查询
func (r *ExportQueryRepo) Build(req exporter.BuildRequest, whitelist map[string]bool) (string, []interface{}, error) {
return exporter.BuildSQL(req, whitelist)
}
// BuildWithFields 构建SQL查询并返回实际使用的字段列表
func (r *ExportQueryRepo) BuildWithFields(req exporter.BuildRequest, whitelist map[string]bool) (string, []interface{}, []string, error) {
return exporter.BuildSQLWithFields(req, whitelist)
}
// Explain 执行EXPLAIN分析
func (r *ExportQueryRepo) Explain(db *sql.DB, query string, args []interface{}) (int, []string, error) {
return exporter.EvaluateExplain(db, query, args)
}
// ==================== 行数统计 ====================
// Count 使用基础查询统计行数
func (r *ExportQueryRepo) Count(db *sql.DB, baseQuery string, args []interface{}) int64 {
return exporter.CountRows(db, baseQuery, args)
}
// CountByReq 使用BuildRequest统计行数COUNT(DISTINCT)
func (r *ExportQueryRepo) CountByReq(db *sql.DB, req exporter.BuildRequest, whitelist map[string]bool) int64 {
query, args, err := exporter.BuildCountSQL(req, whitelist)
if err != nil {
logging.Error("build_count_sql_error", err, nil)
return 0
}
var count int64
if err := db.QueryRow(query, args...).Scan(&count); err != nil {
logging.Error("count_by_req_error", err, map[string]interface{}{
"sql": query,
"args": args,
})
return 0
}
return count
}
// EstimateFast 快速估算行数
func (r *ExportQueryRepo) EstimateFast(db *sql.DB, datasource, mainTable string, filters map[string]interface{}) int64 {
return exporter.CountRowsFast(db, datasource, mainTable, filters)
}
// EstimateFastChunked 快速估算行数(分块)
func (r *ExportQueryRepo) EstimateFastChunked(db *sql.DB, datasource, mainTable string, filters map[string]interface{}) int64 {
return exporter.CountRowsFastChunked(db, datasource, mainTable, filters)
}
// ==================== 游标操作 ====================
// NewCursor 创建游标
func (r *ExportQueryRepo) NewCursor(datasource, mainTable string) *exporter.CursorSQL {
return exporter.NewCursorSQL(datasource, mainTable)
}
// ==================== 回调函数类型 ====================
// RowWriterFactory 写入器工厂函数
type RowWriterFactory func() (exporter.RowWriter, error)
// RowTransform 行转换函数
type RowTransform func([]string) []string
// RollCallback 文件滚动回调
type RollCallback func(path string, size int64, partRows int64) error
// ProgressCallback 进度回调
type ProgressCallback func(totalRows int64) error
// ==================== 流式导出 ====================
// StreamCursor 流式导出数据
func (r *ExportQueryRepo) StreamCursor(
db *sql.DB,
baseQuery string,
args []interface{},
cursor *exporter.CursorSQL,
batchSize int,
columns []string,
newWriter RowWriterFactory,
transform RowTransform,
maxRowsPerFile int64,
onRoll RollCallback,
onProgress ProgressCallback,
) (int64, []string, error) {
return exporter.StreamWithCursor(
db,
baseQuery,
args,
cursor,
batchSize,
columns,
func() (exporter.RowWriter, error) { return newWriter() },
func(vals []string) []string { return transform(vals) },
maxRowsPerFile,
func(path string, size int64, rows int64) error { return onRoll(path, size, rows) },
func(total int64) error { return onProgress(total) },
)
}
// ZipAndRecord 压缩文件并记录
func (r *ExportQueryRepo) ZipAndRecord(metaDB *sql.DB, jobID uint64, files []string, totalRows int64) {
if len(files) == 0 {
return
}
zipPath, zipSize := exporter.ZipFiles(jobID, files)
now := time.Now()
_, err := metaDB.Exec(
"INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)",
jobID, zipPath, totalRows, zipSize, now, now,
)
if err != nil {
logging.DBError("zip_and_record", jobID, err)
}
}
// ==================== 模板元数据 ====================
// GetTemplateMeta 获取模板元数据
func (r *ExportQueryRepo) GetTemplateMeta(metaDB *sql.DB, templateID uint64) (datasource, mainTable string, fields []string, err error) {
var fieldsJSON []byte
row := metaDB.QueryRow(
"SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?",
templateID,
)
if err = row.Scan(&datasource, &mainTable, &fieldsJSON); err != nil {
return "", "", nil, err
}
_ = json.Unmarshal(fieldsJSON, &fields)
return datasource, mainTable, fields, nil
}
// GetJobFilters 获取任务的过滤条件
func (r *ExportQueryRepo) GetJobFilters(metaDB *sql.DB, jobID uint64) (templateID uint64, filtersJSON []byte, err error) {
row := metaDB.QueryRow(
"SELECT template_id, filters_json FROM export_jobs WHERE id=?",
jobID,
)
if err = row.Scan(&templateID, &filtersJSON); err != nil {
return 0, nil, err
}
return templateID, filtersJSON, nil
}
// ==================== 任务管理 ====================
// InsertJob 插入新的导出任务
func (r *ExportQueryRepo) InsertJob(
metaDB *sql.DB,
templateID, requestedBy, ownerID uint64,
permission, filters, options map[string]interface{},
explainResult map[string]interface{},
explainScore int,
rowEstimate int64,
fileFormat string,
) (uint64, error) {
now := time.Now()
insertSQL := `INSERT INTO export_jobs
(template_id, status, requested_by, owner_id, permission_scope_json,
filters_json, options_json, explain_json, explain_score, row_estimate,
file_format, created_at, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)`
args := []interface{}{
templateID,
string(constants.JobStatusQueued),
requestedBy,
ownerID,
toJSON(permission),
toJSON(filters),
toJSON(options),
toJSON(explainResult),
explainScore,
rowEstimate,
fileFormat,
now,
now,
}
result, err := metaDB.Exec(insertSQL, args...)
if err != nil {
return 0, err
}
id, _ := result.LastInsertId()
return uint64(id), nil
}
// StartJob 标记任务开始执行
func (r *ExportQueryRepo) StartJob(metaDB *sql.DB, jobID uint64) {
now := time.Now()
_, err := metaDB.Exec(
"UPDATE export_jobs SET status=?, started_at=?, updated_at=? WHERE id=?",
string(constants.JobStatusRunning), now, now, jobID,
)
if err != nil {
logging.DBError("start_job", jobID, err)
}
}
// UpdateProgress 更新任务进度
func (r *ExportQueryRepo) UpdateProgress(metaDB *sql.DB, jobID uint64, totalRows int64) {
now := time.Now()
_, err := metaDB.Exec(
`UPDATE export_jobs
SET total_rows=GREATEST(COALESCE(total_rows,0), ?),
updated_at=?,
status=CASE WHEN status='queued' THEN 'running' ELSE status END
WHERE id=?`,
totalRows, now, jobID,
)
if err != nil {
logging.DBError("update_progress", jobID, err)
}
logging.ExportProgress(jobID, totalRows)
}
// MarkFailed 标记任务失败
func (r *ExportQueryRepo) MarkFailed(metaDB *sql.DB, jobID uint64, reason string, context map[string]interface{}) {
now := time.Now()
// 记录失败原因和上下文符合MarkFailed调用日志记录规范
logContext := map[string]interface{}{
"event": "mark_failed",
"job_id": jobID,
"reason": reason,
"time": now,
}
if context != nil {
for k, v := range context {
logContext[k] = v
}
}
logging.JSON("ERROR", logContext)
_, err := metaDB.Exec(
"UPDATE export_jobs SET status=?, finished_at=? WHERE id=?",
string(constants.JobStatusFailed), now, jobID,
)
if err != nil {
logging.DBError("mark_failed_update", jobID, err)
}
}
// MarkCompleted 标记任务完成
func (r *ExportQueryRepo) MarkCompleted(metaDB *sql.DB, jobID uint64, totalRows int64) {
now := time.Now()
_, err := metaDB.Exec(
`UPDATE export_jobs
SET status=?, finished_at=?, total_rows=?,
row_estimate=GREATEST(COALESCE(row_estimate,0), ?), updated_at=?
WHERE id=?`,
string(constants.JobStatusCompleted), now, totalRows, totalRows, now, jobID,
)
if err != nil {
logging.DBError("mark_completed", jobID, err)
}
}
// InsertJobFile 插入任务文件记录
func (r *ExportQueryRepo) InsertJobFile(metaDB *sql.DB, jobID uint64, uri, sheetName string, rowCount, sizeBytes int64) {
now := time.Now()
_, err := metaDB.Exec(
`INSERT INTO export_job_files
(job_id, storage_uri, sheet_name, row_count, size_bytes, created_at, updated_at)
VALUES (?,?,?,?,?,?,?)`,
jobID, uri, sheetName, rowCount, sizeBytes, now, now,
)
if err != nil {
logging.Error("insert_job_file", err, map[string]interface{}{
"job_id": jobID,
"path": uri,
})
}
}
// UpdateRowEstimate 更新行数估算
func (r *ExportQueryRepo) UpdateRowEstimate(metaDB *sql.DB, jobID uint64, estimate int64) {
now := time.Now()
_, err := metaDB.Exec(
"UPDATE export_jobs SET row_estimate=?, updated_at=? WHERE id=?",
estimate, now, jobID,
)
if err != nil {
logging.Error("update_row_estimate", err, map[string]interface{}{
"job_id": jobID,
"row_estimate": estimate,
})
}
}
// toJSON 将对象序列化为JSON
func toJSON(v interface{}) []byte {
b, _ := json.Marshal(v)
return b
}
// ==================== 数据类型 ====================
// JobDetail 任务详情
type JobDetail struct {
ID uint64
TemplateID uint64
Status string
RequestedBy uint64
TotalRows sql.NullInt64
FileFormat string
StartedAt sql.NullTime
FinishedAt sql.NullTime
CreatedAt time.Time
UpdatedAt time.Time
ExplainScore sql.NullInt64
ExplainJSON sql.NullString
}
// JobFile 任务文件
type JobFile struct {
URI sql.NullString
Sheet sql.NullString
RowCount sql.NullInt64
SizeBytes sql.NullInt64
}
// JobListItem 任务列表项
type JobListItem struct {
ID uint64
TemplateID uint64
Status string
RequestedBy uint64
RowEstimate sql.NullInt64
TotalRows sql.NullInt64
FileFormat string
CreatedAt sql.NullTime
UpdatedAt sql.NullTime
ExplainScore sql.NullInt64
ExplainJSON sql.NullString
}
// ==================== 任务查询 ====================
// GetJob 获取任务详情
func (r *ExportQueryRepo) GetJob(metaDB *sql.DB, jobID string) (JobDetail, error) {
querySQL := `SELECT id, template_id, status, requested_by, total_rows,
file_format, started_at, finished_at, created_at, updated_at,
explain_score, explain_json
FROM export_jobs WHERE id=?`
var detail JobDetail
err := metaDB.QueryRow(querySQL, jobID).Scan(
&detail.ID, &detail.TemplateID, &detail.Status, &detail.RequestedBy,
&detail.TotalRows, &detail.FileFormat, &detail.StartedAt, &detail.FinishedAt,
&detail.CreatedAt, &detail.UpdatedAt, &detail.ExplainScore, &detail.ExplainJSON,
)
return detail, err
}
// ListJobFiles 获取任务文件列表
func (r *ExportQueryRepo) ListJobFiles(metaDB *sql.DB, jobID string) ([]JobFile, error) {
rows, err := metaDB.Query(
"SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id=?",
jobID,
)
if err != nil {
return nil, err
}
defer rows.Close()
var files []JobFile
for rows.Next() {
var file JobFile
if err := rows.Scan(&file.URI, &file.Sheet, &file.RowCount, &file.SizeBytes); err != nil {
continue
}
files = append(files, file)
}
return files, nil
}
// GetLatestFileURI 获取最新文件URI
func (r *ExportQueryRepo) GetLatestFileURI(metaDB *sql.DB, jobID string) (string, error) {
var uri string
err := metaDB.QueryRow(
"SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1",
jobID,
).Scan(&uri)
return uri, err
}
// CountJobs 统计任务数量
func (r *ExportQueryRepo) CountJobs(metaDB *sql.DB, templateID uint64, ownerID string) int64 {
var count int64
var err error
if templateID > 0 {
if ownerID != "" {
err = metaDB.QueryRow(
"SELECT COUNT(1) FROM export_jobs WHERE template_id=? AND owner_id=?",
templateID, ownerID,
).Scan(&count)
} else {
err = metaDB.QueryRow(
"SELECT COUNT(1) FROM export_jobs WHERE template_id=?",
templateID,
).Scan(&count)
}
} else {
if ownerID != "" {
err = metaDB.QueryRow(
"SELECT COUNT(1) FROM export_jobs WHERE owner_id=?",
ownerID,
).Scan(&count)
} else {
err = metaDB.QueryRow("SELECT COUNT(1) FROM export_jobs").Scan(&count)
}
}
if err != nil {
logging.Error("count_jobs", err, nil)
}
return count
}
// ListJobs 获取任务列表
func (r *ExportQueryRepo) ListJobs(metaDB *sql.DB, templateID uint64, ownerID string, pageSize, offset int) ([]JobListItem, error) {
querySQL := `SELECT id, template_id, status, requested_by, row_estimate,
total_rows, file_format, created_at, updated_at, explain_score, explain_json
FROM export_jobs`
var args []interface{}
var conditions []string
if templateID > 0 {
conditions = append(conditions, "template_id=?")
args = append(args, templateID)
}
if ownerID != "" {
conditions = append(conditions, "owner_id=?")
args = append(args, ownerID)
}
if len(conditions) > 0 {
querySQL += " WHERE " + joinStrings(conditions, " AND ")
}
querySQL += " ORDER BY id DESC LIMIT ? OFFSET ?"
args = append(args, pageSize, offset)
rows, err := metaDB.Query(querySQL, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var items []JobListItem
for rows.Next() {
var item JobListItem
if err := rows.Scan(
&item.ID, &item.TemplateID, &item.Status, &item.RequestedBy,
&item.RowEstimate, &item.TotalRows, &item.FileFormat,
&item.CreatedAt, &item.UpdatedAt, &item.ExplainScore, &item.ExplainJSON,
); err == nil {
items = append(items, item)
}
}
return items, nil
}
// joinStrings 连接字符串切片
func joinStrings(strs []string, sep string) string {
if len(strs) == 0 {
return ""
}
result := strs[0]
for i := 1; i < len(strs); i++ {
result += sep + strs[i]
}
return result
}