270 lines
9.7 KiB
Go
270 lines
9.7 KiB
Go
package api
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"io"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
"marketing-system-data-tool/server/internal/exporter"
|
|
)
|
|
|
|
type ExportsAPI struct{
|
|
meta *sql.DB
|
|
marketing *sql.DB
|
|
}
|
|
|
|
func ExportsHandler(meta, marketing *sql.DB) http.Handler {
|
|
api := &ExportsAPI{meta: meta, marketing: marketing}
|
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
|
p := strings.TrimPrefix(r.URL.Path, "/api/exports")
|
|
if r.Method == http.MethodPost && p == "" {
|
|
api.create(w, r)
|
|
return
|
|
}
|
|
if strings.HasPrefix(p, "/") {
|
|
id := strings.TrimPrefix(p, "/")
|
|
if r.Method == http.MethodGet && !strings.HasSuffix(p, "/download") {
|
|
api.get(w, r, id)
|
|
return
|
|
}
|
|
if r.Method == http.MethodGet && strings.HasSuffix(p, "/download") {
|
|
id = strings.TrimSuffix(id, "/download")
|
|
api.download(w, r, id)
|
|
return
|
|
}
|
|
if r.Method == http.MethodPost && strings.HasSuffix(p, "/cancel") {
|
|
id = strings.TrimSuffix(id, "/cancel")
|
|
api.cancel(w, r, id)
|
|
return
|
|
}
|
|
}
|
|
w.WriteHeader(http.StatusNotFound)
|
|
})
|
|
}
|
|
|
|
type ExportPayload struct{
|
|
TemplateID uint64 `json:"template_id"`
|
|
RequestedBy uint64 `json:"requested_by"`
|
|
Permission map[string]interface{} `json:"permission"`
|
|
Options map[string]interface{} `json:"options"`
|
|
FileFormat string `json:"file_format"`
|
|
}
|
|
|
|
func (a *ExportsAPI) create(w http.ResponseWriter, r *http.Request) {
|
|
b, _ := io.ReadAll(r.Body)
|
|
var p ExportPayload
|
|
json.Unmarshal(b, &p)
|
|
var main string
|
|
var fields, filters []byte
|
|
row := a.meta.QueryRow("SELECT main_table, fields_json, filters_json FROM export_templates WHERE id=?", p.TemplateID)
|
|
err := row.Scan(&main, &fields, &filters)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
w.Write([]byte("invalid template"))
|
|
return
|
|
}
|
|
var fs []string
|
|
var fl map[string]interface{}
|
|
json.Unmarshal(fields, &fs)
|
|
json.Unmarshal(filters, &fl)
|
|
wl := map[string]bool{
|
|
"order.order_number": true,
|
|
"order.creator": true,
|
|
"order.out_trade_no": true,
|
|
"order.type": true,
|
|
"order.status": true,
|
|
"order.contract_price": true,
|
|
"order.num": true,
|
|
"order.total": true,
|
|
"order.pay_amount": true,
|
|
"order.create_time": true,
|
|
"order.update_time": true,
|
|
}
|
|
req := exporter.BuildRequest{MainTable: main, Fields: fs, Filters: fl}
|
|
q, args, err := exporter.BuildSQL(req, wl)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
_, _, _ = exporter.RunExplain(a.marketing, q, args)
|
|
res, err := a.meta.Exec("INSERT INTO export_jobs (template_id, status, requested_by, permission_scope_json, options_json, file_format, created_at) VALUES (?,?,?,?,?,?,?)", p.TemplateID, "queued", p.RequestedBy, toJSON(p.Permission), toJSON(p.Options), p.FileFormat, time.Now())
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusInternalServerError)
|
|
w.Write([]byte(err.Error()))
|
|
return
|
|
}
|
|
id, _ := res.LastInsertId()
|
|
go a.runJob(uint64(id), q, args, fs, p.FileFormat)
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Write([]byte("{\"id\":"+strconv.FormatInt(id,10)+"}"))
|
|
}
|
|
|
|
func (a *ExportsAPI) runJob(id uint64, q string, args []interface{}, cols []string, fmt string) {
|
|
a.meta.Exec("UPDATE export_jobs SET status=?, started_at=? WHERE id=?", "running", time.Now(), id)
|
|
if fmt == "csv" {
|
|
w, err := exporter.NewCSVWriter("storage", "export")
|
|
if err != nil {
|
|
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
|
|
return
|
|
}
|
|
w.WriteHeader(cols)
|
|
rows, err := a.marketing.Query(q, args...)
|
|
if err != nil {
|
|
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
out := make([]interface{}, len(cols))
|
|
dest := make([]interface{}, len(cols))
|
|
for i := range out {
|
|
dest[i] = &out[i]
|
|
}
|
|
var count int64
|
|
for rows.Next() {
|
|
if err := rows.Scan(dest...); err != nil {
|
|
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
|
|
return
|
|
}
|
|
vals := make([]string, len(cols))
|
|
for i := range out {
|
|
if b, ok := out[i].([]byte); ok {
|
|
vals[i] = string(b)
|
|
} else if out[i] == nil {
|
|
vals[i] = ""
|
|
} else {
|
|
vals[i] = toString(out[i])
|
|
}
|
|
}
|
|
w.WriteRow(vals)
|
|
count++
|
|
}
|
|
path, size, _ := w.Close()
|
|
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at) VALUES (?,?,?,?,?)", id, path, count, size, time.Now())
|
|
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=? WHERE id=?", "completed", time.Now(), count, id)
|
|
return
|
|
}
|
|
if fmt == "xlsx" {
|
|
x, path, err := exporter.NewXLSXWriter("storage", "export", "Sheet1")
|
|
if err != nil {
|
|
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
|
|
return
|
|
}
|
|
x.WriteHeader(cols)
|
|
rows, err := a.marketing.Query(q, args...)
|
|
if err != nil {
|
|
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
out := make([]interface{}, len(cols))
|
|
dest := make([]interface{}, len(cols))
|
|
for i := range out {
|
|
dest[i] = &out[i]
|
|
}
|
|
var count int64
|
|
for rows.Next() {
|
|
if err := rows.Scan(dest...); err != nil {
|
|
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
|
|
return
|
|
}
|
|
vals := make([]string, len(cols))
|
|
for i := range out {
|
|
if b, ok := out[i].([]byte); ok {
|
|
vals[i] = string(b)
|
|
} else if out[i] == nil {
|
|
vals[i] = ""
|
|
} else {
|
|
vals[i] = toString(out[i])
|
|
}
|
|
}
|
|
x.WriteRow(vals)
|
|
count++
|
|
}
|
|
p, size, _ := x.Close(path)
|
|
a.meta.Exec("INSERT INTO export_job_files (job_id, storage_uri, row_count, size_bytes, created_at) VALUES (?,?,?,?,?)", id, p, count, size, time.Now())
|
|
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=?, total_rows=? WHERE id=?", "completed", time.Now(), count, id)
|
|
return
|
|
}
|
|
a.meta.Exec("UPDATE export_jobs SET status=?, finished_at=? WHERE id=?", "failed", time.Now(), id)
|
|
}
|
|
|
|
func (a *ExportsAPI) get(w http.ResponseWriter, r *http.Request, id string) {
|
|
row := a.meta.QueryRow("SELECT id, template_id, status, requested_by, total_rows, file_format, started_at, finished_at, created_at, updated_at FROM export_jobs WHERE id=?", id)
|
|
var m = map[string]interface{}{}
|
|
var jid uint64
|
|
var templateID uint64
|
|
var status string
|
|
var requestedBy uint64
|
|
var totalRows sql.NullInt64
|
|
var fileFormat string
|
|
var startedAt, finishedAt sql.NullTime
|
|
var createdAt, updatedAt time.Time
|
|
err := row.Scan(&jid, &templateID, &status, &requestedBy, &totalRows, &fileFormat, &startedAt, &finishedAt, &createdAt, &updatedAt)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
w.Write([]byte("not found"))
|
|
return
|
|
}
|
|
m["id"] = jid
|
|
m["template_id"] = templateID
|
|
m["status"] = status
|
|
m["requested_by"] = requestedBy
|
|
m["file_format"] = fileFormat
|
|
m["total_rows"] = totalRows.Int64
|
|
m["started_at"] = startedAt.Time
|
|
m["finished_at"] = finishedAt.Time
|
|
m["created_at"] = createdAt
|
|
m["updated_at"] = updatedAt
|
|
rows, _ := a.meta.Query("SELECT storage_uri, sheet_name, row_count, size_bytes FROM export_job_files WHERE job_id=?", id)
|
|
files := []map[string]interface{}{}
|
|
for rows.Next() {
|
|
var uri, sheet sql.NullString
|
|
var rc, sz sql.NullInt64
|
|
rows.Scan(&uri, &sheet, &rc, &sz)
|
|
files = append(files, map[string]interface{}{"storage_uri": uri.String, "sheet_name": sheet.String, "row_count": rc.Int64, "size_bytes": sz.Int64})
|
|
}
|
|
rows.Close()
|
|
m["files"] = files
|
|
b, _ := json.Marshal(m)
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.Write(b)
|
|
}
|
|
|
|
func (a *ExportsAPI) download(w http.ResponseWriter, r *http.Request, id string) {
|
|
row := a.meta.QueryRow("SELECT storage_uri FROM export_job_files WHERE job_id=? ORDER BY id DESC LIMIT 1", id)
|
|
var uri string
|
|
err := row.Scan(&uri)
|
|
if err != nil {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
w.Write([]byte("not found"))
|
|
return
|
|
}
|
|
http.ServeFile(w, r, uri)
|
|
}
|
|
|
|
func (a *ExportsAPI) cancel(w http.ResponseWriter, r *http.Request, id string) {
|
|
a.meta.Exec("UPDATE export_jobs SET status=? WHERE id=? AND status IN ('queued','running')", "canceled", id)
|
|
w.Write([]byte("ok"))
|
|
}
|
|
|
|
func toString(v interface{}) string {
|
|
switch t := v.(type) {
|
|
case []byte:
|
|
return string(t)
|
|
case string:
|
|
return t
|
|
case int64:
|
|
return strconv.FormatInt(t, 10)
|
|
case int:
|
|
return strconv.Itoa(t)
|
|
case float64:
|
|
return strconv.FormatFloat(t, 'f', -1, 64)
|
|
default:
|
|
return ""
|
|
}
|
|
}
|