diff --git a/biz/config/config.go b/biz/config/config.go index ea6635b..3c320a5 100644 --- a/biz/config/config.go +++ b/biz/config/config.go @@ -120,3 +120,16 @@ func LoadConfig(path string) *Config { DefaultConfig = &c return &c } + +func GetJob(conf *Config, sys, job string) (Job, string, error) { + s, err := conf.GetSystem(sys) + if err != nil { + return Job{}, "", err + } + + j, err := s.GetJob(job) + if err != nil { + return Job{}, "", err + } + return j, s.Db, nil +} diff --git a/biz/db/db.go b/biz/db/db.go index fcaa2f5..1e4ff6a 100644 --- a/biz/db/db.go +++ b/biz/db/db.go @@ -2,10 +2,14 @@ package db import ( "context" + "database/sql" "excel_export/biz/export" + "fmt" "gorm.io/driver/mysql" "gorm.io/gorm" "runtime/trace" + "strconv" + "time" ) var _ export.DataFetcher = new(Db) @@ -35,25 +39,76 @@ func (d *Db) Fetch(s string) (*export.Data, error) { return nil, err } defer rows.Close() - + //titles := make([]string, 0, 10) titles, err := rows.Columns() if err != nil { return nil, err } - var dd []map[string]interface{} - d.db.ScanRows(rows, &dd) - data := make([]interface{}, len(dd)) - for i, m := range dd { - row := make([]interface{}, 0, len(m)) - for _, title := range titles { - row = append(row, m[title]) - } - data[i] = interface{}(row) - } + data := getData(rows, d.db, titles) + + //vv := transform(titles, dd) + //fmt.Println(vv) + //f, err := os.Create("./ff.csv") + // + //w := csv.NewWriter(f) + //w.Write(titles) + //w.WriteAll(vv) + ////w.Flush() + //f.Close() return &export.Data{ Title: titles, Data: data, }, nil } + +func getData(rows *sql.Rows, db *gorm.DB, titles []string) [][]string { + result := make([][]string, 0, 10) + for rows.Next() { + var row map[string]interface{} + db.ScanRows(rows, &row) + result = append(result, transformRow(titles, row)) + } + return result +} + +func transform(titles []string, data []map[string]interface{}) [][]string { + result := make([][]string, len(data)) + for i, m := range data { + result[i] = transformRow(titles, m) + } + return result +} + +func transformRow(titles []string, data map[string]interface{}) []string { + row := make([]string, 0, len(data)) + for _, title := range titles { + col := data[title] + switch v := col.(type) { + case string: + row = append(row, v) + case time.Time: + row = append(row, v.Format("2006-01-02 15:04:05")) + case int, int8, int16, int32, int64: + row = append(row, fmt.Sprintf("%d", v)) + case float64: + // When formatting floats, do not use fmt.Sprintf("%v", n), this will cause numbers below 1e-4 to be printed in + // scientific notation. Scientific notation is not a valid way to store numbers in XML. + // Also not not use fmt.Sprintf("%f", n), this will cause numbers to be stored as X.XXXXXX. Which means that + // numbers will lose precision and numbers with fewer significant digits such as 0 will be stored as 0.000000 + // which causes tests to fail. + row = append(row, strconv.FormatFloat(v, 'f', -1, 64)) + case float32: + row = append(row, strconv.FormatFloat(float64(v), 'f', -1, 32)) + case []byte: + row = append(row, string(v)) + case nil: + row = append(row, "") + default: + row = append(row, fmt.Sprintf("%v", v)) + } + } + return row + +} diff --git a/biz/db/db_test.go b/biz/db/db_test.go index b029ff6..cd07e5e 100644 --- a/biz/db/db_test.go +++ b/biz/db/db_test.go @@ -20,7 +20,7 @@ func TestDb_Fetch(t *testing.T) { ret, err := db.Fetch(sql) assert.Nil(t, err) fmt.Printf("%v \n", ret) - item := ret.Data[0].([]interface{}) + item := ret.Data[0] for _, i := range item { fmt.Printf("%v", i) } diff --git a/biz/export/csv_exporter.go b/biz/export/csv_exporter.go new file mode 100644 index 0000000..2104515 --- /dev/null +++ b/biz/export/csv_exporter.go @@ -0,0 +1,61 @@ +package export + +import "fmt" + +type CSVExporter struct { + mFetcher DataFetcher + file FileAdapter + count int + last interface{} +} + +func NewCVSExporter(fetcher DataFetcher, file FileAdapter) DataExporter { + return &CSVExporter{ + mFetcher: fetcher, + file: file, + } +} + +func (ee *CSVExporter) Fetcher(fetcher DataFetcher) { + ee.mFetcher = fetcher +} + +func (ee *CSVExporter) File(file FileAdapter) { + ee.file = file +} + +func (ee *CSVExporter) Export(sql, pk string) error { + data, err := ee.mFetcher.Fetch(sql) + if err != nil { + return fmt.Errorf("数据获取错误:%w", err) + } + ee.count = len(data.Data) + //fmt.Printf("Excel Exporter.Excel, got %v data\n", len(data)) + //ee.file.Open() + ee.file.WriteTitle(data.Title) + var last interface{} + for _, val := range data.Data { + last = val + ee.file.Write(last) + } + + if row, ok := last.([]interface{}); ok { + ee.last = row[ee.getPkIndex(data.Title, pk)] + } + + //ee.file.Close() + return nil +} + +func (ee *CSVExporter) Last() (int, interface{}) { + return ee.count, ee.last +} + +func (ee *CSVExporter) getPkIndex(titles []string, pk string) int { + for i, title := range titles { + if title == pk { + return i + } + } + return -1 +} diff --git a/biz/export/csv_exporter_test.go b/biz/export/csv_exporter_test.go new file mode 100644 index 0000000..da38678 --- /dev/null +++ b/biz/export/csv_exporter_test.go @@ -0,0 +1,26 @@ +package export + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestCSVExporter_Export(t *testing.T) { + data := NewMysqlDataFetcher("aaa") + pwd, _ := os.Getwd() + file := NewCSV(pwd+"/aa-{begin}.csv", 5, map[string]string{"begin": "202301"}) + e := NewCVSExporter(data, file) + file.Open() + err := e.Export("aa", "字段1") + file.Close() + assert.Nil(t, err) + + assert.FileExists(t, pwd+"/aa-202301_0.csv") + assert.FileExists(t, pwd+"/aa-202301_1.csv") + assert.NoFileExists(t, pwd+"/aa-202301_2.csv") + + _ = os.Remove(pwd + "/aa-202301_0.csv") + _ = os.Remove(pwd + "/aa-202301_1.csv") + +} diff --git a/biz/export/csv_test.go b/biz/export/csv_test.go new file mode 100644 index 0000000..9bf882e --- /dev/null +++ b/biz/export/csv_test.go @@ -0,0 +1,31 @@ +package export + +import ( + "github.com/stretchr/testify/assert" + "os" + "testing" +) + +func TestCsv_Write(t *testing.T) { + pwd, _ := os.Getwd() + e := NewCSV(pwd+"/aa-{begin}.csv", 5, map[string]string{"begin": "202301"}) + + e.Open() + + e.WriteTitle([]string{"姓名", "年龄"}) + data := make([]interface{}, 2) + data[0] = "张三" + for i := 0; i < 9; i++ { + data[1] = 10 + i + e.Write(data) + } + e.Close() + + assert.FileExists(t, pwd+"/aa-202301_0.csv") + assert.FileExists(t, pwd+"/aa-202301_1.csv") + assert.NoFileExists(t, pwd+"/aa-202301_2.csv") + + _ = os.Remove(pwd + "/aa-202301_0.csv") + _ = os.Remove(pwd + "/aa-202301_1.csv") + +} diff --git a/biz/export/cvs.go b/biz/export/cvs.go new file mode 100644 index 0000000..5e45509 --- /dev/null +++ b/biz/export/cvs.go @@ -0,0 +1,127 @@ +package export + +import ( + "bufio" + "encoding/csv" + "errors" + "fmt" + "io" + "os" + "reflect" +) + +type CSV struct { + fc *os.File + csv *csv.Writer + f *File + isNew bool + titles []string +} + +func NewCSV(fileName string, limit int, param map[string]string) *CSV { + return &CSV{ + f: NewFile(fileName, limit, param), + } +} + +func (e *CSV) slice() { + if e.f.slice() { + e.reset() + } +} + +func (e *CSV) SetParam(param map[string]string) { + e.f.param = param +} + +func (e *CSV) reset() { + e.save() + e.f.NextFile() + e.Open() + e.WriteTitle(nil) + e.slice() +} + +func (e *CSV) Open() error { + var err error + if e.f.IsFileExist() { + e.fc, err = os.OpenFile(e.f.FileName(), os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return err + } + e.f.SetRow(e.getLineCount(e.fc)) + } else { + e.isNew = true + e.fc, err = os.Create(e.f.FileName()) + } + + if err == nil { + e.csv = csv.NewWriter(e.fc) + } + + return nil +} + +func (e *CSV) getLineCount(file io.Reader) (line int) { + reader := bufio.NewReader(file) + line = 0 + for { + _, isPrefix, err := reader.ReadLine() + if err != nil { + break + } + if !isPrefix { + line++ + } + } + return line +} + +func (e *CSV) save() error { + e.csv.Flush() + e.fc.Close() + return nil +} + +func (e *CSV) WriteTitle(titles []string) error { + + if titles != nil { + e.titles = titles + } + + if e.titles != nil && e.isNew { + e.Write(e.titles) + e.isNew = false + } + return nil +} + +func (e *CSV) Write(data interface{}) error { + if e.f.slice() { + e.reset() + } + + v := reflect.ValueOf(data) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + if v.Kind() != reflect.Slice { + return errors.New("数据无效,不是切片类型") + } + + switch val := data.(type) { + case []string: + return e.csv.Write(val) + case []interface{}: + strs := make([]string, len(val)) + for i, v := range val { + strs[i] = fmt.Sprintf("%v", v) + } + return e.csv.Write(strs) + } + return nil +} + +func (e *CSV) Close() error { + return e.save() +} diff --git a/biz/export/excel_exporter.go b/biz/export/excel_exporter.go index 4ce9fae..d4924c0 100644 --- a/biz/export/excel_exporter.go +++ b/biz/export/excel_exporter.go @@ -1,6 +1,9 @@ package export -import "fmt" +import ( + "fmt" + "log" +) type ExcelExporter struct { mFetcher DataFetcher @@ -33,15 +36,13 @@ func (ee *ExcelExporter) Export(sql, pk string) error { //fmt.Printf("Excel Exporter.Excel, got %v data\n", len(data)) //ee.file.Open() ee.file.WriteTitle(data.Title) - var last interface{} + var last []string for _, val := range data.Data { last = val ee.file.Write(last) } - if row, ok := last.([]interface{}); ok { - ee.last = row[getPkIndex(data.Title, pk)] - } + ee.last = last[getPkIndex(data.Title, pk)] //ee.file.Close() return nil @@ -52,6 +53,10 @@ func (ee *ExcelExporter) Last() (int, interface{}) { } func getPkIndex(titles []string, pk string) int { + if pk == "" { + log.Println("pk is not empty") + return -1 + } for i, title := range titles { if title == pk { return i diff --git a/biz/export/excel_exporter_test.go b/biz/export/excel_exporter_test.go index 7e57b28..2c464b7 100644 --- a/biz/export/excel_exporter_test.go +++ b/biz/export/excel_exporter_test.go @@ -11,7 +11,9 @@ func TestExcelExporter_Export(t *testing.T) { pwd, _ := os.Getwd() file := NewExcel(pwd+"/aa-{begin}.xlsx", 5, map[string]string{"begin": "202301"}) e := NewExcelExporter(data, file) - err := e.Export("aa") + file.Open() + err := e.Export("aa", "字段1") + file.Close() assert.Nil(t, err) assert.FileExists(t, pwd+"/aa-202301_0.xlsx") diff --git a/biz/export/export.go b/biz/export/export.go index 1e4896a..ee971d7 100644 --- a/biz/export/export.go +++ b/biz/export/export.go @@ -10,7 +10,7 @@ type ( Data struct { Title []string - Data []interface{} + Data [][]string } DataFetcher interface { Fetch(sql string) (*Data, error) diff --git a/biz/export/mysql_data_fetcher.go b/biz/export/mysql_data_fetcher.go index 40ea077..d8e48f3 100644 --- a/biz/export/mysql_data_fetcher.go +++ b/biz/export/mysql_data_fetcher.go @@ -2,6 +2,7 @@ package export import ( "math/rand" + "strconv" ) type MysqlDataFetcher struct { @@ -9,9 +10,9 @@ type MysqlDataFetcher struct { } func (mf *MysqlDataFetcher) Fetch(sql string) (*Data, error) { - rows := make([]interface{}, 0, 6) + rows := make([][]string, 0, 6) // 插入6个随机数组成的切片,模拟查询要返回的数据集 - rows = append(rows, rand.Perm(5), rand.Perm(5), rand.Perm(5), rand.Perm(5), rand.Perm(5), rand.Perm(5)) + rows = append(rows, row(), row(), row(), row(), row(), row()) return &Data{ Title: []string{"字段1", "字段2", "字段3", "字段4", "字段5"}, Data: rows, @@ -23,3 +24,12 @@ func NewMysqlDataFetcher(configStr string) DataFetcher { Config: configStr, } } + +func row() []string { + strs := make([]string, 5) + nums := rand.Perm(5) + for i, num := range nums { + strs[i] = strconv.Itoa(num) + } + return strs +} diff --git a/cmd/cmd/export.go b/cmd/cmd/excel.go similarity index 56% rename from cmd/cmd/export.go rename to cmd/cmd/excel.go index 4c7f389..e0d2a56 100644 --- a/cmd/cmd/export.go +++ b/cmd/cmd/excel.go @@ -9,8 +9,18 @@ import ( "time" ) -func Export(conf *config.Config, sysName, jobName string, begin, end time.Time, batch int) error { - job, dbStr, err := GetJob(conf, sysName, jobName) +type Excel struct { + conf *config.Config +} + +func NewExcel(conf *config.Config) *Excel { + return &Excel{ + conf: conf, + } +} + +func (e *Excel) Export(sysName, jobName string, begin, end time.Time, batch int) error { + job, dbStr, err := config.GetJob(e.conf, sysName, jobName) if err != nil { return err } @@ -20,27 +30,14 @@ func Export(conf *config.Config, sysName, jobName string, begin, end time.Time, return err } - return JobHandler(job, d, map[string]interface{}{ + return e.JobHandler(job, d, map[string]interface{}{ "begin": begin, "end": end, "last": 0, }, batch) } -func GetJob(conf *config.Config, sys, job string) (config.Job, string, error) { - s, err := conf.GetSystem(sys) - if err != nil { - return config.Job{}, "", err - } - - j, err := s.GetJob(job) - if err != nil { - return config.Job{}, "", err - } - return j, s.Db, nil -} - -func JobHandler(job config.Job, d export.DataFetcher, params map[string]interface{}, batch int) error { +func (e *Excel) JobHandler(job config.Job, d export.DataFetcher, params map[string]interface{}, batch int) error { f := export.NewExcel(job.File, job.Size, map[string]string{ "begin": params["begin"].(time.Time).Format("20060102"), @@ -49,16 +46,17 @@ func JobHandler(job config.Job, d export.DataFetcher, params map[string]interfac for i, task := range job.Tasks { fmt.Printf("执行导出任务:%d\n", i+1) - if err := TaskExport(d, task, params, f, batch); err != nil { + if err := e.TaskExport(d, task, params, f, batch); err != nil { return err } } return nil } -func TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, f export.FileAdapter, batch int) error { +func (e *Excel) TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, f export.FileAdapter, batch int) error { //todo 最多分1000个批次进行处理 f.Open() + defer f.Close() for i := 0; i < 1000; i++ { sql := t.GetSql(params) @@ -75,6 +73,5 @@ func TaskExport(d export.DataFetcher, t config.Task, params map[string]interface params["last"] = last time.Sleep(time.Microsecond * 30) } - f.Close() return nil } diff --git a/cmd/cmd/root.go b/cmd/cmd/root.go index 9bee009..4698946 100644 --- a/cmd/cmd/root.go +++ b/cmd/cmd/root.go @@ -86,7 +86,8 @@ func rootRun(cmd *cobra.Command, args []string) { CmdError(cmd, "无效的参数:%s", err.Error()) } b := time.Now() - Export(config.DefaultConfig, sName, jName, begin, end, batch) + ee := NewExcel(config.DefaultConfig) + ee.Export(sName, jName, begin, end, batch) e := time.Now() fmt.Println("耗时:" + e.Sub(b).String())