From 343df76e718e2d7ee87caabe45e3cd4f9f5f32d5 Mon Sep 17 00:00:00 2001 From: ziming Date: Thu, 12 Jun 2025 09:24:21 +0800 Subject: [PATCH] timeSliceQueryPush --- internal/biz/do/rds_mq.go | 9 +++++ internal/biz/timeslicequery/mq.go | 36 +++++++++++------ internal/data/repoimpl/order.go | 3 ++ internal/pkg/script/script.go | 33 ++++++---------- internal/pkg/script/script_test.go | 45 +++++++++++++++++++-- internal/pkg/timeslice/manager.go | 15 ++++--- internal/pkg/timeslice/manager_test.go | 54 +++++++++++++------------- internal/pkg/timeslice/model.go | 18 ++++++++- internal/service/script.go | 2 +- 9 files changed, 142 insertions(+), 73 deletions(-) diff --git a/internal/biz/do/rds_mq.go b/internal/biz/do/rds_mq.go index b89b5a7..6efb705 100644 --- a/internal/biz/do/rds_mq.go +++ b/internal/biz/do/rds_mq.go @@ -4,4 +4,13 @@ type WechatQuery struct { ProductNo string `json:"product_no"` StartTime string `json:"start_time"` EndTime string `json:"end_time"` + OrderNo string `json:"order_no"` +} + +type RdsWechatQuery struct { + ProductNo string `json:"product_no"` + StartTime string `json:"start_time"` + EndTime string `json:"end_time"` + GoNum int `json:"go_num"` // 并发数 + TimeSliceHours int64 `json:"time_slice_hours"` // 时间片"小时" } diff --git a/internal/biz/timeslicequery/mq.go b/internal/biz/timeslicequery/mq.go index 6e0c171..3fb2fcc 100644 --- a/internal/biz/timeslicequery/mq.go +++ b/internal/biz/timeslicequery/mq.go @@ -11,7 +11,7 @@ import ( "voucher/internal/pkg/timeslice" ) -func (v *Query) Push(ctx http.Context, req *do.WechatQuery) (string, error) { +func (v *Query) Push(ctx http.Context, req *do.RdsWechatQuery) (string, error) { if req.StartTime == "" || req.EndTime == "" { return "", fmt.Errorf("时间参数不能为空") @@ -60,9 +60,9 @@ func (v *Query) Push(ctx http.Context, req *do.WechatQuery) (string, error) { return strMsg, nil } -func (v *Query) getManager(_ context.Context, msg string) (*timeslice.Manager, error) { +func (v *Query) getManager(msg string) (*timeslice.Manager, error) { - var req *do.WechatQuery + var req *do.RdsWechatQuery if err := json.Unmarshal([]byte(msg), &req); err != nil { return nil, err @@ -81,29 +81,41 @@ func (v *Query) getManager(_ context.Context, msg string) (*timeslice.Manager, e return nil, err } - return ×lice.Manager{ + m := ×lice.Manager{ StartTime: start, EndTime: end, ProductNo: req.ProductNo, - GoNum: 2, // 协程数量 - TimeSliceHours: 2, // 时间间隔 - }, nil + GoNum: timeslice.DefaultGoNum, // 协程数量 + TimeSliceHours: timeslice.DefaultTimeSliceHours, // 时间间隔 + } + + if req.GoNum > 0 { + m.GoNum = req.GoNum + } + + if req.TimeSliceHours > 0 { + m.TimeSliceHours = req.TimeSliceHours + } + + return m, nil } func (v *Query) Consumer(ctx context.Context, msg string) error { defer v.Remove(v.uid(msg)) - req, err := v.getManager(ctx, msg) + req, err := v.getManager(msg) if err != nil { return err } + reqStr := req.String() + executeStart := time.Now() executeStartStr := executeStart.Format(time.DateTime) - log.Warnf("微信券查询处理开始:%s,msg:%s", executeStartStr, msg) - fmt.Printf("微信券查询处理开始:%s,msg:%s", executeStartStr, msg) + log.Warnf("微信券查询处理开始:%s,manager:%s", executeStartStr, reqStr) + fmt.Printf("微信券查询处理开始:%s,manager:%s", executeStartStr, reqStr) if err = v.execute(ctx, req); err != nil { log.Errorf("微信券查询处理失败:%s,msg:%s,err:%v", executeStartStr, msg, err) @@ -111,8 +123,8 @@ func (v *Query) Consumer(ctx context.Context, msg string) error { } executeEnd := time.Now() - log.Warnf("微信券查询处理耗时:%s,结束时间%s,msg:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), msg) - fmt.Printf("微信券查询处理耗时:%s,结束时间%s,msg:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), msg) + log.Warnf("微信券查询处理耗时:%s,结束时间%s,manager:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr) + fmt.Printf("微信券查询处理耗时:%s,结束时间%s,manager:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr) return nil } diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index f4dfea0..713dd57 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -44,6 +44,9 @@ func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.We if req.EndTime != "" { tx = tx.Where("receive_success_time <= ?", req.EndTime) } + if req.OrderNo != "" { + tx = tx.Where("order_no = ?", req.OrderNo) + } var results = make([]*model.Order, 0) diff --git a/internal/pkg/script/script.go b/internal/pkg/script/script.go index 240d57c..b6258b6 100644 --- a/internal/pkg/script/script.go +++ b/internal/pkg/script/script.go @@ -2,7 +2,6 @@ package script import ( "bytes" - "encoding/json" "fmt" "io" "net/http" @@ -15,14 +14,20 @@ const ( PRO_URL = "https://voucher.86698.cn/voucher/cmb/timeSliceQueryPush" ) -func script(startTime, endTime time.Time, duration time.Duration) error { +const ( + SINGLE_URL = "http://127.0.0.1:15000/voucher/pushWechatQuery" + DEV_SINGLE_URL = "http://open.cszfan.com/voucher/cmb/pushWechatQuery" + PRO_SINGLE_URL = "https://voucher.86698.cn/voucher/cmb/pushWechatQuery" +) + +func script(startTime, endTime time.Time, duration time.Duration, body []byte, URL string) error { // 每指定间隔时间发送一次请求 for t := startTime; t.Before(endTime); t = t.Add(duration) { end := t.Add(duration) // 计算每次请求的结束时间 // 发送请求 - if err := sendRequest(t, end); err != nil { + if err := sendRequest(t, end, body, URL); err != nil { fmt.Printf("Error sending request: %v\n", err) } @@ -33,23 +38,9 @@ func script(startTime, endTime time.Time, duration time.Duration) error { return nil } -func sendRequest(startTime, endTime time.Time) error { +func sendRequest(startTime, endTime time.Time, body []byte, URL string) error { - // 创建请求体 - requestBody := map[string]string{ - "product_no": "", - "start_time": startTime.Format(time.DateTime), - "end_time": endTime.Format(time.DateTime), - } - - // 将请求体转换为 JSON 格式 - jsonData, err := json.Marshal(requestBody) - if err != nil { - return fmt.Errorf("failed to marshal JSON: %v", err) - } - - // 发送 POST 请求 - resp, err := http.Post(URL, "application/json", bytes.NewBuffer(jsonData)) + resp, err := http.Post(URL, "application/json", bytes.NewBuffer(body)) if err != nil { return fmt.Errorf("failed to send POST request: %v", err) } @@ -62,9 +53,7 @@ func sendRequest(startTime, endTime time.Time) error { if resp.StatusCode == http.StatusOK { fmt.Printf("Request sent successfully,body:%s", string(bodyBytes)) - } else { - return fmt.Errorf("failed with status code: %d", resp.StatusCode) } - return nil + return fmt.Errorf("failed with status code: %d", resp.StatusCode) } diff --git a/internal/pkg/script/script_test.go b/internal/pkg/script/script_test.go index e2fc243..1b35f8d 100644 --- a/internal/pkg/script/script_test.go +++ b/internal/pkg/script/script_test.go @@ -1,6 +1,7 @@ package script import ( + "encoding/json" "testing" "time" ) @@ -21,7 +22,24 @@ func Test_script(t *testing.T) { duration := 5 * time.Hour - if err = script(startTime, endTime, duration); err != nil { + // 创建请求体 + requestBody := map[string]any{ + "go_num": 2, // 并发数量 + "time_slice_hours": 2, // 时间间隔 + "order_no": "", + "product_no": "", + "start_time": startTime.Format(time.DateTime), + "end_time": endTime.Format(time.DateTime), + } + + // 将请求体转换为 JSON 格式 + bodyBytes, err := json.Marshal(requestBody) + if err != nil { + t.Error(err) + return + } + + if err = script(startTime, endTime, duration, bodyBytes, URL); err != nil { t.Error(err) } } @@ -34,11 +52,30 @@ func Test_script2(t *testing.T) { return } - endTime := time.Now() + endTime, err := time.Parse(time.DateTime, "2025-05-01 10:00:00") + if err != nil { + t.Error(err) + return + } - duration := 1 * time.Hour + duration := 50 * time.Hour - if err = script(startTime, endTime, duration); err != nil { + // 创建请求体 + requestBody := map[string]any{ + "order_no": "", + "product_no": "", + "start_time": startTime.Format(time.DateTime), + "end_time": endTime.Format(time.DateTime), + } + + // 将请求体转换为 JSON 格式 + bodyBytes, err := json.Marshal(requestBody) + if err != nil { + t.Error(err) + return + } + + if err = script(startTime, endTime, duration, bodyBytes, SINGLE_URL); err != nil { t.Error(err) } } diff --git a/internal/pkg/timeslice/manager.go b/internal/pkg/timeslice/manager.go index 4e3e8a5..6ca40b0 100644 --- a/internal/pkg/timeslice/manager.go +++ b/internal/pkg/timeslice/manager.go @@ -10,8 +10,9 @@ import ( ) const ( - TimeSliceHours = 2 - maxGlobalGoroutines = 1000 + DefaultGoNum = 2 + DefaultTimeSliceHours = 2 + maxGlobalGoroutines = 1000 ) type Callback func(ctx context.Context, req *Task) error @@ -37,11 +38,13 @@ func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) { return 0, fmt.Errorf("协程数量不能大于%d", maxGlobalGoroutines) } + timeSliceHours := float64(req.TimeSliceHours) + totalHours := req.EndTime.Sub(req.StartTime).Hours() - taskCount := int(totalHours / TimeSliceHours) + taskCount := int(totalHours / timeSliceHours) // 如果剩余时间不足 TimeSliceHours 小时,增加任务数 - if totalHours-float64(taskCount)*float64(TimeSliceHours) > 0 { + if totalHours-float64(taskCount)*timeSliceHours > 0 { taskCount++ } @@ -69,8 +72,8 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error { // 为每个任务按指定的时间片 TimeSliceHours 分配开始和结束时间 for i := 0; i < req.TaskCount; i++ { - currentStart := req.Manager.StartTime.Add(time.Duration(i) * TimeSliceHours * time.Hour) - currentEnd := currentStart.Add(TimeSliceHours * time.Hour) + currentStart := req.Manager.StartTime.Add(time.Duration(i) * time.Duration(req.Manager.TimeSliceHours) * time.Hour) + currentEnd := currentStart.Add(time.Duration(req.Manager.TimeSliceHours) * time.Hour) if currentEnd.After(req.Manager.EndTime) { currentEnd = req.Manager.EndTime diff --git a/internal/pkg/timeslice/manager_test.go b/internal/pkg/timeslice/manager_test.go index 2cda4ae..7600515 100644 --- a/internal/pkg/timeslice/manager_test.go +++ b/internal/pkg/timeslice/manager_test.go @@ -60,10 +60,11 @@ func TestNewManager(t *testing.T) { srv := NewManager(callback) taskCount, err := srv.Run(context.Background(), &Manager{ - StartTime: start, - EndTime: end, - ProductNo: "no123456", - GoNum: 2, + StartTime: start, + EndTime: end, + ProductNo: "no123456", + GoNum: 2, + TimeSliceHours: 2, }) // 输出结果 @@ -95,7 +96,7 @@ func TestBatchCallBackFunc(t *testing.T) { t.Fatalf("%v", err) return } - end, err := time.Parse(time.DateTime, "2025-01-02 02:00:01") + end, err := time.Parse(time.DateTime, "2025-01-01 05:00:01") if err != nil { t.Fatalf("%v", err) return @@ -121,7 +122,7 @@ func TestBatchCallBackFunc(t *testing.T) { next = end } - t.Logf("处理时间: %s到%s", current.Format(time.DateTime), next.Format(time.DateTime)) + t.Logf("\n处理时间: %s到%s", current.Format(time.DateTime), next.Format(time.DateTime)) wg.Add(1) // 启动goroutine处理每个时间段 @@ -150,10 +151,11 @@ func CallbackFunc(start, end time.Time) error { managerEndStr := end.Format(time.DateTime) req := &Manager{ - StartTime: start, - EndTime: end, - ProductNo: "no123456", - GoNum: 2, + StartTime: start, + EndTime: end, + ProductNo: "no123456", + GoNum: 2, + TimeSliceHours: 1, } taskCount, err := NewManager(callbackFunc).Run(context.Background(), req) @@ -188,8 +190,8 @@ func callbackFunc(_ context.Context, req *Task) error { start := time.Now() startStr := start.Format(time.DateTime) - n := 0 - num := 0 + groupNum := 0 + allNum := 0 notifyNum := 0 for i := 0; i < 3; i++ { @@ -198,34 +200,32 @@ func callbackFunc(_ context.Context, req *Task) error { // 模拟任务执行,休眠随机时间 time.Sleep(time.Duration(rand.Intn(3)) * time.Second) - n += 1 - num = num + (i+1)*100 + groupNum += 1 + allNum = allNum + (i+1)*100 logFields := map[string]interface{}{ - "处理条数": num, - "通知条数": notifyNum, - "耗时": time.Now().Sub(groupStartTime).String(), - "任务处理开始时间": currentStartTimeStr, - "任务处理结束时间": currentEndTimeStr, + "duration": time.Now().Sub(groupStartTime).String(), + "searchTime": currentStartTimeStr + "到" + currentEndTimeStr, } fmt.Printf("%s到%s,第%d个任务,第%d组,处理完毕, %+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, i+1, logFields) } end := time.Now() logFields := map[string]interface{}{ - "总处理组数": n, - "总处理条数": num, - "总通知条数": notifyNum, - "执行任务开始时间": startStr, - "执行任务结束时间": end.Format(time.DateTime), - "总处理耗时": end.Sub(start).String(), - "任务处理开始时间": currentStartTimeStr, - "任务处理结束时间": currentEndTimeStr, + "groupNum": groupNum, + "allNum": allNum, + "startTime": startStr, + "endTime": end.Format(time.DateTime), + "searchTime": currentStartTimeStr + "到" + currentEndTimeStr, + "notifyNum": notifyNum, + "duration": end.Sub(start).String(), } // 生成任务执行结果 result := fmt.Sprintf("%s到%s,第%d个任务,处理完毕,%+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, logFields) + fmt.Printf(result) + //return fmt.Errorf(result) return nil } diff --git a/internal/pkg/timeslice/model.go b/internal/pkg/timeslice/model.go index 35dfb12..30c8173 100644 --- a/internal/pkg/timeslice/model.go +++ b/internal/pkg/timeslice/model.go @@ -1,6 +1,7 @@ package timeslice import ( + "encoding/json" "time" ) @@ -9,7 +10,12 @@ type Manager struct { EndTime time.Time // 结束时间 ProductNo string // 产品编号 GoNum int // 并发数 - TimeSliceHours int // 时间片"小时" + TimeSliceHours int64 // 时间片"小时" +} + +func (m *Manager) String() string { + b, _ := json.Marshal(m) + return string(b) } type Process struct { @@ -17,9 +23,19 @@ type Process struct { TaskCount int // 任务数 } +func (m *Process) String() string { + b, _ := json.Marshal(m) + return string(b) +} + type Task struct { Process *Process CurrentStartTime time.Time // 时间片开始时间 CurrentEndTime time.Time // 时间片结束时间 TaskID int // 任务ID } + +func (m *Task) String() string { + b, _ := json.Marshal(m) + return string(b) +} diff --git a/internal/service/script.go b/internal/service/script.go index 0a48089..5f05ccc 100644 --- a/internal/service/script.go +++ b/internal/service/script.go @@ -77,7 +77,7 @@ func (this *CmbService) TimeSliceQueryPush(ctx http.Context) error { return err } - var req *do.WechatQuery + var req *do.RdsWechatQuery if err = json.Unmarshal(bodyBytes, &req); err != nil { return err }