csv文件导出
This commit is contained in:
parent
79ec730424
commit
e0793c7773
|
|
@ -0,0 +1,220 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"encoding/csv"
|
||||
"excel_export/biz/config"
|
||||
"excel_export/biz/db"
|
||||
"excel_export/biz/export"
|
||||
"fmt"
|
||||
"github.com/xuri/excelize/v2"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Csv struct {
|
||||
conf *config.Config
|
||||
dirTemp string
|
||||
}
|
||||
|
||||
func NewCsv(conf *config.Config) *Csv {
|
||||
return &Csv{
|
||||
conf: conf,
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Csv) Export(sysName, jobName string, begin, end time.Time, batch int) error {
|
||||
job, dbStr, err := config.GetJob(e.conf, sysName, jobName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
d, err := db.NewDb(dbStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.JobHandler(job, d, map[string]interface{}{
|
||||
"begin": begin,
|
||||
"end": end,
|
||||
"last": 0,
|
||||
}, batch)
|
||||
}
|
||||
|
||||
func (e *Csv) JobHandler(job config.Job, d export.DataFetcher, params map[string]interface{}, batch int) error {
|
||||
|
||||
for i, task := range job.Tasks {
|
||||
fmt.Printf("执行导出任务:%d\n", i+1)
|
||||
if err := e.TaskExport(d, task, params, batch); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Csv) TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, batch int) error {
|
||||
var i int
|
||||
var wg sync.WaitGroup
|
||||
for i = 0; i < 1000; i++ {
|
||||
|
||||
f, err := e.getCsvFile(i)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sql := t.GetSql(params)
|
||||
|
||||
e := export.NewCsvExporter(d, f)
|
||||
e.(*export.CsvExporter).WaitGroup(&wg)
|
||||
wg.Add(1)
|
||||
e.Export(sql+" limit "+strconv.Itoa(batch), t.PK)
|
||||
|
||||
count, last := e.Last()
|
||||
fmt.Printf("已导出 %d 条数据\n", batch*i+count)
|
||||
|
||||
if count == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if count < batch {
|
||||
break
|
||||
}
|
||||
|
||||
params["last"] = last
|
||||
time.Sleep(time.Microsecond * 30)
|
||||
}
|
||||
wg.Wait()
|
||||
fmt.Println("tempDir", e.dirTemp)
|
||||
//todo 合并csv文件,并删除 临时目录
|
||||
err := e.mergeCsvToExcel(e.dirTemp, i)
|
||||
fmt.Println(err)
|
||||
|
||||
//重置临时路径
|
||||
e.dirTemp = ""
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Csv) getCsvFile(index int) (*export.Csv, error) {
|
||||
|
||||
if e.dirTemp == "" {
|
||||
path, err := os.MkdirTemp(os.TempDir(), "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
e.dirTemp = path
|
||||
}
|
||||
|
||||
filename := e.dirTemp + "/data_{index}.csv"
|
||||
|
||||
f := export.NewCsv(filename, map[string]string{
|
||||
"index": strconv.Itoa(index),
|
||||
})
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (e *Csv) mergeCsv(path string, max int) error {
|
||||
csvFile, err := os.Create("aa.csv")
|
||||
if err != nil {
|
||||
return fmt.Errorf("打开写入文件失败:%w", err)
|
||||
}
|
||||
defer csvFile.Close()
|
||||
csvWriter := csv.NewWriter(csvFile)
|
||||
|
||||
for i := 0; i <= max; i++ {
|
||||
filename := fmt.Sprintf("%s/data_%d_0.csv", path, i)
|
||||
csvOpen, err := os.Open(filename)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("打开读取文件%s失败:%w", filename, err)
|
||||
}
|
||||
csvReader := csv.NewReader(csvOpen)
|
||||
var index int
|
||||
for {
|
||||
record, err := csvReader.Read()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("读取文件%s错误:%w", filename, err)
|
||||
}
|
||||
|
||||
index++
|
||||
//不是第一个文件时,跳过第一条数据
|
||||
if index == 1 && i != 0 {
|
||||
//continue
|
||||
}
|
||||
fmt.Println(record)
|
||||
csvWriter.Write(record)
|
||||
|
||||
}
|
||||
|
||||
csvOpen.Close()
|
||||
}
|
||||
csvWriter.Flush()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Csv) mergeCsvToExcel(path string, max int) error {
|
||||
f := excelize.NewFile()
|
||||
defer func() {
|
||||
if err := f.Close(); err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}()
|
||||
|
||||
sheet, err := f.NewStreamWriter("Sheet1")
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return err
|
||||
}
|
||||
|
||||
var index int
|
||||
for i := 0; i <= max; i++ {
|
||||
filename := fmt.Sprintf("%s/data_%d_0.csv", path, i)
|
||||
csvOpen, err := os.Open(filename)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("打开读取文件%s失败:%w", filename, err)
|
||||
}
|
||||
csvReader := csv.NewReader(csvOpen)
|
||||
|
||||
frist := true
|
||||
for {
|
||||
record, err := csvReader.Read()
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("读取文件%s错误:%w", filename, err)
|
||||
}
|
||||
|
||||
//不是第一个文件时,跳过第一条数据
|
||||
if frist && i != 0 {
|
||||
frist = false
|
||||
continue
|
||||
}
|
||||
|
||||
index++
|
||||
|
||||
cell, _ := excelize.CoordinatesToCellName(1, index)
|
||||
|
||||
rec := make([]interface{}, len(record))
|
||||
for i2, s := range record {
|
||||
rec[i2] = s
|
||||
}
|
||||
|
||||
sheet.SetRow(cell, rec)
|
||||
|
||||
}
|
||||
csvOpen.Close()
|
||||
}
|
||||
|
||||
if err := sheet.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return f.SaveAs("ss.xlsx")
|
||||
}
|
||||
|
|
@ -0,0 +1,24 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCsv_getCsvFile(t *testing.T) {
|
||||
|
||||
c := &Csv{}
|
||||
f, err := c.getCsvFile(1)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, f)
|
||||
f.Open()
|
||||
f.Close()
|
||||
fmt.Println(c.dirTemp)
|
||||
|
||||
assert.DirExists(t, c.dirTemp)
|
||||
os.RemoveAll(c.dirTemp)
|
||||
assert.NoDirExists(t, c.dirTemp)
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue