232 lines
6.1 KiB
Go
232 lines
6.1 KiB
Go
package timeslice
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"github.com/hashicorp/go-multierror"
|
||
"golang.org/x/sync/errgroup"
|
||
"math/rand"
|
||
"sync"
|
||
"testing"
|
||
"time"
|
||
)
|
||
|
||
func ProcessTasks() error {
|
||
eg := new(errgroup.Group)
|
||
eg.SetLimit(5)
|
||
|
||
for i := 0; i < 5; i++ {
|
||
eg.Go(func() error {
|
||
// 任务逻辑...
|
||
time.Sleep(100 * time.Millisecond)
|
||
|
||
return fmt.Errorf("任务失败")
|
||
})
|
||
}
|
||
|
||
return eg.Wait() // 仅返回第一个错误
|
||
}
|
||
|
||
func TestNewManager(t *testing.T) {
|
||
|
||
// 解析起始时间和结束时间
|
||
start, err := time.Parse(time.DateTime, "2023-01-01 00:00:00")
|
||
if err != nil {
|
||
t.Fatalf("查询失败: %v", err)
|
||
return
|
||
}
|
||
|
||
//end, err := time.Parse(time.DateTime, "2023-01-31 02:00:01")
|
||
end, err := time.Parse(time.DateTime, "2023-01-02 02:00:01")
|
||
if err != nil {
|
||
t.Fatalf("查询失败: %v", err)
|
||
return
|
||
}
|
||
|
||
var results []string
|
||
callback := func(ctx context.Context, req *Task) error {
|
||
// 模拟任务执行,休眠随机时间
|
||
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
|
||
// 生成任务执行结果
|
||
result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.Process.Manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
|
||
results = append(results, result)
|
||
//return nil
|
||
return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.Process.Manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
|
||
}
|
||
|
||
startTime := time.Now()
|
||
startStr := time.Now().String()
|
||
|
||
srv := NewManager(callback)
|
||
taskCount, err := srv.Run(context.Background(), &Manager{
|
||
StartTime: start,
|
||
EndTime: end,
|
||
ProductNo: "no123456",
|
||
GoNum: 2,
|
||
})
|
||
|
||
// 输出结果
|
||
fmt.Printf("总任务数:%d\n", taskCount)
|
||
for _, result := range results {
|
||
fmt.Printf("%v\n", result)
|
||
}
|
||
|
||
if err != nil {
|
||
multiErr, ok := err.(*multierror.Error)
|
||
if ok {
|
||
for i, err := range multiErr.Errors {
|
||
fmt.Printf("错误 %d: %v\n", i+1, err)
|
||
}
|
||
} else {
|
||
// 不是多错误,单独处理
|
||
t.Error("单个错误", err)
|
||
}
|
||
}
|
||
|
||
endTime := time.Now()
|
||
fmt.Printf("处理耗时:%s,开始处理时间:%s,结束时间%s\n", startStr, endTime.Sub(startTime).String(), endTime.String())
|
||
}
|
||
|
||
func TestBatchCallBackFunc(t *testing.T) {
|
||
// 初始化开始时间和结束时间
|
||
start, err := time.Parse(time.DateTime, "2025-01-01 01:00:00")
|
||
if err != nil {
|
||
t.Fatalf("%v", err)
|
||
return
|
||
}
|
||
end, err := time.Parse(time.DateTime, "2025-01-02 02:00:01")
|
||
if err != nil {
|
||
t.Fatalf("%v", err)
|
||
return
|
||
}
|
||
|
||
// 确保开始时间小于结束时间
|
||
if start.After(end) {
|
||
t.Fatalf("start_time不能大于end_time")
|
||
return
|
||
}
|
||
|
||
var wg sync.WaitGroup
|
||
// 按照每天的时间单位分片
|
||
duration := 24 * time.Hour // 每个时间段为一天
|
||
|
||
// 循环处理时间段
|
||
for current := start; current.Before(end); current = current.Add(duration) {
|
||
// 计算当前时间段的结束时间
|
||
next := current.Add(duration)
|
||
|
||
// 如果下一个结束时间超过了实际的结束时间,则调整为实际结束时间
|
||
if next.After(end) {
|
||
next = end
|
||
}
|
||
|
||
t.Logf("处理时间: %s到%s", current.Format(time.DateTime), next.Format(time.DateTime))
|
||
|
||
wg.Add(1)
|
||
// 启动goroutine处理每个时间段
|
||
go func(startTime, endTime time.Time) {
|
||
defer func() {
|
||
wg.Done()
|
||
if err2 := recover(); err2 != nil {
|
||
t.Error("panic", err2)
|
||
}
|
||
}()
|
||
if err3 := CallbackFunc(startTime, endTime); err3 != nil {
|
||
t.Errorf("任务执行失败:%v\n", err3)
|
||
}
|
||
}(current, next)
|
||
}
|
||
|
||
// 等待所有的 goroutine 完成
|
||
wg.Wait()
|
||
}
|
||
|
||
func CallbackFunc(start, end time.Time) error {
|
||
|
||
nowTime := time.Now()
|
||
|
||
managerStartStr := start.Format(time.DateTime)
|
||
managerEndStr := end.Format(time.DateTime)
|
||
|
||
req := &Manager{
|
||
StartTime: start,
|
||
EndTime: end,
|
||
ProductNo: "no123456",
|
||
GoNum: 2,
|
||
}
|
||
|
||
taskCount, err := NewManager(callbackFunc).Run(context.Background(), req)
|
||
if err != nil {
|
||
var multiErr *multierror.Error
|
||
if errors.As(err, &multiErr) {
|
||
for i, err := range multiErr.Errors {
|
||
fmt.Printf("%s到%s,错误 %d: %v\n", managerStartStr, managerEndStr, i+1, err)
|
||
}
|
||
} else {
|
||
// 不是多错误,单独处理
|
||
fmt.Printf("单个错误%v", err)
|
||
}
|
||
}
|
||
|
||
fmt.Printf("%s到%s,总任务数:%d\n", managerStartStr, managerEndStr, taskCount)
|
||
|
||
endTime := time.Now()
|
||
fmt.Printf("%s到%s,处理耗时:%s,开始处理时间:%s,结束时间%s\n", managerStartStr, managerEndStr, endTime.Sub(nowTime).String(), nowTime.Format(time.DateTime), endTime.Format(time.DateTime))
|
||
|
||
return nil
|
||
}
|
||
|
||
func callbackFunc(_ context.Context, req *Task) error {
|
||
|
||
managerStartTimeStr := req.Process.Manager.StartTime.Format(time.DateTime)
|
||
managerEndTimeStr := req.Process.Manager.EndTime.Format(time.DateTime)
|
||
|
||
currentStartTimeStr := req.CurrentStartTime.Format(time.DateTime)
|
||
currentEndTimeStr := req.CurrentEndTime.Format(time.DateTime)
|
||
|
||
start := time.Now()
|
||
startStr := start.Format(time.DateTime)
|
||
|
||
n := 0
|
||
num := 0
|
||
notifyNum := 0
|
||
|
||
for i := 0; i < 3; i++ {
|
||
|
||
groupStartTime := time.Now()
|
||
// 模拟任务执行,休眠随机时间
|
||
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
|
||
|
||
n += 1
|
||
num = num + (i+1)*100
|
||
|
||
logFields := map[string]interface{}{
|
||
"处理条数": num,
|
||
"通知条数": notifyNum,
|
||
"耗时": time.Now().Sub(groupStartTime).String(),
|
||
"任务处理开始时间": currentStartTimeStr,
|
||
"任务处理结束时间": currentEndTimeStr,
|
||
}
|
||
fmt.Printf("%s到%s,第%d个任务,第%d组,处理完毕, %+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, i+1, logFields)
|
||
}
|
||
|
||
end := time.Now()
|
||
logFields := map[string]interface{}{
|
||
"总处理组数": n,
|
||
"总处理条数": num,
|
||
"总通知条数": notifyNum,
|
||
"执行任务开始时间": startStr,
|
||
"执行任务结束时间": end.Format(time.DateTime),
|
||
"总处理耗时": end.Sub(start).String(),
|
||
"任务处理开始时间": currentStartTimeStr,
|
||
"任务处理结束时间": currentEndTimeStr,
|
||
}
|
||
|
||
// 生成任务执行结果
|
||
result := fmt.Sprintf("%s到%s,第%d个任务,处理完毕,%+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, logFields)
|
||
fmt.Printf(result)
|
||
|
||
return nil
|
||
}
|