From d7a1f7ce863e6d15565973c88f4faa5e85b56390 Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 11 Jun 2025 17:31:18 +0800 Subject: [PATCH] timeSliceQueryPush --- internal/biz/timeslicequery/execute.go | 3 +-- internal/pkg/timeslice/manager.go | 31 +++++++++++++++++++++----- internal/server/rds_consume.go | 4 ++++ 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/internal/biz/timeslicequery/execute.go b/internal/biz/timeslicequery/execute.go index c143c6b..d0928ba 100644 --- a/internal/biz/timeslicequery/execute.go +++ b/internal/biz/timeslicequery/execute.go @@ -71,13 +71,12 @@ func (v *Query) callbackFunc(ctx context.Context, req *timeslice.Task) error { end := time.Now() logFields := map[string]interface{}{ + "任务处理时间": currentStartTimeStr + "到" + currentEndTimeStr, "总处理组数": n, "总处理条数": num, "总通知条数": notifyNum, "执行任务开始时间": startStr, "执行任务结束时间": end.Format(time.DateTime), - "任务处理开始时间": currentStartTimeStr, - "任务处理结束时间": currentEndTimeStr, "总处理耗时": end.Sub(start).String(), } log.Warnf("%s到%s,第%d个任务,处理完毕:%+v", startTimeStr, endTimeStr, req.TaskID, logFields) diff --git a/internal/pkg/timeslice/manager.go b/internal/pkg/timeslice/manager.go index 2d221ac..4e3e8a5 100644 --- a/internal/pkg/timeslice/manager.go +++ b/internal/pkg/timeslice/manager.go @@ -5,10 +5,14 @@ import ( "fmt" "github.com/hashicorp/go-multierror" "golang.org/x/sync/errgroup" + "sync" "time" ) -const TimeSliceHours = 2 +const ( + TimeSliceHours = 2 + maxGlobalGoroutines = 1000 +) type Callback func(ctx context.Context, req *Task) error @@ -29,8 +33,8 @@ func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) { if req.GoNum == 0 { return 0, fmt.Errorf("协程数量不能为0") } - if req.GoNum > 100 { - return 0, fmt.Errorf("协程数量不能大于100") + if req.GoNum > maxGlobalGoroutines { + return 0, fmt.Errorf("协程数量不能大于%d", maxGlobalGoroutines) } totalHours := req.EndTime.Sub(req.StartTime).Hours() @@ -58,8 +62,9 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error { // 设置最大并发任务数为 5 eg := new(errgroup.Group) eg.SetLimit(req.Manager.GoNum) + var mu sync.Mutex - errs := make([]error, 0) // 用于存储所有错误 + errs := make([]error, 0, req.TaskCount) // 为每个任务按指定的时间片 TimeSliceHours 分配开始和结束时间 for i := 0; i < req.TaskCount; i++ { @@ -74,9 +79,21 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error { eg.Go(func() error { + select { + case <-ctx.Done(): + mu.Lock() + errs = append(errs, fmt.Errorf("任务 %d 被上下文取消", taskID)) + mu.Unlock() + return ctx.Err() + default: + // 继续执行 + } + defer func() { if err := recover(); err != nil { - errs = append(errs, fmt.Errorf("panic: %v", err)) + mu.Lock() + errs = append(errs, fmt.Errorf("任务 %d panic: %v", taskID, err)) + mu.Unlock() } }() @@ -88,7 +105,9 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error { } if err := m.callback(ctx, taskReq); err != nil { - errs = append(errs, err) + mu.Lock() + errs = append(errs, fmt.Errorf("任务 %d 执行失败: %v", taskID, err)) + mu.Unlock() } return nil diff --git a/internal/server/rds_consume.go b/internal/server/rds_consume.go index 895f7d3..0a34025 100644 --- a/internal/server/rds_consume.go +++ b/internal/server/rds_consume.go @@ -30,6 +30,10 @@ func NewRdbConsumer( manager.Add(cf) } + if cf1 := voucherService.GetWechatTimeSliceQueryConfig(); cf1 != nil { + manager.Add(cf1) + } + if cf2 := voucherService.GetWechatRetryConfig(); cf2 != nil { manager.Add(cf2) }