This commit is contained in:
ziming 2025-06-11 14:27:10 +08:00
parent 87222148ec
commit ac00969597
3 changed files with 194 additions and 11 deletions

View File

@ -54,6 +54,7 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
// 设置最大并发任务数为 5
eg := new(errgroup.Group)
eg.SetLimit(req.manager.GoNum)
errs := make([]error, 0) // 用于存储所有错误
// 为每个任务分配开始和结束时间
@ -61,9 +62,11 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
currentStart := req.manager.StartTime.Add(time.Duration(i) * 2 * time.Hour)
currentEnd := currentStart.Add(2 * time.Hour)
if currentEnd.After(req.manager.EndTime) {
currentEnd = req.manager.EndTime
}
taskID := i + 1
eg.Go(func() error {
@ -73,17 +76,15 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
}
}()
taskID := i + 1
taskReq := &Task{
CurrentStartTime: currentStart,
CurrentEndTime: currentEnd,
TaskID: taskID,
ProductNo: req.manager.ProductNo,
Process: req,
}
if err := m.callback(ctx, taskReq); err != nil {
errs = append(errs, err)
return err
}
return nil
@ -91,7 +92,9 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
}
// 等待所有任务完成
_ = eg.Wait()
if err := eg.Wait(); err != nil {
return fmt.Errorf("任务执行失败: %v", err)
}
var result error

View File

@ -2,9 +2,12 @@ package timeslice
import (
"context"
"errors"
"fmt"
"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"
"math/rand"
"sync"
"testing"
"time"
)
@ -44,12 +47,12 @@ func TestNewManager(t *testing.T) {
var results []string
callback := func(ctx context.Context, req *Task) error {
// 模拟任务执行,休眠随机时间
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
// 生成任务执行结果
result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.Process.manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
results = append(results, result)
//return nil
return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.Process.manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
}
startTime := time.Now()
@ -59,7 +62,7 @@ func TestNewManager(t *testing.T) {
taskCount, err := srv.Run(context.Background(), &Manager{
StartTime: start,
EndTime: end,
ProductNo: "123456",
ProductNo: "no123456",
GoNum: 2,
})
@ -70,9 +73,161 @@ func TestNewManager(t *testing.T) {
}
if err != nil {
t.Error(err)
multiErr, ok := err.(*multierror.Error)
if ok {
for i, err := range multiErr.Errors {
fmt.Printf("错误 %d: %v\n", i+1, err)
}
} else {
// 不是多错误,单独处理
t.Error("单个错误", err)
}
}
endTime := time.Now()
fmt.Printf("处理耗时:%s,开始处理时间:%s,结束时间%s", startStr, endTime.Sub(startTime).String(), endTime.String())
fmt.Printf("处理耗时:%s,开始处理时间:%s,结束时间%s\n", startStr, endTime.Sub(startTime).String(), endTime.String())
}
func TestBatchCallBackFunc(t *testing.T) {
// 初始化开始时间和结束时间
start, err := time.Parse(time.DateTime, "2025-01-01 01:00:00")
if err != nil {
t.Fatalf("%v", err)
return
}
end, err := time.Parse(time.DateTime, "2025-01-02 02:00:01")
if err != nil {
t.Fatalf("%v", err)
return
}
// 确保开始时间小于结束时间
if start.After(end) {
t.Fatalf("start_time不能大于end_time")
return
}
var wg sync.WaitGroup
// 按照每天的时间单位分片
duration := 24 * time.Hour // 每个时间段为一天
// 循环处理时间段
for current := start; current.Before(end); current = current.Add(duration) {
// 计算当前时间段的结束时间
next := current.Add(duration)
// 如果下一个结束时间超过了实际的结束时间,则调整为实际结束时间
if next.After(end) {
next = end
}
t.Logf("处理时间: %s到%s", current.Format(time.DateTime), next.Format(time.DateTime))
wg.Add(1)
// 启动goroutine处理每个时间段
go func(startTime, endTime time.Time) {
defer func() {
wg.Done()
if err2 := recover(); err2 != nil {
t.Error("panic", err2)
}
}()
if err3 := CallbackFunc(startTime, endTime); err3 != nil {
t.Errorf("任务执行失败:%v\n", err3)
}
}(current, next)
}
// 等待所有的 goroutine 完成
wg.Wait()
}
func CallbackFunc(start, end time.Time) error {
nowTime := time.Now()
managerStartStr := start.Format(time.DateTime)
managerEndStr := end.Format(time.DateTime)
req := &Manager{
StartTime: start,
EndTime: end,
ProductNo: "no123456",
GoNum: 2,
}
taskCount, err := NewManager(callbackFunc).Run(context.Background(), req)
if err != nil {
var multiErr *multierror.Error
if errors.As(err, &multiErr) {
for i, err := range multiErr.Errors {
fmt.Printf("%s到%s,错误 %d: %v\n", managerStartStr, managerEndStr, i+1, err)
}
} else {
// 不是多错误,单独处理
fmt.Printf("单个错误%v", err)
}
}
fmt.Printf("%s到%s,总任务数:%d\n", managerStartStr, managerEndStr, taskCount)
endTime := time.Now()
fmt.Printf("%s到%s,处理耗时:%s,开始处理时间:%s,结束时间%s\n", managerStartStr, managerEndStr, endTime.Sub(nowTime).String(), nowTime.Format(time.DateTime), endTime.Format(time.DateTime))
return nil
}
func callbackFunc(_ context.Context, req *Task) error {
managerStartTimeStr := req.Process.manager.StartTime.Format(time.DateTime)
managerEndTimeStr := req.Process.manager.EndTime.Format(time.DateTime)
currentStartTimeStr := req.CurrentStartTime.Format(time.DateTime)
currentEndTimeStr := req.CurrentEndTime.Format(time.DateTime)
start := time.Now()
startStr := start.Format(time.DateTime)
n := 0
num := 0
notifyNum := 0
for i := 0; i < 3; i++ {
groupStartTime := time.Now()
// 模拟任务执行,休眠随机时间
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
n += 1
num = num + (i+1)*100
logFields := map[string]interface{}{
"处理条数": num,
"通知条数": notifyNum,
"耗时": time.Now().Sub(groupStartTime).String(),
"任务处理开始时间": currentStartTimeStr,
"任务处理结束时间": currentEndTimeStr,
}
fmt.Printf("%s到%s,第%d个任务第%d组,处理完毕, %+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, i+1, logFields)
}
end := time.Now()
logFields := map[string]interface{}{
"总处理组数": n,
"总处理条数": num,
"总通知条数": notifyNum,
"执行任务开始时间": startStr,
"执行任务结束时间": end.Format(time.DateTime),
"总处理耗时": end.Sub(start).String(),
"任务处理开始时间": currentStartTimeStr,
"任务处理结束时间": currentEndTimeStr,
}
// 生成任务执行结果
result := fmt.Sprintf("%s到%s第%d个任务,处理完毕,%+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, logFields)
fmt.Printf(result)
//return fmt.Errorf(result)
return nil
}

View File

@ -1,6 +1,7 @@
package timeslice
import (
"encoding/json"
"time"
)
@ -11,14 +12,38 @@ type Manager struct {
GoNum int
}
func (t *Manager) String() (string, error) {
b, err := json.Marshal(t)
if err != nil {
return "", err
}
return string(b), nil
}
type Process struct {
manager *Manager
taskCount int
}
func (t *Process) String() (string, error) {
b, err := json.Marshal(t)
if err != nil {
return "", err
}
return string(b), nil
}
type Task struct {
Process *Process
CurrentStartTime time.Time
CurrentEndTime time.Time
TaskID int
ProductNo string
}
func (t *Task) String() (string, error) {
b, err := json.Marshal(t)
if err != nil {
return "", err
}
return string(b), nil
}