MarketingSystemDataExportTool/server/internal/repo/export_repo.go

257 lines
11 KiB
Go

package repo
import (
"database/sql"
"encoding/json"
"server/internal/exporter"
"server/internal/logging"
"time"
)
type ExportQueryRepo struct{}
func NewExportRepo() *ExportQueryRepo { return &ExportQueryRepo{} }
func (r *ExportQueryRepo) Build(req exporter.BuildRequest, wl map[string]bool) (string, []interface{}, error) {
return exporter.BuildSQL(req, wl)
}
func (r *ExportQueryRepo) Explain(db *sql.DB, q string, args []interface{}) (int, []string, error) {
return exporter.EvaluateExplain(db, q, args)
}
func (r *ExportQueryRepo) Count(db *sql.DB, base string, args []interface{}) int64 {
return exporter.CountRows(db, base, args)
}
// Count by BuildRequest using filters-only joins and COUNT(DISTINCT main pk)
func (r *ExportQueryRepo) CountByReq(db *sql.DB, req exporter.BuildRequest, wl map[string]bool) int64 {
q, args, err := exporter.BuildCountSQL(req, wl)
if err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "build_count_sql_error", "error": err.Error()})
return 0
}
var c int64
row := db.QueryRow(q, args...)
if err := row.Scan(&c); err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "count_by_req_error", "error": err.Error(), "sql": q, "args": args})
return 0
}
return c
}
func (r *ExportQueryRepo) EstimateFast(db *sql.DB, ds, main string, filters map[string]interface{}) int64 {
return exporter.CountRowsFast(db, ds, main, filters)
}
func (r *ExportQueryRepo) EstimateFastChunked(db *sql.DB, ds, main string, filters map[string]interface{}) int64 {
return exporter.CountRowsFastChunked(db, ds, main, filters)
}
func (r *ExportQueryRepo) NewCursor(datasource, main string) *exporter.CursorSQL {
return exporter.NewCursorSQL(datasource, main)
}
type RowWriterFactory func() (exporter.RowWriter, error)
type RowTransform func([]string) []string
type RollCallback func(path string, size int64, partRows int64) error
type ProgressCallback func(totalRows int64) error
func (r *ExportQueryRepo) StreamCursor(db *sql.DB, base string, args []interface{}, cur *exporter.CursorSQL, batch int, cols []string, newWriter RowWriterFactory, transform RowTransform, maxRowsPerFile int64, onRoll RollCallback, onProgress ProgressCallback) (int64, []string, error) {
return exporter.StreamWithCursor(db, base, args, cur, batch, cols, func() (exporter.RowWriter, error) { return newWriter() }, func(vals []string) []string { return transform(vals) }, maxRowsPerFile, func(p string, sz int64, rows int64) error { return onRoll(p, sz, rows) }, func(total int64) error { return onProgress(total) })
}
func (r *ExportQueryRepo) ZipAndRecord(meta *sql.DB, jobID uint64, files []string, total int64) {
if len(files) == 0 {
return
}
zipPath, zipSize := exporter.ZipFiles(jobID, files)
meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", jobID, zipPath, total, zipSize, time.Now(), time.Now())
}
// Metadata and job helpers
func (r *ExportQueryRepo) GetTemplateMeta(meta *sql.DB, tplID uint64) (string, string, []string, error) {
var ds string
var main string
var fieldsJSON []byte
row := meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", tplID)
if err := row.Scan(&ds, &main, &fieldsJSON); err != nil {
return "", "", nil, err
}
var fs []string
_ = json.Unmarshal(fieldsJSON, &fs)
return ds, main, fs, nil
}
func (r *ExportQueryRepo) GetJobFilters(meta *sql.DB, jobID uint64) (uint64, []byte, error) {
var tplID uint64
var filtersJSON []byte
row := meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id= ?", jobID)
if err := row.Scan(&tplID, &filtersJSON); err != nil {
return 0, nil, err
}
return tplID, filtersJSON, nil
}
func (r *ExportQueryRepo) InsertJob(meta *sql.DB, tplID, requestedBy, owner uint64, permission, filters, options map[string]interface{}, explain map[string]interface{}, explainScore int, rowEstimate int64, fileFormat string) (uint64, error) {
ejSQL := "INSERT INTO export_jobs (template_id, status, requested_by, owner_id, permission_scope_json, filters_json, options_json, explain_json, explain_score, row_estimate, file_format, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)"
ejArgs := []interface{}{tplID, "queued", requestedBy, owner, toJSON(permission), toJSON(filters), toJSON(options), toJSON(explain), explainScore, rowEstimate, fileFormat, time.Now(), time.Now()}
res, err := meta.Exec(ejSQL, ejArgs...)
if err != nil {
return 0, err
}
id, _ := res.LastInsertId()
return uint64(id), nil
}
func (r *ExportQueryRepo) StartJob(meta *sql.DB, id uint64) {
if _, err := meta.Exec("UPDATE export_jobs SET status=?, started_at=?, updated_at=? WHERE id= ?", "running", time.Now(), time.Now(), id); err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "start_job", "job_id": id, "error": err.Error()})
}
}
func (r *ExportQueryRepo) UpdateProgress(meta *sql.DB, id uint64, total int64) {
if _, err := meta.Exec("UPDATE export_jobs SET total_rows=GREATEST(COALESCE(total_rows,0), ?), updated_at=?, status=CASE WHEN status='queued' THEN 'running' ELSE status END WHERE id= ?", total, time.Now(), id); err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "update_progress", "job_id": id, "error": err.Error()})
}
logging.JSON("INFO", map[string]interface{}{"event": "progress_update", "job_id": id, "total_rows": total})
}
func (r *ExportQueryRepo) MarkFailed(meta *sql.DB, id uint64) {
if _, err := meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id); err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "mark_failed", "job_id": id, "error": err.Error()})
}
}
func (r *ExportQueryRepo) MarkCompleted(meta *sql.DB, id uint64, total int64) {
if _, err := meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, row_estimate=GREATEST(COALESCE(row_estimate,0), ?), updated_at=? WHERE id= ?", "completed", time.Now(), total, total, time.Now(), id); err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "mark_completed", "job_id": id, "error": err.Error()})
}
}
func (r *ExportQueryRepo) InsertJobFile(meta *sql.DB, id uint64, uri string, sheetName string, rowCount, size int64) {
if _, err := meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, sheet_name, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?,?)", id, uri, sheetName, rowCount, size, time.Now(), time.Now()); err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "db_insert_error", "action": "insert_job_file", "job_id": id, "error": err.Error(), "path": uri})
}
}
func (r *ExportQueryRepo) UpdateRowEstimate(meta *sql.DB, id uint64, est int64) {
if _, err := meta.Exec("UPDATE export_jobs SET row_estimate=?, updated_at=? WHERE id= ?", est, time.Now(), id); err != nil {
logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "update_row_estimate", "job_id": id, "error": err.Error(), "row_estimate": est})
}
}
func toJSON(v interface{}) []byte {
b, _ := json.Marshal(v)
return b
}
type JobDetail struct {
ID uint64
TemplateID uint64
Status string
RequestedBy uint64
TotalRows sql.NullInt64
FileFormat string
StartedAt sql.NullTime
FinishedAt sql.NullTime
CreatedAt time.Time
UpdatedAt time.Time
ExplainScore sql.NullInt64
ExplainJSON sql.NullString
}
type JobFile struct {
URI sql.NullString
Sheet sql.NullString
RowCount sql.NullInt64
SizeBytes sql.NullInt64
}
type JobListItem struct {
ID uint64
TemplateID uint64
Status string
RequestedBy uint64
RowEstimate sql.NullInt64
TotalRows sql.NullInt64
FileFormat string
CreatedAt sql.NullTime
UpdatedAt sql.NullTime
ExplainScore sql.NullInt64
ExplainJSON sql.NullString
}
func (r *ExportQueryRepo) GetJob(meta *sql.DB, id string) (JobDetail, error) {
row := meta.QueryRow("SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE id= ?", id)
var d JobDetail
err := row.Scan(&d.ID, &d.TemplateID, &d.Status, &d.RequestedBy, &d.TotalRows, &d.FileFormat, &d.StartedAt, &d.FinishedAt, &d.CreatedAt, &d.UpdatedAt, &d.ExplainScore, &d.ExplainJSON)
return d, err
}
func (r *ExportQueryRepo) ListJobFiles(meta *sql.DB, jobID string) ([]JobFile, error) {
rows, err := meta.Query("SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id= ?", jobID)
if err != nil {
return nil, err
}
defer rows.Close()
out := []JobFile{}
for rows.Next() {
var f JobFile
rows.Scan(&f.URI, &f.Sheet, &f.RowCount, &f.SizeBytes)
out = append(out, f)
}
return out, nil
}
func (r *ExportQueryRepo) GetLatestFileURI(meta *sql.DB, jobID string) (string, error) {
row := meta.QueryRow("SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1", jobID)
var uri string
err := row.Scan(&uri)
return uri, err
}
func (r *ExportQueryRepo) CountJobs(meta *sql.DB, tplID uint64, owner string) int64 {
var c int64
if tplID > 0 {
if owner != "" {
_ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ? AND owner_id = ?", tplID, owner).Scan(&c)
} else {
_ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ?", tplID).Scan(&c)
}
} else {
if owner != "" {
_ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE owner_id = ?", owner).Scan(&c)
} else {
_ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs").Scan(&c)
}
}
return c
}
func (r *ExportQueryRepo) ListJobs(meta *sql.DB, tplID uint64, owner string, size, offset int) ([]JobListItem, error) {
var rows *sql.Rows
var err error
if tplID > 0 {
if owner != "" {
rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? AND owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, owner, size, offset)
} else {
rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, size, offset)
}
} else {
if owner != "" {
rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", owner, size, offset)
} else {
rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs ORDER BY id DESC LIMIT ? OFFSET ?", size, offset)
}
}
if err != nil {
return nil, err
}
defer rows.Close()
items := []JobListItem{}
for rows.Next() {
var it JobListItem
if err := rows.Scan(&it.ID, &it.TemplateID, &it.Status, &it.RequestedBy, &it.RowEstimate, &it.TotalRows, &it.FileFormat, &it.CreatedAt, &it.UpdatedAt, &it.ExplainScore, &it.ExplainJSON); err == nil {
items = append(items, it)
}
}
return items, nil
}