1132 lines
38 KiB
Go
1132 lines
38 KiB
Go
package api
|
||
|
||
import (
|
||
"archive/zip"
|
||
"database/sql"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"marketing-system-data-tool/server/internal/exporter"
|
||
"marketing-system-data-tool/server/internal/ymtcrypto"
|
||
"math/big"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
)
|
||
|
||
type ExportsAPI struct {
|
||
meta *sql.DB
|
||
marketing *sql.DB
|
||
}
|
||
|
||
func ExportsHandler(meta, marketing *sql.DB) http.Handler {
|
||
api := &ExportsAPI{meta: meta, marketing: marketing}
|
||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
|
||
if r.Method == http.MethodPost && p == "" {
|
||
api.create(w, r)
|
||
return
|
||
}
|
||
if r.Method == http.MethodGet && p == "" {
|
||
api.list(w, r)
|
||
return
|
||
}
|
||
if strings.HasPrefix(p, "/") {
|
||
id := strings.TrimPrefix(p, "/")
|
||
if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") {
|
||
if strings.HasSuffix(p, "/sql") {
|
||
id = strings.TrimSuffix(id, "/sql")
|
||
api.getSQL(w, r, id)
|
||
return
|
||
}
|
||
api.get(w, r, id)
|
||
return
|
||
}
|
||
if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") {
|
||
id = strings.TrimSuffix(id, "/download")
|
||
api.download(w, r, id)
|
||
return
|
||
}
|
||
if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") {
|
||
id = strings.TrimSuffix(id, "/cancel")
|
||
api.cancel(w, r, id)
|
||
return
|
||
}
|
||
}
|
||
w.WriteHeader(http.StatusNotFound)
|
||
})
|
||
}
|
||
|
||
func (a *ExportsAPI) ensureOwnerColumn() {
|
||
// Try to add owner_id column if not exists; ignore errors
|
||
_, _ = a.meta.Exec("ALTER TABLE export_jobs ADD COLUMN owner_id BIGINT UNSIGNED NOT NULL DEFAULT 0")
|
||
}
|
||
|
||
type ExportPayload struct {
|
||
TemplateID uint64 `json:"template_id"`
|
||
RequestedBy uint64 `json:"requested_by"`
|
||
Permission map[string]interface{} `json:"permission"`
|
||
Options map[string]interface{} `json:"options"`
|
||
FileFormat string `json:"file_format"`
|
||
Filters map[string]interface{} `json:"filters"`
|
||
Datasource string `json:"datasource"`
|
||
}
|
||
|
||
func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
|
||
a.ensureOwnerColumn()
|
||
b, _ := io.ReadAll(r.Body)
|
||
var p ExportPayload
|
||
json.Unmarshal(b, &p)
|
||
r = WithPayload(r, p)
|
||
var main string
|
||
var ds string
|
||
var fields []byte
|
||
log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), "SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", []interface{}{p.TemplateID})
|
||
row := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id= ?", p.TemplateID)
|
||
err := row.Scan(&ds, &main, &fields)
|
||
if err != nil {
|
||
fail(w, r, http.StatusBadRequest, "invalid template")
|
||
return
|
||
}
|
||
if p.Datasource != "" {
|
||
ds = p.Datasource
|
||
}
|
||
var fs []string
|
||
json.Unmarshal(fields, &fs)
|
||
wl := Whitelist()
|
||
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: p.Filters}
|
||
q, args, err := exporter.BuildSQL(req, wl)
|
||
if err != nil {
|
||
r = WithSQL(r, q)
|
||
fail(w, r, http.StatusBadRequest, err.Error())
|
||
return
|
||
}
|
||
r = WithSQL(r, q)
|
||
dataDB := a.selectDataDB(ds)
|
||
score, sugg, err := exporter.EvaluateExplain(dataDB, q, args)
|
||
if err != nil {
|
||
fail(w, r, http.StatusBadRequest, err.Error())
|
||
return
|
||
}
|
||
sugg = append(sugg, exporter.IndexSuggestions(req)...)
|
||
const passThreshold = 60
|
||
if score < passThreshold {
|
||
fail(w, r, http.StatusBadRequest, fmt.Sprintf("EXPLAIN 未通过:评分=%d,请优化索引或缩小查询范围", score))
|
||
return
|
||
}
|
||
var estimate int64
|
||
func() {
|
||
idx := strings.Index(q, " FROM ")
|
||
if idx > 0 {
|
||
cq := "SELECT COUNT(1)" + q[idx:]
|
||
row := dataDB.QueryRow(cq, args...)
|
||
var cnt int64
|
||
if err := row.Scan(&cnt); err == nil {
|
||
estimate = cnt
|
||
return
|
||
}
|
||
}
|
||
estimate = 0
|
||
}()
|
||
labels := FieldLabels()
|
||
hdrs := make([]string, len(fs))
|
||
for i, tf := range fs {
|
||
if v, ok := labels[tf]; ok {
|
||
hdrs[i] = v
|
||
} else {
|
||
hdrs[i] = tf
|
||
}
|
||
}
|
||
// owner from query userId if provided
|
||
owner := uint64(0)
|
||
if uidStr := r.URL.Query().Get("userId"); uidStr != "" {
|
||
if n, err := strconv.ParseUint(uidStr, 10, 64); err == nil {
|
||
owner = n
|
||
}
|
||
}
|
||
ejSQL := "INSERT INTO export_jobs (template_id, status, requested_by, owner_id, permission_scope_json, filters_json, options_json, explain_json, explain_score, row_estimate, file_format, created_at, updated_at) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)"
|
||
ejArgs := []interface{}{p.TemplateID, "queued", p.RequestedBy, owner, toJSON(p.Permission), toJSON(p.Filters), toJSON(p.Options), toJSON(map[string]interface{}{"sql": q, "suggestions": sugg}), score, estimate, p.FileFormat, time.Now(), time.Now()}
|
||
log.Printf("trace_id=%s sql=%s args=%v", TraceIDFrom(r), ejSQL, ejArgs)
|
||
res, err := a.meta.Exec(ejSQL, ejArgs...)
|
||
if err != nil {
|
||
fail(w, r, http.StatusInternalServerError, err.Error())
|
||
return
|
||
}
|
||
id, _ := res.LastInsertId()
|
||
go a.runJob(uint64(id), dataDB, q, args, fs, hdrs, p.FileFormat)
|
||
ok(w, r, map[string]interface{}{"id": id})
|
||
}
|
||
|
||
func (a *ExportsAPI) runJob(id uint64, db *sql.DB, q string, args []interface{}, fields []string, cols []string, fmt string) {
|
||
// load datasource once for transform decisions
|
||
var jobTplID uint64
|
||
var jobDS string
|
||
{
|
||
row := a.meta.QueryRow("SELECT template_id FROM export_jobs WHERE id=?", id)
|
||
_ = row.Scan(&jobTplID)
|
||
if jobTplID > 0 {
|
||
tr := a.meta.QueryRow("SELECT datasource FROM export_templates WHERE id=?", jobTplID)
|
||
_ = tr.Scan(&jobDS)
|
||
}
|
||
}
|
||
log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, started_at=? WHERE id= ?", []interface{}{"running", time.Now(), id})
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, started_at=?, updated_at=? WHERE id= ?", "running", time.Now(), time.Now(), id)
|
||
if fmt == "csv" {
|
||
w, err := exporter.NewCSVWriter("storage", "export")
|
||
if err != nil {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
w.WriteHeader(cols)
|
||
const maxRowsPerFile = 300000
|
||
files := []string{}
|
||
{
|
||
var tplID uint64
|
||
var filtersJSON []byte
|
||
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
|
||
_ = row.Scan(&tplID, &filtersJSON)
|
||
var tplDS string
|
||
var main string
|
||
var fieldsJSON []byte
|
||
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
|
||
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
|
||
var fs []string
|
||
var fl map[string]interface{}
|
||
json.Unmarshal(fieldsJSON, &fs)
|
||
json.Unmarshal(filtersJSON, &fl)
|
||
wl := Whitelist()
|
||
var chunks [][2]string
|
||
if v, ok := fl["create_time_between"]; ok {
|
||
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
|
||
chunks = splitByDays(toString(arr[0]), toString(arr[1]), 10)
|
||
}
|
||
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
|
||
chunks = splitByDays(arrs[0], arrs[1], 10)
|
||
}
|
||
}
|
||
if len(chunks) > 0 {
|
||
out := make([]interface{}, len(cols))
|
||
dest := make([]interface{}, len(cols))
|
||
for i := range out {
|
||
dest[i] = &out[i]
|
||
}
|
||
var count int64
|
||
var partCount int64
|
||
var tick int64
|
||
for _, rg := range chunks {
|
||
fl["create_time_between"] = []string{rg[0], rg[1]}
|
||
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
|
||
cq, cargs, err := exporter.BuildSQL(req, wl)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
batch := 1000
|
||
for off := 0; ; off += batch {
|
||
sub := "SELECT * FROM (" + cq + ") AS sub LIMIT ? OFFSET ?"
|
||
args2 := append(append([]interface{}{}, cargs...), batch, off)
|
||
rows2, err := db.Query(sub, args2...)
|
||
if err != nil {
|
||
break
|
||
}
|
||
fetched := false
|
||
for rows2.Next() {
|
||
fetched = true
|
||
if err := rows2.Scan(dest...); err != nil {
|
||
rows2.Close()
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
vals := make([]string, len(cols))
|
||
for i := range out {
|
||
if b, ok := out[i].([]byte); ok {
|
||
vals[i] = string(b)
|
||
} else if out[i] == nil {
|
||
vals[i] = ""
|
||
} else {
|
||
vals[i] = toString(out[i])
|
||
}
|
||
}
|
||
vals = transformRow(jobDS, fs, vals)
|
||
w.WriteRow(vals)
|
||
count++
|
||
partCount++
|
||
tick++
|
||
if tick%50 == 0 {
|
||
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
|
||
}
|
||
if partCount >= maxRowsPerFile {
|
||
path, size, _ := w.Close()
|
||
files = append(files, path)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now())
|
||
w, err = exporter.NewCSVWriter("storage", "export")
|
||
if err != nil {
|
||
rows2.Close()
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
w.WriteHeader(cols)
|
||
partCount = 0
|
||
}
|
||
}
|
||
rows2.Close()
|
||
if !fetched {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
path, size, _ := w.Close()
|
||
if partCount > 0 || len(files) == 0 {
|
||
files = append(files, path)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now())
|
||
}
|
||
if count == 0 {
|
||
row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...)
|
||
var c int64
|
||
_ = row.Scan(&c)
|
||
count = c
|
||
}
|
||
if len(files) >= 1 {
|
||
zipPath, zipSize := createZip(id, files)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now())
|
||
for _, fp := range files { if strings.HasSuffix(strings.ToLower(fp), ".xlsx") { _ = os.Remove(fp) } }
|
||
}
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
|
||
return
|
||
}
|
||
}
|
||
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
|
||
// batched cursor queries, split workbook per 300k rows
|
||
{
|
||
const maxRowsPerFile = 300000
|
||
out := make([]interface{}, len(cols))
|
||
dest := make([]interface{}, len(cols))
|
||
for i := range out {
|
||
dest[i] = &out[i]
|
||
}
|
||
var count int64
|
||
var partCount int64
|
||
var tick int64
|
||
batch := 1000
|
||
files2 := []string{}
|
||
for off := 0; ; off += batch {
|
||
sub := "SELECT * FROM (" + q + ") AS sub LIMIT ? OFFSET ?"
|
||
args2 := append(append([]interface{}{}, args...), batch, off)
|
||
rows3, err := db.Query(sub, args2...)
|
||
if err != nil {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
fetched := false
|
||
for rows3.Next() {
|
||
fetched = true
|
||
if err := rows3.Scan(dest...); err != nil {
|
||
rows3.Close()
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
vals := make([]string, len(cols))
|
||
for i := range out {
|
||
if b, ok := out[i].([]byte); ok {
|
||
vals[i] = string(b)
|
||
} else if out[i] == nil {
|
||
vals[i] = ""
|
||
} else {
|
||
vals[i] = toString(out[i])
|
||
}
|
||
}
|
||
vals = transformRow(jobDS, fields, vals)
|
||
w.WriteRow(vals)
|
||
count++
|
||
partCount++
|
||
tick++
|
||
if tick%50 == 0 {
|
||
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
|
||
}
|
||
if partCount >= maxRowsPerFile {
|
||
path2, size2, _ := w.Close()
|
||
files2 = append(files2, path2)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path2, partCount, size2, time.Now(), time.Now())
|
||
w, err = exporter.NewCSVWriter("storage", "export")
|
||
if err != nil {
|
||
rows3.Close()
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
w.WriteHeader(cols)
|
||
partCount = 0
|
||
}
|
||
}
|
||
rows3.Close()
|
||
if !fetched {
|
||
break
|
||
}
|
||
}
|
||
path, size, _ := w.Close()
|
||
if partCount > 0 || len(files2) == 0 {
|
||
files2 = append(files2, path)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now())
|
||
}
|
||
if count == 0 {
|
||
row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...)
|
||
var c int64
|
||
_ = row.Scan(&c)
|
||
count = c
|
||
}
|
||
if len(files2) >= 1 {
|
||
zipPath, zipSize := createZip(id, files2)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now())
|
||
for _, fp := range files2 { if strings.HasSuffix(strings.ToLower(fp), ".xlsx") { _ = os.Remove(fp) } }
|
||
}
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
|
||
return
|
||
}
|
||
// batched cursor queries, 1000 rows per page, file split at 300k
|
||
{
|
||
const maxRowsPerFile = 300000
|
||
files2 := []string{}
|
||
out := make([]interface{}, len(cols))
|
||
dest := make([]interface{}, len(cols))
|
||
for i := range out {
|
||
dest[i] = &out[i]
|
||
}
|
||
var count int64
|
||
var partCount int64
|
||
var tick int64
|
||
batch := 1000
|
||
for off := 0; ; off += batch {
|
||
sub := "SELECT * FROM (" + q + ") AS sub LIMIT ? OFFSET ?"
|
||
args2 := append(append([]interface{}{}, args...), batch, off)
|
||
rows3, err := db.Query(sub, args2...)
|
||
if err != nil {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
fetched := false
|
||
for rows3.Next() {
|
||
fetched = true
|
||
if err := rows3.Scan(dest...); err != nil {
|
||
rows3.Close()
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
vals := make([]string, len(cols))
|
||
for i := range out {
|
||
if b, ok := out[i].([]byte); ok {
|
||
vals[i] = string(b)
|
||
} else if out[i] == nil {
|
||
vals[i] = ""
|
||
} else {
|
||
vals[i] = toString(out[i])
|
||
}
|
||
}
|
||
vals = transformRow(jobDS, fields, vals)
|
||
w.WriteRow(vals)
|
||
count++
|
||
partCount++
|
||
tick++
|
||
if tick%50 == 0 {
|
||
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
|
||
}
|
||
if partCount >= maxRowsPerFile {
|
||
path, size, _ := w.Close()
|
||
files2 = append(files2, path)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now())
|
||
w, err = exporter.NewCSVWriter("storage", "export")
|
||
if err != nil {
|
||
rows3.Close()
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
w.WriteHeader(cols)
|
||
partCount = 0
|
||
}
|
||
}
|
||
rows3.Close()
|
||
if !fetched {
|
||
break
|
||
}
|
||
}
|
||
path, size, _ := w.Close()
|
||
if partCount > 0 || len(files2) == 0 {
|
||
files2 = append(files2, path)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, partCount, size, time.Now(), time.Now())
|
||
}
|
||
if count == 0 {
|
||
row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...)
|
||
var c int64
|
||
_ = row.Scan(&c)
|
||
count = c
|
||
}
|
||
if len(files2) >= 1 {
|
||
zipPath, zipSize := createZip(id, files2)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now())
|
||
for _, fp := range files2 { if strings.HasSuffix(strings.ToLower(fp), ".xlsx") { _ = os.Remove(fp) } }
|
||
}
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
|
||
return
|
||
}
|
||
rows, err := db.Query(q, args...)
|
||
if err != nil {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
defer rows.Close()
|
||
out := make([]interface{}, len(cols))
|
||
dest := make([]interface{}, len(cols))
|
||
for i := range out {
|
||
dest[i] = &out[i]
|
||
}
|
||
var count int64
|
||
var tick int64
|
||
for rows.Next() {
|
||
if err := rows.Scan(dest...); err != nil {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
vals := make([]string, len(cols))
|
||
for i := range out {
|
||
if b, ok := out[i].([]byte); ok {
|
||
vals[i] = string(b)
|
||
} else if out[i] == nil {
|
||
vals[i] = ""
|
||
} else {
|
||
vals[i] = toString(out[i])
|
||
}
|
||
}
|
||
w.WriteRow(vals)
|
||
count++
|
||
tick++
|
||
if tick%50 == 0 {
|
||
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
|
||
}
|
||
}
|
||
path, size, _ := w.Close()
|
||
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, path, count, size, time.Now(), time.Now()})
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, path, count, size, time.Now(), time.Now())
|
||
log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", []interface{}{"completed", time.Now(), count, time.Now(), id})
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
|
||
return
|
||
}
|
||
if fmt == "xlsx" {
|
||
const maxRowsPerFile = 300000
|
||
files := []string{}
|
||
x, path, err := exporter.NewXLSXWriter("storage", "export", "Sheet1")
|
||
if err != nil {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
x.WriteHeader(cols)
|
||
{
|
||
var tplID uint64
|
||
var filtersJSON []byte
|
||
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
|
||
_ = row.Scan(&tplID, &filtersJSON)
|
||
var tplDS string
|
||
var main string
|
||
var fieldsJSON []byte
|
||
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
|
||
_ = tr.Scan(&tplDS, &main, &fieldsJSON)
|
||
var fs []string
|
||
var fl map[string]interface{}
|
||
json.Unmarshal(fieldsJSON, &fs)
|
||
json.Unmarshal(filtersJSON, &fl)
|
||
wl := Whitelist()
|
||
var chunks [][2]string
|
||
if v, ok := fl["create_time_between"]; ok {
|
||
if arr, ok2 := v.([]interface{}); ok2 && len(arr) == 2 {
|
||
chunks = splitByDays(toString(arr[0]), toString(arr[1]), 10)
|
||
}
|
||
if arrs, ok3 := v.([]string); ok3 && len(arrs) == 2 {
|
||
chunks = splitByDays(arrs[0], arrs[1], 10)
|
||
}
|
||
}
|
||
if len(chunks) > 0 {
|
||
out := make([]interface{}, len(cols))
|
||
dest := make([]interface{}, len(cols))
|
||
for i := range out {
|
||
dest[i] = &out[i]
|
||
}
|
||
var count int64
|
||
var partCount int64
|
||
var tick int64
|
||
for _, rg := range chunks {
|
||
fl["create_time_between"] = []string{rg[0], rg[1]}
|
||
req := exporter.BuildRequest{MainTable: main, Datasource: tplDS, Fields: fs, Filters: fl}
|
||
cq, cargs, err := exporter.BuildSQL(req, wl)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
batch := 1000
|
||
for off := 0; ; off += batch {
|
||
sub := "SELECT * FROM (" + cq + ") AS sub LIMIT ? OFFSET ?"
|
||
args2 := append(append([]interface{}{}, cargs...), batch, off)
|
||
rows2, err := db.Query(sub, args2...)
|
||
if err != nil {
|
||
break
|
||
}
|
||
fetched := false
|
||
for rows2.Next() {
|
||
fetched = true
|
||
if err := rows2.Scan(dest...); err != nil {
|
||
rows2.Close()
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
vals := make([]string, len(cols))
|
||
for i := range out {
|
||
if b, ok := out[i].([]byte); ok {
|
||
vals[i] = string(b)
|
||
} else if out[i] == nil {
|
||
vals[i] = ""
|
||
} else {
|
||
vals[i] = toString(out[i])
|
||
}
|
||
}
|
||
vals = transformRow(jobDS, fs, vals)
|
||
x.WriteRow(vals)
|
||
count++
|
||
partCount++
|
||
tick++
|
||
if tick%50 == 0 {
|
||
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
|
||
}
|
||
if partCount >= maxRowsPerFile {
|
||
p, size, _ := x.Close(path)
|
||
files = append(files, p)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, partCount, size, time.Now(), time.Now())
|
||
x, path, err = exporter.NewXLSXWriter("storage", "export", "Sheet1")
|
||
if err != nil {
|
||
rows2.Close()
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
x.WriteHeader(cols)
|
||
partCount = 0
|
||
}
|
||
}
|
||
rows2.Close()
|
||
if !fetched {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
if count == 0 {
|
||
rows, err := db.Query(q, args...)
|
||
if err == nil {
|
||
defer rows.Close()
|
||
out2 := make([]interface{}, len(cols))
|
||
dest2 := make([]interface{}, len(cols))
|
||
for i := range out2 {
|
||
dest2[i] = &out2[i]
|
||
}
|
||
var tick2 int64
|
||
for rows.Next() {
|
||
if err := rows.Scan(dest2...); err != nil {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
vals := make([]string, len(cols))
|
||
for i := range out2 {
|
||
if b, ok := out2[i].([]byte); ok {
|
||
vals[i] = string(b)
|
||
} else if out2[i] == nil {
|
||
vals[i] = ""
|
||
} else {
|
||
vals[i] = toString(out2[i])
|
||
}
|
||
}
|
||
vals = transformRow(jobDS, fs, vals)
|
||
x.WriteRow(vals)
|
||
count++
|
||
tick2++
|
||
if tick2%50 == 0 {
|
||
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
p, size, _ := x.Close(path)
|
||
if partCount > 0 || len(files) == 0 {
|
||
files = append(files, p)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, func() int64 {
|
||
if count > 0 {
|
||
return count
|
||
}
|
||
return partCount
|
||
}(), size, time.Now(), time.Now())
|
||
}
|
||
if count == 0 {
|
||
row := db.QueryRow("SELECT COUNT(1) FROM ("+q+") AS sub", args...)
|
||
var c int64
|
||
_ = row.Scan(&c)
|
||
count = c
|
||
}
|
||
if len(files) >= 1 {
|
||
zipPath, zipSize := createZip(id, files)
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now())
|
||
}
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
|
||
return
|
||
}
|
||
}
|
||
log.Printf("job_id=%d sql=%s args=%v", id, q, args)
|
||
rows, err := db.Query(q, args...)
|
||
if err != nil {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id= ?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
defer rows.Close()
|
||
out := make([]interface{}, len(cols))
|
||
dest := make([]interface{}, len(cols))
|
||
for i := range out {
|
||
dest[i] = &out[i]
|
||
}
|
||
var count int64
|
||
var tick int64
|
||
for rows.Next() {
|
||
if err := rows.Scan(dest...); err != nil {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
|
||
return
|
||
}
|
||
vals := make([]string, len(cols))
|
||
for i := range out {
|
||
if b, ok := out[i].([]byte); ok {
|
||
vals[i] = string(b)
|
||
} else if out[i] == nil {
|
||
vals[i] = ""
|
||
} else {
|
||
vals[i] = toString(out[i])
|
||
}
|
||
}
|
||
vals = transformRow(jobDS, fields, vals)
|
||
x.WriteRow(vals)
|
||
count++
|
||
tick++
|
||
if tick%50 == 0 {
|
||
a.meta.Exec("UPDATE export_jobs SET total_rows=?, updated_at=? WHERE id= ?", count, time.Now(), id)
|
||
}
|
||
}
|
||
p, size, _ := x.Close(path)
|
||
log.Printf("job_id=%d sql=%s args=%v", id, "INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", []interface{}{id, p, count, size, time.Now(), time.Now()})
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, p, count, size, time.Now(), time.Now())
|
||
zipPath, zipSize := createZip(id, []string{p})
|
||
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at, updated_at) VALUES (?,?,?,?,?,?)", id, zipPath, count, zipSize, time.Now(), time.Now())
|
||
if strings.HasSuffix(strings.ToLower(p), ".xlsx") { _ = os.Remove(p) }
|
||
log.Printf("job_id=%d sql=%s args=%v", id, "UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", []interface{}{"completed", time.Now(), count, time.Now(), id})
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=?, updated_at=? WHERE id= ?", "completed", time.Now(), count, time.Now(), id)
|
||
return
|
||
}
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, updated_at=? WHERE id= ?", "failed", time.Now(), time.Now(), id)
|
||
}
|
||
|
||
func (a *ExportsAPI) selectDataDB(ds string) *sql.DB {
|
||
if ds == "ymt" {
|
||
return a.meta
|
||
}
|
||
return a.marketing
|
||
}
|
||
|
||
func splitByDays(startStr, endStr string, stepDays int) [][2]string {
|
||
layout := "2006-01-02 15:04:05"
|
||
s, es := strings.TrimSpace(startStr), strings.TrimSpace(endStr)
|
||
st, err1 := time.Parse(layout, s)
|
||
en, err2 := time.Parse(layout, es)
|
||
if err1 != nil || err2 != nil || !en.After(st) || stepDays <= 0 {
|
||
return [][2]string{{s, es}}
|
||
}
|
||
var out [][2]string
|
||
cur := st
|
||
step := time.Duration(stepDays) * 24 * time.Hour
|
||
for cur.Before(en) {
|
||
nxt := cur.Add(step)
|
||
if nxt.After(en) {
|
||
nxt = en
|
||
}
|
||
out = append(out, [2]string{cur.Format(layout), nxt.Format(layout)})
|
||
cur = nxt
|
||
}
|
||
return out
|
||
}
|
||
|
||
func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) {
|
||
row := a.meta.QueryRow("SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at FROM export_jobs WHERE id=?", id)
|
||
var m = map[string]interface{}{}
|
||
var jid uint64
|
||
var templateID uint64
|
||
var status string
|
||
var requestedBy uint64
|
||
var totalRows sql.NullInt64
|
||
var fileFormat string
|
||
var startedAt, finishedAt sql.NullTime
|
||
var createdAt, updatedAt time.Time
|
||
err := row.Scan(&jid, &templateID, &status, &requestedBy, &totalRows, &fileFormat, &startedAt, &finishedAt, &createdAt, &updatedAt)
|
||
if err != nil {
|
||
fail(w, r, http.StatusNotFound, "not found")
|
||
return
|
||
}
|
||
m["id"] = jid
|
||
m["template_id"] = templateID
|
||
m["status"] = status
|
||
m["requested_by"] = requestedBy
|
||
m["file_format"] = fileFormat
|
||
m["total_rows"] = totalRows.Int64
|
||
m["started_at"] = startedAt.Time
|
||
m["finished_at"] = finishedAt.Time
|
||
m["created_at"] = createdAt
|
||
m["updated_at"] = updatedAt
|
||
rows, _ := a.meta.Query("SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id=?", id)
|
||
files := []map[string]interface{}{}
|
||
for rows.Next() {
|
||
var uri, sheet sql.NullString
|
||
var rc, sz sql.NullInt64
|
||
rows.Scan(&uri, &sheet, &rc, &sz)
|
||
files = append(files, map[string]interface{}{"storage_uri": uri.String, "sheet_name": sheet.String, "row_count": rc.Int64, "size_bytes": sz.Int64})
|
||
}
|
||
rows.Close()
|
||
m["files"] = files
|
||
ok(w, r, m)
|
||
}
|
||
|
||
func (a *ExportsAPI) getSQL(w http.ResponseWriter, r *http.Request, id string) {
|
||
// load job filters and template fields
|
||
row := a.meta.QueryRow("SELECT template_id, filters_json FROM export_jobs WHERE id=?", id)
|
||
var tplID uint64
|
||
var filters []byte
|
||
if err := row.Scan(&tplID, &filters); err != nil {
|
||
fail(w, r, http.StatusNotFound, "not found")
|
||
return
|
||
}
|
||
tr := a.meta.QueryRow("SELECT datasource, main_table, fields_json FROM export_templates WHERE id=?", tplID)
|
||
var ds string
|
||
var main string
|
||
var fields []byte
|
||
if err := tr.Scan(&ds, &main, &fields); err != nil {
|
||
fail(w, r, http.StatusBadRequest, "template not found")
|
||
return
|
||
}
|
||
var fs []string
|
||
var fl map[string]interface{}
|
||
json.Unmarshal(fields, &fs)
|
||
json.Unmarshal(filters, &fl)
|
||
wl := Whitelist()
|
||
req := exporter.BuildRequest{MainTable: main, Datasource: ds, Fields: fs, Filters: fl}
|
||
q, args, err := exporter.BuildSQL(req, wl)
|
||
if err != nil {
|
||
failCat(w, r, http.StatusBadRequest, err.Error(), "sql_build_error")
|
||
return
|
||
}
|
||
formatArg := func(a interface{}) string {
|
||
switch t := a.(type) {
|
||
case nil:
|
||
return "NULL"
|
||
case []byte:
|
||
s := string(t)
|
||
s = strings.ReplaceAll(s, "'", "''")
|
||
return "'" + s + "'"
|
||
case string:
|
||
s := strings.ReplaceAll(t, "'", "''")
|
||
return "'" + s + "'"
|
||
case int:
|
||
return strconv.Itoa(t)
|
||
case int64:
|
||
return strconv.FormatInt(t, 10)
|
||
case float64:
|
||
return strconv.FormatFloat(t, 'f', -1, 64)
|
||
case time.Time:
|
||
return "'" + t.Format("2006-01-02 15:04:05") + "'"
|
||
default:
|
||
return fmt.Sprintf("%v", t)
|
||
}
|
||
}
|
||
var sb strings.Builder
|
||
var ai int
|
||
for i := 0; i < len(q); i++ {
|
||
c := q[i]
|
||
if c == '?' && ai < len(args) {
|
||
sb.WriteString(formatArg(args[ai]))
|
||
ai++
|
||
} else {
|
||
sb.WriteByte(c)
|
||
}
|
||
}
|
||
ok(w, r, map[string]interface{}{"sql": q, "final_sql": sb.String()})
|
||
}
|
||
|
||
func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) {
|
||
row := a.meta.QueryRow("SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1", id)
|
||
var uri string
|
||
err := row.Scan(&uri)
|
||
if err != nil {
|
||
fail(w, r, http.StatusNotFound, "not found")
|
||
return
|
||
}
|
||
http.ServeFile(w, r, uri)
|
||
}
|
||
|
||
func transformRow(ds string, fields []string, vals []string) []string {
|
||
for i := range fields {
|
||
if i >= len(vals) {
|
||
break
|
||
}
|
||
f := fields[i]
|
||
if f == "order.key" {
|
||
if ds == "ymt" {
|
||
key := os.Getenv("YMT_KEY_DECRYPT_KEY_B64")
|
||
if key == "" { key = "z9DoIVLuDYEN/qsgweRA4A==" }
|
||
if dec, err := ymtcrypto.SM4Decrypt(vals[i], key); err == nil && dec != "" {
|
||
vals[i] = dec
|
||
}
|
||
} else {
|
||
vals[i] = decodeOrderKey(vals[i])
|
||
}
|
||
}
|
||
}
|
||
return vals
|
||
}
|
||
|
||
func decodeOrderKey(s string) string {
|
||
if s == "" {
|
||
return s
|
||
}
|
||
if len(s) > 2 && s[len(s)-2:] == "_1" {
|
||
s = s[:len(s)-2]
|
||
}
|
||
var n big.Int
|
||
if _, ok := n.SetString(s, 10); !ok {
|
||
return s
|
||
}
|
||
base := []rune{'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'j', 'k', 'l', 'm', 'n', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}
|
||
baseCount := big.NewInt(int64(len(base)))
|
||
zero := big.NewInt(0)
|
||
var out []rune
|
||
for n.Cmp(zero) > 0 {
|
||
var mod big.Int
|
||
mod.Mod(&n, baseCount)
|
||
out = append(out, base[mod.Int64()])
|
||
n.Div(&n, baseCount)
|
||
}
|
||
for len(out) < 16 {
|
||
out = append(out, base[0])
|
||
}
|
||
for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 {
|
||
out[i], out[j] = out[j], out[i]
|
||
}
|
||
return string(out)
|
||
}
|
||
|
||
func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) {
|
||
a.meta.Exec("UPDATE export_jobs SET status=?, updated_at=? WHERE id=? AND status IN ('queued','running')", "canceled", time.Now(), id)
|
||
w.Write([]byte("ok"))
|
||
}
|
||
|
||
func toString(v interface{}) string {
|
||
switch t := v.(type) {
|
||
case []byte:
|
||
return string(t)
|
||
case string:
|
||
return t
|
||
case int64:
|
||
return strconv.FormatInt(t, 10)
|
||
case int:
|
||
return strconv.Itoa(t)
|
||
case float64:
|
||
return strconv.FormatFloat(t, 'f', -1, 64)
|
||
case bool:
|
||
if t {
|
||
return "1"
|
||
}
|
||
return "0"
|
||
case time.Time:
|
||
return t.Format("2006-01-02 15:04:05")
|
||
default:
|
||
return fmt.Sprintf("%v", t)
|
||
}
|
||
}
|
||
func (a *ExportsAPI) list(w http.ResponseWriter, r *http.Request) {
|
||
a.ensureOwnerColumn()
|
||
q := r.URL.Query()
|
||
page := 1
|
||
size := 15
|
||
if p := q.Get("page"); p != "" {
|
||
if n, err := strconv.Atoi(p); err == nil && n > 0 {
|
||
page = n
|
||
}
|
||
}
|
||
if s := q.Get("page_size"); s != "" {
|
||
if n, err := strconv.Atoi(s); err == nil && n > 0 && n <= 100 {
|
||
size = n
|
||
}
|
||
}
|
||
tplIDStr := q.Get("template_id")
|
||
var tplID uint64
|
||
if tplIDStr != "" {
|
||
if n, err := strconv.ParseUint(tplIDStr, 10, 64); err == nil {
|
||
tplID = n
|
||
}
|
||
}
|
||
offset := (page - 1) * size
|
||
var totalCount int64
|
||
uidStr := q.Get("userId")
|
||
if tplID > 0 {
|
||
if uidStr != "" {
|
||
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ? AND owner_id = ?", tplID, uidStr)
|
||
_ = row.Scan(&totalCount)
|
||
} else {
|
||
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE template_id = ?", tplID)
|
||
_ = row.Scan(&totalCount)
|
||
}
|
||
} else {
|
||
if uidStr != "" {
|
||
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs WHERE owner_id = ?", uidStr)
|
||
_ = row.Scan(&totalCount)
|
||
} else {
|
||
row := a.meta.QueryRow("SELECT COUNT(1) FROM export_jobs")
|
||
_ = row.Scan(&totalCount)
|
||
}
|
||
}
|
||
var rows *sql.Rows
|
||
var err error
|
||
if tplID > 0 {
|
||
if uidStr != "" {
|
||
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? AND owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, uidStr, size, offset)
|
||
} else {
|
||
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE template_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", tplID, size, offset)
|
||
}
|
||
} else {
|
||
if uidStr != "" {
|
||
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs WHERE owner_id = ? ORDER BY id DESC LIMIT ? OFFSET ?", uidStr, size, offset)
|
||
} else {
|
||
rows, err = a.meta.Query("SELECT id, template_id, status, requested_by, row_estimate, total_rows, file_format, created_at, updated_at, explain_score, explain_json FROM export_jobs ORDER BY id DESC LIMIT ? OFFSET ?", size, offset)
|
||
}
|
||
}
|
||
if err != nil {
|
||
failCat(w, r, http.StatusInternalServerError, err.Error(), "explain_error")
|
||
return
|
||
}
|
||
defer rows.Close()
|
||
items := []map[string]interface{}{}
|
||
for rows.Next() {
|
||
var id, tid, req uint64
|
||
var status, fmtstr string
|
||
var estimate, total sql.NullInt64
|
||
var createdAt, updatedAt sql.NullTime
|
||
var score sql.NullInt64
|
||
var explainRaw sql.NullString
|
||
if err := rows.Scan(&id, &tid, &status, &req, &estimate, &total, &fmtstr, &createdAt, &updatedAt, &score, &explainRaw); err != nil {
|
||
continue
|
||
}
|
||
evalStatus := "通过"
|
||
if score.Int64 < 60 {
|
||
evalStatus = "禁止"
|
||
}
|
||
desc := fmt.Sprintf("评分:%d,估算行数:%d;%s", score.Int64, estimate.Int64, map[bool]string{true: "允许执行", false: "禁止执行"}[score.Int64 >= 60])
|
||
if explainRaw.Valid && explainRaw.String != "" {
|
||
var arr []map[string]interface{}
|
||
if err := json.Unmarshal([]byte(explainRaw.String), &arr); err == nil {
|
||
segs := []string{}
|
||
for _, r := range arr {
|
||
getStr := func(field string) string {
|
||
if v, ok := r[field]; ok {
|
||
if mm, ok := v.(map[string]interface{}); ok {
|
||
if b, ok := mm["Valid"].(bool); ok && !b {
|
||
return ""
|
||
}
|
||
if s, ok := mm["String"].(string); ok {
|
||
return s
|
||
}
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
getInt := func(field string) int64 {
|
||
if v, ok := r[field]; ok {
|
||
if mm, ok := v.(map[string]interface{}); ok {
|
||
if b, ok := mm["Valid"].(bool); ok && !b {
|
||
return 0
|
||
}
|
||
if f, ok := mm["Int64"].(float64); ok {
|
||
return int64(f)
|
||
}
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
getFloat := func(field string) float64 {
|
||
if v, ok := r[field]; ok {
|
||
if mm, ok := v.(map[string]interface{}); ok {
|
||
if b, ok := mm["Valid"].(bool); ok && !b {
|
||
return 0
|
||
}
|
||
if f, ok := mm["Float64"].(float64); ok {
|
||
return f
|
||
}
|
||
}
|
||
}
|
||
return 0
|
||
}
|
||
tbl := getStr("Table")
|
||
typ := getStr("Type")
|
||
if typ == "" {
|
||
typ = getStr("SelectType")
|
||
}
|
||
key := getStr("Key")
|
||
rowsN := getInt("Rows")
|
||
filt := getFloat("Filtered")
|
||
extra := getStr("Extra")
|
||
if tbl == "" && typ == "" && rowsN == 0 && extra == "" {
|
||
continue
|
||
}
|
||
s := fmt.Sprintf("表:%s, 访问类型:%s, 预估行数:%d, 索引:%s, 过滤比例:%.1f%%", tbl, typ, rowsN, key, filt)
|
||
if extra != "" {
|
||
s += ", 额外:" + extra
|
||
}
|
||
segs = append(segs, s)
|
||
}
|
||
if len(segs) > 0 {
|
||
desc = strings.Join(segs, ";")
|
||
}
|
||
}
|
||
}
|
||
m := map[string]interface{}{"id": id, "template_id": tid, "status": status, "requested_by": req, "row_estimate": estimate.Int64, "total_rows": total.Int64, "file_format": fmtstr, "created_at": createdAt.Time, "updated_at": updatedAt.Time, "eval_status": evalStatus, "eval_desc": desc}
|
||
items = append(items, m)
|
||
}
|
||
ok(w, r, map[string]interface{}{"items": items, "total": totalCount, "page": page, "page_size": size})
|
||
}
|
||
|
||
func createZip(jobID uint64, files []string) (string, int64) {
|
||
baseDir := "storage/export"
|
||
_ = os.MkdirAll(baseDir, 0755)
|
||
zipPath := filepath.Join(baseDir, fmt.Sprintf("job_%d_%d.zip", jobID, time.Now().Unix()))
|
||
zf, err := os.Create(zipPath)
|
||
if err != nil {
|
||
return zipPath, 0
|
||
}
|
||
defer zf.Close()
|
||
zw := zip.NewWriter(zf)
|
||
for _, p := range files {
|
||
f, err := os.Open(p)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
fi, _ := f.Stat()
|
||
w, err := zw.Create(filepath.Base(p))
|
||
if err != nil {
|
||
f.Close()
|
||
continue
|
||
}
|
||
_, _ = io.Copy(w, f)
|
||
_ = fi
|
||
f.Close()
|
||
}
|
||
_ = zw.Close()
|
||
st, err := os.Stat(zipPath)
|
||
if err != nil {
|
||
return zipPath, 0
|
||
}
|
||
return zipPath, st.Size()
|
||
}
|