From b262190ca17279adb7beb1f9b9c3db960d018db0 Mon Sep 17 00:00:00 2001 From: renzhiyuan <465386466@qq.com> Date: Fri, 26 Dec 2025 09:57:03 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BC=82=E6=AD=A5=E5=AF=BC?= =?UTF-8?q?=E5=87=BA=EF=BC=8C=E6=9B=B4=E6=96=B0=E4=B8=8A=E4=BC=A0=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E4=B8=8E=E7=BC=93=E5=AD=98=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- attachment/uploader.go | 2 +- entity.go | 11 +++++-- export_async.go | 66 +++++++++++++++++++++--------------------- export_async_test.go | 47 +++++++++++++----------------- go.mod | 2 +- 5 files changed, 64 insertions(+), 64 deletions(-) diff --git a/attachment/uploader.go b/attachment/uploader.go index 30c775a..7db6662 100644 --- a/attachment/uploader.go +++ b/attachment/uploader.go @@ -109,7 +109,7 @@ func Upload(host, filePath, system, business, fieldFormName string) (*UploadResp _ = retry.Retry(func() error { err = requestHttp() return err - }, retry.RetryTimes(5), retry.RetryDuration(time.Second*3)) + }, retry.RetryTimes(5)) if err != nil { return nil, err } diff --git a/entity.go b/entity.go index fc154fe..d5bd078 100644 --- a/entity.go +++ b/entity.go @@ -23,10 +23,11 @@ type Uploader struct { type ExportOption func(*ExportAsync) -// WithCustomUploader 自定义上传配置 -func WithCustomUploader(sys string, business string, fieldFormName string) ExportOption { +// WithCustomOssUploader 自定义上传配置 +func WithCustomOssUploader(host string, sys string, business string, fieldFormName string) ExportOption { return func(b *ExportAsync) { b.uploader = &Uploader{ + Host: host, System: sys, Business: business, FieldFormName: fieldFormName, @@ -146,6 +147,12 @@ func WithTimeRange(fetcher TimeRangeDataFetcher, startTime time.Time, timeRange } } +func WithCustomCacheDir(dir string) ExportOption { + return func(e *ExportAsync) { + e.cacheDir = dir + } +} + type Task struct { Id string `json:"id"` Name string `json:"name"` diff --git a/export_async.go b/export_async.go index ccd776d..7e7c04f 100644 --- a/export_async.go +++ b/export_async.go @@ -29,18 +29,14 @@ var exportAsyncPool = &sync.Pool{ New: func() interface{} { return &ExportAsync{ extension: DefaultExtension, - sheetName: "Sheet1", + sheetName: DefaultSheetName, batchSize: DefaultBatch, maxRowPerFile: DefaultMaxRowPerFile, csvToExcelBatch: DefaultCsvToExcelBatch, - uploader: &Uploader{ - FieldFormName: "file", - System: "crmApi", - Business: "download", - }, - task: &Task{}, - workerNum: DefaultWorkNum, //runtime.NumCPU() * 2, - logTool: NewLogPrint(nil), + task: &Task{}, + workerNum: DefaultWorkNum, //runtime.NumCPU() * 2, + logTool: NewLogPrint(nil), + cacheDir: DefaultCacheDir, } }, } @@ -51,15 +47,11 @@ var ( DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数,默认10000行->WithMaxRowPerFile DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize DefaultWorkNum = 1 // 并发协程数(务必大于1),默认runtime.NumCPU() * 2->WithCustomWorkNum - ProcessLimit = 1000 //全局并行导出任务上限 - DefaultUploader = &Uploader{ - FieldFormName: "file", - System: "crmApi", - Business: "download", - } - DefaultExtension = ".xlsx" - DefaultSheetName = "Sheet1" - //SameTaskProcessLimit = 1 //单任务并行导出上限,必须小于ProcessLimit + DefaultExtension = ".xlsx" + DefaultSheetName = "Sheet1" + DefaultCacheDir = "/tmp" + ProcessLimit = 0 //全局并行导出任务上限 + ) // ExportAsync 异步导出任务配置->默认配置往上看 @@ -87,7 +79,8 @@ type ExportAsync struct { //2.当workerNum>1时,因为线程的乱序(虽然可以通过通道解决,但是就没有线程存在的意义了),会导致查询很多空值出来 //比如:如果dataProvider走的mysql,那么会执行很多空查询,增加数据库压力 dataCount int - + //导出文件存储位置->默认/tmp ->WithCustomCacheDir + cacheDir string //日志输出->WithLogPrint logTool LogTool // Excel 表头 @@ -115,7 +108,6 @@ type ExportAsync struct { func NewExportAsync( fileName string, header []string, - domain string, TaskSaveTool TaskSaveTool, args ...ExportOption, ) *ExportAsync { @@ -123,7 +115,6 @@ func NewExportAsync( exporter.fileName = fileName exporter.header = header exporter.taskSaveTool = TaskSaveTool - exporter.uploader.Host = domain exporter.task.Name = fileName for _, arg := range args { arg(exporter) @@ -131,16 +122,16 @@ func NewExportAsync( return exporter } -func (e *ExportAsync) Run(ctx context.Context) (string, error) { +func (e *ExportAsync) Run(ctx context.Context) (task_id string, cacheDir string, err error) { //新建任务 if e.pageStrategy == nil { - return "", fmt.Errorf("未设置导出方式,导出方式具体参考PageStrategy") + return "", "", fmt.Errorf("未设置导出方式,导出方式具体参考PageStrategy") } tempDir, err := e.createTask(ctx) if err != nil { - return "", fmt.Errorf("创建任务失败: %v", err) + return "", "", fmt.Errorf("创建任务失败: %v", err) } coroutine.Run(fmt.Sprintf("创建任务导出任务:%s", e.fileName), func() { // 执行导出任务 @@ -166,8 +157,8 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) { e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source) }) - - return e.task.Id, nil + cacheDir, err = e.getBaseCacheDir(ctx) + return e.task.Id, cacheDir, err } // 添加配置项 @@ -610,7 +601,7 @@ func (e *ExportAsync) createTask(ctx context.Context) (tempDir string, err error return "", fmt.Errorf("初始化任务失败: %w", err) } e.task.Id = uid.String() - tempDir, err = e.createDefaultDir(ctx) + tempDir, err = e.getCacheDir(ctx) if err != nil { return "", fmt.Errorf("初始化默认文件夹失败: %w", err) @@ -635,6 +626,9 @@ func (e *ExportAsync) getTaskId(ctx context.Context) (uid uuid.UUID, err error) } func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string, limit int, limitType string) error { + if limit == 0 { + return nil + } count, err := e.getAndParseTaskCount(ctx, key) if err != nil { return fmt.Errorf("获取%s数量失败: %w", limitType, err) @@ -693,12 +687,9 @@ func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) { return } -func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) { +func (e *ExportAsync) getCacheDir(ctx context.Context) (string, error) { // 创建临时目录 - tempDir, err := os.MkdirTemp("", e.task.Id) - if err != nil { - return "", fmt.Errorf("创建临时目录失败: %v", err) - } + tempDir, err := e.getBaseCacheDir(ctx) //csv if err = os.Mkdir(e.dirCsv(tempDir), 0755); err != nil { return "", fmt.Errorf("创建csv目录失败: %v", err) @@ -714,6 +705,15 @@ func (e *ExportAsync) createDefaultDir(ctx context.Context) (string, error) { return tempDir, nil } +func (e *ExportAsync) getBaseCacheDir(ctx context.Context) (string, error) { + tempDir := filepath.Join(e.cacheDir, e.task.Id) + err := os.MkdirAll(tempDir, 0755) // 使用标准的权限模式 + if err != nil { + return "", fmt.Errorf("创建临时目录失败: %v", err) + } + return tempDir, nil +} + func (e *ExportAsync) dirCsv(tempDir string) string { return tempDir + "/csv/" } @@ -766,7 +766,7 @@ func (e *ExportAsync) release() { e.csvToExcelBatch = DefaultCsvToExcelBatch e.task = &Task{} e.workerNum = DefaultWorkNum - e.uploader = DefaultUploader + e.cacheDir = DefaultCacheDir e.logTool = NewLogPrint(nil) e.sheetName = DefaultSheetName exportAsyncPool.Put(e) diff --git a/export_async_test.go b/export_async_test.go index 971bb32..72c9311 100644 --- a/export_async_test.go +++ b/export_async_test.go @@ -2,18 +2,11 @@ package l_export_async import ( "context" - "finance/internal/data" - "finance/internal/initialize" - "finance/internal/pkg" - "finance/internal/pkg/helper/attachment" - log2 "finance/internal/pkg/log" + "fmt" "os" - "strings" - "testing" - "time" - attachmentsdk "codeup.aliyun.com/5f9118049cffa29cfdd3be1c/attachment-sdk" + "testing" ) func Test_Merge(t *testing.T) { @@ -37,24 +30,24 @@ func Test_Tsk(t *testing.T) { t.Log(NewTask(NewRedisTaskStore(dat().Rdb)).GetTaskInfo(context.Background(), taskId)) } -func Test_Upload(t *testing.T) { - task_id := "c0abe2b3-dede-11f0-9178-00155d5ef0f92369485728" - filename := "供应商结算交易流水20251222100502" - - file := "/tmp/" + task_id + "/zip/" + filename + ".zip" - //file := "/tmp/a03f82a8-deda-11f0-9c0b-00155d5ef0f9796292421/zip/供应商结算交易流水20251222100502.zip" - - host := "http://192.168.6.194:8004" - sys := "crmApi" - business := "download" - fieldFormName := "file" - resp, err := attachment.Upload(host, file, sys, business, fieldFormName) - if err != nil { - t.Log(err) - } - url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300) - t.Log(url, err) -} +//func Test_Upload(t *testing.T) { +// task_id := "c0abe2b3-dede-11f0-9178-00155d5ef0f92369485728" +// filename := "供应商结算交易流水20251222100502" +// +// file := "/tmp/" + task_id + "/zip/" + filename + ".zip" +// //file := "/tmp/a03f82a8-deda-11f0-9c0b-00155d5ef0f9796292421/zip/供应商结算交易流水20251222100502.zip" +// +// host := "http://192.168.6.194:8004" +// sys := "crmApi" +// business := "download" +// fieldFormName := "file" +// resp, err := attachment.Upload(host, file, sys, business, fieldFormName) +// if err != nil { +// t.Log(err) +// } +// url := attachmentsdk.GeneratePreviewPrivateUrl(host, "", resp.Url, "", strings.TrimSuffix("供应商结算交易流水20251222100502", ".zip"), time.Now().Unix()+300) +// t.Log(url, err) +//} var ( Name = "test" diff --git a/go.mod b/go.mod index 3083e4d..566f6a9 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module gitea.cdlsxd.cn/self-tools/l-export-async go 1.21.10 require ( - github.com/duke-git/lancet/v2 v2.2.8 + github.com/duke-git/lancet/v2 v2.3.8 github.com/google/uuid v1.3.0 github.com/pkg/errors v0.9.1 github.com/redis/go-redis/v9 v9.17.2