调整数据模式
This commit is contained in:
parent
311b48815a
commit
8eced162b9
|
|
@ -120,3 +120,16 @@ func LoadConfig(path string) *Config {
|
|||
DefaultConfig = &c
|
||||
return &c
|
||||
}
|
||||
|
||||
func GetJob(conf *Config, sys, job string) (Job, string, error) {
|
||||
s, err := conf.GetSystem(sys)
|
||||
if err != nil {
|
||||
return Job{}, "", err
|
||||
}
|
||||
|
||||
j, err := s.GetJob(job)
|
||||
if err != nil {
|
||||
return Job{}, "", err
|
||||
}
|
||||
return j, s.Db, nil
|
||||
}
|
||||
|
|
|
|||
77
biz/db/db.go
77
biz/db/db.go
|
|
@ -2,10 +2,14 @@ package db
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"excel_export/biz/export"
|
||||
"fmt"
|
||||
"gorm.io/driver/mysql"
|
||||
"gorm.io/gorm"
|
||||
"runtime/trace"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
var _ export.DataFetcher = new(Db)
|
||||
|
|
@ -35,25 +39,76 @@ func (d *Db) Fetch(s string) (*export.Data, error) {
|
|||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
//titles := make([]string, 0, 10)
|
||||
titles, err := rows.Columns()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var dd []map[string]interface{}
|
||||
d.db.ScanRows(rows, &dd)
|
||||
data := make([]interface{}, len(dd))
|
||||
for i, m := range dd {
|
||||
row := make([]interface{}, 0, len(m))
|
||||
for _, title := range titles {
|
||||
row = append(row, m[title])
|
||||
}
|
||||
data[i] = interface{}(row)
|
||||
}
|
||||
data := getData(rows, d.db, titles)
|
||||
|
||||
//vv := transform(titles, dd)
|
||||
//fmt.Println(vv)
|
||||
//f, err := os.Create("./ff.csv")
|
||||
//
|
||||
//w := csv.NewWriter(f)
|
||||
//w.Write(titles)
|
||||
//w.WriteAll(vv)
|
||||
////w.Flush()
|
||||
//f.Close()
|
||||
|
||||
return &export.Data{
|
||||
Title: titles,
|
||||
Data: data,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getData(rows *sql.Rows, db *gorm.DB, titles []string) [][]string {
|
||||
result := make([][]string, 0, 10)
|
||||
for rows.Next() {
|
||||
var row map[string]interface{}
|
||||
db.ScanRows(rows, &row)
|
||||
result = append(result, transformRow(titles, row))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func transform(titles []string, data []map[string]interface{}) [][]string {
|
||||
result := make([][]string, len(data))
|
||||
for i, m := range data {
|
||||
result[i] = transformRow(titles, m)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func transformRow(titles []string, data map[string]interface{}) []string {
|
||||
row := make([]string, 0, len(data))
|
||||
for _, title := range titles {
|
||||
col := data[title]
|
||||
switch v := col.(type) {
|
||||
case string:
|
||||
row = append(row, v)
|
||||
case time.Time:
|
||||
row = append(row, v.Format("2006-01-02 15:04:05"))
|
||||
case int, int8, int16, int32, int64:
|
||||
row = append(row, fmt.Sprintf("%d", v))
|
||||
case float64:
|
||||
// When formatting floats, do not use fmt.Sprintf("%v", n), this will cause numbers below 1e-4 to be printed in
|
||||
// scientific notation. Scientific notation is not a valid way to store numbers in XML.
|
||||
// Also not not use fmt.Sprintf("%f", n), this will cause numbers to be stored as X.XXXXXX. Which means that
|
||||
// numbers will lose precision and numbers with fewer significant digits such as 0 will be stored as 0.000000
|
||||
// which causes tests to fail.
|
||||
row = append(row, strconv.FormatFloat(v, 'f', -1, 64))
|
||||
case float32:
|
||||
row = append(row, strconv.FormatFloat(float64(v), 'f', -1, 32))
|
||||
case []byte:
|
||||
row = append(row, string(v))
|
||||
case nil:
|
||||
row = append(row, "")
|
||||
default:
|
||||
row = append(row, fmt.Sprintf("%v", v))
|
||||
}
|
||||
}
|
||||
return row
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ func TestDb_Fetch(t *testing.T) {
|
|||
ret, err := db.Fetch(sql)
|
||||
assert.Nil(t, err)
|
||||
fmt.Printf("%v \n", ret)
|
||||
item := ret.Data[0].([]interface{})
|
||||
item := ret.Data[0]
|
||||
for _, i := range item {
|
||||
fmt.Printf("%v", i)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,61 @@
|
|||
package export
|
||||
|
||||
import "fmt"
|
||||
|
||||
type CSVExporter struct {
|
||||
mFetcher DataFetcher
|
||||
file FileAdapter
|
||||
count int
|
||||
last interface{}
|
||||
}
|
||||
|
||||
func NewCVSExporter(fetcher DataFetcher, file FileAdapter) DataExporter {
|
||||
return &CSVExporter{
|
||||
mFetcher: fetcher,
|
||||
file: file,
|
||||
}
|
||||
}
|
||||
|
||||
func (ee *CSVExporter) Fetcher(fetcher DataFetcher) {
|
||||
ee.mFetcher = fetcher
|
||||
}
|
||||
|
||||
func (ee *CSVExporter) File(file FileAdapter) {
|
||||
ee.file = file
|
||||
}
|
||||
|
||||
func (ee *CSVExporter) Export(sql, pk string) error {
|
||||
data, err := ee.mFetcher.Fetch(sql)
|
||||
if err != nil {
|
||||
return fmt.Errorf("数据获取错误:%w", err)
|
||||
}
|
||||
ee.count = len(data.Data)
|
||||
//fmt.Printf("Excel Exporter.Excel, got %v data\n", len(data))
|
||||
//ee.file.Open()
|
||||
ee.file.WriteTitle(data.Title)
|
||||
var last interface{}
|
||||
for _, val := range data.Data {
|
||||
last = val
|
||||
ee.file.Write(last)
|
||||
}
|
||||
|
||||
if row, ok := last.([]interface{}); ok {
|
||||
ee.last = row[ee.getPkIndex(data.Title, pk)]
|
||||
}
|
||||
|
||||
//ee.file.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ee *CSVExporter) Last() (int, interface{}) {
|
||||
return ee.count, ee.last
|
||||
}
|
||||
|
||||
func (ee *CSVExporter) getPkIndex(titles []string, pk string) int {
|
||||
for i, title := range titles {
|
||||
if title == pk {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
package export
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCSVExporter_Export(t *testing.T) {
|
||||
data := NewMysqlDataFetcher("aaa")
|
||||
pwd, _ := os.Getwd()
|
||||
file := NewCSV(pwd+"/aa-{begin}.csv", 5, map[string]string{"begin": "202301"})
|
||||
e := NewCVSExporter(data, file)
|
||||
file.Open()
|
||||
err := e.Export("aa", "字段1")
|
||||
file.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.FileExists(t, pwd+"/aa-202301_0.csv")
|
||||
assert.FileExists(t, pwd+"/aa-202301_1.csv")
|
||||
assert.NoFileExists(t, pwd+"/aa-202301_2.csv")
|
||||
|
||||
_ = os.Remove(pwd + "/aa-202301_0.csv")
|
||||
_ = os.Remove(pwd + "/aa-202301_1.csv")
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
package export
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCsv_Write(t *testing.T) {
|
||||
pwd, _ := os.Getwd()
|
||||
e := NewCSV(pwd+"/aa-{begin}.csv", 5, map[string]string{"begin": "202301"})
|
||||
|
||||
e.Open()
|
||||
|
||||
e.WriteTitle([]string{"姓名", "年龄"})
|
||||
data := make([]interface{}, 2)
|
||||
data[0] = "张三"
|
||||
for i := 0; i < 9; i++ {
|
||||
data[1] = 10 + i
|
||||
e.Write(data)
|
||||
}
|
||||
e.Close()
|
||||
|
||||
assert.FileExists(t, pwd+"/aa-202301_0.csv")
|
||||
assert.FileExists(t, pwd+"/aa-202301_1.csv")
|
||||
assert.NoFileExists(t, pwd+"/aa-202301_2.csv")
|
||||
|
||||
_ = os.Remove(pwd + "/aa-202301_0.csv")
|
||||
_ = os.Remove(pwd + "/aa-202301_1.csv")
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,127 @@
|
|||
package export
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/csv"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type CSV struct {
|
||||
fc *os.File
|
||||
csv *csv.Writer
|
||||
f *File
|
||||
isNew bool
|
||||
titles []string
|
||||
}
|
||||
|
||||
func NewCSV(fileName string, limit int, param map[string]string) *CSV {
|
||||
return &CSV{
|
||||
f: NewFile(fileName, limit, param),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *CSV) slice() {
|
||||
if e.f.slice() {
|
||||
e.reset()
|
||||
}
|
||||
}
|
||||
|
||||
func (e *CSV) SetParam(param map[string]string) {
|
||||
e.f.param = param
|
||||
}
|
||||
|
||||
func (e *CSV) reset() {
|
||||
e.save()
|
||||
e.f.NextFile()
|
||||
e.Open()
|
||||
e.WriteTitle(nil)
|
||||
e.slice()
|
||||
}
|
||||
|
||||
func (e *CSV) Open() error {
|
||||
var err error
|
||||
if e.f.IsFileExist() {
|
||||
e.fc, err = os.OpenFile(e.f.FileName(), os.O_APPEND|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.f.SetRow(e.getLineCount(e.fc))
|
||||
} else {
|
||||
e.isNew = true
|
||||
e.fc, err = os.Create(e.f.FileName())
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
e.csv = csv.NewWriter(e.fc)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *CSV) getLineCount(file io.Reader) (line int) {
|
||||
reader := bufio.NewReader(file)
|
||||
line = 0
|
||||
for {
|
||||
_, isPrefix, err := reader.ReadLine()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if !isPrefix {
|
||||
line++
|
||||
}
|
||||
}
|
||||
return line
|
||||
}
|
||||
|
||||
func (e *CSV) save() error {
|
||||
e.csv.Flush()
|
||||
e.fc.Close()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *CSV) WriteTitle(titles []string) error {
|
||||
|
||||
if titles != nil {
|
||||
e.titles = titles
|
||||
}
|
||||
|
||||
if e.titles != nil && e.isNew {
|
||||
e.Write(e.titles)
|
||||
e.isNew = false
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *CSV) Write(data interface{}) error {
|
||||
if e.f.slice() {
|
||||
e.reset()
|
||||
}
|
||||
|
||||
v := reflect.ValueOf(data)
|
||||
if v.Kind() == reflect.Ptr {
|
||||
v = v.Elem()
|
||||
}
|
||||
if v.Kind() != reflect.Slice {
|
||||
return errors.New("数据无效,不是切片类型")
|
||||
}
|
||||
|
||||
switch val := data.(type) {
|
||||
case []string:
|
||||
return e.csv.Write(val)
|
||||
case []interface{}:
|
||||
strs := make([]string, len(val))
|
||||
for i, v := range val {
|
||||
strs[i] = fmt.Sprintf("%v", v)
|
||||
}
|
||||
return e.csv.Write(strs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *CSV) Close() error {
|
||||
return e.save()
|
||||
}
|
||||
|
|
@ -1,6 +1,9 @@
|
|||
package export
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
)
|
||||
|
||||
type ExcelExporter struct {
|
||||
mFetcher DataFetcher
|
||||
|
|
@ -33,15 +36,13 @@ func (ee *ExcelExporter) Export(sql, pk string) error {
|
|||
//fmt.Printf("Excel Exporter.Excel, got %v data\n", len(data))
|
||||
//ee.file.Open()
|
||||
ee.file.WriteTitle(data.Title)
|
||||
var last interface{}
|
||||
var last []string
|
||||
for _, val := range data.Data {
|
||||
last = val
|
||||
ee.file.Write(last)
|
||||
}
|
||||
|
||||
if row, ok := last.([]interface{}); ok {
|
||||
ee.last = row[getPkIndex(data.Title, pk)]
|
||||
}
|
||||
ee.last = last[getPkIndex(data.Title, pk)]
|
||||
|
||||
//ee.file.Close()
|
||||
return nil
|
||||
|
|
@ -52,6 +53,10 @@ func (ee *ExcelExporter) Last() (int, interface{}) {
|
|||
}
|
||||
|
||||
func getPkIndex(titles []string, pk string) int {
|
||||
if pk == "" {
|
||||
log.Println("pk is not empty")
|
||||
return -1
|
||||
}
|
||||
for i, title := range titles {
|
||||
if title == pk {
|
||||
return i
|
||||
|
|
|
|||
|
|
@ -11,7 +11,9 @@ func TestExcelExporter_Export(t *testing.T) {
|
|||
pwd, _ := os.Getwd()
|
||||
file := NewExcel(pwd+"/aa-{begin}.xlsx", 5, map[string]string{"begin": "202301"})
|
||||
e := NewExcelExporter(data, file)
|
||||
err := e.Export("aa")
|
||||
file.Open()
|
||||
err := e.Export("aa", "字段1")
|
||||
file.Close()
|
||||
assert.Nil(t, err)
|
||||
|
||||
assert.FileExists(t, pwd+"/aa-202301_0.xlsx")
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ type (
|
|||
|
||||
Data struct {
|
||||
Title []string
|
||||
Data []interface{}
|
||||
Data [][]string
|
||||
}
|
||||
DataFetcher interface {
|
||||
Fetch(sql string) (*Data, error)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package export
|
|||
|
||||
import (
|
||||
"math/rand"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type MysqlDataFetcher struct {
|
||||
|
|
@ -9,9 +10,9 @@ type MysqlDataFetcher struct {
|
|||
}
|
||||
|
||||
func (mf *MysqlDataFetcher) Fetch(sql string) (*Data, error) {
|
||||
rows := make([]interface{}, 0, 6)
|
||||
rows := make([][]string, 0, 6)
|
||||
// 插入6个随机数组成的切片,模拟查询要返回的数据集
|
||||
rows = append(rows, rand.Perm(5), rand.Perm(5), rand.Perm(5), rand.Perm(5), rand.Perm(5), rand.Perm(5))
|
||||
rows = append(rows, row(), row(), row(), row(), row(), row())
|
||||
return &Data{
|
||||
Title: []string{"字段1", "字段2", "字段3", "字段4", "字段5"},
|
||||
Data: rows,
|
||||
|
|
@ -23,3 +24,12 @@ func NewMysqlDataFetcher(configStr string) DataFetcher {
|
|||
Config: configStr,
|
||||
}
|
||||
}
|
||||
|
||||
func row() []string {
|
||||
strs := make([]string, 5)
|
||||
nums := rand.Perm(5)
|
||||
for i, num := range nums {
|
||||
strs[i] = strconv.Itoa(num)
|
||||
}
|
||||
return strs
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,8 +9,18 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
func Export(conf *config.Config, sysName, jobName string, begin, end time.Time, batch int) error {
|
||||
job, dbStr, err := GetJob(conf, sysName, jobName)
|
||||
type Excel struct {
|
||||
conf *config.Config
|
||||
}
|
||||
|
||||
func NewExcel(conf *config.Config) *Excel {
|
||||
return &Excel{
|
||||
conf: conf,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Excel) Export(sysName, jobName string, begin, end time.Time, batch int) error {
|
||||
job, dbStr, err := config.GetJob(e.conf, sysName, jobName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -20,27 +30,14 @@ func Export(conf *config.Config, sysName, jobName string, begin, end time.Time,
|
|||
return err
|
||||
}
|
||||
|
||||
return JobHandler(job, d, map[string]interface{}{
|
||||
return e.JobHandler(job, d, map[string]interface{}{
|
||||
"begin": begin,
|
||||
"end": end,
|
||||
"last": 0,
|
||||
}, batch)
|
||||
}
|
||||
|
||||
func GetJob(conf *config.Config, sys, job string) (config.Job, string, error) {
|
||||
s, err := conf.GetSystem(sys)
|
||||
if err != nil {
|
||||
return config.Job{}, "", err
|
||||
}
|
||||
|
||||
j, err := s.GetJob(job)
|
||||
if err != nil {
|
||||
return config.Job{}, "", err
|
||||
}
|
||||
return j, s.Db, nil
|
||||
}
|
||||
|
||||
func JobHandler(job config.Job, d export.DataFetcher, params map[string]interface{}, batch int) error {
|
||||
func (e *Excel) JobHandler(job config.Job, d export.DataFetcher, params map[string]interface{}, batch int) error {
|
||||
|
||||
f := export.NewExcel(job.File, job.Size, map[string]string{
|
||||
"begin": params["begin"].(time.Time).Format("20060102"),
|
||||
|
|
@ -49,16 +46,17 @@ func JobHandler(job config.Job, d export.DataFetcher, params map[string]interfac
|
|||
|
||||
for i, task := range job.Tasks {
|
||||
fmt.Printf("执行导出任务:%d\n", i+1)
|
||||
if err := TaskExport(d, task, params, f, batch); err != nil {
|
||||
if err := e.TaskExport(d, task, params, f, batch); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, f export.FileAdapter, batch int) error {
|
||||
func (e *Excel) TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, f export.FileAdapter, batch int) error {
|
||||
//todo 最多分1000个批次进行处理
|
||||
f.Open()
|
||||
defer f.Close()
|
||||
for i := 0; i < 1000; i++ {
|
||||
sql := t.GetSql(params)
|
||||
|
||||
|
|
@ -75,6 +73,5 @@ func TaskExport(d export.DataFetcher, t config.Task, params map[string]interface
|
|||
params["last"] = last
|
||||
time.Sleep(time.Microsecond * 30)
|
||||
}
|
||||
f.Close()
|
||||
return nil
|
||||
}
|
||||
|
|
@ -86,7 +86,8 @@ func rootRun(cmd *cobra.Command, args []string) {
|
|||
CmdError(cmd, "无效的参数:%s", err.Error())
|
||||
}
|
||||
b := time.Now()
|
||||
Export(config.DefaultConfig, sName, jName, begin, end, batch)
|
||||
ee := NewExcel(config.DefaultConfig)
|
||||
ee.Export(sName, jName, begin, end, batch)
|
||||
e := time.Now()
|
||||
fmt.Println("耗时:" + e.Sub(b).String())
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue