package export import ( "fmt" "log" "sync" "time" ) type CsvExporter struct { mFetcher DataFetcher file FileAdapter count int last interface{} wg *sync.WaitGroup } func NewCsvExporter(fetcher DataFetcher, file FileAdapter) DataExporter { return &CsvExporter{ mFetcher: fetcher, file: file, } } func (ee *CsvExporter) Fetcher(fetcher DataFetcher) { ee.mFetcher = fetcher } func (ee *CsvExporter) File(file FileAdapter) { ee.file = file } func (ee *CsvExporter) WaitGroup(wg *sync.WaitGroup) { ee.wg = wg } func (ee *CsvExporter) Export(sql, pk string) error { begin := time.Now() data, err := ee.mFetcher.Fetch(sql) if err != nil { return fmt.Errorf("数据获取错误:%w", err) } log.Printf("数据获取耗时:%s \n", time.Now().Sub(begin).String()) ee.count = len(data.Data) if ee.count > 0 { //异步导出数据到csv文件中 go ee.exportToCsv(data) last := data.Data[ee.count-1] ee.last = last[ee.getPkIndex(data.Title, pk)] } return nil } func (ee *CsvExporter) exportToCsv(data *Data) { begin := time.Now() ee.file.Open() ee.file.WriteTitle(data.Title) for _, val := range data.Data { row := val ee.file.Write(row) } ee.file.Close() ee.wg.Done() end := time.Now() log.Printf("csv time:%s\n", end.Sub(begin).String()) } func (ee *CsvExporter) Last() (int, interface{}) { return ee.count, ee.last } func (ee *CsvExporter) getPkIndex(titles []string, pk string) int { for i, title := range titles { if title == pk { return i } } return -1 }