From ac009695973cc23eb52205e0811ae4c3f8f1f647 Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 11 Jun 2025 14:27:10 +0800 Subject: [PATCH] query --- internal/biz/timeslice/manager.go | 11 +- internal/biz/timeslice/manager_test.go | 167 ++++++++++++++++++++++++- internal/biz/timeslice/model.go | 27 +++- 3 files changed, 194 insertions(+), 11 deletions(-) diff --git a/internal/biz/timeslice/manager.go b/internal/biz/timeslice/manager.go index c908e44..aa7e0c6 100644 --- a/internal/biz/timeslice/manager.go +++ b/internal/biz/timeslice/manager.go @@ -54,6 +54,7 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error { // 设置最大并发任务数为 5 eg := new(errgroup.Group) eg.SetLimit(req.manager.GoNum) + errs := make([]error, 0) // 用于存储所有错误 // 为每个任务分配开始和结束时间 @@ -61,9 +62,11 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error { currentStart := req.manager.StartTime.Add(time.Duration(i) * 2 * time.Hour) currentEnd := currentStart.Add(2 * time.Hour) + if currentEnd.After(req.manager.EndTime) { currentEnd = req.manager.EndTime } + taskID := i + 1 eg.Go(func() error { @@ -73,17 +76,15 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error { } }() - taskID := i + 1 taskReq := &Task{ CurrentStartTime: currentStart, CurrentEndTime: currentEnd, TaskID: taskID, - ProductNo: req.manager.ProductNo, + Process: req, } if err := m.callback(ctx, taskReq); err != nil { errs = append(errs, err) - return err } return nil @@ -91,7 +92,9 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error { } // 等待所有任务完成 - _ = eg.Wait() + if err := eg.Wait(); err != nil { + return fmt.Errorf("任务执行失败: %v", err) + } var result error diff --git a/internal/biz/timeslice/manager_test.go b/internal/biz/timeslice/manager_test.go index 0b73271..2864dc9 100644 --- a/internal/biz/timeslice/manager_test.go +++ b/internal/biz/timeslice/manager_test.go @@ -2,9 +2,12 @@ package timeslice import ( "context" + "errors" "fmt" + "github.com/hashicorp/go-multierror" "golang.org/x/sync/errgroup" "math/rand" + "sync" "testing" "time" ) @@ -44,12 +47,12 @@ func TestNewManager(t *testing.T) { var results []string callback := func(ctx context.Context, req *Task) error { // 模拟任务执行,休眠随机时间 - time.Sleep(time.Duration(rand.Intn(5)) * time.Second) + time.Sleep(time.Duration(rand.Intn(3)) * time.Second) // 生成任务执行结果 - result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime)) + result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.Process.manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime)) results = append(results, result) //return nil - return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime)) + return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.Process.manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime)) } startTime := time.Now() @@ -59,7 +62,7 @@ func TestNewManager(t *testing.T) { taskCount, err := srv.Run(context.Background(), &Manager{ StartTime: start, EndTime: end, - ProductNo: "123456", + ProductNo: "no123456", GoNum: 2, }) @@ -70,9 +73,161 @@ func TestNewManager(t *testing.T) { } if err != nil { - t.Error(err) + multiErr, ok := err.(*multierror.Error) + if ok { + for i, err := range multiErr.Errors { + fmt.Printf("错误 %d: %v\n", i+1, err) + } + } else { + // 不是多错误,单独处理 + t.Error("单个错误", err) + } } endTime := time.Now() - fmt.Printf("处理耗时:%s,开始处理时间:%s,结束时间%s", startStr, endTime.Sub(startTime).String(), endTime.String()) + fmt.Printf("处理耗时:%s,开始处理时间:%s,结束时间%s\n", startStr, endTime.Sub(startTime).String(), endTime.String()) +} + +func TestBatchCallBackFunc(t *testing.T) { + // 初始化开始时间和结束时间 + start, err := time.Parse(time.DateTime, "2025-01-01 01:00:00") + if err != nil { + t.Fatalf("%v", err) + return + } + end, err := time.Parse(time.DateTime, "2025-01-02 02:00:01") + if err != nil { + t.Fatalf("%v", err) + return + } + + // 确保开始时间小于结束时间 + if start.After(end) { + t.Fatalf("start_time不能大于end_time") + return + } + + var wg sync.WaitGroup + // 按照每天的时间单位分片 + duration := 24 * time.Hour // 每个时间段为一天 + + // 循环处理时间段 + for current := start; current.Before(end); current = current.Add(duration) { + // 计算当前时间段的结束时间 + next := current.Add(duration) + + // 如果下一个结束时间超过了实际的结束时间,则调整为实际结束时间 + if next.After(end) { + next = end + } + + t.Logf("处理时间: %s到%s", current.Format(time.DateTime), next.Format(time.DateTime)) + + wg.Add(1) + // 启动goroutine处理每个时间段 + go func(startTime, endTime time.Time) { + defer func() { + wg.Done() + if err2 := recover(); err2 != nil { + t.Error("panic", err2) + } + }() + if err3 := CallbackFunc(startTime, endTime); err3 != nil { + t.Errorf("任务执行失败:%v\n", err3) + } + }(current, next) + } + + // 等待所有的 goroutine 完成 + wg.Wait() +} + +func CallbackFunc(start, end time.Time) error { + + nowTime := time.Now() + + managerStartStr := start.Format(time.DateTime) + managerEndStr := end.Format(time.DateTime) + + req := &Manager{ + StartTime: start, + EndTime: end, + ProductNo: "no123456", + GoNum: 2, + } + + taskCount, err := NewManager(callbackFunc).Run(context.Background(), req) + if err != nil { + var multiErr *multierror.Error + if errors.As(err, &multiErr) { + for i, err := range multiErr.Errors { + fmt.Printf("%s到%s,错误 %d: %v\n", managerStartStr, managerEndStr, i+1, err) + } + } else { + // 不是多错误,单独处理 + fmt.Printf("单个错误%v", err) + } + } + + fmt.Printf("%s到%s,总任务数:%d\n", managerStartStr, managerEndStr, taskCount) + + endTime := time.Now() + fmt.Printf("%s到%s,处理耗时:%s,开始处理时间:%s,结束时间%s\n", managerStartStr, managerEndStr, endTime.Sub(nowTime).String(), nowTime.Format(time.DateTime), endTime.Format(time.DateTime)) + + return nil +} + +func callbackFunc(_ context.Context, req *Task) error { + + managerStartTimeStr := req.Process.manager.StartTime.Format(time.DateTime) + managerEndTimeStr := req.Process.manager.EndTime.Format(time.DateTime) + + currentStartTimeStr := req.CurrentStartTime.Format(time.DateTime) + currentEndTimeStr := req.CurrentEndTime.Format(time.DateTime) + + start := time.Now() + startStr := start.Format(time.DateTime) + + n := 0 + num := 0 + notifyNum := 0 + + for i := 0; i < 3; i++ { + + groupStartTime := time.Now() + // 模拟任务执行,休眠随机时间 + time.Sleep(time.Duration(rand.Intn(3)) * time.Second) + + n += 1 + num = num + (i+1)*100 + + logFields := map[string]interface{}{ + "处理条数": num, + "通知条数": notifyNum, + "耗时": time.Now().Sub(groupStartTime).String(), + "任务处理开始时间": currentStartTimeStr, + "任务处理结束时间": currentEndTimeStr, + } + fmt.Printf("%s到%s,第%d个任务,第%d组,处理完毕, %+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, i+1, logFields) + } + + end := time.Now() + logFields := map[string]interface{}{ + "总处理组数": n, + "总处理条数": num, + "总通知条数": notifyNum, + "执行任务开始时间": startStr, + "执行任务结束时间": end.Format(time.DateTime), + "总处理耗时": end.Sub(start).String(), + "任务处理开始时间": currentStartTimeStr, + "任务处理结束时间": currentEndTimeStr, + } + + // 生成任务执行结果 + result := fmt.Sprintf("%s到%s,第%d个任务,处理完毕,%+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, logFields) + + fmt.Printf(result) + + //return fmt.Errorf(result) + return nil } diff --git a/internal/biz/timeslice/model.go b/internal/biz/timeslice/model.go index ca4b4a5..8e87526 100644 --- a/internal/biz/timeslice/model.go +++ b/internal/biz/timeslice/model.go @@ -1,6 +1,7 @@ package timeslice import ( + "encoding/json" "time" ) @@ -11,14 +12,38 @@ type Manager struct { GoNum int } +func (t *Manager) String() (string, error) { + b, err := json.Marshal(t) + if err != nil { + return "", err + } + return string(b), nil +} + type Process struct { manager *Manager taskCount int } +func (t *Process) String() (string, error) { + b, err := json.Marshal(t) + if err != nil { + return "", err + } + return string(b), nil +} + type Task struct { + Process *Process CurrentStartTime time.Time CurrentEndTime time.Time TaskID int - ProductNo string +} + +func (t *Task) String() (string, error) { + b, err := json.Marshal(t) + if err != nil { + return "", err + } + return string(b), nil }