添加协程管理功能,支持优雅关闭和并发控制

This commit is contained in:
renzhiyuan 2025-12-24 17:54:18 +08:00
parent cb60c605e3
commit 2e6e665984
11 changed files with 374 additions and 24 deletions

View File

@ -1,4 +1,4 @@
package l_export_async package attachment
import ( import (
"bytes" "bytes"

50
coroutine/README.md Normal file
View File

@ -0,0 +1,50 @@
# 协程启动管理器
目前的主要作用是:
1. 简化写法不用自己再defer记录日志
2. 优雅关闭
3. 统一管理方便后面扩展
4. 支持固定队列协程:定义最大数量,超过时会阻塞启动,直接有空闲的时候才会启动
todolist:
* [ ]. 控coroutine的状态数量时长
## 使用步骤
1. 交给kratos管理生命周期
```go
// 1. 打开main.go
// 2. newApp方法中注入logHelper *log.Helper
// 3. 按如下代码提供给kratos管理生命周期
coroutineServer := coroutine.NewServer(logHelper)
serverOption := kratos.Server(
// 省略之前的一些已有变量...
coroutineServer,
)
```
2. 使用示例
```go
doDesc := "我是闭包中能访问到的变量,可以直接使用"
coroutine.Run("我是任务名称", func() {
// 模拟执行
fmt.Printf("开始:%s\n", doDesc)
time.Sleep(1 * time.Second)
fmt.Printf("开始:%s\n", doDesc)
})
// 在Run里面会开启协程不要自己在这里使用go关键字开启协程
// 错误示例go coroutine.Run("我是任务名称", func() {})
```
3固定队列使用示例
```go
maxTaskCnt := 10 // 最大并行任务数量。超过时阻塞启动方,直到有有任务执行完成
f := NewFixed(maxTaskCnt)
doDesc := "我是闭包中能访问到的变量,可以直接使用"
f.Run("我是任务名称", func() {
// 模拟执行
fmt.Printf("开始:%s\n", doDesc)
time.Sleep(1 * time.Second)
fmt.Printf("开始:%s\n", doDesc)
})
// 在Run里面会开启协程不要自己在这里使用go关键字开启协程
// 错误示例go coroutine.Run("我是任务名称", func() {})
```

23
coroutine/fixed.go Normal file
View File

@ -0,0 +1,23 @@
package coroutine
type Fixed struct {
queues chan struct{}
}
func NewFixed(count int) *Fixed {
return &Fixed{
queues: make(chan struct{}, count),
}
}
// Run 运行指定函数
// name 协程名称
// fn 协程执行的函数
func (f *Fixed) Run(name string, fn func()) {
f.queues <- struct{}{}
runAfter(name, fn, func() {
<-f.queues
})
}
// chan 的发送次数 >= 接收次数可以不用close由GC回收

64
coroutine/fixed_test.go Normal file
View File

@ -0,0 +1,64 @@
package coroutine
import (
"fmt"
"testing"
"time"
)
func TestFixed_Run(t *testing.T) {
tests := []struct {
name string
count int
taskCnt int
expectedMinMill time.Duration
expectedMaxMill time.Duration
}{
{
name: "超过时阻塞 100ms",
count: 1,
taskCnt: 2,
expectedMinMill: 100 * time.Millisecond,
},
{
name: "超过时阻塞 200ms",
count: 1,
taskCnt: 3,
expectedMinMill: 200 * time.Millisecond,
},
{
name: "相等限制时不阻塞",
count: 3,
taskCnt: 3,
expectedMaxMill: 10 * time.Millisecond,
},
{
name: "低于限制时不阻塞",
count: 4,
taskCnt: 3,
expectedMaxMill: 10 * time.Millisecond,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f := NewFixed(tt.count)
start := time.Now()
for i := 0; i < tt.taskCnt; i++ {
f.Run("test", func() {
time.Sleep(100 * time.Millisecond)
})
}
end := time.Now()
milliseconds := end.Sub(start).Milliseconds()
if tt.expectedMinMill > 0 && milliseconds < tt.expectedMinMill.Milliseconds() {
t.Errorf("运行时长最少 %d ms, 实际 %d ms", tt.expectedMinMill, milliseconds)
}
if tt.expectedMaxMill > 0 && milliseconds > tt.expectedMaxMill.Milliseconds() {
t.Errorf("运行时长不超过 %d ms, 实际 %d ms", tt.expectedMaxMill, milliseconds)
}
fmt.Println(milliseconds)
})
}
}

73
coroutine/server.go Normal file
View File

@ -0,0 +1,73 @@
package coroutine
import (
"context"
"fmt"
"gitea.cdlsxd.cn/self-tools/l-export-async/util"
"time"
)
// Server 管理协程
// 提供给kratos当server使用主要实现其start和stop交给kratos管理其生命周期
type Server struct {
// 关闭时最长等待时长
timeout time.Duration
}
type ServerOption func(s *Server)
func WithServerTimeout(timeout time.Duration) ServerOption {
return func(s *Server) {
s.timeout = timeout
}
}
// NewServer 创建一个协程管理器
// log 日志
// timeout 关闭时最长等待时长
func NewServer(log util.Logger, opts ...ServerOption) *Server {
globalLogger = log
s := &Server{
timeout: 24 * time.Hour, //默认最多等待24个小时
}
for _, opt := range opts {
opt(s)
}
return s
}
func (s *Server) Start(ctx context.Context) error {
return nil
}
func (s *Server) Stop(ctx context.Context) error {
myCtx, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel()
// 每1s检查下业务是否都处理完成
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
tasks := getTasks()
if len(tasks) == 0 {
fmt.Println("协程任务已全部优雅退出")
return nil
}
processStatusMsg := ""
for _, t := range tasks {
if processStatusMsg != "" {
processStatusMsg += "、"
}
processStatusMsg += t.name
}
fmt.Printf("等待协程任务退出,当前活动:%d 个:%s \n", len(tasks), processStatusMsg)
select {
case <-myCtx.Done():
//超时退出
err := fmt.Errorf("等待协程任务超时,即将退出")
return err
case <-ticker.C:
}
}
}

87
coroutine/task.go Normal file
View File

@ -0,0 +1,87 @@
package coroutine
import (
"fmt"
"gitea.cdlsxd.cn/self-tools/l-export-async/util"
"sync"
)
var activeTaskMu sync.Mutex
var activeTask = make(map[*task]struct{})
var globalLogger util.Logger
type task struct {
name string
fn func()
}
// addTask 添加任务
func addTask(t *task) {
activeTaskMu.Lock()
defer activeTaskMu.Unlock()
activeTask[t] = struct{}{}
}
// removeTask 删除任务
func removeTask(t *task) {
activeTaskMu.Lock()
defer activeTaskMu.Unlock()
delete(activeTask, t)
}
// getTaskSize 获取任务数量
// nolint
func getTaskSize() int {
activeTaskMu.Lock()
defer activeTaskMu.Unlock()
return len(activeTask)
}
// getTaskSize 获取任务数量
func getTasks() []*task {
activeTaskMu.Lock()
defer activeTaskMu.Unlock()
tasks := make([]*task, 0, len(activeTask))
for t := range activeTask {
tasks = append(tasks, t)
}
return tasks
}
// Run 运行指定函数
// name 协程名称
// fn 协程执行的函数
func Run(name string, fn func()) {
runAfter(name, fn, func() {})
}
// Run 运行指定函数
// name 协程名称
// fn 协程执行的函数
func runAfter(name string, fn func(), afterFn func()) {
t := &task{name: name, fn: fn}
addTask(t)
go func() {
// 删除任务
defer removeTask(t)
// 记录panic错误
defer func() {
if r := recover(); r != nil {
if globalLogger != nil {
globalLogger.Errorf("coroutine %s panic%+v", t.name, r)
} else {
fmt.Printf("coroutine %s panic%+v", t.name, r)
}
}
}()
// 完毕后要执行的函数
defer afterFn()
// 运行指定函数
t.fn()
}()
}

41
coroutine/task_test.go Normal file
View File

@ -0,0 +1,41 @@
package coroutine
import (
"context"
"fmt"
"testing"
"time"
)
func TestRun(t *testing.T) {
// 测试未超时情况
fmt.Println("-----测试超时情况------")
for i := 0; i < 5; i++ {
ii := i
name := fmt.Sprintf("测试任务%d", i)
Run(name, func() {
fmt.Printf("%s开始\n", name)
w := time.Second * time.Duration(ii)
time.Sleep(w)
fmt.Printf("%s完成\n", name)
})
}
err := NewServer(nil, WithServerTimeout(2*time.Second)).Stop(context.Background())
fmt.Println(err)
// 测试超时情况
fmt.Println("\n-----测试未超时情况------")
for i := 0; i < 5; i++ {
ii := i
name := fmt.Sprintf("测试任务%d", ii)
Run(name, func() {
fmt.Printf("%s开始\n", name)
w := time.Second * time.Duration(ii)
time.Sleep(w)
fmt.Printf("%s完成\n", name)
})
}
err = NewServer(nil).Stop(context.Background())
fmt.Println(err)
}

View File

@ -6,11 +6,16 @@ import (
"encoding/base64" "encoding/base64"
"encoding/csv" "encoding/csv"
"encoding/json" "encoding/json"
"finance/internal/pkg/helper/attachment" "errors"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"gitea.cdlsxd.cn/self-tools/l-export-async/attachment"
"gitea.cdlsxd.cn/self-tools/l-export-async/coroutine"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
@ -18,10 +23,6 @@ import (
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
attachmentsdk "codeup.aliyun.com/5f9118049cffa29cfdd3be1c/attachment-sdk"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
) )
var exportAsyncPool = &sync.Pool{ var exportAsyncPool = &sync.Pool{
@ -50,7 +51,7 @@ var (
DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数默认10000行->WithMaxRowPerFile DefaultMaxRowPerFile = 100000 ////每个Xlsx的行数默认10000行->WithMaxRowPerFile
DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize DefaultCsvToExcelBatch = 1000 ////csv转excel的批量写入缓冲区大小,逐行写入设置为1000->WithCustomBufferSize
DefaultWorkNum = 1 // 并发协程数务必大于1,默认runtime.NumCPU() * 2->WithCustomWorkNum DefaultWorkNum = 1 // 并发协程数务必大于1,默认runtime.NumCPU() * 2->WithCustomWorkNum
ProcessLimit = 1 //全局并行导出任务上限 ProcessLimit = 1000 //全局并行导出任务上限
DefaultUploader = &Uploader{ DefaultUploader = &Uploader{
FieldFormName: "file", FieldFormName: "file",
System: "crmApi", System: "crmApi",
@ -141,8 +142,7 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) {
if err != nil { if err != nil {
return "", fmt.Errorf("创建任务失败: %v", err) return "", fmt.Errorf("创建任务失败: %v", err)
} }
coroutine.Run(fmt.Sprintf("创建任务导出任务:%s", e.fileName), func() {
go func() {
// 执行导出任务 // 执行导出任务
subCtx, cancel := context.WithCancel(context.Background()) subCtx, cancel := context.WithCancel(context.Background())
defer func() { defer func() {
@ -162,7 +162,8 @@ func (e *ExportAsync) Run(ctx context.Context) (string, error) {
} }
e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source) e.logTool.Infof("异步导出任务:%s,导出完成,总计导出%d条数据,下载地址:%s", e.task.Id, e.task.RowCount, source)
}() })
return e.task.Id, nil return e.task.Id, nil
} }
@ -318,7 +319,7 @@ func (e *ExportAsync) upload(file string) (string, error) {
if err != nil { if err != nil {
return "", err return "", err
} }
return attachmentsdk.GeneratePreviewPrivateUrl(e.uploader.Host, "", resp.Url, "", strings.TrimSuffix(e.fileName, ".zip"), time.Now().Unix()+300), nil return attachment.GeneratePreviewPrivateUrl(e.uploader.Host, "", resp.Url, "", strings.TrimSuffix(e.fileName, ".zip"), time.Now().Unix()+300), nil
} }
func (e *ExportAsync) folderToZip(excelsDir, zipFilePath string) error { func (e *ExportAsync) folderToZip(excelsDir, zipFilePath string) error {
@ -534,10 +535,6 @@ func (e *ExportAsync) processTimeShard(ctx context.Context, tempDir string, work
// 如果本次获取的数据不足limit说明这个时间段的数据已经取完 // 如果本次获取的数据不足limit说明这个时间段的数据已经取完
currentTime = queryEndTime currentTime = queryEndTime
} else { } else {
// 如果数据量等于limit可能还有更多数据
// 这里可以根据业务逻辑决定是否移动时间
// 例如:如果数据是按时间排序的,可以取最后一条数据的时间作为下一次查询的起始时间
// 为了简化,我们还是按固定时间片移动
currentTime = queryEndTime currentTime = queryEndTime
} }
@ -639,12 +636,10 @@ func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string
if err != nil { if err != nil {
return fmt.Errorf("获取%s数量失败: %w", limitType, err) return fmt.Errorf("获取%s数量失败: %w", limitType, err)
} }
if count >= limit { if count >= limit {
return fmt.Errorf("%s %s数量已达上限%d请稍后重试", e.fileName, limitType, limit) return fmt.Errorf("%s %s数量已达上限%d请稍后重试", e.fileName, limitType, limit)
} }
if _err := e.taskSaveTool.Set(ctx, key, strconv.Itoa(count+1), 10*time.Minute).Err(); _err != nil {
if _err := e.taskSaveTool.Set(ctx, key, strconv.Itoa(count+1), 0).Err(); _err != nil {
e.taskSaveTool.Del(ctx, key) e.taskSaveTool.Del(ctx, key)
return fmt.Errorf("更新任务数量失败: %w", err) return fmt.Errorf("更新任务数量失败: %w", err)
} }
@ -653,9 +648,13 @@ func (e *ExportAsync) CheckAndIncrementTaskCount(ctx context.Context, key string
func (e *ExportAsync) getAndParseTaskCount(ctx context.Context, key string) (int, error) { func (e *ExportAsync) getAndParseTaskCount(ctx context.Context, key string) (int, error) {
res := e.taskSaveTool.Get(ctx, key) res := e.taskSaveTool.Get(ctx, key)
if res.Val() == "" { if errors.Is(res.Err(), redis.Nil) {
return 0, nil return 0, nil
} }
if res.Err() != nil {
return 0, fmt.Errorf("获取任务数量失败: %w", res.Err())
}
count, err := strconv.Atoi(res.Val()) count, err := strconv.Atoi(res.Val())
if err != nil { if err != nil {
return 0, fmt.Errorf("解析任务数量失败: %w", err) return 0, fmt.Errorf("解析任务数量失败: %w", err)
@ -685,7 +684,6 @@ func (e *ExportAsync) updateTask(ctx context.Context) (err error) {
} }
func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) { func (e *ExportAsync) processAdd(ctx context.Context, addNum int32) {
atomic.AddInt32(&e.task.Process, addNum) atomic.AddInt32(&e.task.Process, addNum)
e.logTool.Infof("异步导出任务:%s,当前进度:%d", e.task.Id, e.task.Process) e.logTool.Infof("异步导出任务:%s,当前进度:%d", e.task.Id, e.task.Process)
_ = e.updateTask(ctx) _ = e.updateTask(ctx)
@ -763,7 +761,7 @@ func (e *ExportAsync) release() {
e.batchSize = DefaultBatch e.batchSize = DefaultBatch
e.maxRowPerFile = DefaultMaxRowPerFile e.maxRowPerFile = DefaultMaxRowPerFile
e.csvToExcelBatch = DefaultCsvToExcelBatch e.csvToExcelBatch = DefaultCsvToExcelBatch
e.task = nil e.task = &Task{}
e.workerNum = DefaultWorkNum e.workerNum = DefaultWorkNum
e.uploader = DefaultUploader e.uploader = DefaultUploader
e.logTool = NewLogPrint(nil) e.logTool = NewLogPrint(nil)

View File

@ -3,12 +3,11 @@ package l_export_async
import ( import (
"encoding/csv" "encoding/csv"
"fmt" "fmt"
"github.com/xuri/excelize/v2"
"io" "io"
"math" "math"
"os" "os"
"regexp" "regexp"
"github.com/xuri/excelize/v2"
) )
type ( type (

View File

@ -16,7 +16,7 @@ func (r *redisTaskStore) Set(ctx context.Context, key string, value interface{},
} }
func (r *redisTaskStore) Del(ctx context.Context, keys ...string) TaskErr { func (r *redisTaskStore) Del(ctx context.Context, keys ...string) TaskErr {
//实际运行中并没有去执行这个,因为不确定是否真的需要删除,如果需要可以自行写入
return r.client.Del(ctx, keys...) return r.client.Del(ctx, keys...)
} }

15
util/logger.go Normal file
View File

@ -0,0 +1,15 @@
package util
type Logger interface {
// Debugf logs a formatted debugging message.
Debugf(format string, args ...interface{})
// Infof logs a formatted informational message.
Infof(format string, args ...interface{})
// Warnf logs a formatted warning message.
Warnf(format string, args ...interface{})
// Errorf logs a formatted error message.
Errorf(format string, args ...interface{})
}