package repo import ( "database/sql" "encoding/json" "server/internal/exporter" "server/internal/logging" "time" ) type ExportQueryRepo struct{} func NewExportRepo() *ExportQueryRepo { return &ExportQueryRepo{} } func (r *ExportQueryRepo) Build(req exporter.BuildRequest, wl map[string]bool) (string, []interface{}, error) { return exporter.BuildSQL(req, wl) } func (r *ExportQueryRepo) Explain(db *sql.DB, q string, args []interface{}) (int, []string, error) { return exporter.EvaluateExplain(db, q, args) } func (r *ExportQueryRepo) Count(db *sql.DB, base string, args []interface{}) int64 { return exporter.CountRows(db, base, args) } // Count by BuildRequest using filters-only joins and COUNT(DISTINCT main pk) func (r *ExportQueryRepo) CountByReq(db *sql.DB, req exporter.BuildRequest, wl map[string]bool) int64 { q, args, err := exporter.BuildCountSQL(req, wl) if err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "build_count_sql_error", "error": err.Error()}) return 0 } var c int64 row := db.QueryRow(q, args...) if err := row.Scan(&c); err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "count_by_req_error", "error": err.Error(), "sql": q, "args": args}) return 0 } return c } func (r *ExportQueryRepo) EstimateFast(db *sql.DB, ds, main string, filters map[string]interface{}) int64 { return exporter.CountRowsFast(db, ds, main, filters) } func (r *ExportQueryRepo) EstimateFastChunked(db *sql.DB, ds, main string, filters map[string]interface{}) int64 { return exporter.CountRowsFastChunked(db, ds, main, filters) } func (r *ExportQueryRepo) NewCursor(datasource, main string) *exporter.CursorSQL { return exporter.NewCursorSQL(datasource, main) } type RowWriterFactory func() (exporter.RowWriter, error) type RowTransform func([]string) []string type RollCallback func(path string, size int64, partRows int64) error type ProgressCallback func(totalRows int64) error func (r *ExportQueryRepo) StreamCursor(db *sql.DB, base string, args []interface{}, cur *exporter.CursorSQL, batch int, cols []string, newWriter RowWriterFactory, transform RowTransform, maxRowsPerFile int64, onRoll RollCallback, onProgress ProgressCallback) (int64, []string, error) { return exporter.StreamWithCursor(db, base, args, cur, batch, cols, func() (exporter.RowWriter, error) { return newWriter() }, func(vals []string) []string { return transform(vals) }, maxRowsPerFile, func(p string, sz int64, rows int64) error { return onRoll(p, sz, rows) }, func(total int64) error { return onProgress(total) }) } func (r *ExportQueryRepo) ZipAndRecord(meta *sql.DB, jobID uint64, files []string, total int64) { if len(files) == 0 { return } zipPath, zipSize := exporter.ZipFiles(jobID, files) meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", jobID, zipPath, total, zipSize, time.Now(), time.Now()) } // Metadata and job helpers func (r *ExportQueryRepo) GetTemplateMeta(meta *sql.DB, tplID uint64) (string, string, []string, error) { var ds string var main string var fieldsJSON []byte row := meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", tplID) if err := row.Scan(&ds, &main, &fieldsJSON); err != nil { return "", "", nil, err } var fs []string _ = json.Unmarshal(fieldsJSON, &fs) return ds, main, fs, nil } func (r *ExportQueryRepo) GetJobFilters(meta *sql.DB, jobID uint64) (uint64, []byte, error) { var tplID uint64 var filtersJSON []byte row := meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id= ?", jobID) if err := row.Scan(&tplID, &filtersJSON); err != nil { return 0, nil, err } return tplID, filtersJSON, nil } func (r *ExportQueryRepo) InsertJob(meta *sql.DB, tplID, requestedBy, owner uint64, permission, filters, options map[string]interface{}, explain map[string]interface{}, explainScore int, rowEstimate int64, fileFormat string) (uint64, error) { ejSQL := "INSERT INTO export_jobs (template_id, status, requested_by, owner_id, permission_scope_json, filters_json, options_json, explain_json, explain_score, row_estimate, file_format, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)" ejArgs := []interface{}{tplID, "queued", requestedBy, owner, toJSON(permission), toJSON(filters), toJSON(options), toJSON(explain), explainScore, rowEstimate, fileFormat, time.Now(), time.Now()} res, err := meta.Exec(ejSQL, ejArgs...) if err != nil { return 0, err } id, _ := res.LastInsertId() return uint64(id), nil } func (r *ExportQueryRepo) StartJob(meta *sql.DB, id uint64) { if _, err := meta.Exec("UPDATE export_jobs SET status=?, started_at=?, updated_at=? WHERE id= ?", "running", time.Now(), time.Now(), id); err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "start_job", "job_id": id, "error": err.Error()}) } } func (r *ExportQueryRepo) UpdateProgress(meta *sql.DB, id uint64, total int64) { if _, err := meta.Exec("UPDATE export_jobs SET total_rows=GREATEST(COALESCE(total_rows,0), ?), updated_at=?, status=CASE WHEN status='queued' THEN 'running' ELSE status END WHERE id= ?", total, time.Now(), id); err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "update_progress", "job_id": id, "error": err.Error()}) } logging.JSON("INFO", map[string]interface{}{"event": "progress_update", "job_id": id, "total_rows": total}) } func (r *ExportQueryRepo) MarkFailed(meta *sql.DB, id uint64) { if _, err := meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id); err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "mark_failed", "job_id": id, "error": err.Error()}) } } func (r *ExportQueryRepo) MarkCompleted(meta *sql.DB, id uint64, total int64) { if _, err := meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, row_estimate=GREATEST(COALESCE(row_estimate,0), ?), updated_at=? WHERE id= ?", "completed", time.Now(), total, total, time.Now(), id); err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "mark_completed", "job_id": id, "error": err.Error()}) } } func (r *ExportQueryRepo) InsertJobFile(meta *sql.DB, id uint64, uri string, sheetName string, rowCount, size int64) { if _, err := meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, sheet_name, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?,?)", id, uri, sheetName, rowCount, size, time.Now(), time.Now()); err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "db_insert_error", "action": "insert_job_file", "job_id": id, "error": err.Error(), "path": uri}) } } func (r *ExportQueryRepo) UpdateRowEstimate(meta *sql.DB, id uint64, est int64) { if _, err := meta.Exec("UPDATE export_jobs SET row_estimate=?, updated_at=? WHERE id= ?", est, time.Now(), id); err != nil { logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "update_row_estimate", "job_id": id, "error": err.Error(), "row_estimate": est}) } } func toJSON(v interface{}) []byte { b, _ := json.Marshal(v) return b } type JobDetail struct { ID uint64 TemplateID uint64 Status string RequestedBy uint64 TotalRows sql.NullInt64 FileFormat string StartedAt sql.NullTime FinishedAt sql.NullTime CreatedAt time.Time UpdatedAt time.Time ExplainScore sql.NullInt64 ExplainJSON sql.NullString } type JobFile struct { URI sql.NullString Sheet sql.NullString RowCount sql.NullInt64 SizeBytes sql.NullInt64 } type JobListItem struct { ID uint64 TemplateID uint64 Status string RequestedBy uint64 RowEstimate sql.NullInt64 TotalRows sql.NullInt64 FileFormat string CreatedAt sql.NullTime UpdatedAt sql.NullTime ExplainScore sql.NullInt64 ExplainJSON sql.NullString } func (r *ExportQueryRepo) GetJob(meta *sql.DB, id string) (JobDetail, error) { row := meta.QueryRow("SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE id= ?", id) var d JobDetail err := row.Scan(&d.ID, &d.TemplateID, &d.Status, &d.RequestedBy, &d.TotalRows, &d.FileFormat, &d.StartedAt, &d.FinishedAt, &d.CreatedAt, &d.UpdatedAt, &d.ExplainScore, &d.ExplainJSON) return d, err } func (r *ExportQueryRepo) ListJobFiles(meta *sql.DB, jobID string) ([]JobFile, error) { rows, err := meta.Query("SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id= ?", jobID) if err != nil { return nil, err } defer rows.Close() out := []JobFile{} for rows.Next() { var f JobFile rows.Scan(&f.URI, &f.Sheet, &f.RowCount, &f.SizeBytes) out = append(out, f) } return out, nil } func (r *ExportQueryRepo) GetLatestFileURI(meta *sql.DB, jobID string) (string, error) { row := meta.QueryRow("SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1", jobID) var uri string err := row.Scan(&uri) return uri, err } func (r *ExportQueryRepo) CountJobs(meta *sql.DB, tplID uint64, owner string) int64 { var c int64 if tplID > 0 { if owner != "" { _ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ? AND owner_id = ?", tplID, owner).Scan(&c) } else { _ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ?", tplID).Scan(&c) } } else { if owner != "" { _ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE owner_id = ?", owner).Scan(&c) } else { _ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs").Scan(&c) } } return c } func (r *ExportQueryRepo) ListJobs(meta *sql.DB, tplID uint64, owner string, size, offset int) ([]JobListItem, error) { var rows *sql.Rows var err error if tplID > 0 { if owner != "" { rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? AND owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, owner, size, offset) } else { rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, size, offset) } } else { if owner != "" { rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", owner, size, offset) } else { rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs ORDER BY id DESC LIMIT ? OFFSET ?", size, offset) } } if err != nil { return nil, err } defer rows.Close() items := []JobListItem{} for rows.Next() { var it JobListItem if err := rows.Scan(&it.ID, &it.TemplateID, &it.Status, &it.RequestedBy, &it.RowEstimate, &it.TotalRows, &it.FileFormat, &it.CreatedAt, &it.UpdatedAt, &it.ExplainScore, &it.ExplainJSON); err == nil { items = append(items, it) } } return items, nil }