// Package repo 提供数据访问层 package repo import ( "database/sql" "encoding/json" "server/internal/constants" "server/internal/exporter" "server/internal/logging" "sync" "time" ) // ==================== 导出仓库 ==================== // ExportQueryRepo 导出查询仓库 type ExportQueryRepo struct{} // NewExportRepo 创建导出仓库实例 func NewExportRepo() *ExportQueryRepo { return &ExportQueryRepo{} } // ==================== 实时进度缓存 ==================== // ProgressCache 实时进度缓存 type ProgressCache struct { mu sync.RWMutex progress map[uint64]int64 // jobID -> totalRows } var ( progressCache = &ProgressCache{ progress: make(map[uint64]int64), } ) // SetProgress 设置实时进度 func SetProgress(jobID uint64, totalRows int64) { progressCache.mu.Lock() defer progressCache.mu.Unlock() progressCache.progress[jobID] = totalRows } // GetProgress 获取实时进度 func GetProgress(jobID uint64) int64 { progressCache.mu.RLock() defer progressCache.mu.RUnlock() if rows, ok := progressCache.progress[jobID]; ok { return rows } return 0 } // ClearProgress 清除所有进度缓存(可选,死了的休惑自动清理) func ClearProgress(jobID uint64) { progressCache.mu.Lock() defer progressCache.mu.Unlock() delete(progressCache.progress, jobID) } // ==================== SQL构建 ==================== // Build 构建SQL查询 func (r *ExportQueryRepo) Build(req exporter.BuildRequest, whitelist map[string]bool) (string, []interface{}, error) { return exporter.BuildSQL(req, whitelist) } // BuildWithFields 构建SQL查询并返回实际使用的字段列表 func (r *ExportQueryRepo) BuildWithFields(req exporter.BuildRequest, whitelist map[string]bool) (string, []interface{}, []string, error) { return exporter.BuildSQLWithFields(req, whitelist) } // Explain 执行EXPLAIN分析 func (r *ExportQueryRepo) Explain(db *sql.DB, query string, args []interface{}) (int, []string, error) { return exporter.EvaluateExplain(db, query, args) } // ==================== 行数统计 ==================== // Count 使用基础查询统计行数 func (r *ExportQueryRepo) Count(db *sql.DB, baseQuery string, args []interface{}) int64 { return exporter.CountRows(db, baseQuery, args) } // CountByReq 使用BuildRequest统计行数(COUNT(DISTINCT)) func (r *ExportQueryRepo) CountByReq(db *sql.DB, req exporter.BuildRequest, whitelist map[string]bool) int64 { query, args, err := exporter.BuildCountSQL(req, whitelist) if err != nil { logging.Error("build_count_sql_error", err, nil) return 0 } var count int64 if err := db.QueryRow(query, args...).Scan(&count); err != nil { logging.Error("count_by_req_error", err, map[string]interface{}{ "sql": query, "args": args, }) return 0 } return count } // EstimateFast 快速估算行数 func (r *ExportQueryRepo) EstimateFast(db *sql.DB, datasource, mainTable string, filters map[string]interface{}) int64 { return exporter.CountRowsFast(db, datasource, mainTable, filters) } // EstimateFastChunked 快速估算行数(分块) func (r *ExportQueryRepo) EstimateFastChunked(db *sql.DB, datasource, mainTable string, filters map[string]interface{}) int64 { return exporter.CountRowsFastChunked(db, datasource, mainTable, filters) } // ==================== 游标操作 ==================== // NewCursor 创建游标 func (r *ExportQueryRepo) NewCursor(datasource, mainTable string) *exporter.CursorSQL { return exporter.NewCursorSQL(datasource, mainTable) } // ==================== 回调函数类型 ==================== // RowWriterFactory 写入器工厂函数 type RowWriterFactory func() (exporter.RowWriter, error) // RowTransform 行转换函数 type RowTransform func([]string) []string // RollCallback 文件滚动回调 type RollCallback func(path string, size int64, partRows int64) error // ProgressCallback 进度回调 type ProgressCallback func(totalRows int64) error // ==================== 流式导出 ==================== // StreamCursor 流式导出数据 func (r *ExportQueryRepo) StreamCursor( db *sql.DB, baseQuery string, args []interface{}, cursor *exporter.CursorSQL, batchSize int, columns []string, newWriter RowWriterFactory, transform RowTransform, maxRowsPerFile int64, onRoll RollCallback, onProgress ProgressCallback, ) (int64, []string, error) { return exporter.StreamWithCursor( db, baseQuery, args, cursor, batchSize, columns, func() (exporter.RowWriter, error) { return newWriter() }, func(vals []string) []string { return transform(vals) }, maxRowsPerFile, func(path string, size int64, rows int64) error { return onRoll(path, size, rows) }, func(total int64) error { return onProgress(total) }, ) } // ZipAndRecord 压缩文件并记录 func (r *ExportQueryRepo) ZipAndRecord(metaDB *sql.DB, jobID uint64, files []string, totalRows int64) { if len(files) == 0 { return } zipPath, zipSize := exporter.ZipFiles(jobID, files) now := time.Now() _, err := metaDB.Exec( "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", jobID, zipPath, totalRows, zipSize, now, now, ) if err != nil { logging.DBError("zip_and_record", jobID, err) } } // ==================== 模板元数据 ==================== // GetTemplateMeta 获取模板元数据 func (r *ExportQueryRepo) GetTemplateMeta(metaDB *sql.DB, templateID uint64) (datasource, mainTable string, fields []string, err error) { var fieldsJSON []byte row := metaDB.QueryRow( "SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", templateID, ) if err = row.Scan(&datasource, &mainTable, &fieldsJSON); err != nil { return "", "", nil, err } _ = json.Unmarshal(fieldsJSON, &fields) return datasource, mainTable, fields, nil } // GetJobFilters 获取任务的过滤条件 func (r *ExportQueryRepo) GetJobFilters(metaDB *sql.DB, jobID uint64) (templateID uint64, filtersJSON []byte, err error) { row := metaDB.QueryRow( "SELECT template_id, filters_json FROM export_jobs WHERE id=?", jobID, ) if err = row.Scan(&templateID, &filtersJSON); err != nil { return 0, nil, err } return templateID, filtersJSON, nil } // ==================== 任务管理 ==================== // InsertJob 插入新的导出任务 func (r *ExportQueryRepo) InsertJob( metaDB *sql.DB, templateID, requestedBy, ownerID uint64, permission, filters, options map[string]interface{}, explainResult map[string]interface{}, explainScore int, rowEstimate int64, fileFormat string, ) (uint64, error) { now := time.Now() insertSQL := `INSERT INTO export_jobs (template_id, status, requested_by, owner_id, permission_scope_json, filters_json, options_json, explain_json, explain_score, row_estimate, file_format, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)` args := []interface{}{ templateID, string(constants.JobStatusQueued), requestedBy, ownerID, toJSON(permission), toJSON(filters), toJSON(options), toJSON(explainResult), explainScore, rowEstimate, fileFormat, now, now, } result, err := metaDB.Exec(insertSQL, args...) if err != nil { return 0, err } id, _ := result.LastInsertId() return uint64(id), nil } // StartJob 标记任务开始执行 func (r *ExportQueryRepo) StartJob(metaDB *sql.DB, jobID uint64) { now := time.Now() _, err := metaDB.Exec( "UPDATE export_jobs SET status=?, started_at=?, updated_at=? WHERE id=?", string(constants.JobStatusRunning), now, now, jobID, ) if err != nil { logging.DBError("start_job", jobID, err) } } // UpdateProgress 更新任务进度 func (r *ExportQueryRepo) UpdateProgress(metaDB *sql.DB, jobID uint64, totalRows int64) { now := time.Now() _, err := metaDB.Exec( `UPDATE export_jobs SET total_rows=GREATEST(COALESCE(total_rows,0), ?), updated_at=?, status=CASE WHEN status='queued' THEN 'running' ELSE status END WHERE id=?`, totalRows, now, jobID, ) if err != nil { logging.DBError("update_progress", jobID, err) } logging.ExportProgress(jobID, totalRows) } // MarkFailed 标记任务失败 func (r *ExportQueryRepo) MarkFailed(metaDB *sql.DB, jobID uint64, reason string, context map[string]interface{}) { now := time.Now() // 记录失败原因和上下文(符合MarkFailed调用日志记录规范) logContext := map[string]interface{}{ "event": "mark_failed", "job_id": jobID, "reason": reason, "time": now, } if context != nil { for k, v := range context { logContext[k] = v } } logging.JSON("ERROR", logContext) _, err := metaDB.Exec( "UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", string(constants.JobStatusFailed), now, jobID, ) if err != nil { logging.DBError("mark_failed_update", jobID, err) } } // MarkCompleted 标记任务完成 func (r *ExportQueryRepo) MarkCompleted(metaDB *sql.DB, jobID uint64, totalRows int64) { now := time.Now() _, err := metaDB.Exec( `UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, row_estimate=GREATEST(COALESCE(row_estimate,0), ?), updated_at=? WHERE id=?`, string(constants.JobStatusCompleted), now, totalRows, totalRows, now, jobID, ) if err != nil { logging.DBError("mark_completed", jobID, err) } // 导出完成时清除缓存,释放内存 ClearProgress(jobID) } // InsertJobFile 插入任务文件记录 func (r *ExportQueryRepo) InsertJobFile(metaDB *sql.DB, jobID uint64, uri, sheetName string, rowCount, sizeBytes int64) { now := time.Now() _, err := metaDB.Exec( `INSERT INTO export_job_files (job_id, storage_uri, sheet_name, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?,?)`, jobID, uri, sheetName, rowCount, sizeBytes, now, now, ) if err != nil { logging.Error("insert_job_file", err, map[string]interface{}{ "job_id": jobID, "path": uri, }) } } // UpdateRowEstimate 更新行数估算 func (r *ExportQueryRepo) UpdateRowEstimate(metaDB *sql.DB, jobID uint64, estimate int64) { now := time.Now() _, err := metaDB.Exec( "UPDATE export_jobs SET row_estimate=?, updated_at=? WHERE id=?", estimate, now, jobID, ) if err != nil { logging.Error("update_row_estimate", err, map[string]interface{}{ "job_id": jobID, "row_estimate": estimate, }) } } // toJSON 将对象序列化为JSON func toJSON(v interface{}) []byte { b, _ := json.Marshal(v) return b } // ==================== 数据类型 ==================== // JobDetail 任务详情 type JobDetail struct { ID uint64 TemplateID uint64 Status string RequestedBy uint64 TotalRows sql.NullInt64 FileFormat string StartedAt sql.NullTime FinishedAt sql.NullTime CreatedAt time.Time UpdatedAt time.Time ExplainScore sql.NullInt64 ExplainJSON sql.NullString } // JobFile 任务文件 type JobFile struct { URI sql.NullString Sheet sql.NullString RowCount sql.NullInt64 SizeBytes sql.NullInt64 } // JobListItem 任务列表项 type JobListItem struct { ID uint64 TemplateID uint64 Status string RequestedBy uint64 RowEstimate sql.NullInt64 TotalRows sql.NullInt64 FileFormat string CreatedAt sql.NullTime UpdatedAt sql.NullTime ExplainScore sql.NullInt64 ExplainJSON sql.NullString } // ==================== 任务查询 ==================== // GetJob 获取任务详情 func (r *ExportQueryRepo) GetJob(metaDB *sql.DB, jobID string) (JobDetail, error) { querySQL := `SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE id=?` var detail JobDetail err := metaDB.QueryRow(querySQL, jobID).Scan( &detail.ID, &detail.TemplateID, &detail.Status, &detail.RequestedBy, &detail.TotalRows, &detail.FileFormat, &detail.StartedAt, &detail.FinishedAt, &detail.CreatedAt, &detail.UpdatedAt, &detail.ExplainScore, &detail.ExplainJSON, ) return detail, err } // GetRunningJobs 获取所有运行中的任务(用于服务重启恢复) func (r *ExportQueryRepo) GetRunningJobs(metaDB *sql.DB) ([]JobDetail, error) { querySQL := `SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE status=?` rows, err := metaDB.Query(querySQL, string(constants.JobStatusRunning)) if err != nil { return nil, err } defer rows.Close() var jobs []JobDetail for rows.Next() { var detail JobDetail if err := rows.Scan( &detail.ID, &detail.TemplateID, &detail.Status, &detail.RequestedBy, &detail.TotalRows, &detail.FileFormat, &detail.StartedAt, &detail.FinishedAt, &detail.CreatedAt, &detail.UpdatedAt, &detail.ExplainScore, &detail.ExplainJSON, ); err != nil { continue } jobs = append(jobs, detail) } return jobs, nil } // IncrementRestartCount 增加任务重启计数 func (r *ExportQueryRepo) IncrementRestartCount(metaDB *sql.DB, jobID uint64) { now := time.Now() _, err := metaDB.Exec( "UPDATE export_jobs SET restart_count = COALESCE(restart_count,0) + 1, updated_at=? WHERE id=?", now, jobID, ) if err != nil { logging.DBError("increment_restart_count", jobID, err) } } // ResetJobProgress 重置任务进度为0(用于任务恢复) func (r *ExportQueryRepo) ResetJobProgress(metaDB *sql.DB, jobID uint64) { now := time.Now() _, err := metaDB.Exec( "UPDATE export_jobs SET total_rows=0, updated_at=? WHERE id=?", now, jobID, ) if err != nil { logging.DBError("reset_job_progress", jobID, err) } } // ProgressTracker 内存中的进度跟踪器,减少数据库写入 type ProgressTracker struct { jobID uint64 totalRows int64 lastSyncRows int64 lastSyncTime time.Time metaDB *sql.DB syncInterval int64 // 每多少行同步一次(默认10000) timeLimitMS int64 // 最长多久同步一次(毫秒,默认5000) } // NewProgressTracker 创建进度跟踪器 func NewProgressTracker(jobID uint64, metaDB *sql.DB) *ProgressTracker { return &ProgressTracker{ jobID: jobID, totalRows: 0, lastSyncRows: 0, lastSyncTime: time.Now(), metaDB: metaDB, syncInterval: 100000, // 每100000行同步一次 timeLimitMS: 5000, // 最长5秒同步一次 } } // Update 更新进度到缓存(不同步数据库) func (pt *ProgressTracker) Update(totalRows int64) error { pt.totalRows = totalRows // 立即写入缓存,前端可以实时查询 SetProgress(pt.jobID, totalRows) return nil } // Sync 仅执行最终同步(应用于导出完成时) func (pt *ProgressTracker) Sync() error { // 取消同步数据库操作,缓存已在 Update 中实时更新 return nil } // FinalSync 导出完成时的最终同步 func (pt *ProgressTracker) FinalSync() error { if pt.metaDB == nil { return nil } now := time.Now() _, err := pt.metaDB.Exec( `UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id=?`, pt.totalRows, now, pt.jobID, ) if err != nil { logging.DBError("progress_tracker_final_sync", pt.jobID, err) return err } return nil } // ListJobFiles 获取任务文件列表 func (r *ExportQueryRepo) ListJobFiles(metaDB *sql.DB, jobID string) ([]JobFile, error) { rows, err := metaDB.Query( "SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id=?", jobID, ) if err != nil { return nil, err } defer rows.Close() var files []JobFile for rows.Next() { var file JobFile if err := rows.Scan(&file.URI, &file.Sheet, &file.RowCount, &file.SizeBytes); err != nil { continue } files = append(files, file) } return files, nil } // GetLatestFileURI 获取最新文件URI func (r *ExportQueryRepo) GetLatestFileURI(metaDB *sql.DB, jobID string) (string, error) { var uri string err := metaDB.QueryRow( "SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1", jobID, ).Scan(&uri) return uri, err } // CountJobs 统计任务数量 func (r *ExportQueryRepo) CountJobs(metaDB *sql.DB, templateID uint64, ownerID string) int64 { var count int64 var err error if templateID > 0 { if ownerID != "" { err = metaDB.QueryRow( "SELECT COUNT(1) FROM export_jobs WHERE template_id=? AND owner_id=?", templateID, ownerID, ).Scan(&count) } else { err = metaDB.QueryRow( "SELECT COUNT(1) FROM export_jobs WHERE template_id=?", templateID, ).Scan(&count) } } else { if ownerID != "" { err = metaDB.QueryRow( "SELECT COUNT(1) FROM export_jobs WHERE owner_id=?", ownerID, ).Scan(&count) } else { err = metaDB.QueryRow("SELECT COUNT(1) FROM export_jobs").Scan(&count) } } if err != nil { logging.Error("count_jobs", err, nil) } return count } // ListJobs 获取任务列表 func (r *ExportQueryRepo) ListJobs(metaDB *sql.DB, templateID uint64, ownerID string, pageSize, offset int) ([]JobListItem, error) { querySQL := `SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs` var args []interface{} var conditions []string if templateID > 0 { conditions = append(conditions, "template_id=?") args = append(args, templateID) } if ownerID != "" { conditions = append(conditions, "owner_id=?") args = append(args, ownerID) } if len(conditions) > 0 { querySQL += " WHERE " + joinStrings(conditions, " AND ") } querySQL += " ORDER BY id DESC LIMIT ? OFFSET ?" args = append(args, pageSize, offset) rows, err := metaDB.Query(querySQL, args...) if err != nil { return nil, err } defer rows.Close() var items []JobListItem for rows.Next() { var item JobListItem if err := rows.Scan( &item.ID, &item.TemplateID, &item.Status, &item.RequestedBy, &item.RowEstimate, &item.TotalRows, &item.FileFormat, &item.CreatedAt, &item.UpdatedAt, &item.ExplainScore, &item.ExplainJSON, ); err == nil { items = append(items, item) } } return items, nil } // joinStrings 连接字符串切片 func joinStrings(strs []string, sep string) string { if len(strs) == 0 { return "" } result := strs[0] for i := 1; i < len(strs); i++ { result += sep + strs[i] } return result }