调整数据写入方式

This commit is contained in:
Mr.Li 2023-04-20 15:07:46 +08:00
parent 84b81fb98c
commit 79ec730424
5 changed files with 56 additions and 36 deletions

View File

@ -1,57 +1,68 @@
package export package export
import "fmt" import (
"fmt"
"sync"
)
type CSVExporter struct { type CsvExporter struct {
mFetcher DataFetcher mFetcher DataFetcher
file FileAdapter file FileAdapter
count int count int
last interface{} last interface{}
wg *sync.WaitGroup
} }
func NewCVSExporter(fetcher DataFetcher, file FileAdapter) DataExporter { func NewCsvExporter(fetcher DataFetcher, file FileAdapter) DataExporter {
return &CSVExporter{ return &CsvExporter{
mFetcher: fetcher, mFetcher: fetcher,
file: file, file: file,
} }
} }
func (ee *CSVExporter) Fetcher(fetcher DataFetcher) { func (ee *CsvExporter) Fetcher(fetcher DataFetcher) {
ee.mFetcher = fetcher ee.mFetcher = fetcher
} }
func (ee *CSVExporter) File(file FileAdapter) { func (ee *CsvExporter) File(file FileAdapter) {
ee.file = file ee.file = file
} }
func (ee *CSVExporter) Export(sql, pk string) error { func (ee *CsvExporter) WaitGroup(wg *sync.WaitGroup) {
ee.wg = wg
}
func (ee *CsvExporter) Export(sql, pk string) error {
data, err := ee.mFetcher.Fetch(sql) data, err := ee.mFetcher.Fetch(sql)
if err != nil { if err != nil {
return fmt.Errorf("数据获取错误:%w", err) return fmt.Errorf("数据获取错误:%w", err)
} }
ee.count = len(data.Data) ee.count = len(data.Data)
//fmt.Printf("Excel Exporter.Excel, got %v data\n", len(data)) //fmt.Printf("Excel Exporter.Excel, got %v data\n", len(data))
//ee.file.Open() go func() {
ee.file.Open()
ee.file.WriteTitle(data.Title) ee.file.WriteTitle(data.Title)
var last interface{}
for _, val := range data.Data { for _, val := range data.Data {
last = val row := val
ee.file.Write(last) ee.file.Write(row)
} }
ee.file.Close()
ee.wg.Done()
}()
if row, ok := last.([]interface{}); ok { last := data.Data[ee.count-1]
ee.last = row[ee.getPkIndex(data.Title, pk)] ee.last = last[ee.getPkIndex(data.Title, pk)]
}
//ee.file.Close()
return nil return nil
} }
func (ee *CSVExporter) Last() (int, interface{}) { func (ee *CsvExporter) Last() (int, interface{}) {
return ee.count, ee.last return ee.count, ee.last
} }
func (ee *CSVExporter) getPkIndex(titles []string, pk string) int { func (ee *CsvExporter) getPkIndex(titles []string, pk string) int {
for i, title := range titles { for i, title := range titles {
if title == pk { if title == pk {
return i return i

View File

@ -10,7 +10,7 @@ import (
"reflect" "reflect"
) )
type CSV struct { type Csv struct {
fc *os.File fc *os.File
csv *csv.Writer csv *csv.Writer
f *File f *File
@ -18,23 +18,23 @@ type CSV struct {
titles []string titles []string
} }
func NewCSV(fileName string, limit int, param map[string]string) *CSV { func NewCsv(fileName string, param map[string]string) *Csv {
return &CSV{ return &Csv{
f: NewFile(fileName, limit, param), f: NewFile(fileName, 100000000, param),
} }
} }
func (e *CSV) slice() { func (e *Csv) slice() {
if e.f.slice() { if e.f.slice() {
e.reset() e.reset()
} }
} }
func (e *CSV) SetParam(param map[string]string) { func (e *Csv) SetParam(param map[string]string) {
e.f.param = param e.f.param = param
} }
func (e *CSV) reset() { func (e *Csv) reset() {
e.save() e.save()
e.f.NextFile() e.f.NextFile()
e.Open() e.Open()
@ -42,7 +42,7 @@ func (e *CSV) reset() {
e.slice() e.slice()
} }
func (e *CSV) Open() error { func (e *Csv) Open() error {
var err error var err error
if e.f.IsFileExist() { if e.f.IsFileExist() {
e.fc, err = os.OpenFile(e.f.FileName(), os.O_APPEND|os.O_WRONLY, 0644) e.fc, err = os.OpenFile(e.f.FileName(), os.O_APPEND|os.O_WRONLY, 0644)
@ -62,7 +62,7 @@ func (e *CSV) Open() error {
return nil return nil
} }
func (e *CSV) getLineCount(file io.Reader) (line int) { func (e *Csv) getLineCount(file io.Reader) (line int) {
reader := bufio.NewReader(file) reader := bufio.NewReader(file)
line = 0 line = 0
for { for {
@ -77,13 +77,13 @@ func (e *CSV) getLineCount(file io.Reader) (line int) {
return line return line
} }
func (e *CSV) save() error { func (e *Csv) save() error {
e.csv.Flush() e.csv.Flush()
e.fc.Close() e.fc.Close()
return nil return nil
} }
func (e *CSV) WriteTitle(titles []string) error { func (e *Csv) WriteTitle(titles []string) error {
if titles != nil { if titles != nil {
e.titles = titles e.titles = titles
@ -96,7 +96,7 @@ func (e *CSV) WriteTitle(titles []string) error {
return nil return nil
} }
func (e *CSV) Write(data interface{}) error { func (e *Csv) Write(data interface{}) error {
if e.f.slice() { if e.f.slice() {
e.reset() e.reset()
} }
@ -122,6 +122,6 @@ func (e *CSV) Write(data interface{}) error {
return nil return nil
} }
func (e *CSV) Close() error { func (e *Csv) Close() error {
return e.save() return e.save()
} }

View File

@ -55,6 +55,7 @@ func (e *Excel) JobHandler(job config.Job, d export.DataFetcher, params map[stri
func (e *Excel) TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, f export.FileAdapter, batch int) error { func (e *Excel) TaskExport(d export.DataFetcher, t config.Task, params map[string]interface{}, f export.FileAdapter, batch int) error {
//todo 最多分1000个批次进行处理 //todo 最多分1000个批次进行处理
f.Open() f.Open()
defer f.Close() defer f.Close()
for i := 0; i < 1000; i++ { for i := 0; i < 1000; i++ {

View File

@ -86,7 +86,7 @@ func rootRun(cmd *cobra.Command, args []string) {
CmdError(cmd, "无效的参数:%s", err.Error()) CmdError(cmd, "无效的参数:%s", err.Error())
} }
b := time.Now() b := time.Now()
ee := NewExcel(config.DefaultConfig) ee := NewCsv(config.DefaultConfig)
ee.Export(sName, jName, begin, end, batch) ee.Export(sName, jName, begin, end, batch)
e := time.Now() e := time.Now()
fmt.Println("耗时:" + e.Sub(b).String()) fmt.Println("耗时:" + e.Sub(b).String())

12
go.mod
View File

@ -31,9 +31,12 @@ require (
github.com/kr/text v0.2.0 // indirect github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect github.com/magiconair/properties v1.8.7 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/pelletier/go-toml/v2 v2.0.6 // indirect github.com/pelletier/go-toml/v2 v2.0.6 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/richardlehane/mscfb v1.0.4 // indirect
github.com/richardlehane/msoleps v1.0.3 // indirect
github.com/rogpeppe/fastuuid v1.2.0 // indirect github.com/rogpeppe/fastuuid v1.2.0 // indirect
github.com/rogpeppe/go-internal v1.6.1 // indirect github.com/rogpeppe/go-internal v1.6.1 // indirect
github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa // indirect github.com/shabbyrobe/xmlwriter v0.0.0-20200208144257-9fca06d00ffa // indirect
@ -41,8 +44,13 @@ require (
github.com/spf13/cast v1.5.0 // indirect github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/subosito/gotenv v1.4.2 // indirect github.com/subosito/gotenv v1.4.2 // indirect
golang.org/x/sys v0.3.0 // indirect github.com/xuri/efp v0.0.0-20220603152613-6918739fd470 // indirect
golang.org/x/text v0.5.0 // indirect github.com/xuri/excelize/v2 v2.7.1 // indirect
github.com/xuri/nfp v0.0.0-20220409054826-5e722a1d9e22 // indirect
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
) )