l-export-async/export_async.go

773 lines
21 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package l_export_async
import (
"archive/zip"
"context"
"encoding/base64"
"encoding/csv"
"encoding/json"
"finance/internal/pkg/helper/attachment"
"fmt"
"strconv"
"strings"
"io"
"os"
"path/filepath"
"sort"
"sync"
"sync/atomic"
"time"
attachmentsdk "codeup.aliyun.com/5f9118049cffa29cfdd3be1c/attachment-sdk"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)
var exportAsyncPool = &sync.Pool{
New: func() interface{} {
return &ExportAsync{
extension: DefaultExtension,
sheetName: "Sheet1",
batchSize: DefaultBatch,
maxRowPerFile: DefaultMaxRowPerFile,
csvToExcelBatch: DefaultCsvToExcelBatch,
uploader: &Uploader{
FieldFormName: "file",
System: "crmApi",
Business: "download",
},
task: &Task{},
workerNum: DefaultWorkNum, //runtime.NumCPU() * 2,
logTool: NewLogPrint(nil),
}
},
}
// 全局配置项
var (
DefaultBatch = 10000 //默认一次性读取数据量
DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数默认10000行->WithMaxRowPerFile
DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize
DefaultWorkNum = 1 // 并发协程数务必大于1,默认runtime.NumCPU() * 2->WithCustomWorkNum
ProcessLimit = 1 //全局并行导出任务上限
DefaultUploader = &Uploader{
FieldFormName: "file",
System: "crmApi",
Business: "download",
}
DefaultExtension = ".xlsx"
DefaultSheetName = "Sheet1"
//SameTaskProcessLimit = 1 //单任务并行导出上限,必须小于ProcessLimit
)
// ExportAsync 异步导出任务配置->默认配置往上看
type ExportAsync struct {
// 导出文件名(不含扩展名),同时作为任务名称
fileName string
// 文件扩展名(如 .xlsx,默认.xlsx->WithCustomExtension
extension string
//xlsx注脚,默认Sheet1->WithCustomSheetName
sheetName string //sheet名称
//每一批次导出数量数据库每页行数默认10000行->WithCustomBatchSize
batchSize int
//每个Xlsx的行数默认10000行->WithMaxRowPerFile
maxRowPerFile int
//csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize
csvToExcelBatch int
//!!!导出数总量,这个参数非常重要,可以通过WithProcess设置
//1.计算进度使用
//2.当workerNum>1时因为线程的乱序虽然可以通过通道解决但是就没有线程存在的意义了会导致查询很多空值出来
//比如如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力
dataCount int
//日志输出->WithLogPrint
logTool LogTool
// Excel 表头
header []string
// 上传配置->WithCustomUploader
uploader *Uploader
// 任务状态存储(如 Redis;
taskSaveTool TaskSaveTool
// 并发协程数务必大于1,默认runtime.NumCPU() * 2->WithCustomWorkNum
workerNum int
//任务状态
task *Task
// 分页策略(替换原来的 dataProvider
pageStrategy PageStrategy
// 分页策略类型(用于配置)
pageStrategyType PageStrategyType
}
func NewExportAsync(
fileName string,
header []string,
domain string,
TaskSaveTool TaskSaveTool,
args ...ExportOption,
) *ExportAsync {
exporter := exportAsyncPool.Get().(*ExportAsync)
exporter.fileName = fileName
exporter.header = header
exporter.taskSaveTool = TaskSaveTool
exporter.uploader.Host = domain
exporter.task.Name = fileName
for _, arg := range args {
arg(exporter)
}
return exporter
}
func (e *ExportAsync) Run(ctx context.Context) (string, error) {
//新建任务
if e.pageStrategy == nil {
return "", fmt.Errorf("未设置导出方式导出方式具体参考PageStrategy")
}
tempDir, err := e.createTask(ctx)
if err != nil {
return "", fmt.Errorf("创建任务失败: %v", err)
}
go func() {
// 执行导出任务
subCtx, cancel := context.WithCancel(context.Background())
defer func() {
e.taskSaveTool.Del(ctx, e.globalCacheKey())
if _err := recover(); _err != nil {
e.logTool.Errorf("导出panic\n任务%s,错误原因:%s", e.task.Id, _err)
}
e.release()
os.RemoveAll(tempDir)
cancel()
}()
source, err := e.export(subCtx, tempDir)
if err != nil {
e.logTool.Errorf("导出错误:\n任务%s,错误原因:%s", e.task.Id, err.Error())
e.task.Err = err.Error()
_ = e.updateTask(subCtx)
}
e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source)
}()
return e.task.Id, nil
}
// 添加配置项
func (e *ExportAsync) setPageStrategy(strategy PageStrategy) {
e.pageStrategy = strategy
e.pageStrategyType = strategy.Type()
}
func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string, err error) {
e.processAdd(ctx, INIT.int())
e.logTool.Infof("异步导出任务:%s,开始导出到csv", e.task.Id)
csvFiles, err := e.exportToCsv(ctx, tempDir)
e.task.Process = CSV.int() + INIT.int()
e.processAdd(ctx, 0)
// 合并csv文件
e.logTool.Infof("任务:%s,开始合并到xlsx", e.task.Id)
excelsDir, err := e.mergeCSVsToExcelFiles(csvFiles, tempDir)
if err != nil {
return
}
e.processAdd(ctx, XLSX.int())
// 打包
e.logTool.Infof("异步导出任务:%s,开始打包xlsx", e.task.Id)
source = e.zipFile(tempDir)
if err = e.folderToZip(excelsDir, source); err != nil {
return
}
if len(e.uploader.Host) > 0 {
e.logTool.Infof("异步导出任务:%s,开始上传", e.task.Id)
source, err = e.upload(source)
if err != nil {
return
}
}
e.task.Source = source
e.processAdd(ctx, ATT.int())
return
}
func (e *ExportAsync) exportToCsv(ctx context.Context, tempDir string) (csvFiles []string, err error) {
// 根据策略类型选择不同的导出方式
switch e.pageStrategyType {
case PageStrategyOffset:
return e.exportToCsvWithOffset(ctx, tempDir)
case PageStrategyCursor:
return e.exportToCsvWithCursor(ctx, tempDir)
case PageStrategyTime:
return e.exportToCsvWithTimeRange(ctx, tempDir)
default:
return nil, fmt.Errorf("unsupported page strategy: %s", e.pageStrategyType)
}
}
func (e *ExportAsync) exportToCsvWithStrategy(ctx context.Context, tempDir string) (csvFiles []string, err error) {
var (
perPageProcess int32
csvFilesMap sync.Map
pageNum int64 = 0
)
// 计算进度
if e.dataCount > 0 {
totalPages := (e.dataCount + e.batchSize - 1) / e.batchSize
if totalPages > 0 {
perPageProcess = CSV.int() / int32(totalPages)
}
}
// 使用通道分发任务
taskChan := make(chan interface{}, e.workerNum)
initialState := e.pageStrategy.InitialState()
taskChan <- initialState
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(e.workerNum)
for i := 0; i < e.workerNum; i++ {
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case state := <-taskChan:
if state == nil {
return nil
}
// 获取数据
data, nextState, err := e.pageStrategy.NextPage(ctx, state)
if err != nil {
e.logTool.Errorf("异步导出任务:%s,获取数据失败:%s", e.task.Id, err.Error())
return fmt.Errorf("获取数据失败: %w", err)
}
// 没有数据则结束
if len(data) == 0 {
return nil
}
// 生成文件名
currentPage := atomic.AddInt64(&pageNum, 1)
fileName := filepath.Join(tempDir, fmt.Sprintf("/csv/%d.csv", currentPage))
// 原子增加行数
atomic.AddInt64(&e.task.RowCount, int64(len(data)))
// 保存数据
if err := e.savePageToCSV(data, fileName); err != nil {
e.logTool.Errorf("任务:%s,保存CSV失败%s", e.task.Id, err.Error())
return fmt.Errorf("保存数据失败: %w", err)
}
// 存储文件名
csvFilesMap.Store(int(currentPage), fileName)
// 更新进度
e.processAdd(ctx, perPageProcess)
// 如果还有更多数据,继续处理
if e.pageStrategy.HasMore(nextState, data) {
select {
case taskChan <- nextState:
default:
// 通道满在当前goroutine继续处理
state = nextState
continue
}
} else {
// 发送结束信号
close(taskChan)
return nil
}
}
}
})
}
// 等待所有goroutine完成
if err := g.Wait(); err != nil {
return nil, err
}
// 关闭通道(如果还没关闭)
select {
case <-taskChan:
default:
close(taskChan)
}
return getSortedValues(&csvFilesMap), nil
}
func (e *ExportAsync) upload(file string) (string, error) {
resp, err := attachment.Upload(e.uploader.Host, file, e.uploader.System, e.uploader.Business, e.uploader.FieldFormName)
if err != nil {
return "", err
}
return attachmentsdk.GeneratePreviewPrivateUrl(e.uploader.Host, "", resp.Url, "", strings.TrimSuffix(e.fileName, ".zip"), time.Now().Unix()+300), nil
}
func (e *ExportAsync) folderToZip(excelsDir, zipFilePath string) error {
// 创建文件
zipFile, err := os.Create(zipFilePath)
if err != nil {
return err
}
defer zipFile.Close()
// 创建zip writer
archive := zip.NewWriter(zipFile)
defer archive.Close()
// 遍历文件夹
err = filepath.Walk(excelsDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// 忽略文件夹自身
if info.IsDir() {
return nil
}
// 打开文件
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
// 创建zip文件条目
header, err := zip.FileInfoHeader(info)
if err != nil {
return err
}
// 更改工作目录到zip路径
header.Name = filepath.ToSlash(path[len(excelsDir):])
// 创建zip文件条目
writer, err := archive.CreateHeader(header)
if err != nil {
return err
}
// 将文件内容写入zip文件条目
_, err = io.Copy(writer, file)
return err
})
if err != nil {
return err
}
return nil
}
// 为每种策略提供专门的导出方法(如果需要特殊处理)
func (e *ExportAsync) exportToCsvWithOffset(ctx context.Context, tempDir string) (csvFiles []string, err error) {
return e.exportToCsvWithStrategy(ctx, tempDir)
}
func (e *ExportAsync) exportToCsvWithCursor(ctx context.Context, tempDir string) (csvFiles []string, err error) {
return e.exportToCsvWithStrategy(ctx, tempDir)
}
func (e *ExportAsync) exportToCsvWithTimeRange(ctx context.Context, tempDir string) (csvFiles []string, err error) {
if strategy, ok := e.pageStrategy.(*TimeRangeStrategy); ok {
// 如果设置了结束时间,可以进行分片并行
if !strategy.endTime.IsZero() {
return e.exportToCsvWithTimeRangeParallel(ctx, tempDir, strategy)
}
}
return e.exportToCsvWithStrategy(ctx, tempDir)
}
// exportToCsvWithTimeRangeParallel 时间范围并行导出
func (e *ExportAsync) exportToCsvWithTimeRangeParallel(ctx context.Context, tempDir string, strategy *TimeRangeStrategy) (csvFiles []string, err error) {
var csvFilesMap sync.Map
// 计算时间范围
startTime := strategy.startTime
endTime := strategy.endTime
if endTime.IsZero() {
endTime = time.Now() // 默认到当前时间
}
// 计算总时长和分片
totalDuration := endTime.Sub(startTime)
if totalDuration <= 0 {
return nil, fmt.Errorf("invalid time range: start=%v, end=%v", startTime, endTime)
}
// 计算每个分片的时间范围
shardDuration := totalDuration / time.Duration(e.workerNum)
if shardDuration == 0 {
shardDuration = totalDuration // 如果分片太小,就不分片
e.workerNum = 1
}
// 计算进度
var perShardProcess int32
if e.dataCount > 0 {
perShardProcess = CSV.int() / int32(e.workerNum)
} else {
perShardProcess = 0
}
e.logTool.Infof("异步导出任务:%s,时间范围分片并行导出,分片数:%d总时长%v分片时长%v",
e.task.Id, e.workerNum, totalDuration, shardDuration)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(e.workerNum)
// 使用原子计数器生成文件索引
var fileIndex int64 = 1
for i := 0; i < e.workerNum; i++ {
workerID := i
g.Go(func() error {
// 计算该worker的时间范围
defer e.processAdd(ctx, perShardProcess)
workerStartTime := startTime.Add(time.Duration(workerID) * shardDuration)
workerEndTime := workerStartTime.Add(shardDuration)
// 最后一个worker处理剩余的时间
if workerID == e.workerNum-1 {
workerEndTime = endTime
}
e.logTool.Infof("异步导出任务:%s,Worker %d 处理时间范围:%v 到 %v",
e.task.Id, workerID, workerStartTime, workerEndTime)
return e.processTimeShard(ctx, tempDir, workerID, workerStartTime, workerEndTime,
strategy, &csvFilesMap, &fileIndex)
})
}
// 等待所有分片完成
if err := g.Wait(); err != nil {
return nil, err
}
return getSortedValues(&csvFilesMap), nil
}
// processTimeShard 处理单个时间分片
func (e *ExportAsync) processTimeShard(ctx context.Context, tempDir string, workerID int,
startTime, endTime time.Time, strategy *TimeRangeStrategy,
csvFilesMap *sync.Map, fileIndex *int64) error {
currentTime := startTime
shardFileIndex := int64(0)
for {
// 检查上下文是否被取消
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// 如果当前时间已经超过分片结束时间,则退出
if !endTime.IsZero() && currentTime.After(endTime) {
break
}
// 计算本次查询的结束时间
queryEndTime := currentTime.Add(strategy.timeRange)
if !endTime.IsZero() && queryEndTime.After(endTime) {
queryEndTime = endTime
}
// 获取数据
data, err := strategy.fetcher(ctx, currentTime, strategy.limit)
if err != nil {
e.logTool.Errorf("异步导出任务:%s,Worker %d 获取数据失败(时间:%v%s",
e.task.Id, workerID, currentTime, err.Error())
return fmt.Errorf("worker %d 获取数据失败: %w", workerID, err)
}
// 没有数据则尝试下一个时间片段
if len(data) == 0 {
currentTime = queryEndTime
continue
}
// 生成文件名
currentFileIndex := atomic.AddInt64(fileIndex, 1)
fileName := filepath.Join(tempDir, fmt.Sprintf("/csv/worker%d_%d.csv", workerID, shardFileIndex))
shardFileIndex++
// 原子增加行数
atomic.AddInt64(&e.task.RowCount, int64(len(data)))
// 保存数据
if err := e.savePageToCSV(data, fileName); err != nil {
e.logTool.Errorf("异步导出任务:%s,Worker %d 保存CSV失败%s",
e.task.Id, workerID, err.Error())
return fmt.Errorf("worker %d 保存数据失败: %w", workerID, err)
}
// 存储文件名(使用全局索引保证排序)
csvFilesMap.Store(int(currentFileIndex), fileName)
e.logTool.Infof("异步导出任务:%s,Worker %d 已处理 %d 条数据,时间:%v文件%s",
e.task.Id, workerID, len(data), currentTime, fileName)
// 判断是否继续
if len(data) < strategy.limit {
// 如果本次获取的数据不足limit说明这个时间段的数据已经取完
currentTime = queryEndTime
} else {
// 如果数据量等于limit可能还有更多数据
// 这里可以根据业务逻辑决定是否移动时间
// 例如:如果数据是按时间排序的,可以取最后一条数据的时间作为下一次查询的起始时间
// 为了简化,我们还是按固定时间片移动
currentTime = queryEndTime
}
// 如果已经处理到分片结束时间,退出
if !endTime.IsZero() && currentTime.After(endTime) {
break
}
}
e.logTool.Infof("异步导出任务:%s,Worker %d 完成,处理了 %d 个文件",
e.task.Id, workerID, shardFileIndex)
return nil
}
func (e *ExportAsync) zipFile(tempDir string) string {
return e.dirZip(tempDir) + e.fileName + ".zip"
}
// mergeCSVsToExcelFiles 将多个CSV文件合并为多个Excel文件流式处理
func (e *ExportAsync) mergeCSVsToExcelFiles(csvFiles []string, tempDir string) (outputDir string, err error) {
outputDir = e.dirXlsx(tempDir)
m := NewMerge(
Reader{Files: csvFiles, Index: len(csvFiles) - 1},
Writer{File: outputDir + e.fileName + e.extension, Limit: e.maxRowPerFile, BufferSize: e.csvToExcelBatch},
e.logTool,
)
if err = m.Merge(); err != nil {
return
}
return
}
func getSortedValues(sm *sync.Map) []string {
// 1. 预分配切片(假设已知大致数量)
items := make([]struct {
key int
value string
}, 0, 16) // 初始容量可调整
// 2. 收集数据(单次遍历)
sm.Range(func(key, value interface{}) bool {
items = append(items, struct {
key int
value string
}{key.(int), value.(string)})
return true
})
// 3. 排序
sort.Slice(items, func(i, j int) bool {
return items[i].key < items[j].key
})
// 4. 提取值
sortedValues := make([]string, len(items))
for i, item := range items {
sortedValues[i] = item.value
}
return sortedValues
}
func (e *ExportAsync) createTask(ctx context.Context) (tempDir string, err error) {
//判断是否到达系统上限
uid, err := e.getTaskId(ctx)
if err != nil {
return "", fmt.Errorf("初始化任务失败: %w", err)
}
e.task.Id = uid.String()
tempDir, err = e.createDefaultDir(ctx)
if err != nil {
return "", fmt.Errorf("初始化默认文件夹失败: %w", err)
}
err = e.updateTask(ctx)
return tempDir, nil
}
func (e *ExportAsync) getTaskId(ctx context.Context) (uid uuid.UUID, err error) {
//// 检查同任务数量
//if err = e.checkTaskLimit(ctx, e.taskCacheKey(), SameTaskProcessLimit, "任务"); err != nil {
// return
//}
// 检查全局任务数量
if err = e.CheckAndIncrementTaskCount(ctx, e.globalCacheKey(), ProcessLimit, "全局任务"); err != nil {
return
}
return uuid.NewUUID()
}
func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string, limit int, limitType string) error {
count, err := e.getAndParseTaskCount(ctx, key)
if err != nil {
return fmt.Errorf("获取%s数量失败: %w", limitType, err)
}
if count >= limit {
return fmt.Errorf("%s %s数量已达上限%d请稍后重试", e.fileName, limitType, limit)
}
if _err := e.taskSaveTool.Set(ctx, key, strconv.Itoa(count+1), 0).Err(); _err != nil {
e.taskSaveTool.Del(ctx, key)
return fmt.Errorf("更新任务数量失败: %w", err)
}
return nil
}
func (e *ExportAsync) getAndParseTaskCount(ctx context.Context, key string) (int, error) {
res := e.taskSaveTool.Get(ctx, key)
if res.Val() == "" {
return 0, nil
}
count, err := strconv.Atoi(res.Val())
if err != nil {
return 0, fmt.Errorf("解析任务数量失败: %w", err)
}
return count, nil
}
func (e *ExportAsync) taskCacheKey() string {
return fmt.Sprintf("%s:%s", CacheKey, base64.StdEncoding.EncodeToString([]byte(e.fileName)))
}
func (e *ExportAsync) globalCacheKey() string {
return fmt.Sprintf("%s%s", CacheKey, "global")
}
func (e *ExportAsync) updateTask(ctx context.Context) (err error) {
taskByte, err := json.Marshal(e.task)
if err != nil {
return
}
err = e.taskSaveTool.Set(ctx, e.task.Id, string(taskByte), 0).Err()
if err != nil {
err = fmt.Errorf("更新任务失败: %w", err)
return
}
return
}
func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) {
atomic.AddInt32(&e.task.Process, addNum)
e.logTool.Infof("异步导出任务:%s,当前进度:%d", e.task.Id, e.task.Process)
_ = e.updateTask(ctx)
return
}
func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) {
// 创建临时目录
tempDir, err := os.MkdirTemp("", e.task.Id)
if err != nil {
return "", fmt.Errorf("创建临时目录失败: %v", err)
}
//csv
if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil {
return "", fmt.Errorf("创建csv目录失败: %v", err)
}
//xlsx
if err = os.Mkdir(e.dirXlsx(tempDir), 0755); err != nil {
return "", fmt.Errorf("创建xlsx目录失败: %v", err)
}
//zip
if err = os.Mkdir(e.dirZip(tempDir), 0755); err != nil {
return "", fmt.Errorf("创建zip目录失败: %v", err)
}
return tempDir, nil
}
func (e *ExportAsync) dirCsv(tempDir string) string {
return tempDir + "/csv/"
}
func (e *ExportAsync) dirXlsx(tempDir string) string {
return tempDir + "/xlsx/"
}
func (e *ExportAsync) dirZip(tempDir string) string {
return tempDir + "/zip/"
}
// savePageToCSV 将单页数据保存为CSV文件
func (e *ExportAsync) savePageToCSV(data [][]interface{}, filename string) error {
file, err := os.Create(filename)
if err != nil {
return fmt.Errorf("创建CSV文件失败: %v", err)
}
defer file.Close()
writer := csv.NewWriter(file)
defer writer.Flush()
// 写入表头(如果尚未写入)
if err := writer.Write(e.header); err != nil {
return fmt.Errorf("写入CSV表头失败: %v", err)
}
// 写入数据行
for _, row := range data {
csvRow := make([]string, 0, len(e.header))
for _, val := range row {
csvRow = append(csvRow, fmt.Sprintf("%v", val))
}
if err := writer.Write(csvRow); err != nil {
return fmt.Errorf("写入CSV行失败: %v", err)
}
}
return nil
}
func (e *ExportAsync) release() {
// 清空敏感或动态数据
e.fileName = ""
e.header = nil
e.taskSaveTool = nil
e.batchSize = DefaultBatch
e.maxRowPerFile = DefaultMaxRowPerFile
e.csvToExcelBatch = DefaultCsvToExcelBatch
e.task = nil
e.workerNum = DefaultWorkNum
e.uploader = DefaultUploader
e.logTool = NewLogPrint(nil)
e.sheetName = DefaultSheetName
exportAsyncPool.Put(e)
}