timeSliceQueryPush

This commit is contained in:
ziming 2025-06-11 17:31:18 +08:00
parent 3aa1ad9520
commit d7a1f7ce86
3 changed files with 30 additions and 8 deletions

View File

@ -71,13 +71,12 @@ func (v *Query) callbackFunc(ctx context.Context, req *timeslice.Task) error {
end := time.Now()
logFields := map[string]interface{}{
"任务处理时间": currentStartTimeStr + "到" + currentEndTimeStr,
"总处理组数": n,
"总处理条数": num,
"总通知条数": notifyNum,
"执行任务开始时间": startStr,
"执行任务结束时间": end.Format(time.DateTime),
"任务处理开始时间": currentStartTimeStr,
"任务处理结束时间": currentEndTimeStr,
"总处理耗时": end.Sub(start).String(),
}
log.Warnf("%s到%s,第%d个任务,处理完毕:%+v", startTimeStr, endTimeStr, req.TaskID, logFields)

View File

@ -5,10 +5,14 @@ import (
"fmt"
"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"
"sync"
"time"
)
const TimeSliceHours = 2
const (
TimeSliceHours = 2
maxGlobalGoroutines = 1000
)
type Callback func(ctx context.Context, req *Task) error
@ -29,8 +33,8 @@ func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) {
if req.GoNum == 0 {
return 0, fmt.Errorf("协程数量不能为0")
}
if req.GoNum > 100 {
return 0, fmt.Errorf("协程数量不能大于100")
if req.GoNum > maxGlobalGoroutines {
return 0, fmt.Errorf("协程数量不能大于%d", maxGlobalGoroutines)
}
totalHours := req.EndTime.Sub(req.StartTime).Hours()
@ -58,8 +62,9 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
// 设置最大并发任务数为 5
eg := new(errgroup.Group)
eg.SetLimit(req.Manager.GoNum)
var mu sync.Mutex
errs := make([]error, 0) // 用于存储所有错误
errs := make([]error, 0, req.TaskCount)
// 为每个任务按指定的时间片 TimeSliceHours 分配开始和结束时间
for i := 0; i < req.TaskCount; i++ {
@ -74,9 +79,21 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
eg.Go(func() error {
select {
case <-ctx.Done():
mu.Lock()
errs = append(errs, fmt.Errorf("任务 %d 被上下文取消", taskID))
mu.Unlock()
return ctx.Err()
default:
// 继续执行
}
defer func() {
if err := recover(); err != nil {
errs = append(errs, fmt.Errorf("panic: %v", err))
mu.Lock()
errs = append(errs, fmt.Errorf("任务 %d panic: %v", taskID, err))
mu.Unlock()
}
}()
@ -88,7 +105,9 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
}
if err := m.callback(ctx, taskReq); err != nil {
errs = append(errs, err)
mu.Lock()
errs = append(errs, fmt.Errorf("任务 %d 执行失败: %v", taskID, err))
mu.Unlock()
}
return nil

View File

@ -30,6 +30,10 @@ func NewRdbConsumer(
manager.Add(cf)
}
if cf1 := voucherService.GetWechatTimeSliceQueryConfig(); cf1 != nil {
manager.Add(cf1)
}
if cf2 := voucherService.GetWechatRetryConfig(); cf2 != nil {
manager.Add(cf2)
}