From 2e6e66598406a4244df6d54d8438f218a173bc57 Mon Sep 17 00:00:00 2001 From: renzhiyuan <465386466@qq.com> Date: Wed, 24 Dec 2025 17:54:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8D=8F=E7=A8=8B=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81=E4=BC=98?= =?UTF-8?q?=E9=9B=85=E5=85=B3=E9=97=AD=E5=92=8C=E5=B9=B6=E5=8F=91=E6=8E=A7?= =?UTF-8?q?=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- uploader.go => attachment/uploader.go | 2 +- coroutine/README.md | 50 +++++++++++++++ coroutine/fixed.go | 23 +++++++ coroutine/fixed_test.go | 64 ++++++++++++++++++++ coroutine/server.go | 73 ++++++++++++++++++++++ coroutine/task.go | 87 +++++++++++++++++++++++++++ coroutine/task_test.go | 41 +++++++++++++ export_async.go | 38 ++++++------ merge.go | 3 +- redis.go | 2 +- util/logger.go | 15 +++++ 11 files changed, 374 insertions(+), 24 deletions(-) rename uploader.go => attachment/uploader.go (99%) create mode 100644 coroutine/README.md create mode 100644 coroutine/fixed.go create mode 100644 coroutine/fixed_test.go create mode 100644 coroutine/server.go create mode 100644 coroutine/task.go create mode 100644 coroutine/task_test.go create mode 100644 util/logger.go diff --git a/uploader.go b/attachment/uploader.go similarity index 99% rename from uploader.go rename to attachment/uploader.go index 0c39959..30c775a 100644 --- a/uploader.go +++ b/attachment/uploader.go @@ -1,4 +1,4 @@ -package l_export_async +package attachment import ( "bytes" diff --git a/coroutine/README.md b/coroutine/README.md new file mode 100644 index 0000000..b95e183 --- /dev/null +++ b/coroutine/README.md @@ -0,0 +1,50 @@ +# 协程启动管理器 +目前的主要作用是: + 1. 简化写法,不用自己再defer记录日志 + 2. 优雅关闭 + 3. 统一管理方便后面扩展 + 4. 支持固定队列协程:定义最大数量,超过时会阻塞启动,直接有空闲的时候才会启动 + +todolist: +* [ ]. 控coroutine的状态:数量,时长 + +## 使用步骤 +1. 交给kratos管理生命周期 + ```go + // 1. 打开main.go + // 2. newApp方法中注入logHelper *log.Helper + // 3. 按如下代码提供给kratos管理生命周期 + coroutineServer := coroutine.NewServer(logHelper) + serverOption := kratos.Server( + // 省略之前的一些已有变量... + coroutineServer, + ) + ``` + +2. 使用示例 + ```go + doDesc := "我是闭包中能访问到的变量,可以直接使用" + coroutine.Run("我是任务名称", func() { + // 模拟执行 + fmt.Printf("开始:%s\n", doDesc) + time.Sleep(1 * time.Second) + fmt.Printf("开始:%s\n", doDesc) + }) + // 在Run里面会开启协程,不要自己在这里使用go关键字开启协程 + // 错误示例:go coroutine.Run("我是任务名称", func() {}) + ``` + +3固定队列使用示例 + ```go + maxTaskCnt := 10 // 最大并行任务数量。超过时阻塞启动方,直到有有任务执行完成 + f := NewFixed(maxTaskCnt) + doDesc := "我是闭包中能访问到的变量,可以直接使用" + f.Run("我是任务名称", func() { + // 模拟执行 + fmt.Printf("开始:%s\n", doDesc) + time.Sleep(1 * time.Second) + fmt.Printf("开始:%s\n", doDesc) + }) + // 在Run里面会开启协程,不要自己在这里使用go关键字开启协程 + // 错误示例:go coroutine.Run("我是任务名称", func() {}) + ``` \ No newline at end of file diff --git a/coroutine/fixed.go b/coroutine/fixed.go new file mode 100644 index 0000000..15fb7ee --- /dev/null +++ b/coroutine/fixed.go @@ -0,0 +1,23 @@ +package coroutine + +type Fixed struct { + queues chan struct{} +} + +func NewFixed(count int) *Fixed { + return &Fixed{ + queues: make(chan struct{}, count), + } +} + +// Run 运行指定函数 +// name 协程名称 +// fn 协程执行的函数 +func (f *Fixed) Run(name string, fn func()) { + f.queues <- struct{}{} + runAfter(name, fn, func() { + <-f.queues + }) +} + +// chan 的发送次数 >= 接收次数,可以不用close,由GC回收 diff --git a/coroutine/fixed_test.go b/coroutine/fixed_test.go new file mode 100644 index 0000000..46cb017 --- /dev/null +++ b/coroutine/fixed_test.go @@ -0,0 +1,64 @@ +package coroutine + +import ( + "fmt" + "testing" + "time" +) + +func TestFixed_Run(t *testing.T) { + tests := []struct { + name string + count int + taskCnt int + expectedMinMill time.Duration + expectedMaxMill time.Duration + }{ + { + name: "超过时阻塞 100ms", + count: 1, + taskCnt: 2, + expectedMinMill: 100 * time.Millisecond, + }, + { + name: "超过时阻塞 200ms", + count: 1, + taskCnt: 3, + expectedMinMill: 200 * time.Millisecond, + }, + { + name: "相等限制时不阻塞", + count: 3, + taskCnt: 3, + expectedMaxMill: 10 * time.Millisecond, + }, + { + name: "低于限制时不阻塞", + count: 4, + taskCnt: 3, + expectedMaxMill: 10 * time.Millisecond, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f := NewFixed(tt.count) + start := time.Now() + + for i := 0; i < tt.taskCnt; i++ { + f.Run("test", func() { + time.Sleep(100 * time.Millisecond) + }) + } + + end := time.Now() + milliseconds := end.Sub(start).Milliseconds() + if tt.expectedMinMill > 0 && milliseconds < tt.expectedMinMill.Milliseconds() { + t.Errorf("运行时长最少 %d ms, 实际 %d ms", tt.expectedMinMill, milliseconds) + } + if tt.expectedMaxMill > 0 && milliseconds > tt.expectedMaxMill.Milliseconds() { + t.Errorf("运行时长不超过 %d ms, 实际 %d ms", tt.expectedMaxMill, milliseconds) + } + fmt.Println(milliseconds) + }) + } +} diff --git a/coroutine/server.go b/coroutine/server.go new file mode 100644 index 0000000..c07b4b7 --- /dev/null +++ b/coroutine/server.go @@ -0,0 +1,73 @@ +package coroutine + +import ( + "context" + "fmt" + "gitea.cdlsxd.cn/self-tools/l-export-async/util" + "time" +) + +// Server 管理协程 +// 提供给kratos当server使用,主要实现其start和stop,交给kratos管理其生命周期 +type Server struct { + // 关闭时最长等待时长 + timeout time.Duration +} + +type ServerOption func(s *Server) + +func WithServerTimeout(timeout time.Duration) ServerOption { + return func(s *Server) { + s.timeout = timeout + } +} + +// NewServer 创建一个协程管理器 +// log 日志 +// timeout 关闭时最长等待时长 +func NewServer(log util.Logger, opts ...ServerOption) *Server { + globalLogger = log + s := &Server{ + timeout: 24 * time.Hour, //默认最多等待24个小时 + } + for _, opt := range opts { + opt(s) + } + return s +} + +func (s *Server) Start(ctx context.Context) error { + return nil +} + +func (s *Server) Stop(ctx context.Context) error { + myCtx, cancel := context.WithTimeout(context.Background(), s.timeout) + defer cancel() + + // 每1s检查下业务是否都处理完成 + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + tasks := getTasks() + if len(tasks) == 0 { + fmt.Println("协程任务已全部优雅退出") + return nil + } + processStatusMsg := "" + for _, t := range tasks { + if processStatusMsg != "" { + processStatusMsg += "、" + } + processStatusMsg += t.name + } + fmt.Printf("等待协程任务退出,当前活动:%d 个:%s \n", len(tasks), processStatusMsg) + select { + case <-myCtx.Done(): + //超时退出 + err := fmt.Errorf("等待协程任务超时,即将退出") + return err + case <-ticker.C: + } + } +} diff --git a/coroutine/task.go b/coroutine/task.go new file mode 100644 index 0000000..47fe88f --- /dev/null +++ b/coroutine/task.go @@ -0,0 +1,87 @@ +package coroutine + +import ( + "fmt" + "gitea.cdlsxd.cn/self-tools/l-export-async/util" + "sync" +) + +var activeTaskMu sync.Mutex +var activeTask = make(map[*task]struct{}) +var globalLogger util.Logger + +type task struct { + name string + fn func() +} + +// addTask 添加任务 +func addTask(t *task) { + activeTaskMu.Lock() + defer activeTaskMu.Unlock() + activeTask[t] = struct{}{} +} + +// removeTask 删除任务 +func removeTask(t *task) { + activeTaskMu.Lock() + defer activeTaskMu.Unlock() + delete(activeTask, t) +} + +// getTaskSize 获取任务数量 +// nolint +func getTaskSize() int { + activeTaskMu.Lock() + defer activeTaskMu.Unlock() + return len(activeTask) +} + +// getTaskSize 获取任务数量 +func getTasks() []*task { + activeTaskMu.Lock() + defer activeTaskMu.Unlock() + + tasks := make([]*task, 0, len(activeTask)) + for t := range activeTask { + tasks = append(tasks, t) + } + return tasks +} + +// Run 运行指定函数 +// name 协程名称 +// fn 协程执行的函数 +func Run(name string, fn func()) { + runAfter(name, fn, func() {}) +} + +// Run 运行指定函数 +// name 协程名称 +// fn 协程执行的函数 +func runAfter(name string, fn func(), afterFn func()) { + t := &task{name: name, fn: fn} + addTask(t) + + go func() { + // 删除任务 + defer removeTask(t) + + // 记录panic错误 + defer func() { + if r := recover(); r != nil { + if globalLogger != nil { + globalLogger.Errorf("coroutine %s panic:%+v", t.name, r) + } else { + fmt.Printf("coroutine %s panic:%+v", t.name, r) + } + } + }() + + // 完毕后要执行的函数 + defer afterFn() + + // 运行指定函数 + t.fn() + }() +} diff --git a/coroutine/task_test.go b/coroutine/task_test.go new file mode 100644 index 0000000..a1c76fc --- /dev/null +++ b/coroutine/task_test.go @@ -0,0 +1,41 @@ +package coroutine + +import ( + "context" + "fmt" + "testing" + "time" +) + +func TestRun(t *testing.T) { + // 测试未超时情况 + fmt.Println("-----测试超时情况------") + for i := 0; i < 5; i++ { + ii := i + name := fmt.Sprintf("测试任务%d", i) + Run(name, func() { + fmt.Printf("%s开始\n", name) + w := time.Second * time.Duration(ii) + time.Sleep(w) + fmt.Printf("%s完成\n", name) + }) + } + err := NewServer(nil, WithServerTimeout(2*time.Second)).Stop(context.Background()) + fmt.Println(err) + + // 测试超时情况 + fmt.Println("\n-----测试未超时情况------") + for i := 0; i < 5; i++ { + ii := i + name := fmt.Sprintf("测试任务%d", ii) + Run(name, func() { + fmt.Printf("%s开始\n", name) + w := time.Second * time.Duration(ii) + time.Sleep(w) + fmt.Printf("%s完成\n", name) + }) + } + err = NewServer(nil).Stop(context.Background()) + fmt.Println(err) + +} diff --git a/export_async.go b/export_async.go index 04fcd14..6383915 100644 --- a/export_async.go +++ b/export_async.go @@ -6,11 +6,16 @@ import ( "encoding/base64" "encoding/csv" "encoding/json" - "finance/internal/pkg/helper/attachment" + "errors" "fmt" "strconv" "strings" + "gitea.cdlsxd.cn/self-tools/l-export-async/attachment" + "gitea.cdlsxd.cn/self-tools/l-export-async/coroutine" + "github.com/google/uuid" + "github.com/redis/go-redis/v9" + "golang.org/x/sync/errgroup" "io" "os" "path/filepath" @@ -18,10 +23,6 @@ import ( "sync" "sync/atomic" "time" - - attachmentsdk "codeup.aliyun.com/5f9118049cffa29cfdd3be1c/attachment-sdk" - "github.com/google/uuid" - "golang.org/x/sync/errgroup" ) var exportAsyncPool = &sync.Pool{ @@ -50,7 +51,7 @@ var ( DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数,默认10000行->WithMaxRowPerFile DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize DefaultWorkNum = 1 // 并发协程数(务必大于1),默认runtime.NumCPU() * 2->WithCustomWorkNum - ProcessLimit = 1 //全局并行导出任务上限 + ProcessLimit = 1000 //全局并行导出任务上限 DefaultUploader = &Uploader{ FieldFormName: "file", System: "crmApi", @@ -141,8 +142,7 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) { if err != nil { return "", fmt.Errorf("创建任务失败: %v", err) } - - go func() { + coroutine.Run(fmt.Sprintf("创建任务导出任务:%s", e.fileName), func() { // 执行导出任务 subCtx, cancel := context.WithCancel(context.Background()) defer func() { @@ -162,7 +162,8 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) { } e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source) - }() + }) + return e.task.Id, nil } @@ -318,7 +319,7 @@ func (e *ExportAsync) upload(file string) (string, error) { if err != nil { return "", err } - return attachmentsdk.GeneratePreviewPrivateUrl(e.uploader.Host, "", resp.Url, "", strings.TrimSuffix(e.fileName, ".zip"), time.Now().Unix()+300), nil + return attachment.GeneratePreviewPrivateUrl(e.uploader.Host, "", resp.Url, "", strings.TrimSuffix(e.fileName, ".zip"), time.Now().Unix()+300), nil } func (e *ExportAsync) folderToZip(excelsDir, zipFilePath string) error { @@ -534,10 +535,6 @@ func (e *ExportAsync) processTimeShard(ctx context.Context, tempDir string, work // 如果本次获取的数据不足limit,说明这个时间段的数据已经取完 currentTime = queryEndTime } else { - // 如果数据量等于limit,可能还有更多数据 - // 这里可以根据业务逻辑决定是否移动时间 - // 例如:如果数据是按时间排序的,可以取最后一条数据的时间作为下一次查询的起始时间 - // 为了简化,我们还是按固定时间片移动 currentTime = queryEndTime } @@ -639,12 +636,10 @@ func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string if err != nil { return fmt.Errorf("获取%s数量失败: %w", limitType, err) } - if count >= limit { return fmt.Errorf("%s %s数量已达上限(%d),请稍后重试", e.fileName, limitType, limit) } - - if _err := e.taskSaveTool.Set(ctx, key, strconv.Itoa(count+1), 0).Err(); _err != nil { + if _err := e.taskSaveTool.Set(ctx, key, strconv.Itoa(count+1), 10*time.Minute).Err(); _err != nil { e.taskSaveTool.Del(ctx, key) return fmt.Errorf("更新任务数量失败: %w", err) } @@ -653,9 +648,13 @@ func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string func (e *ExportAsync) getAndParseTaskCount(ctx context.Context, key string) (int, error) { res := e.taskSaveTool.Get(ctx, key) - if res.Val() == "" { + if errors.Is(res.Err(), redis.Nil) { return 0, nil } + if res.Err() != nil { + return 0, fmt.Errorf("获取任务数量失败: %w", res.Err()) + } + count, err := strconv.Atoi(res.Val()) if err != nil { return 0, fmt.Errorf("解析任务数量失败: %w", err) @@ -685,7 +684,6 @@ func (e *ExportAsync) updateTask(ctx context.Context) (err error) { } func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) { - atomic.AddInt32(&e.task.Process, addNum) e.logTool.Infof("异步导出任务:%s,当前进度:%d", e.task.Id, e.task.Process) _ = e.updateTask(ctx) @@ -763,7 +761,7 @@ func (e *ExportAsync) release() { e.batchSize = DefaultBatch e.maxRowPerFile = DefaultMaxRowPerFile e.csvToExcelBatch = DefaultCsvToExcelBatch - e.task = nil + e.task = &Task{} e.workerNum = DefaultWorkNum e.uploader = DefaultUploader e.logTool = NewLogPrint(nil) diff --git a/merge.go b/merge.go index 967383c..13c0ca9 100644 --- a/merge.go +++ b/merge.go @@ -3,12 +3,11 @@ package l_export_async import ( "encoding/csv" "fmt" + "github.com/xuri/excelize/v2" "io" "math" "os" "regexp" - - "github.com/xuri/excelize/v2" ) type ( diff --git a/redis.go b/redis.go index a65b4aa..25b8093 100644 --- a/redis.go +++ b/redis.go @@ -16,7 +16,7 @@ func (r *redisTaskStore) Set(ctx context.Context, key string, value interface{}, } func (r *redisTaskStore) Del(ctx context.Context, keys ...string) TaskErr { - //实际运行中并没有去执行这个,因为不确定是否真的需要删除,如果需要可以自行写入 + return r.client.Del(ctx, keys...) } diff --git a/util/logger.go b/util/logger.go new file mode 100644 index 0000000..031662e --- /dev/null +++ b/util/logger.go @@ -0,0 +1,15 @@ +package util + +type Logger interface { + // Debugf logs a formatted debugging message. + Debugf(format string, args ...interface{}) + + // Infof logs a formatted informational message. + Infof(format string, args ...interface{}) + + // Warnf logs a formatted warning message. + Warnf(format string, args ...interface{}) + + // Errorf logs a formatted error message. + Errorf(format string, args ...interface{}) +}