package timeslice import ( "context" "fmt" "github.com/hashicorp/go-multierror" "golang.org/x/sync/errgroup" "time" ) type ManagerSrv struct { callback func(ctx context.Context, req *Task) error } func NewManager(callback func(ctx context.Context, req *Task) error) *ManagerSrv { return &ManagerSrv{callback: callback} } func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) { if req.StartTime.After(req.EndTime) { return 0, fmt.Errorf("start_time不能大于end_time") } totalHours := req.EndTime.Sub(req.StartTime).Hours() taskCount := int(totalHours / 2) // 如果剩余时间不足2小时,增加任务数 if totalHours-float64(taskCount)*float64(2) > 0 { taskCount++ } processReq := &Process{ manager: req, taskCount: taskCount, } return taskCount, m.process(ctx, processReq) } func (m *ManagerSrv) process(ctx context.Context, req *Process) error { if req.taskCount == 0 { return fmt.Errorf("该时间范围无可执行任务次数,请检查时间范围") } if req.manager.GoNum == 0 { return fmt.Errorf("协程数量不能为0") } if req.manager.GoNum > 100 { return fmt.Errorf("协程数量不能大于100") } // 设置最大并发任务数为 5 eg := new(errgroup.Group) eg.SetLimit(req.manager.GoNum) errs := make([]error, 0) // 用于存储所有错误 // 为每个任务分配开始和结束时间 for i := 0; i < req.taskCount; i++ { currentStart := req.manager.StartTime.Add(time.Duration(i) * 2 * time.Hour) currentEnd := currentStart.Add(2 * time.Hour) if currentEnd.After(req.manager.EndTime) { currentEnd = req.manager.EndTime } taskID := i + 1 eg.Go(func() error { defer func() { if err := recover(); err != nil { errs = append(errs, fmt.Errorf("panic: %v", err)) } }() taskReq := &Task{ CurrentStartTime: currentStart, CurrentEndTime: currentEnd, TaskID: taskID, Process: req, } if err := m.callback(ctx, taskReq); err != nil { errs = append(errs, err) } return nil }) } // 等待所有任务完成 if err := eg.Wait(); err != nil { return fmt.Errorf("任务执行失败: %v", err) } var result error // 收集错误 for _, err2 := range errs { result = multierror.Append(result, err2) } return result }