108 lines
2.3 KiB
Go
108 lines
2.3 KiB
Go
package timeslice
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"github.com/hashicorp/go-multierror"
|
||
"golang.org/x/sync/errgroup"
|
||
"time"
|
||
)
|
||
|
||
type ManagerSrv struct {
|
||
callback func(ctx context.Context, req *Task) error
|
||
}
|
||
|
||
func NewManager(callback func(ctx context.Context, req *Task) error) *ManagerSrv {
|
||
return &ManagerSrv{callback: callback}
|
||
}
|
||
|
||
func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) {
|
||
|
||
if req.StartTime.After(req.EndTime) {
|
||
return 0, fmt.Errorf("start_time不能大于end_time")
|
||
}
|
||
|
||
totalHours := req.EndTime.Sub(req.StartTime).Hours()
|
||
taskCount := int(totalHours / 2)
|
||
|
||
// 如果剩余时间不足2小时,增加任务数
|
||
if totalHours-float64(taskCount)*float64(2) > 0 {
|
||
taskCount++
|
||
}
|
||
|
||
processReq := &Process{
|
||
manager: req,
|
||
taskCount: taskCount,
|
||
}
|
||
|
||
return taskCount, m.process(ctx, processReq)
|
||
}
|
||
|
||
func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
|
||
|
||
if req.taskCount == 0 {
|
||
return fmt.Errorf("该时间范围无可执行任务次数,请检查时间范围")
|
||
}
|
||
|
||
if req.manager.GoNum == 0 {
|
||
return fmt.Errorf("协程数量不能为0")
|
||
}
|
||
if req.manager.GoNum > 100 {
|
||
return fmt.Errorf("协程数量不能大于100")
|
||
}
|
||
|
||
// 设置最大并发任务数为 5
|
||
eg := new(errgroup.Group)
|
||
eg.SetLimit(req.manager.GoNum)
|
||
|
||
errs := make([]error, 0) // 用于存储所有错误
|
||
|
||
// 为每个任务分配开始和结束时间
|
||
for i := 0; i < req.taskCount; i++ {
|
||
|
||
currentStart := req.manager.StartTime.Add(time.Duration(i) * 2 * time.Hour)
|
||
currentEnd := currentStart.Add(2 * time.Hour)
|
||
|
||
if currentEnd.After(req.manager.EndTime) {
|
||
currentEnd = req.manager.EndTime
|
||
}
|
||
taskID := i + 1
|
||
|
||
eg.Go(func() error {
|
||
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
errs = append(errs, fmt.Errorf("panic: %v", err))
|
||
}
|
||
}()
|
||
|
||
taskReq := &Task{
|
||
CurrentStartTime: currentStart,
|
||
CurrentEndTime: currentEnd,
|
||
TaskID: taskID,
|
||
Process: req,
|
||
}
|
||
|
||
if err := m.callback(ctx, taskReq); err != nil {
|
||
errs = append(errs, err)
|
||
}
|
||
|
||
return nil
|
||
})
|
||
}
|
||
|
||
// 等待所有任务完成
|
||
if err := eg.Wait(); err != nil {
|
||
return fmt.Errorf("任务执行失败: %v", err)
|
||
}
|
||
|
||
var result error
|
||
|
||
// 收集错误
|
||
for _, err2 := range errs {
|
||
result = multierror.Append(result, err2)
|
||
}
|
||
|
||
return result
|
||
}
|