257 lines
11 KiB
Go
257 lines
11 KiB
Go
package repo
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"server/internal/exporter"
|
|
"server/internal/logging"
|
|
"time"
|
|
)
|
|
|
|
type ExportQueryRepo struct{}
|
|
|
|
func NewExportRepo() *ExportQueryRepo { return &ExportQueryRepo{} }
|
|
|
|
func (r *ExportQueryRepo) Build(req exporter.BuildRequest, wl map[string]bool) (string, []interface{}, error) {
|
|
return exporter.BuildSQL(req, wl)
|
|
}
|
|
|
|
func (r *ExportQueryRepo) Explain(db *sql.DB, q string, args []interface{}) (int, []string, error) {
|
|
return exporter.EvaluateExplain(db, q, args)
|
|
}
|
|
|
|
func (r *ExportQueryRepo) Count(db *sql.DB, base string, args []interface{}) int64 {
|
|
return exporter.CountRows(db, base, args)
|
|
}
|
|
|
|
// Count by BuildRequest using filters-only joins and COUNT(DISTINCT main pk)
|
|
func (r *ExportQueryRepo) CountByReq(db *sql.DB, req exporter.BuildRequest, wl map[string]bool) int64 {
|
|
q, args, err := exporter.BuildCountSQL(req, wl)
|
|
if err != nil {
|
|
logging.JSON("ERROR", map[string]interface{}{"event": "build_count_sql_error", "error": err.Error()})
|
|
return 0
|
|
}
|
|
var c int64
|
|
row := db.QueryRow(q, args...)
|
|
if err := row.Scan(&c); err != nil {
|
|
logging.JSON("ERROR", map[string]interface{}{"event": "count_by_req_error", "error": err.Error(), "sql": q, "args": args})
|
|
return 0
|
|
}
|
|
return c
|
|
}
|
|
|
|
func (r *ExportQueryRepo) EstimateFast(db *sql.DB, ds, main string, filters map[string]interface{}) int64 {
|
|
return exporter.CountRowsFast(db, ds, main, filters)
|
|
}
|
|
|
|
func (r *ExportQueryRepo) EstimateFastChunked(db *sql.DB, ds, main string, filters map[string]interface{}) int64 {
|
|
return exporter.CountRowsFastChunked(db, ds, main, filters)
|
|
}
|
|
|
|
func (r *ExportQueryRepo) NewCursor(datasource, main string) *exporter.CursorSQL {
|
|
return exporter.NewCursorSQL(datasource, main)
|
|
}
|
|
|
|
type RowWriterFactory func() (exporter.RowWriter, error)
|
|
type RowTransform func([]string) []string
|
|
type RollCallback func(path string, size int64, partRows int64) error
|
|
type ProgressCallback func(totalRows int64) error
|
|
|
|
func (r *ExportQueryRepo) StreamCursor(db *sql.DB, base string, args []interface{}, cur *exporter.CursorSQL, batch int, cols []string, newWriter RowWriterFactory, transform RowTransform, maxRowsPerFile int64, onRoll RollCallback, onProgress ProgressCallback) (int64, []string, error) {
|
|
return exporter.StreamWithCursor(db, base, args, cur, batch, cols, func() (exporter.RowWriter, error) { return newWriter() }, func(vals []string) []string { return transform(vals) }, maxRowsPerFile, func(p string, sz int64, rows int64) error { return onRoll(p, sz, rows) }, func(total int64) error { return onProgress(total) })
|
|
}
|
|
|
|
func (r *ExportQueryRepo) ZipAndRecord(meta *sql.DB, jobID uint64, files []string, total int64) {
|
|
if len(files) == 0 {
|
|
return
|
|
}
|
|
zipPath, zipSize := exporter.ZipFiles(jobID, files)
|
|
meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", jobID, zipPath, total, zipSize, time.Now(), time.Now())
|
|
}
|
|
|
|
// Metadata and job helpers
|
|
func (r *ExportQueryRepo) GetTemplateMeta(meta *sql.DB, tplID uint64) (string, string, []string, error) {
|
|
var ds string
|
|
var main string
|
|
var fieldsJSON []byte
|
|
row := meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", tplID)
|
|
if err := row.Scan(&ds, &main, &fieldsJSON); err != nil {
|
|
return "", "", nil, err
|
|
}
|
|
var fs []string
|
|
_ = json.Unmarshal(fieldsJSON, &fs)
|
|
return ds, main, fs, nil
|
|
}
|
|
|
|
func (r *ExportQueryRepo) GetJobFilters(meta *sql.DB, jobID uint64) (uint64, []byte, error) {
|
|
var tplID uint64
|
|
var filtersJSON []byte
|
|
row := meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id= ?", jobID)
|
|
if err := row.Scan(&tplID, &filtersJSON); err != nil {
|
|
return 0, nil, err
|
|
}
|
|
return tplID, filtersJSON, nil
|
|
}
|
|
|
|
func (r *ExportQueryRepo) InsertJob(meta *sql.DB, tplID, requestedBy, owner uint64, permission, filters, options map[string]interface{}, explain map[string]interface{}, explainScore int, rowEstimate int64, fileFormat string) (uint64, error) {
|
|
ejSQL := "INSERT INTO export_jobs (template_id, status, requested_by, owner_id, permission_scope_json, filters_json, options_json, explain_json, explain_score, row_estimate, file_format, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)"
|
|
ejArgs := []interface{}{tplID, "queued", requestedBy, owner, toJSON(permission), toJSON(filters), toJSON(options), toJSON(explain), explainScore, rowEstimate, fileFormat, time.Now(), time.Now()}
|
|
res, err := meta.Exec(ejSQL, ejArgs...)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
id, _ := res.LastInsertId()
|
|
return uint64(id), nil
|
|
}
|
|
|
|
func (r *ExportQueryRepo) StartJob(meta *sql.DB, id uint64) {
|
|
if _, err := meta.Exec("UPDATE export_jobs SET status=?, started_at=?, updated_at=? WHERE id= ?", "running", time.Now(), time.Now(), id); err != nil {
|
|
logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "start_job", "job_id": id, "error": err.Error()})
|
|
}
|
|
}
|
|
func (r *ExportQueryRepo) UpdateProgress(meta *sql.DB, id uint64, total int64) {
|
|
if _, err := meta.Exec("UPDATE export_jobs SET total_rows=GREATEST(COALESCE(total_rows,0), ?), updated_at=?, status=CASE WHEN status='queued' THEN 'running' ELSE status END WHERE id= ?", total, time.Now(), id); err != nil {
|
|
logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "update_progress", "job_id": id, "error": err.Error()})
|
|
}
|
|
logging.JSON("INFO", map[string]interface{}{"event": "progress_update", "job_id": id, "total_rows": total})
|
|
}
|
|
func (r *ExportQueryRepo) MarkFailed(meta *sql.DB, id uint64) {
|
|
if _, err := meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id); err != nil {
|
|
logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "mark_failed", "job_id": id, "error": err.Error()})
|
|
}
|
|
}
|
|
func (r *ExportQueryRepo) MarkCompleted(meta *sql.DB, id uint64, total int64) {
|
|
if _, err := meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, row_estimate=GREATEST(COALESCE(row_estimate,0), ?), updated_at=? WHERE id= ?", "completed", time.Now(), total, total, time.Now(), id); err != nil {
|
|
logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "mark_completed", "job_id": id, "error": err.Error()})
|
|
}
|
|
}
|
|
func (r *ExportQueryRepo) InsertJobFile(meta *sql.DB, id uint64, uri string, sheetName string, rowCount, size int64) {
|
|
if _, err := meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, sheet_name, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?,?)", id, uri, sheetName, rowCount, size, time.Now(), time.Now()); err != nil {
|
|
logging.JSON("ERROR", map[string]interface{}{"event": "db_insert_error", "action": "insert_job_file", "job_id": id, "error": err.Error(), "path": uri})
|
|
}
|
|
}
|
|
|
|
func (r *ExportQueryRepo) UpdateRowEstimate(meta *sql.DB, id uint64, est int64) {
|
|
if _, err := meta.Exec("UPDATE export_jobs SET row_estimate=?, updated_at=? WHERE id= ?", est, time.Now(), id); err != nil {
|
|
logging.JSON("ERROR", map[string]interface{}{"event": "db_update_error", "action": "update_row_estimate", "job_id": id, "error": err.Error(), "row_estimate": est})
|
|
}
|
|
}
|
|
|
|
func toJSON(v interface{}) []byte {
|
|
b, _ := json.Marshal(v)
|
|
return b
|
|
}
|
|
|
|
type JobDetail struct {
|
|
ID uint64
|
|
TemplateID uint64
|
|
Status string
|
|
RequestedBy uint64
|
|
TotalRows sql.NullInt64
|
|
FileFormat string
|
|
StartedAt sql.NullTime
|
|
FinishedAt sql.NullTime
|
|
CreatedAt time.Time
|
|
UpdatedAt time.Time
|
|
ExplainScore sql.NullInt64
|
|
ExplainJSON sql.NullString
|
|
}
|
|
|
|
type JobFile struct {
|
|
URI sql.NullString
|
|
Sheet sql.NullString
|
|
RowCount sql.NullInt64
|
|
SizeBytes sql.NullInt64
|
|
}
|
|
|
|
type JobListItem struct {
|
|
ID uint64
|
|
TemplateID uint64
|
|
Status string
|
|
RequestedBy uint64
|
|
RowEstimate sql.NullInt64
|
|
TotalRows sql.NullInt64
|
|
FileFormat string
|
|
CreatedAt sql.NullTime
|
|
UpdatedAt sql.NullTime
|
|
ExplainScore sql.NullInt64
|
|
ExplainJSON sql.NullString
|
|
}
|
|
|
|
func (r *ExportQueryRepo) GetJob(meta *sql.DB, id string) (JobDetail, error) {
|
|
row := meta.QueryRow("SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE id= ?", id)
|
|
var d JobDetail
|
|
err := row.Scan(&d.ID, &d.TemplateID, &d.Status, &d.RequestedBy, &d.TotalRows, &d.FileFormat, &d.StartedAt, &d.FinishedAt, &d.CreatedAt, &d.UpdatedAt, &d.ExplainScore, &d.ExplainJSON)
|
|
return d, err
|
|
}
|
|
|
|
func (r *ExportQueryRepo) ListJobFiles(meta *sql.DB, jobID string) ([]JobFile, error) {
|
|
rows, err := meta.Query("SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id= ?", jobID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := []JobFile{}
|
|
for rows.Next() {
|
|
var f JobFile
|
|
rows.Scan(&f.URI, &f.Sheet, &f.RowCount, &f.SizeBytes)
|
|
out = append(out, f)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (r *ExportQueryRepo) GetLatestFileURI(meta *sql.DB, jobID string) (string, error) {
|
|
row := meta.QueryRow("SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1", jobID)
|
|
var uri string
|
|
err := row.Scan(&uri)
|
|
return uri, err
|
|
}
|
|
|
|
func (r *ExportQueryRepo) CountJobs(meta *sql.DB, tplID uint64, owner string) int64 {
|
|
var c int64
|
|
if tplID > 0 {
|
|
if owner != "" {
|
|
_ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ? AND owner_id = ?", tplID, owner).Scan(&c)
|
|
} else {
|
|
_ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ?", tplID).Scan(&c)
|
|
}
|
|
} else {
|
|
if owner != "" {
|
|
_ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE owner_id = ?", owner).Scan(&c)
|
|
} else {
|
|
_ = meta.QueryRow("SELECT COUNT(1) FROM export_jobs").Scan(&c)
|
|
}
|
|
}
|
|
return c
|
|
}
|
|
|
|
func (r *ExportQueryRepo) ListJobs(meta *sql.DB, tplID uint64, owner string, size, offset int) ([]JobListItem, error) {
|
|
var rows *sql.Rows
|
|
var err error
|
|
if tplID > 0 {
|
|
if owner != "" {
|
|
rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? AND owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, owner, size, offset)
|
|
} else {
|
|
rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, size, offset)
|
|
}
|
|
} else {
|
|
if owner != "" {
|
|
rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", owner, size, offset)
|
|
} else {
|
|
rows, err = meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs ORDER BY id DESC LIMIT ? OFFSET ?", size, offset)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []JobListItem{}
|
|
for rows.Next() {
|
|
var it JobListItem
|
|
if err := rows.Scan(&it.ID, &it.TemplateID, &it.Status, &it.RequestedBy, &it.RowEstimate, &it.TotalRows, &it.FileFormat, &it.CreatedAt, &it.UpdatedAt, &it.ExplainScore, &it.ExplainJSON); err == nil {
|
|
items = append(items, it)
|
|
}
|
|
}
|
|
return items, nil
|
|
}
|