l-export-async/export_async.go

465 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package l_export_async
import (
"archive/zip"
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)
var exportAsyncPool = &sync.Pool{
New: func() interface{} {
return &ExportAsync{
extension: ".xlsx",
sheetName: "Sheet1",
batchSize: 10000,
maxRowPerFile: 10000,
csvToExcelBatch: 1000,
uploader: &Uploader{
FieldFormName: "file",
System: "crmApi",
Business: "download",
},
task: &Task{},
workerNum: 1, //runtime.NumCPU() * 2,
logTool: NewLogPrint(nil),
}
},
}
// ExportAsync 异步导出任务配置->默认配置往上看
type ExportAsync struct {
//// 导出文件名(不含扩展名)
fileName string
// 文件扩展名(如 .xlsx,默认.xlsx->WithCustomExtension
extension string
//xlsx注脚,默认Sheet1->WithCustomSheetName
sheetName string //sheet名称
//每一批次导出数量数据库每页行数默认10000行->WithCustomBatchSize
batchSize int
//每个Xlsx的行数默认10000行->WithMaxRowPerFile
maxRowPerFile int
//csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize
csvToExcelBatch int
//!!!导出数总量,这个参数非常重要,可以通过WithProcess设置
//1.计算进度使用
//2.当workerNum>1时因为线程的乱序虽然可以通过通道解决但是就没有线程存在的意义了会导致查询很多空值出来
//比如如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力
dataCount int
//日志输出->WithLogPrint
logTool LogTool
// Excel 表头
header []string
// 上传配置->WithCustomUploader
uploader *Uploader
// 数据提供函数
dataProvider DataProviderFn
// 任务状态存储(如 Redis;
taskSaveTool TaskSaveTool
// 并发协程数务必大于1,默认runtime.NumCPU() * 2->WithCustomWorkNum
workerNum int
//任务状态
task *Task
}
func NewExportAsync(
fileName string,
header []string,
domain string,
dataProvider DataProviderFn,
TaskSaveTool TaskSaveTool,
args ...ExportOption,
) *ExportAsync {
exporter := exportAsyncPool.Get().(*ExportAsync)
exporter.fileName = fileName
exporter.header = header
exporter.dataProvider = dataProvider
exporter.taskSaveTool = TaskSaveTool
exporter.uploader.Host = domain
exporter.task.Name = fileName
for _, arg := range args {
arg(exporter)
}
return exporter
}
func (e *ExportAsync) Run(ctx context.Context) (string, error) {
//新建任务
tempDir, err := e.createTask(ctx)
if err != nil {
return "", fmt.Errorf("创建任务失败: %v", err)
}
go func() {
// 执行导出任务
subCtx, cancel := context.WithCancel(context.Background())
defer cancel()
source, err := e.export(subCtx, tempDir)
if err != nil {
e.logTool.Errorf("导出错误:\n任务%s,错误原因:%s", e.task.Id, err.Error())
e.task.Err = err.Error()
_ = e.updateTask(subCtx)
}
e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source)
os.RemoveAll(tempDir)
}()
return e.task.Id, nil
}
func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string, err error) {
e.processAdd(ctx, INIT.int())
e.logTool.Infof("异步导出任务:%s,开始导出到csv", e.task.Id)
csvFiles, err := e.exportToCsv(ctx, tempDir)
e.task.Process = CSV.int() + INIT.int()
e.processAdd(ctx, 0)
// 合并csv文件
e.logTool.Infof("任务:%s,开始合并到xlsx", e.task.Id)
excelsDir, err := e.mergeCSVsToExcelFiles(csvFiles, tempDir)
if err != nil {
return
}
e.processAdd(ctx, XLSX.int())
// 打包
e.logTool.Infof("异步导出任务:%s,开始打包xlsx", e.task.Id)
source = e.zipFile(tempDir)
if err = e.folderToZip(excelsDir, source); err != nil {
return
}
if len(e.uploader.Host) > 0 {
e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id)
source, err = e.upload(source)
if err != nil {
return
}
}
e.task.Source = source
e.processAdd(ctx, ATT.int())
return
}
func (e *ExportAsync) exportToCsv(ctx context.Context, tempDir string) (csvFiles []string, err error) {
var (
perPageProcess int32
pageLimit int = -1
csvFilesMap sync.Map
)
// 计算每页进度
if e.dataCount > 0 {
pageLimit = (e.dataCount + e.batchSize - 1) / e.batchSize
perPageProcess = CSV.int() / int32(pageLimit) //6
}
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(e.workerNum) // 限制并发数
// 使用原子计数器生成页面编号
var pageNum int64 = 1
stopProcessing := false
for i := 0; i < e.workerNum; i++ {
g.Go(func() error {
for {
if stopProcessing {
return nil
}
// 原子获取下一页
page := int(atomic.AddInt64(&pageNum, 1) - 1)
if pageLimit != -1 && page > pageLimit {
return nil
}
// 获取数据
rows, err := e.dataProvider(ctx, page, e.batchSize)
if err != nil {
e.logTool.Errorf("异步导出任务:%s,第%d页查询失败%s", e.task.Id, page, err.Error())
return fmt.Errorf("第%d页查询失败: %w", page, err)
}
// 检查是否是最后一页
if len(rows) == 0 || len(rows) < e.batchSize {
stopProcessing = true
}
// 如果没有数据,结束处理
if len(rows) == 0 {
return nil
}
// 生成文件名
fileName := filepath.Join(tempDir, fmt.Sprintf("/csv/%d.csv", page))
// 原子增加行数
atomic.AddInt64(&e.task.RowCount, int64(len(rows)))
// 保存数据到临时文件
if err := e.savePageToCSV(rows, fileName); err != nil {
e.logTool.Errorf("任务:%s,导出到csv错误%s", e.task.Id, err.Error())
return fmt.Errorf("保存第%d页失败: %w", page, err)
}
// 存储文件名
csvFilesMap.Store(page, fileName)
// 更新进度
e.processAdd(ctx, perPageProcess)
}
})
}
// 等待所有goroutine完成
if err := g.Wait(); err != nil {
return nil, err
}
// 将csv文件名称进行排序
csvFiles = getSortedValues(&csvFilesMap)
return csvFiles, nil
}
func (e *ExportAsync) upload(file string) (string, error) {
resp, err := Upload(e.uploader.Host, file, e.uploader.System, e.uploader.Business, e.uploader.FieldFormName)
if err != nil {
return "", err
}
return GeneratePreviewPrivateUrl(e.uploader.Host, "", resp.Url, "", strings.TrimSuffix(e.fileName, ".zip"), time.Now().Unix()+300), nil
}
func (e *ExportAsync) folderToZip(excelsDir, zipFilePath string) error {
// 创建文件
zipFile, err := os.Create(zipFilePath)
if err != nil {
return err
}
defer zipFile.Close()
// 创建zip writer
archive := zip.NewWriter(zipFile)
defer archive.Close()
// 遍历文件夹
err = filepath.Walk(excelsDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// 忽略文件夹自身
if info.IsDir() {
return nil
}
// 打开文件
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
// 创建zip文件条目
header, err := zip.FileInfoHeader(info)
if err != nil {
return err
}
// 更改工作目录到zip路径
header.Name = filepath.ToSlash(path[len(excelsDir):])
// 创建zip文件条目
writer, err := archive.CreateHeader(header)
if err != nil {
return err
}
// 将文件内容写入zip文件条目
_, err = io.Copy(writer, file)
return err
})
if err != nil {
return err
}
return nil
}
func (e *ExportAsync) zipFile(tempDir string) string {
return e.dirZip(tempDir) + e.fileName + ".zip"
}
// mergeCSVsToExcelFiles 将多个CSV文件合并为多个Excel文件流式处理
func (e *ExportAsync) mergeCSVsToExcelFiles(csvFiles []string, tempDir string) (outputDir string, err error) {
outputDir = e.dirXlsx(tempDir)
m := NewMerge(
Reader{Files: csvFiles, Index: len(csvFiles) - 1},
Writer{File: outputDir + e.fileName + e.extension, Limit: e.maxRowPerFile, BufferSize: e.csvToExcelBatch},
e.logTool,
)
if err = m.Merge(); err != nil {
return
}
return
}
func (e *ExportAsync) release() {
// 清空敏感或动态数据
e.fileName = ""
e.header = nil
e.dataProvider = nil
e.taskSaveTool = nil
exportAsyncPool.Put(e)
}
func getSortedValues(sm *sync.Map) []string {
// 1. 预分配切片(假设已知大致数量)
items := make([]struct {
key int
value string
}, 0, 16) // 初始容量可调整
// 2. 收集数据(单次遍历)
sm.Range(func(key, value interface{}) bool {
items = append(items, struct {
key int
value string
}{key.(int), value.(string)})
return true
})
// 3. 排序
sort.Slice(items, func(i, j int) bool {
return items[i].key < items[j].key
})
// 4. 提取值
sortedValues := make([]string, len(items))
for i, item := range items {
sortedValues[i] = item.value
}
return sortedValues
}
func (e *ExportAsync) createTask(ctx context.Context) (tempDir string, err error) {
uid, err := uuid.NewUUID()
if err != nil {
err = fmt.Errorf("UUid创建失败: %w", err)
return
}
e.task.Id = uid.String()
tempDir, err = e.createDefaultDir(ctx)
if err != nil {
err = fmt.Errorf("初始化默认文件夹失败: %w", err)
return
}
err = e.updateTask(ctx)
return tempDir, nil
}
func (e *ExportAsync) updateTask(ctx context.Context) (err error) {
taskByte, err := json.Marshal(e.task)
if err != nil {
return
}
err = e.taskSaveTool.Set(ctx, e.task.Id, string(taskByte), 0).Err()
if err != nil {
err = fmt.Errorf("更新任务失败: %w", err)
return
}
return
}
func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) {
atomic.AddInt32(&e.task.Process, addNum)
e.logTool.Infof("异步导出任务:%s,当前进度:%d", e.task.Id, e.task.Process)
_ = e.updateTask(ctx)
return
}
func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) {
// 创建临时目录
tempDir, err := os.MkdirTemp("", e.task.Id)
if err != nil {
return "", fmt.Errorf("创建临时目录失败: %v", err)
}
//csv
if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil {
return "", fmt.Errorf("创建csv目录失败: %v", err)
}
//xlsx
if err = os.Mkdir(e.dirXlsx(tempDir), 0755); err != nil {
return "", fmt.Errorf("创建xlsx目录失败: %v", err)
}
//zip
if err = os.Mkdir(e.dirZip(tempDir), 0755); err != nil {
return "", fmt.Errorf("创建zip目录失败: %v", err)
}
return tempDir, nil
}
func (e *ExportAsync) dirCsv(tempDir string) string {
return tempDir + "/csv/"
}
func (e *ExportAsync) dirXlsx(tempDir string) string {
return tempDir + "/xlsx/"
}
func (e *ExportAsync) dirZip(tempDir string) string {
return tempDir + "/zip/"
}
// savePageToCSV 将单页数据保存为CSV文件
func (e *ExportAsync) savePageToCSV(data [][]interface{}, filename string) error {
file, err := os.Create(filename)
if err != nil {
return fmt.Errorf("创建CSV文件失败: %v", err)
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
// 写入表头(如果尚未写入)
if err := writer.Write(e.header); err != nil {
return fmt.Errorf("写入CSV表头失败: %v", err)
}
// 写入数据行
for _, row := range data {
csvRow := make([]string, 0, len(e.header))
for _, val := range row {
csvRow = append(csvRow, fmt.Sprintf("%v", val))
}
if err := writer.Write(csvRow); err != nil {
return fmt.Errorf("写入CSV行失败: %v", err)
}
}
return nil
}