package l_export_async import ( "archive/zip" "context" "encoding/csv" "encoding/json" "fmt" "io" "os" "path/filepath" "sort" "strings" "sync" "sync/atomic" "time" "github.com/google/uuid" "golang.org/x/sync/errgroup" ) var exportAsyncPool = &sync.Pool{ New: func() interface{} { return &ExportAsync{ extension: ".xlsx", sheetName: "Sheet1", batchSize: 10000, maxRowPerFile: 10000, csvToExcelBatch: 1000, uploader: &Uploader{ FieldFormName: "file", System: "crmApi", Business: "download", }, task: &Task{}, workerNum: 1, //runtime.NumCPU() * 2, logTool: NewLogPrint(nil), } }, } // ExportAsync 异步导出任务配置->默认配置往上看 type ExportAsync struct { //// 导出文件名(不含扩展名) fileName string // 文件扩展名(如 .xlsx),默认.xlsx->WithCustomExtension extension string //xlsx注脚,默认Sheet1->WithCustomSheetName sheetName string //sheet名称 //每一批次导出数量,数据库每页行数,默认10000行->WithCustomBatchSize batchSize int //每个Xlsx的行数,默认10000行->WithMaxRowPerFile maxRowPerFile int //csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize csvToExcelBatch int //!!!导出数总量,这个参数非常重要,可以通过WithProcess设置 //1.计算进度使用 //2.当workerNum>1时,因为线程的乱序(虽然可以通过通道解决,但是就没有线程存在的意义了),会导致查询很多空值出来 //比如:如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力 dataCount int //日志输出->WithLogPrint logTool LogTool // Excel 表头 header []string // 上传配置->WithCustomUploader uploader *Uploader // 数据提供函数 dataProvider DataProviderFn // 任务状态存储(如 Redis); taskSaveTool TaskSaveTool // 并发协程数(务必大于1),默认runtime.NumCPU() * 2->WithCustomWorkNum workerNum int //任务状态 task *Task } func NewExportAsync( fileName string, header []string, domain string, dataProvider DataProviderFn, TaskSaveTool TaskSaveTool, args ...ExportOption, ) *ExportAsync { exporter := exportAsyncPool.Get().(*ExportAsync) exporter.fileName = fileName exporter.header = header exporter.dataProvider = dataProvider exporter.taskSaveTool = TaskSaveTool exporter.uploader.Host = domain exporter.task.Name = fileName for _, arg := range args { arg(exporter) } return exporter } func (e *ExportAsync) Run(ctx context.Context) (string, error) { //新建任务 tempDir, err := e.createTask(ctx) if err != nil { return "", fmt.Errorf("创建任务失败: %v", err) } go func() { // 执行导出任务 subCtx, cancel := context.WithCancel(context.Background()) defer cancel() source, err := e.export(subCtx, tempDir) if err != nil { e.logTool.Errorf("导出错误:\n任务:%s,错误原因:%s", e.task.Id, err.Error()) e.task.Err = err.Error() _ = e.updateTask(subCtx) } e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source) os.RemoveAll(tempDir) }() return e.task.Id, nil } func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string, err error) { e.processAdd(ctx, INIT.int()) e.logTool.Infof("异步导出任务:%s,开始导出到csv", e.task.Id) csvFiles, err := e.exportToCsv(ctx, tempDir) e.task.Process = CSV.int() + INIT.int() e.processAdd(ctx, 0) // 合并csv文件 e.logTool.Infof("任务:%s,开始合并到xlsx", e.task.Id) excelsDir, err := e.mergeCSVsToExcelFiles(csvFiles, tempDir) if err != nil { return } e.processAdd(ctx, XLSX.int()) // 打包 e.logTool.Infof("异步导出任务:%s,开始打包xlsx", e.task.Id) source = e.zipFile(tempDir) if err = e.folderToZip(excelsDir, source); err != nil { return } if len(e.uploader.Host) > 0 { e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id) source, err = e.upload(source) if err != nil { return } } e.task.Source = source e.processAdd(ctx, ATT.int()) return } func (e *ExportAsync) exportToCsv(ctx context.Context, tempDir string) (csvFiles []string, err error) { var ( perPageProcess int32 pageLimit int = -1 csvFilesMap sync.Map ) // 计算每页进度 if e.dataCount > 0 { pageLimit = (e.dataCount + e.batchSize - 1) / e.batchSize perPageProcess = CSV.int() / int32(pageLimit) //6 } g, ctx := errgroup.WithContext(ctx) g.SetLimit(e.workerNum) // 限制并发数 // 使用原子计数器生成页面编号 var pageNum int64 = 1 stopProcessing := false for i := 0; i < e.workerNum; i++ { g.Go(func() error { for { if stopProcessing { return nil } // 原子获取下一页 page := int(atomic.AddInt64(&pageNum, 1) - 1) if pageLimit != -1 && page > pageLimit { return nil } // 获取数据 rows, err := e.dataProvider(ctx, page, e.batchSize) if err != nil { e.logTool.Errorf("异步导出任务:%s,第%d页查询失败:%s", e.task.Id, page, err.Error()) return fmt.Errorf("第%d页查询失败: %w", page, err) } // 检查是否是最后一页 if len(rows) == 0 || len(rows) < e.batchSize { stopProcessing = true } // 如果没有数据,结束处理 if len(rows) == 0 { return nil } // 生成文件名 fileName := filepath.Join(tempDir, fmt.Sprintf("/csv/%d.csv", page)) // 原子增加行数 atomic.AddInt64(&e.task.RowCount, int64(len(rows))) // 保存数据到临时文件 if err := e.savePageToCSV(rows, fileName); err != nil { e.logTool.Errorf("任务:%s,导出到csv错误:%s", e.task.Id, err.Error()) return fmt.Errorf("保存第%d页失败: %w", page, err) } // 存储文件名 csvFilesMap.Store(page, fileName) // 更新进度 e.processAdd(ctx, perPageProcess) } }) } // 等待所有goroutine完成 if err := g.Wait(); err != nil { return nil, err } // 将csv文件名称进行排序 csvFiles = getSortedValues(&csvFilesMap) return csvFiles, nil } func (e *ExportAsync) upload(file string) (string, error) { resp, err := Upload(e.uploader.Host, file, e.uploader.System, e.uploader.Business, e.uploader.FieldFormName) if err != nil { return "", err } return GeneratePreviewPrivateUrl(e.uploader.Host, "", resp.Url, "", strings.TrimSuffix(e.fileName, ".zip"), time.Now().Unix()+300), nil } func (e *ExportAsync) folderToZip(excelsDir, zipFilePath string) error { // 创建文件 zipFile, err := os.Create(zipFilePath) if err != nil { return err } defer zipFile.Close() // 创建zip writer archive := zip.NewWriter(zipFile) defer archive.Close() // 遍历文件夹 err = filepath.Walk(excelsDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } // 忽略文件夹自身 if info.IsDir() { return nil } // 打开文件 file, err := os.Open(path) if err != nil { return err } defer file.Close() // 创建zip文件条目 header, err := zip.FileInfoHeader(info) if err != nil { return err } // 更改工作目录到zip路径 header.Name = filepath.ToSlash(path[len(excelsDir):]) // 创建zip文件条目 writer, err := archive.CreateHeader(header) if err != nil { return err } // 将文件内容写入zip文件条目 _, err = io.Copy(writer, file) return err }) if err != nil { return err } return nil } func (e *ExportAsync) zipFile(tempDir string) string { return e.dirZip(tempDir) + e.fileName + ".zip" } // mergeCSVsToExcelFiles 将多个CSV文件合并为多个Excel文件(流式处理) func (e *ExportAsync) mergeCSVsToExcelFiles(csvFiles []string, tempDir string) (outputDir string, err error) { outputDir = e.dirXlsx(tempDir) m := NewMerge( Reader{Files: csvFiles, Index: len(csvFiles) - 1}, Writer{File: outputDir + e.fileName + e.extension, Limit: e.maxRowPerFile, BufferSize: e.csvToExcelBatch}, e.logTool, ) if err = m.Merge(); err != nil { return } return } func (e *ExportAsync) release() { // 清空敏感或动态数据 e.fileName = "" e.header = nil e.dataProvider = nil e.taskSaveTool = nil exportAsyncPool.Put(e) } func getSortedValues(sm *sync.Map) []string { // 1. 预分配切片(假设已知大致数量) items := make([]struct { key int value string }, 0, 16) // 初始容量可调整 // 2. 收集数据(单次遍历) sm.Range(func(key, value interface{}) bool { items = append(items, struct { key int value string }{key.(int), value.(string)}) return true }) // 3. 排序 sort.Slice(items, func(i, j int) bool { return items[i].key < items[j].key }) // 4. 提取值 sortedValues := make([]string, len(items)) for i, item := range items { sortedValues[i] = item.value } return sortedValues } func (e *ExportAsync) createTask(ctx context.Context) (tempDir string, err error) { uid, err := uuid.NewUUID() if err != nil { err = fmt.Errorf("UUid创建失败: %w", err) return } e.task.Id = uid.String() tempDir, err = e.createDefaultDir(ctx) if err != nil { err = fmt.Errorf("初始化默认文件夹失败: %w", err) return } err = e.updateTask(ctx) return tempDir, nil } func (e *ExportAsync) updateTask(ctx context.Context) (err error) { taskByte, err := json.Marshal(e.task) if err != nil { return } err = e.taskSaveTool.Set(ctx, e.task.Id, string(taskByte), 0).Err() if err != nil { err = fmt.Errorf("更新任务失败: %w", err) return } return } func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) { atomic.AddInt32(&e.task.Process, addNum) e.logTool.Infof("异步导出任务:%s,当前进度:%d", e.task.Id, e.task.Process) _ = e.updateTask(ctx) return } func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) { // 创建临时目录 tempDir, err := os.MkdirTemp("", e.task.Id) if err != nil { return "", fmt.Errorf("创建临时目录失败: %v", err) } //csv if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil { return "", fmt.Errorf("创建csv目录失败: %v", err) } //xlsx if err = os.Mkdir(e.dirXlsx(tempDir), 0755); err != nil { return "", fmt.Errorf("创建xlsx目录失败: %v", err) } //zip if err = os.Mkdir(e.dirZip(tempDir), 0755); err != nil { return "", fmt.Errorf("创建zip目录失败: %v", err) } return tempDir, nil } func (e *ExportAsync) dirCsv(tempDir string) string { return tempDir + "/csv/" } func (e *ExportAsync) dirXlsx(tempDir string) string { return tempDir + "/xlsx/" } func (e *ExportAsync) dirZip(tempDir string) string { return tempDir + "/zip/" } // savePageToCSV 将单页数据保存为CSV文件 func (e *ExportAsync) savePageToCSV(data [][]interface{}, filename string) error { file, err := os.Create(filename) if err != nil { return fmt.Errorf("创建CSV文件失败: %v", err) } defer file.Close() writer := csv.NewWriter(file) defer writer.Flush() // 写入表头(如果尚未写入) if err := writer.Write(e.header); err != nil { return fmt.Errorf("写入CSV表头失败: %v", err) } // 写入数据行 for _, row := range data { csvRow := make([]string, 0, len(e.header)) for _, val := range row { csvRow = append(csvRow, fmt.Sprintf("%v", val)) } if err := writer.Write(csvRow); err != nil { return fmt.Errorf("写入CSV行失败: %v", err) } } return nil }