fix:调整导出数据拆分模式,按照时间进行分片

This commit is contained in:
Mr.Li 2023-05-05 22:05:00 +08:00
parent 39a0adb091
commit bcc66bad0a
7 changed files with 51 additions and 27 deletions

View File

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"gorm.io/driver/mysql" "gorm.io/driver/mysql"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/logger"
"runtime/trace" "runtime/trace"
"strconv" "strconv"
"time" "time"
@ -23,13 +22,13 @@ func NewDb(str string) (*Db, error) {
db, err := gorm.Open( db, err := gorm.Open(
mysql.Open(str+""), mysql.Open(str+""),
&gorm.Config{ &gorm.Config{
Logger: logger.Discard, //Logger: logger.Discard,
}, },
) )
if err != nil { if err != nil {
return nil, err return nil, err
} }
//db = db.Debug() db = db.Debug()
return &Db{ return &Db{
db: db, db: db,
}, nil }, nil

View File

@ -11,7 +11,6 @@ type CsvExporter struct {
mFetcher DataFetcher mFetcher DataFetcher
file FileAdapter file FileAdapter
count int count int
last interface{}
wg *sync.WaitGroup wg *sync.WaitGroup
} }
@ -50,8 +49,8 @@ func (ee *CsvExporter) Export(sql, pk string) error {
if ee.count > 0 { if ee.count > 0 {
//异步导出数据到csv文件中 //异步导出数据到csv文件中
go ee.exportToCsv(data) go ee.exportToCsv(data)
last := data.Data[ee.count-1] } else {
ee.last = last[ee.getPkIndex(data.Title, pk)] ee.wg.Done()
} }
return nil return nil
} }
@ -73,8 +72,8 @@ func (ee *CsvExporter) exportToCsv(data *Data) {
} }
} }
func (ee *CsvExporter) Last() (int, interface{}) { func (ee *CsvExporter) Count() int {
return ee.count, ee.last return ee.count
} }
func (ee *CsvExporter) getPkIndex(titles []string, pk string) int { func (ee *CsvExporter) getPkIndex(titles []string, pk string) int {

View File

@ -5,7 +5,7 @@ type (
Fetcher(fetcher DataFetcher) Fetcher(fetcher DataFetcher)
File(file FileAdapter) File(file FileAdapter)
Export(sql, pk string) error Export(sql, pk string) error
Last() (int, interface{}) Count() int
} }
Data struct { Data struct {

View File

@ -37,7 +37,6 @@ func (e *Csv) Export(sysName, jobName string, begin, end time.Time, batch int) e
return e.JobHandler(job, d, map[string]interface{}{ return e.JobHandler(job, d, map[string]interface{}{
"begin": begin, "begin": begin,
"end": end, "end": end,
"last": 0,
}, batch) }, batch)
} }
@ -56,42 +55,55 @@ func (e *Csv) JobHandler(job config.Job, d export.DataFetcher, params map[string
func (e *Csv) TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, batch int, fileName string) error { func (e *Csv) TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, batch int, fileName string) error {
var i int var i int
var wg sync.WaitGroup var wg sync.WaitGroup
for i = 0; i < 1000; i++ { var total int
beginTime := params["begin"].(time.Time)
lastTime := params["end"].(time.Time)
over := false
for i = 0; i < 10000; i++ {
endTime := beginTime.Add(2 * time.Hour)
//结束时间大于最后时间
if endTime.After(lastTime) {
endTime = lastTime
over = true
}
f, err := e.getCsvFile(i) f, err := e.getCsvFile(i)
if err != nil { if err != nil {
return err return err
} }
params["begin"] = beginTime
params["end"] = endTime
sql := t.GetSql(params) sql := t.GetSql(params)
e := export.NewCsvExporter(d, f) e := export.NewCsvExporter(d, f)
e.(*export.CsvExporter).WaitGroup(&wg) e.(*export.CsvExporter).WaitGroup(&wg)
wg.Add(1) wg.Add(1)
e.Export(sql+" limit "+strconv.Itoa(batch), t.PK) e.Export(sql, t.PK)
count, last := e.Last() count := e.Count()
fmt.Printf("已导出 %d 条数据\n", batch*i+count) fmt.Printf("已导出 %d 条数据\n", batch*i+count)
if count == 0 { total = total + count
return nil
}
if count < batch { if over {
if count == 0 {
i = i - 1
}
break break
} }
beginTime = endTime
params["last"] = last
time.Sleep(time.Microsecond * 30) time.Sleep(time.Microsecond * 30)
} }
wg.Wait() wg.Wait()
//fmt.Println("tempDir", e.dirTemp) //fmt.Println("tempDir", e.dirTemp)
//todo 合并csv文件并删除 临时目录 if total > 0 { //查询到数据
if err := e.mergeCsvToExcel(e.dirTemp, i, fileName); err != nil { //合并csv文件并删除 临时目录
log.Printf("合并csv文件异常%s", err.Error()) if err := e.mergeCsvToExcel(e.dirTemp, i, fileName); err != nil {
return err log.Printf("合并csv文件异常%s", err.Error())
return err
}
} }
//重置临时路径 //重置临时路径
e.dirTemp = "" e.dirTemp = ""
@ -126,5 +138,6 @@ func (e *Csv) mergeCsvToExcel(path string, max int, out string) error {
return err return err
} }
return m.Clear() return nil
//return m.Clear()
} }

View File

@ -44,6 +44,8 @@ func exportAllJobRun(cmd *cobra.Command, args []string) {
CmdError(cmd, "无效的参数:%s", err.Error()) CmdError(cmd, "无效的参数:%s", err.Error())
} }
allBegin := time.Now()
for _, job := range sys.Jobs { for _, job := range sys.Jobs {
log.Printf("执行【%s】【%s】导出\n", sName, job.Name) log.Printf("执行【%s】【%s】导出\n", sName, job.Name)
b := time.Now() b := time.Now()
@ -53,4 +55,6 @@ func exportAllJobRun(cmd *cobra.Command, args []string) {
} }
log.Println("导出耗时:" + time.Now().Sub(b).String()) log.Println("导出耗时:" + time.Now().Sub(b).String())
} }
log.Println("总耗时:" + time.Now().Sub(allBegin).String())
} }

View File

@ -35,6 +35,7 @@ func exportAllRun(cmd *cobra.Command, args []string) {
if err != nil { if err != nil {
CmdError(cmd, "无效的参数:%s", err.Error()) CmdError(cmd, "无效的参数:%s", err.Error())
} }
allBegin := time.Now()
for _, sys := range c.Systems { for _, sys := range c.Systems {
for _, job := range sys.Jobs { for _, job := range sys.Jobs {
log.Printf("执行【%s】【%s】导出\n", sys.Name, job.Name) log.Printf("执行【%s】【%s】导出\n", sys.Name, job.Name)
@ -46,5 +47,6 @@ func exportAllRun(cmd *cobra.Command, args []string) {
log.Println("导出耗时:" + time.Now().Sub(b).String()) log.Println("导出耗时:" + time.Now().Sub(b).String())
} }
} }
log.Println("总耗时:" + time.Now().Sub(allBegin).String())
} }

View File

@ -1,6 +1,8 @@
package cmd package cmd
import ( import (
"github.com/stretchr/testify/assert"
"os"
"testing" "testing"
) )
@ -17,5 +19,10 @@ func TestMerge_Write(t *testing.T) {
} }
func TestMerge_Save(t *testing.T) { func TestMerge_Save(t *testing.T) {
// m := NewMerge(os.TempDir()+"/3299772411",500000) m := NewMerge(
Reader{Path: os.TempDir() + "/3299772411", Index: 102},
Writer{File: "sss.xlsx", Limit: 1000000},
)
err := m.Merge()
assert.NoError(t, err)
} }