diff --git a/cmd/cmd/csv.go b/cmd/cmd/csv.go index c802cb0..c002486 100644 --- a/cmd/cmd/csv.go +++ b/cmd/cmd/csv.go @@ -6,9 +6,7 @@ import ( "excel_export/biz/db" "excel_export/biz/export" "fmt" - "github.com/xuri/excelize/v2" "io" - "log" "os" "strconv" "sync" @@ -48,14 +46,15 @@ func (e *Csv) JobHandler(job config.Job, d export.DataFetcher, params map[string for i, task := range job.Tasks { fmt.Printf("执行导出任务:%d\n", i+1) - if err := e.TaskExport(d, task, params, batch); err != nil { + params["task"] = i + if err := e.TaskExport(d, task, params, batch, job.GetFileName(params)); err != nil { return err } } return nil } -func (e *Csv) TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, batch int) error { +func (e *Csv) TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, batch int, fileName string) error { var i int var wg sync.WaitGroup for i = 0; i < 1000; i++ { @@ -89,7 +88,7 @@ func (e *Csv) TaskExport(d export.DataFetcher, t config.Task, params map[string] wg.Wait() fmt.Println("tempDir", e.dirTemp) //todo 合并csv文件,并删除 临时目录 - err := e.mergeCsvToExcel(e.dirTemp, i) + err := e.mergeCsvToExcel(e.dirTemp, i, fileName) fmt.Println(err) //重置临时路径 @@ -158,65 +157,10 @@ func (e *Csv) mergeCsv(path string, max int) error { return nil } -func (e *Csv) mergeCsvToExcel(path string, max int) error { - begin := time.Now() - f := excelize.NewFile() - defer func() { - log.Printf("mergeCsvToExcel:耗时 %s\n", time.Now().Sub(begin).String()) - if err := f.Close(); err != nil { - log.Println(err) - } - }() - - sheet, err := f.NewStreamWriter("Sheet1") - if err != nil { - log.Println(err) - return err - } - - var index int - for i := 0; i <= max; i++ { - filename := fmt.Sprintf("%s/data_%d_0.csv", path, i) - csvOpen, err := os.Open(filename) - - if err != nil { - return fmt.Errorf("打开读取文件%s失败:%w", filename, err) - } - csvReader := csv.NewReader(csvOpen) - - frist := true - for { - record, err := csvReader.Read() - if err == io.EOF { - break - } else if err != nil { - return fmt.Errorf("读取文件%s错误:%w", filename, err) - } - - //不是第一个文件时,跳过第一条数据 - if frist && i != 0 { - frist = false - continue - } - - index++ - - cell, _ := excelize.CoordinatesToCellName(1, index) - - rec := make([]interface{}, len(record)) - for i2, s := range record { - rec[i2] = s - } - - sheet.SetRow(cell, rec) - - } - csvOpen.Close() - } - - if err := sheet.Flush(); err != nil { - return err - } - - return f.SaveAs("ss.xlsx") +func (e *Csv) mergeCsvToExcel(path string, max int, out string) error { + m := NewMerge( + Reader{Path: path, Index: max}, + Writer{File: out, Limit: 500000}, + ) + return m.Merge() } diff --git a/cmd/cmd/merge.go b/cmd/cmd/merge.go index d3817fd..36bb17c 100644 --- a/cmd/cmd/merge.go +++ b/cmd/cmd/merge.go @@ -1,31 +1,97 @@ package cmd import ( + "encoding/csv" + "fmt" "github.com/xuri/excelize/v2" + "io" + "log" + "os" "regexp" "strconv" + "time" ) -type Merge struct { - fileName string - limit int - file *excelize.File - sw *excelize.StreamWriter +type ( + Reader struct { + Path string + Index int + } + Writer struct { + File string + Limit int + } + Merge struct { + reader Reader + writer Writer - titles []interface{} - fileIndex int - total int - rowIndex int -} + file *excelize.File + sw *excelize.StreamWriter -func NewMerge(filename string, limit int) *Merge { + titles []interface{} + fileIndex int + total int + rowIndex int + } +) + +func NewMerge(r Reader, w Writer) *Merge { m := &Merge{ - fileName: filename, - limit: limit, + reader: r, + writer: w, } m.open() return m } + +func (m *Merge) Merge() error { + begin := time.Now() + defer func() { + log.Printf("mergeCsvToExcel:耗时 %s\n", time.Now().Sub(begin).String()) + if err := m.Save(); err != nil { + log.Println(err) + } + }() + + for i := 0; i <= m.reader.Index; i++ { + filename := fmt.Sprintf("%s/data_%d_0.csv", m.reader.Path, i) + csvOpen, err := os.Open(filename) + + if err != nil { + return fmt.Errorf("打开读取文件%s失败:%w", filename, err) + } + csvReader := csv.NewReader(csvOpen) + + frist := true + for { + record, err := csvReader.Read() + if err == io.EOF { + break + } else if err != nil { + return fmt.Errorf("读取文件%s错误:%w", filename, err) + } + + row := transform(record) + + //不是第一个文件时,跳过第一条数据 + if frist { + frist = false + + if i == 0 { + m.WriteTitle(row) + } + continue + } + + m.Write(row) + + } + csvOpen.Close() + } + + return nil +} + func (m *Merge) WriteTitle(titles []interface{}) error { if titles != nil { m.titles = titles @@ -65,7 +131,7 @@ func (m *Merge) reset() (err error) { func (m *Merge) count() { m.total++ m.rowIndex++ - if m.rowIndex > m.limit { + if m.rowIndex > m.writer.Limit { m.reset() } } @@ -87,13 +153,14 @@ func (m *Merge) Save() error { return err } - return m.file.SaveAs(m.getFileName()) + return m.file.SaveAs(m.writer.GetFileName(m.fileIndex)) } -func (m *Merge) getFileName() string { +//GetFileName 获取文件名 +func (w *Writer) GetFileName(fileIndex int) string { ex := regexp.MustCompile("(\\..*)") - name := ex.ReplaceAllFunc([]byte(m.fileName), func(b []byte) []byte { - i := []byte("_" + strconv.Itoa(m.fileIndex)) + name := ex.ReplaceAllFunc([]byte(w.File), func(b []byte) []byte { + i := []byte("_" + strconv.Itoa(fileIndex)) ret := make([]byte, len(b)+len(i)) copy(ret, i) copy(ret[len(i):], b) @@ -101,3 +168,11 @@ func (m *Merge) getFileName() string { }) return string(name) } + +func transform(record []string) []interface{} { + result := make([]interface{}, len(record)) + for i2, s := range record { + result[i2] = s + } + return result +} diff --git a/cmd/cmd/merge_test.go b/cmd/cmd/merge_test.go index 87af7e6..2cb4798 100644 --- a/cmd/cmd/merge_test.go +++ b/cmd/cmd/merge_test.go @@ -1,9 +1,11 @@ package cmd -import "testing" +import ( + "testing" +) func TestMerge_Write(t *testing.T) { - m := NewMerge("xx.xlsx", 2) + m := NewMerge(Reader{}, Writer{"xx.xlsx", 2}) m.WriteTitle([]interface{}{"姓名", "年龄"}) m.Write([]interface{}{"张三", 12}) @@ -13,3 +15,7 @@ func TestMerge_Write(t *testing.T) { m.Save() } + +func TestMerge_Save(t *testing.T) { + // m := NewMerge(os.TempDir()+"/3299772411",500000) +}