From cb60c605e3ff386c980ce1996e53b4ca212e6561 Mon Sep 17 00:00:00 2001 From: renzhiyuan <465386466@qq.com> Date: Tue, 23 Dec 2025 20:57:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BC=82=E6=AD=A5=E5=AF=BC?= =?UTF-8?q?=E5=87=BA=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E7=A7=8D=E5=88=86=E9=A1=B5=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 32 +-- entity.go | 61 +++++- export_async.go | 474 +++++++++++++++++++++++++++++++++++-------- export_async_test.go | 51 ++--- merge.go | 3 +- page_strategy.go | 178 ++++++++++++++++ 6 files changed, 670 insertions(+), 129 deletions(-) create mode 100644 page_strategy.go diff --git a/README.md b/README.md index a6515e6..506b143 100644 --- a/README.md +++ b/README.md @@ -7,28 +7,30 @@ $ go get gitea.cdlsxd.cn/self-tools/l-export-async ## 使用 ```go task, err := export_async.NewExportAsync( -fmt.Sprintf("%s%s", "供应商结算交易流水", time.Now().Format("20060102150405")), -supplierOrderTransRecordsFields(), -s.c.Rpc.GetAttachmentDomain(), -func(ctx context.Context, pageNum, limit int) ([][]interface{}, error) { +fmt.Sprintf("%s%s", "分销商账户-交易明细", time.Now().Format("20060102150405")), +s.balanceLogsFiled(), +s.cfg.Rpc.GetAttachmentDomain(), +export_async.NewRedisTaskStore(s.d.Rdb), +export_async.WithCustomBatchSize(10000), +export_async.WithCustomBatchSize(10000), +export_async.WithCursor(func(ctx context.Context, cursor interface{}, limit int) ([][]interface{}, interface{}, error) { req.Page = &api.PageReq{ -Page: int32(pageNum), +Page: 1, PageSize: int32(limit), } -list, err := s.GetSupplierOrderTransRecordList(ctx, req) -if err != nil { -return nil, err +req.Id = uint64(cursor.(int32)) +resp, err := s.BalanceLogs(ctx, req) +if len(resp.Data) == 0 { +return nil, nil, nil } -if list == nil { -return nil, nil -} -return supplierOrderTransRecordsToCollect(list.List), nil -}, -export_async.NewRedisTaskStore(s.d.Rdb), +last := resp.Data[len(resp.Data)-1].Id +return s.balanceLogsToCollect(resp.Data), last, err +}, idMax), export_async.WithLogPrint(s.log), -export_async.WithProcess(count), export_async.WithMaxRowPerFile(1000000), ).Run(ctx) +if err != nil { + return nil, myerr.ErrorParamError("导出失败,请稍后重试:" + err.Error()) } return &api.TaskReply{Task: task}, nil diff --git a/entity.go b/entity.go index acfc9a5..fc154fe 100644 --- a/entity.go +++ b/entity.go @@ -1,8 +1,16 @@ package l_export_async -import "context" +import ( + "context" + "time" +) // DataProviderFn 定义数据提供函数的类型别名 +// 这里目前存在三种情况 +// 1. 分页导出 limit ... offset +// 2. 游标导出 id>... ORDER BY id LIMIT ...; +// 3. 时间范围导出 time_column BETWEEN ... AND ... ORDER BY time_column LIMIT ...; + type DataProviderFn func(ctx context.Context, pageNum, limit int) ([][]interface{}, error) // Uploader 这里主要是为了调用attachment.Upload方法 @@ -33,6 +41,13 @@ func WithCustomBatchSize(batchSize int) ExportOption { } } +// WithCustomBatchSize 每一批次导出数量,数据库每页行数,默认10000行 +func WithTotalProcess(batchSize int) ExportOption { + return func(b *ExportAsync) { + b.batchSize = batchSize + } +} + // WithCustomBufferSize csv转excel的批量写入缓冲区大小,逐行写入设置为0 func WithCustomBufferSize(bufferSize int) ExportOption { return func(b *ExportAsync) { @@ -89,6 +104,48 @@ func WithMaxRowPerFile(maxRowPerFile int) ExportOption { } } +// WithOffset 传统分页数据 +func WithOffset(fetcher OffsetDataFetcher) ExportOption { + return func(e *ExportAsync) { + if e.batchSize == 0 { + e.batchSize = DefaultBatch + } + e.setPageStrategy(&OffsetStrategy{ + fetcher: fetcher, + limit: e.batchSize, + }) + } +} + +// WithCursor 游标分页数据 +func WithCursor(fetcher CursorDataFetcher, initialCursor interface{}) ExportOption { + return func(e *ExportAsync) { + if e.batchSize == 0 { + e.batchSize = DefaultBatch + } + e.setPageStrategy(&CursorStrategy{ + fetcher: fetcher, + limit: e.batchSize, + initialCursor: initialCursor, + }) + } +} + +// WithTimeRange 时间范围分页 +func WithTimeRange(fetcher TimeRangeDataFetcher, startTime time.Time, timeRange time.Duration) ExportOption { + return func(e *ExportAsync) { + if e.batchSize == 0 { + e.batchSize = DefaultBatch + } + e.setPageStrategy(&TimeRangeStrategy{ + fetcher: fetcher, + limit: e.batchSize, + startTime: startTime, + timeRange: timeRange, + }) + } +} + type Task struct { Id string `json:"id"` Name string `json:"name"` @@ -107,6 +164,8 @@ const ( ATT ProcessScore = 100000 ) +const CacheKey = "export_async_task" + func (p ProcessScore) int() int32 { return int32(p) } diff --git a/export_async.go b/export_async.go index c970b63..04fcd14 100644 --- a/export_async.go +++ b/export_async.go @@ -3,18 +3,23 @@ package l_export_async import ( "archive/zip" "context" + "encoding/base64" "encoding/csv" "encoding/json" + "finance/internal/pkg/helper/attachment" "fmt" + "strconv" + "strings" + "io" "os" "path/filepath" "sort" - "strings" "sync" "sync/atomic" "time" + attachmentsdk "codeup.aliyun.com/5f9118049cffa29cfdd3be1c/attachment-sdk" "github.com/google/uuid" "golang.org/x/sync/errgroup" ) @@ -22,26 +27,43 @@ import ( var exportAsyncPool = &sync.Pool{ New: func() interface{} { return &ExportAsync{ - extension: ".xlsx", + extension: DefaultExtension, sheetName: "Sheet1", - batchSize: 10000, - maxRowPerFile: 10000, - csvToExcelBatch: 1000, + batchSize: DefaultBatch, + maxRowPerFile: DefaultMaxRowPerFile, + csvToExcelBatch: DefaultCsvToExcelBatch, uploader: &Uploader{ FieldFormName: "file", System: "crmApi", Business: "download", }, task: &Task{}, - workerNum: 1, //runtime.NumCPU() * 2, + workerNum: DefaultWorkNum, //runtime.NumCPU() * 2, logTool: NewLogPrint(nil), } }, } +// 全局配置项 +var ( + DefaultBatch = 10000 //默认一次性读取数据量 + DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数,默认10000行->WithMaxRowPerFile + DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize + DefaultWorkNum = 1 // 并发协程数(务必大于1),默认runtime.NumCPU() * 2->WithCustomWorkNum + ProcessLimit = 1 //全局并行导出任务上限 + DefaultUploader = &Uploader{ + FieldFormName: "file", + System: "crmApi", + Business: "download", + } + DefaultExtension = ".xlsx" + DefaultSheetName = "Sheet1" + //SameTaskProcessLimit = 1 //单任务并行导出上限,必须小于ProcessLimit +) + // ExportAsync 异步导出任务配置->默认配置往上看 type ExportAsync struct { - //// 导出文件名(不含扩展名) + // 导出文件名(不含扩展名),同时作为任务名称 fileName string // 文件扩展名(如 .xlsx),默认.xlsx->WithCustomExtension @@ -73,9 +95,6 @@ type ExportAsync struct { // 上传配置->WithCustomUploader uploader *Uploader - // 数据提供函数 - dataProvider DataProviderFn - // 任务状态存储(如 Redis); taskSaveTool TaskSaveTool @@ -84,32 +103,40 @@ type ExportAsync struct { //任务状态 task *Task + + // 分页策略(替换原来的 dataProvider) + pageStrategy PageStrategy + + // 分页策略类型(用于配置) + pageStrategyType PageStrategyType } func NewExportAsync( fileName string, header []string, domain string, - dataProvider DataProviderFn, TaskSaveTool TaskSaveTool, args ...ExportOption, ) *ExportAsync { exporter := exportAsyncPool.Get().(*ExportAsync) exporter.fileName = fileName exporter.header = header - exporter.dataProvider = dataProvider exporter.taskSaveTool = TaskSaveTool exporter.uploader.Host = domain exporter.task.Name = fileName for _, arg := range args { arg(exporter) } - return exporter } func (e *ExportAsync) Run(ctx context.Context) (string, error) { + //新建任务 + if e.pageStrategy == nil { + return "", fmt.Errorf("未设置导出方式,导出方式具体参考PageStrategy") + } + tempDir, err := e.createTask(ctx) if err != nil { return "", fmt.Errorf("创建任务失败: %v", err) @@ -118,7 +145,15 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) { go func() { // 执行导出任务 subCtx, cancel := context.WithCancel(context.Background()) - defer cancel() + defer func() { + e.taskSaveTool.Del(ctx, e.globalCacheKey()) + if _err := recover(); _err != nil { + e.logTool.Errorf("导出panic:\n任务:%s,错误原因:%s", e.task.Id, _err) + } + e.release() + os.RemoveAll(tempDir) + cancel() + }() source, err := e.export(subCtx, tempDir) if err != nil { e.logTool.Errorf("导出错误:\n任务:%s,错误原因:%s", e.task.Id, err.Error()) @@ -126,10 +161,17 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) { _ = e.updateTask(subCtx) } e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source) - os.RemoveAll(tempDir) + }() return e.task.Id, nil } + +// 添加配置项 +func (e *ExportAsync) setPageStrategy(strategy PageStrategy) { + e.pageStrategy = strategy + e.pageStrategyType = strategy.Type() +} + func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string, err error) { e.processAdd(ctx, INIT.int()) @@ -166,66 +208,91 @@ func (e *ExportAsync) export(ctx context.Context, tempDir string) (source string } func (e *ExportAsync) exportToCsv(ctx context.Context, tempDir string) (csvFiles []string, err error) { + + // 根据策略类型选择不同的导出方式 + switch e.pageStrategyType { + case PageStrategyOffset: + return e.exportToCsvWithOffset(ctx, tempDir) + case PageStrategyCursor: + return e.exportToCsvWithCursor(ctx, tempDir) + case PageStrategyTime: + return e.exportToCsvWithTimeRange(ctx, tempDir) + default: + return nil, fmt.Errorf("unsupported page strategy: %s", e.pageStrategyType) + } +} + +func (e *ExportAsync) exportToCsvWithStrategy(ctx context.Context, tempDir string) (csvFiles []string, err error) { var ( perPageProcess int32 - pageLimit int = -1 csvFilesMap sync.Map + pageNum int64 = 0 ) - // 计算每页进度 + + // 计算进度 if e.dataCount > 0 { - pageLimit = (e.dataCount + e.batchSize - 1) / e.batchSize - perPageProcess = CSV.int() / int32(pageLimit) //6 + totalPages := (e.dataCount + e.batchSize - 1) / e.batchSize + if totalPages > 0 { + perPageProcess = CSV.int() / int32(totalPages) + } } + + // 使用通道分发任务 + taskChan := make(chan interface{}, e.workerNum) + initialState := e.pageStrategy.InitialState() + taskChan <- initialState + g, ctx := errgroup.WithContext(ctx) - g.SetLimit(e.workerNum) // 限制并发数 - - // 使用原子计数器生成页面编号 - var pageNum int64 = 1 - stopProcessing := false - + g.SetLimit(e.workerNum) for i := 0; i < e.workerNum; i++ { g.Go(func() error { for { - if stopProcessing { - return nil + select { + case <-ctx.Done(): + return ctx.Err() + case state := <-taskChan: + if state == nil { + return nil + } + // 获取数据 + data, nextState, err := e.pageStrategy.NextPage(ctx, state) + if err != nil { + e.logTool.Errorf("异步导出任务:%s,获取数据失败:%s", e.task.Id, err.Error()) + return fmt.Errorf("获取数据失败: %w", err) + } + // 没有数据则结束 + if len(data) == 0 { + return nil + } + // 生成文件名 + currentPage := atomic.AddInt64(&pageNum, 1) + fileName := filepath.Join(tempDir, fmt.Sprintf("/csv/%d.csv", currentPage)) + // 原子增加行数 + atomic.AddInt64(&e.task.RowCount, int64(len(data))) + // 保存数据 + if err := e.savePageToCSV(data, fileName); err != nil { + e.logTool.Errorf("任务:%s,保存CSV失败:%s", e.task.Id, err.Error()) + return fmt.Errorf("保存数据失败: %w", err) + } + // 存储文件名 + csvFilesMap.Store(int(currentPage), fileName) + // 更新进度 + e.processAdd(ctx, perPageProcess) + // 如果还有更多数据,继续处理 + if e.pageStrategy.HasMore(nextState, data) { + select { + case taskChan <- nextState: + default: + // 通道满,在当前goroutine继续处理 + state = nextState + continue + } + } else { + // 发送结束信号 + close(taskChan) + return nil + } } - - // 原子获取下一页 - page := int(atomic.AddInt64(&pageNum, 1) - 1) - if pageLimit != -1 && page > pageLimit { - return nil - } - // 获取数据 - rows, err := e.dataProvider(ctx, page, e.batchSize) - if err != nil { - e.logTool.Errorf("异步导出任务:%s,第%d页查询失败:%s", e.task.Id, page, err.Error()) - return fmt.Errorf("第%d页查询失败: %w", page, err) - } - - // 检查是否是最后一页 - if len(rows) == 0 || len(rows) < e.batchSize { - stopProcessing = true - } - - // 如果没有数据,结束处理 - if len(rows) == 0 { - return nil - } - - // 生成文件名 - fileName := filepath.Join(tempDir, fmt.Sprintf("/csv/%d.csv", page)) - // 原子增加行数 - atomic.AddInt64(&e.task.RowCount, int64(len(rows))) - - // 保存数据到临时文件 - if err := e.savePageToCSV(rows, fileName); err != nil { - e.logTool.Errorf("任务:%s,导出到csv错误:%s", e.task.Id, err.Error()) - return fmt.Errorf("保存第%d页失败: %w", page, err) - } - // 存储文件名 - csvFilesMap.Store(page, fileName) - // 更新进度 - e.processAdd(ctx, perPageProcess) } }) } @@ -234,18 +301,24 @@ func (e *ExportAsync) exportToCsv(ctx context.Context, tempDir string) (csvFiles if err := g.Wait(); err != nil { return nil, err } - // 将csv文件名称进行排序 - csvFiles = getSortedValues(&csvFilesMap) - return csvFiles, nil + + // 关闭通道(如果还没关闭) + select { + case <-taskChan: + default: + close(taskChan) + } + + return getSortedValues(&csvFilesMap), nil } func (e *ExportAsync) upload(file string) (string, error) { - resp, err := Upload(e.uploader.Host, file, e.uploader.System, e.uploader.Business, e.uploader.FieldFormName) + resp, err := attachment.Upload(e.uploader.Host, file, e.uploader.System, e.uploader.Business, e.uploader.FieldFormName) if err != nil { return "", err } - return GeneratePreviewPrivateUrl(e.uploader.Host, "", resp.Url, "", strings.TrimSuffix(e.fileName, ".zip"), time.Now().Unix()+300), nil + return attachmentsdk.GeneratePreviewPrivateUrl(e.uploader.Host, "", resp.Url, "", strings.TrimSuffix(e.fileName, ".zip"), time.Now().Unix()+300), nil } func (e *ExportAsync) folderToZip(excelsDir, zipFilePath string) error { @@ -304,6 +377,182 @@ func (e *ExportAsync) folderToZip(excelsDir, zipFilePath string) error { return nil } +// 为每种策略提供专门的导出方法(如果需要特殊处理) +func (e *ExportAsync) exportToCsvWithOffset(ctx context.Context, tempDir string) (csvFiles []string, err error) { + + return e.exportToCsvWithStrategy(ctx, tempDir) +} + +func (e *ExportAsync) exportToCsvWithCursor(ctx context.Context, tempDir string) (csvFiles []string, err error) { + return e.exportToCsvWithStrategy(ctx, tempDir) +} + +func (e *ExportAsync) exportToCsvWithTimeRange(ctx context.Context, tempDir string) (csvFiles []string, err error) { + if strategy, ok := e.pageStrategy.(*TimeRangeStrategy); ok { + // 如果设置了结束时间,可以进行分片并行 + if !strategy.endTime.IsZero() { + return e.exportToCsvWithTimeRangeParallel(ctx, tempDir, strategy) + } + } + return e.exportToCsvWithStrategy(ctx, tempDir) +} + +// exportToCsvWithTimeRangeParallel 时间范围并行导出 +func (e *ExportAsync) exportToCsvWithTimeRangeParallel(ctx context.Context, tempDir string, strategy *TimeRangeStrategy) (csvFiles []string, err error) { + var csvFilesMap sync.Map + + // 计算时间范围 + startTime := strategy.startTime + endTime := strategy.endTime + if endTime.IsZero() { + endTime = time.Now() // 默认到当前时间 + } + + // 计算总时长和分片 + totalDuration := endTime.Sub(startTime) + if totalDuration <= 0 { + return nil, fmt.Errorf("invalid time range: start=%v, end=%v", startTime, endTime) + } + + // 计算每个分片的时间范围 + shardDuration := totalDuration / time.Duration(e.workerNum) + if shardDuration == 0 { + shardDuration = totalDuration // 如果分片太小,就不分片 + e.workerNum = 1 + } + + // 计算进度 + var perShardProcess int32 + if e.dataCount > 0 { + perShardProcess = CSV.int() / int32(e.workerNum) + } else { + perShardProcess = 0 + } + + e.logTool.Infof("异步导出任务:%s,时间范围分片并行导出,分片数:%d,总时长:%v,分片时长:%v", + e.task.Id, e.workerNum, totalDuration, shardDuration) + + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(e.workerNum) + + // 使用原子计数器生成文件索引 + var fileIndex int64 = 1 + + for i := 0; i < e.workerNum; i++ { + workerID := i + g.Go(func() error { + // 计算该worker的时间范围 + defer e.processAdd(ctx, perShardProcess) + workerStartTime := startTime.Add(time.Duration(workerID) * shardDuration) + workerEndTime := workerStartTime.Add(shardDuration) + + // 最后一个worker处理剩余的时间 + if workerID == e.workerNum-1 { + workerEndTime = endTime + } + + e.logTool.Infof("异步导出任务:%s,Worker %d 处理时间范围:%v 到 %v", + e.task.Id, workerID, workerStartTime, workerEndTime) + + return e.processTimeShard(ctx, tempDir, workerID, workerStartTime, workerEndTime, + strategy, &csvFilesMap, &fileIndex) + }) + } + + // 等待所有分片完成 + if err := g.Wait(); err != nil { + return nil, err + } + + return getSortedValues(&csvFilesMap), nil +} + +// processTimeShard 处理单个时间分片 +func (e *ExportAsync) processTimeShard(ctx context.Context, tempDir string, workerID int, + startTime, endTime time.Time, strategy *TimeRangeStrategy, + csvFilesMap *sync.Map, fileIndex *int64) error { + + currentTime := startTime + shardFileIndex := int64(0) + + for { + // 检查上下文是否被取消 + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // 如果当前时间已经超过分片结束时间,则退出 + if !endTime.IsZero() && currentTime.After(endTime) { + break + } + + // 计算本次查询的结束时间 + queryEndTime := currentTime.Add(strategy.timeRange) + if !endTime.IsZero() && queryEndTime.After(endTime) { + queryEndTime = endTime + } + + // 获取数据 + data, err := strategy.fetcher(ctx, currentTime, strategy.limit) + if err != nil { + e.logTool.Errorf("异步导出任务:%s,Worker %d 获取数据失败(时间:%v):%s", + e.task.Id, workerID, currentTime, err.Error()) + return fmt.Errorf("worker %d 获取数据失败: %w", workerID, err) + } + + // 没有数据则尝试下一个时间片段 + if len(data) == 0 { + currentTime = queryEndTime + continue + } + + // 生成文件名 + currentFileIndex := atomic.AddInt64(fileIndex, 1) + fileName := filepath.Join(tempDir, fmt.Sprintf("/csv/worker%d_%d.csv", workerID, shardFileIndex)) + shardFileIndex++ + + // 原子增加行数 + atomic.AddInt64(&e.task.RowCount, int64(len(data))) + + // 保存数据 + if err := e.savePageToCSV(data, fileName); err != nil { + e.logTool.Errorf("异步导出任务:%s,Worker %d 保存CSV失败:%s", + e.task.Id, workerID, err.Error()) + return fmt.Errorf("worker %d 保存数据失败: %w", workerID, err) + } + + // 存储文件名(使用全局索引保证排序) + csvFilesMap.Store(int(currentFileIndex), fileName) + + e.logTool.Infof("异步导出任务:%s,Worker %d 已处理 %d 条数据,时间:%v,文件:%s", + e.task.Id, workerID, len(data), currentTime, fileName) + + // 判断是否继续 + if len(data) < strategy.limit { + // 如果本次获取的数据不足limit,说明这个时间段的数据已经取完 + currentTime = queryEndTime + } else { + // 如果数据量等于limit,可能还有更多数据 + // 这里可以根据业务逻辑决定是否移动时间 + // 例如:如果数据是按时间排序的,可以取最后一条数据的时间作为下一次查询的起始时间 + // 为了简化,我们还是按固定时间片移动 + currentTime = queryEndTime + } + + // 如果已经处理到分片结束时间,退出 + if !endTime.IsZero() && currentTime.After(endTime) { + break + } + } + + e.logTool.Infof("异步导出任务:%s,Worker %d 完成,处理了 %d 个文件", + e.task.Id, workerID, shardFileIndex) + + return nil +} + func (e *ExportAsync) zipFile(tempDir string) string { return e.dirZip(tempDir) + e.fileName + ".zip" @@ -323,15 +572,6 @@ func (e *ExportAsync) mergeCSVsToExcelFiles(csvFiles []string, tempDir string) ( return } -func (e *ExportAsync) release() { - // 清空敏感或动态数据 - e.fileName = "" - e.header = nil - e.dataProvider = nil - e.taskSaveTool = nil - exportAsyncPool.Put(e) -} - func getSortedValues(sm *sync.Map) []string { // 1. 预分配切片(假设已知大致数量) items := make([]struct { @@ -363,22 +603,74 @@ func getSortedValues(sm *sync.Map) []string { } func (e *ExportAsync) createTask(ctx context.Context) (tempDir string, err error) { - uid, err := uuid.NewUUID() + //判断是否到达系统上限 + uid, err := e.getTaskId(ctx) if err != nil { - err = fmt.Errorf("UUid创建失败: %w", err) - return + + return "", fmt.Errorf("初始化任务失败: %w", err) } e.task.Id = uid.String() tempDir, err = e.createDefaultDir(ctx) if err != nil { - err = fmt.Errorf("初始化默认文件夹失败: %w", err) - return + + return "", fmt.Errorf("初始化默认文件夹失败: %w", err) } err = e.updateTask(ctx) return tempDir, nil } +func (e *ExportAsync) getTaskId(ctx context.Context) (uid uuid.UUID, err error) { + //// 检查同任务数量 + //if err = e.checkTaskLimit(ctx, e.taskCacheKey(), SameTaskProcessLimit, "任务"); err != nil { + // return + //} + + // 检查全局任务数量 + if err = e.CheckAndIncrementTaskCount(ctx, e.globalCacheKey(), ProcessLimit, "全局任务"); err != nil { + return + } + + return uuid.NewUUID() +} + +func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string, limit int, limitType string) error { + count, err := e.getAndParseTaskCount(ctx, key) + if err != nil { + return fmt.Errorf("获取%s数量失败: %w", limitType, err) + } + + if count >= limit { + return fmt.Errorf("%s %s数量已达上限(%d),请稍后重试", e.fileName, limitType, limit) + } + + if _err := e.taskSaveTool.Set(ctx, key, strconv.Itoa(count+1), 0).Err(); _err != nil { + e.taskSaveTool.Del(ctx, key) + return fmt.Errorf("更新任务数量失败: %w", err) + } + return nil +} + +func (e *ExportAsync) getAndParseTaskCount(ctx context.Context, key string) (int, error) { + res := e.taskSaveTool.Get(ctx, key) + if res.Val() == "" { + return 0, nil + } + count, err := strconv.Atoi(res.Val()) + if err != nil { + return 0, fmt.Errorf("解析任务数量失败: %w", err) + } + return count, nil +} + +func (e *ExportAsync) taskCacheKey() string { + return fmt.Sprintf("%s:%s", CacheKey, base64.StdEncoding.EncodeToString([]byte(e.fileName))) +} + +func (e *ExportAsync) globalCacheKey() string { + return fmt.Sprintf("%s%s", CacheKey, "global") +} + func (e *ExportAsync) updateTask(ctx context.Context) (err error) { taskByte, err := json.Marshal(e.task) if err != nil { @@ -462,3 +754,19 @@ func (e *ExportAsync) savePageToCSV(data [][]interface{}, filename string) error return nil } + +func (e *ExportAsync) release() { + // 清空敏感或动态数据 + e.fileName = "" + e.header = nil + e.taskSaveTool = nil + e.batchSize = DefaultBatch + e.maxRowPerFile = DefaultMaxRowPerFile + e.csvToExcelBatch = DefaultCsvToExcelBatch + e.task = nil + e.workerNum = DefaultWorkNum + e.uploader = DefaultUploader + e.logTool = NewLogPrint(nil) + e.sheetName = DefaultSheetName + exportAsyncPool.Put(e) +} diff --git a/export_async_test.go b/export_async_test.go index 3d0d746..971bb32 100644 --- a/export_async_test.go +++ b/export_async_test.go @@ -2,13 +2,18 @@ package l_export_async import ( "context" + "finance/internal/data" + "finance/internal/initialize" + "finance/internal/pkg" + "finance/internal/pkg/helper/attachment" + log2 "finance/internal/pkg/log" "fmt" "os" "strings" "testing" "time" - "github.com/redis/go-redis/v9" + attachmentsdk "codeup.aliyun.com/5f9118049cffa29cfdd3be1c/attachment-sdk" ) func Test_Merge(t *testing.T) { @@ -28,8 +33,8 @@ func Test_Merge(t *testing.T) { } func Test_Tsk(t *testing.T) { - taskId := "c1f316c5-defa-11f0-bcc5-00155d5ef0f9" - t.Log(NewTask(NewRedisTaskStore(dat())).GetTaskInfo(context.Background(), taskId)) + taskId := "38089ced-dfde-11f0-a860-00155d5ef0f9" + t.Log(NewTask(NewRedisTaskStore(dat().Rdb)).GetTaskInfo(context.Background(), taskId)) } func Test_Upload(t *testing.T) { @@ -43,14 +48,20 @@ func Test_Upload(t *testing.T) { sys := "crmApi" business := "download" fieldFormName := "file" - resp, err := Upload(host, file, sys, business, fieldFormName) + resp, err := attachment.Upload(host, file, sys, business, fieldFormName) if err != nil { t.Log(err) } - url := GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300) + url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300) t.Log(url, err) } +var ( + Name = "test" + Version string + id, _ = os.Hostname() +) + func listFiles(dirPath string) ([]string, error) { entries, err := os.ReadDir(dirPath) if err != nil { @@ -66,29 +77,11 @@ func listFiles(dirPath string) ([]string, error) { return files, nil } -var ( - Name = "test" - Version string - id, _ = os.Hostname() -) +func dat() *data.Data { + bootstrap := initialize.LoadConfigWithTest() + businessLogger := log2.NewBusinessLogger(bootstrap.Logs, id, Name, Version) + helper := pkg.NewLogHelper(businessLogger, bootstrap) -func dat() *redis.Client { - - return buildRdb() -} - -func buildRdb() *redis.Client { - - rdb := redis.NewClient(&redis.Options{ - Addr: Redis.Addr, - Password: Redis.Password, - ReadTimeout: Redis.ReadTimeout.AsDuration(), - WriteTimeout: Redis.WriteTimeout.AsDuration(), - PoolSize: int(Redis.PoolSize), - MinIdleConns: int(Redis.MinIdleConns), - ConnMaxIdleTime: Redis.ConnMaxIdleTime.AsDuration(), - DB: int(GetRedis().GetDb()), - }) - // 此时并没有发起连接,在使用时才会 - return rdb + dataData, _, _ := data.NewData(bootstrap, helper) + return dataData } diff --git a/merge.go b/merge.go index 13c0ca9..967383c 100644 --- a/merge.go +++ b/merge.go @@ -3,11 +3,12 @@ package l_export_async import ( "encoding/csv" "fmt" - "github.com/xuri/excelize/v2" "io" "math" "os" "regexp" + + "github.com/xuri/excelize/v2" ) type ( diff --git a/page_strategy.go b/page_strategy.go new file mode 100644 index 0000000..8beb075 --- /dev/null +++ b/page_strategy.go @@ -0,0 +1,178 @@ +package l_export_async + +import ( + "context" + "time" +) + +// PageStrategyType 分页策略类型 +type PageStrategyType string + +const ( + PageStrategyOffset PageStrategyType = "offset" + PageStrategyCursor PageStrategyType = "cursor" + PageStrategyTime PageStrategyType = "time" +) + +// PageStrategy 分页策略接口 +type PageStrategy interface { + // Type 类型标识 + Type() PageStrategyType + + // InitialState 获取初始状态 + InitialState() interface{} + + // NextPage 获取下一页数据 + NextPage(ctx context.Context, state interface{}) ([][]interface{}, interface{}, error) + + // HasMore 是否还有下一页 + HasMore(state interface{}, lastData [][]interface{}) bool + + // GetLimit 获取每批数据量 + GetLimit() int +} + +// DataFetcher 数据获取函数类型 +type ( + // OffsetDataFetcher 传统分页 + OffsetDataFetcher func(ctx context.Context, pageNum, limit int) ([][]interface{}, error) + + // CursorDataFetcher 游标分页 + CursorDataFetcher func(ctx context.Context, cursor interface{}, limit int) ([][]interface{}, interface{}, error) + + // TimeRangeDataFetcher 时间范围分页 + TimeRangeDataFetcher func(ctx context.Context, startTime time.Time, limit int) ([][]interface{}, error) +) + +// OffsetStrategy 传统分页策略 +type OffsetStrategy struct { + fetcher OffsetDataFetcher + limit int + total int // 可选,用于提前知道总数 +} + +func (s *OffsetStrategy) Type() PageStrategyType { + return PageStrategyOffset +} + +func (s *OffsetStrategy) InitialState() interface{} { + return 1 // 初始页码 +} + +func (s *OffsetStrategy) NextPage(ctx context.Context, state interface{}) ([][]interface{}, interface{}, error) { + pageNum := state.(int) + data, err := s.fetcher(ctx, pageNum, s.limit) + if err != nil { + return nil, nil, err + } + + // 下一页页码 + nextState := pageNum + 1 + + return data, nextState, nil +} + +func (s *OffsetStrategy) HasMore(state interface{}, lastData [][]interface{}) bool { + if s.total > 0 { + currentPage := state.(int) + totalPages := (s.total + s.limit - 1) / s.limit + return currentPage <= totalPages + } + + // 如果没有总数,则根据最后一页数据判断 + //return len(lastData) >= s.limit + //这里改为判断是否为空,多查一次 + + return len(lastData) > 0 +} + +func (s *OffsetStrategy) GetLimit() int { + return s.limit +} + +// CursorStrategy 游标分页策略 +type CursorStrategy struct { + fetcher CursorDataFetcher + limit int + initialCursor interface{} +} + +func (s *CursorStrategy) Type() PageStrategyType { + return PageStrategyCursor +} + +func (s *CursorStrategy) InitialState() interface{} { + return s.initialCursor // nil 或初始游标值 +} + +func (s *CursorStrategy) NextPage(ctx context.Context, state interface{}) ([][]interface{}, interface{}, error) { + return s.fetcher(ctx, state, s.limit) +} + +func (s *CursorStrategy) HasMore(state interface{}, lastData [][]interface{}) bool { + // 这里如果为空,证明已经到最后一夜了 + return len(lastData) > 0 +} + +func (s *CursorStrategy) GetLimit() int { + return s.limit +} + +// TimeRangeStrategy 时间范围分页策略 +type TimeRangeStrategy struct { + fetcher TimeRangeDataFetcher + limit int + startTime time.Time + endTime time.Time // 可选的结束时间 + timeRange time.Duration // 每次查询的时间范围 + lastQueryTime *time.Time // 记录最后一次查询的时间 +} + +func (s *TimeRangeStrategy) Type() PageStrategyType { + return PageStrategyTime +} + +func (s *TimeRangeStrategy) InitialState() interface{} { + return s.startTime +} + +func (s *TimeRangeStrategy) NextPage(ctx context.Context, state interface{}) ([][]interface{}, interface{}, error) { + currentTime := state.(time.Time) + + // 如果设置了结束时间,且当前时间已经超过结束时间 + if !s.endTime.IsZero() && currentTime.After(s.endTime) { + return nil, currentTime, nil + } + + data, err := s.fetcher(ctx, currentTime, s.limit) + if err != nil { + return nil, nil, err + } + + // 更新时间状态 + var nextTime time.Time + if len(data) > 0 { + // 这里假设数据按时间排序,可以通过自定义逻辑来调整 + nextTime = currentTime.Add(s.timeRange) + } else { + nextTime = currentTime.Add(s.timeRange) + } + + s.lastQueryTime = ¤tTime + return data, nextTime, nil +} + +func (s *TimeRangeStrategy) HasMore(state interface{}, lastData [][]interface{}) bool { + currentTime := state.(time.Time) + + // 如果设置了结束时间 + if !s.endTime.IsZero() { + return currentTime.Before(s.endTime) + } + + return len(lastData) > 0 +} + +func (s *TimeRangeStrategy) GetLimit() int { + return s.limit +}