timeSliceQueryPush

This commit is contained in:
ziming 2025-06-12 09:24:21 +08:00
parent d3be7d733b
commit 343df76e71
9 changed files with 142 additions and 73 deletions

View File

@ -4,4 +4,13 @@ type WechatQuery struct {
ProductNo string `json:"product_no"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
OrderNo string `json:"order_no"`
}
type RdsWechatQuery struct {
ProductNo string `json:"product_no"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
GoNum int `json:"go_num"` // 并发数
TimeSliceHours int64 `json:"time_slice_hours"` // 时间片"小时"
}

View File

@ -11,7 +11,7 @@ import (
"voucher/internal/pkg/timeslice"
)
func (v *Query) Push(ctx http.Context, req *do.WechatQuery) (string, error) {
func (v *Query) Push(ctx http.Context, req *do.RdsWechatQuery) (string, error) {
if req.StartTime == "" || req.EndTime == "" {
return "", fmt.Errorf("时间参数不能为空")
@ -60,9 +60,9 @@ func (v *Query) Push(ctx http.Context, req *do.WechatQuery) (string, error) {
return strMsg, nil
}
func (v *Query) getManager(_ context.Context, msg string) (*timeslice.Manager, error) {
func (v *Query) getManager(msg string) (*timeslice.Manager, error) {
var req *do.WechatQuery
var req *do.RdsWechatQuery
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return nil, err
@ -81,29 +81,41 @@ func (v *Query) getManager(_ context.Context, msg string) (*timeslice.Manager, e
return nil, err
}
return &timeslice.Manager{
m := &timeslice.Manager{
StartTime: start,
EndTime: end,
ProductNo: req.ProductNo,
GoNum: 2, // 协程数量
TimeSliceHours: 2, // 时间间隔
}, nil
GoNum: timeslice.DefaultGoNum, // 协程数量
TimeSliceHours: timeslice.DefaultTimeSliceHours, // 时间间隔
}
if req.GoNum > 0 {
m.GoNum = req.GoNum
}
if req.TimeSliceHours > 0 {
m.TimeSliceHours = req.TimeSliceHours
}
return m, nil
}
func (v *Query) Consumer(ctx context.Context, msg string) error {
defer v.Remove(v.uid(msg))
req, err := v.getManager(ctx, msg)
req, err := v.getManager(msg)
if err != nil {
return err
}
reqStr := req.String()
executeStart := time.Now()
executeStartStr := executeStart.Format(time.DateTime)
log.Warnf("微信券查询处理开始:%s,msg:%s", executeStartStr, msg)
fmt.Printf("微信券查询处理开始:%s,msg:%s", executeStartStr, msg)
log.Warnf("微信券查询处理开始:%s,manager:%s", executeStartStr, reqStr)
fmt.Printf("微信券查询处理开始:%s,manager:%s", executeStartStr, reqStr)
if err = v.execute(ctx, req); err != nil {
log.Errorf("微信券查询处理失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
@ -111,8 +123,8 @@ func (v *Query) Consumer(ctx context.Context, msg string) error {
}
executeEnd := time.Now()
log.Warnf("微信券查询处理耗时:%s,结束时间%s,msg:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), msg)
fmt.Printf("微信券查询处理耗时:%s,结束时间%s,msg:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), msg)
log.Warnf("微信券查询处理耗时:%s,结束时间%s,manager:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr)
fmt.Printf("微信券查询处理耗时:%s,结束时间%s,manager:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr)
return nil
}

View File

@ -44,6 +44,9 @@ func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.We
if req.EndTime != "" {
tx = tx.Where("receive_success_time <= ?", req.EndTime)
}
if req.OrderNo != "" {
tx = tx.Where("order_no = ?", req.OrderNo)
}
var results = make([]*model.Order, 0)

View File

@ -2,7 +2,6 @@ package script
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
@ -15,14 +14,20 @@ const (
PRO_URL = "https://voucher.86698.cn/voucher/cmb/timeSliceQueryPush"
)
func script(startTime, endTime time.Time, duration time.Duration) error {
const (
SINGLE_URL = "http://127.0.0.1:15000/voucher/pushWechatQuery"
DEV_SINGLE_URL = "http://open.cszfan.com/voucher/cmb/pushWechatQuery"
PRO_SINGLE_URL = "https://voucher.86698.cn/voucher/cmb/pushWechatQuery"
)
func script(startTime, endTime time.Time, duration time.Duration, body []byte, URL string) error {
// 每指定间隔时间发送一次请求
for t := startTime; t.Before(endTime); t = t.Add(duration) {
end := t.Add(duration) // 计算每次请求的结束时间
// 发送请求
if err := sendRequest(t, end); err != nil {
if err := sendRequest(t, end, body, URL); err != nil {
fmt.Printf("Error sending request: %v\n", err)
}
@ -33,23 +38,9 @@ func script(startTime, endTime time.Time, duration time.Duration) error {
return nil
}
func sendRequest(startTime, endTime time.Time) error {
func sendRequest(startTime, endTime time.Time, body []byte, URL string) error {
// 创建请求体
requestBody := map[string]string{
"product_no": "",
"start_time": startTime.Format(time.DateTime),
"end_time": endTime.Format(time.DateTime),
}
// 将请求体转换为 JSON 格式
jsonData, err := json.Marshal(requestBody)
if err != nil {
return fmt.Errorf("failed to marshal JSON: %v", err)
}
// 发送 POST 请求
resp, err := http.Post(URL, "application/json", bytes.NewBuffer(jsonData))
resp, err := http.Post(URL, "application/json", bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to send POST request: %v", err)
}
@ -62,9 +53,7 @@ func sendRequest(startTime, endTime time.Time) error {
if resp.StatusCode == http.StatusOK {
fmt.Printf("Request sent successfully,body:%s", string(bodyBytes))
} else {
return fmt.Errorf("failed with status code: %d", resp.StatusCode)
}
return nil
return fmt.Errorf("failed with status code: %d", resp.StatusCode)
}

View File

@ -1,6 +1,7 @@
package script
import (
"encoding/json"
"testing"
"time"
)
@ -21,7 +22,24 @@ func Test_script(t *testing.T) {
duration := 5 * time.Hour
if err = script(startTime, endTime, duration); err != nil {
// 创建请求体
requestBody := map[string]any{
"go_num": 2, // 并发数量
"time_slice_hours": 2, // 时间间隔
"order_no": "",
"product_no": "",
"start_time": startTime.Format(time.DateTime),
"end_time": endTime.Format(time.DateTime),
}
// 将请求体转换为 JSON 格式
bodyBytes, err := json.Marshal(requestBody)
if err != nil {
t.Error(err)
return
}
if err = script(startTime, endTime, duration, bodyBytes, URL); err != nil {
t.Error(err)
}
}
@ -34,11 +52,30 @@ func Test_script2(t *testing.T) {
return
}
endTime := time.Now()
endTime, err := time.Parse(time.DateTime, "2025-05-01 10:00:00")
if err != nil {
t.Error(err)
return
}
duration := 1 * time.Hour
duration := 50 * time.Hour
if err = script(startTime, endTime, duration); err != nil {
// 创建请求体
requestBody := map[string]any{
"order_no": "",
"product_no": "",
"start_time": startTime.Format(time.DateTime),
"end_time": endTime.Format(time.DateTime),
}
// 将请求体转换为 JSON 格式
bodyBytes, err := json.Marshal(requestBody)
if err != nil {
t.Error(err)
return
}
if err = script(startTime, endTime, duration, bodyBytes, SINGLE_URL); err != nil {
t.Error(err)
}
}

View File

@ -10,8 +10,9 @@ import (
)
const (
TimeSliceHours = 2
maxGlobalGoroutines = 1000
DefaultGoNum = 2
DefaultTimeSliceHours = 2
maxGlobalGoroutines = 1000
)
type Callback func(ctx context.Context, req *Task) error
@ -37,11 +38,13 @@ func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) {
return 0, fmt.Errorf("协程数量不能大于%d", maxGlobalGoroutines)
}
timeSliceHours := float64(req.TimeSliceHours)
totalHours := req.EndTime.Sub(req.StartTime).Hours()
taskCount := int(totalHours / TimeSliceHours)
taskCount := int(totalHours / timeSliceHours)
// 如果剩余时间不足 TimeSliceHours 小时,增加任务数
if totalHours-float64(taskCount)*float64(TimeSliceHours) > 0 {
if totalHours-float64(taskCount)*timeSliceHours > 0 {
taskCount++
}
@ -69,8 +72,8 @@ func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
// 为每个任务按指定的时间片 TimeSliceHours 分配开始和结束时间
for i := 0; i < req.TaskCount; i++ {
currentStart := req.Manager.StartTime.Add(time.Duration(i) * TimeSliceHours * time.Hour)
currentEnd := currentStart.Add(TimeSliceHours * time.Hour)
currentStart := req.Manager.StartTime.Add(time.Duration(i) * time.Duration(req.Manager.TimeSliceHours) * time.Hour)
currentEnd := currentStart.Add(time.Duration(req.Manager.TimeSliceHours) * time.Hour)
if currentEnd.After(req.Manager.EndTime) {
currentEnd = req.Manager.EndTime

View File

@ -60,10 +60,11 @@ func TestNewManager(t *testing.T) {
srv := NewManager(callback)
taskCount, err := srv.Run(context.Background(), &Manager{
StartTime: start,
EndTime: end,
ProductNo: "no123456",
GoNum: 2,
StartTime: start,
EndTime: end,
ProductNo: "no123456",
GoNum: 2,
TimeSliceHours: 2,
})
// 输出结果
@ -95,7 +96,7 @@ func TestBatchCallBackFunc(t *testing.T) {
t.Fatalf("%v", err)
return
}
end, err := time.Parse(time.DateTime, "2025-01-02 02:00:01")
end, err := time.Parse(time.DateTime, "2025-01-01 05:00:01")
if err != nil {
t.Fatalf("%v", err)
return
@ -121,7 +122,7 @@ func TestBatchCallBackFunc(t *testing.T) {
next = end
}
t.Logf("处理时间: %s到%s", current.Format(time.DateTime), next.Format(time.DateTime))
t.Logf("\n处理时间: %s到%s", current.Format(time.DateTime), next.Format(time.DateTime))
wg.Add(1)
// 启动goroutine处理每个时间段
@ -150,10 +151,11 @@ func CallbackFunc(start, end time.Time) error {
managerEndStr := end.Format(time.DateTime)
req := &Manager{
StartTime: start,
EndTime: end,
ProductNo: "no123456",
GoNum: 2,
StartTime: start,
EndTime: end,
ProductNo: "no123456",
GoNum: 2,
TimeSliceHours: 1,
}
taskCount, err := NewManager(callbackFunc).Run(context.Background(), req)
@ -188,8 +190,8 @@ func callbackFunc(_ context.Context, req *Task) error {
start := time.Now()
startStr := start.Format(time.DateTime)
n := 0
num := 0
groupNum := 0
allNum := 0
notifyNum := 0
for i := 0; i < 3; i++ {
@ -198,34 +200,32 @@ func callbackFunc(_ context.Context, req *Task) error {
// 模拟任务执行,休眠随机时间
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
n += 1
num = num + (i+1)*100
groupNum += 1
allNum = allNum + (i+1)*100
logFields := map[string]interface{}{
"处理条数": num,
"通知条数": notifyNum,
"耗时": time.Now().Sub(groupStartTime).String(),
"任务处理开始时间": currentStartTimeStr,
"任务处理结束时间": currentEndTimeStr,
"duration": time.Now().Sub(groupStartTime).String(),
"searchTime": currentStartTimeStr + "到" + currentEndTimeStr,
}
fmt.Printf("%s到%s,第%d个任务第%d组,处理完毕, %+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, i+1, logFields)
}
end := time.Now()
logFields := map[string]interface{}{
"总处理组数": n,
"总处理条数": num,
"总通知条数": notifyNum,
"执行任务开始时间": startStr,
"执行任务结束时间": end.Format(time.DateTime),
"总处理耗时": end.Sub(start).String(),
"任务处理开始时间": currentStartTimeStr,
"任务处理结束时间": currentEndTimeStr,
"groupNum": groupNum,
"allNum": allNum,
"startTime": startStr,
"endTime": end.Format(time.DateTime),
"searchTime": currentStartTimeStr + "到" + currentEndTimeStr,
"notifyNum": notifyNum,
"duration": end.Sub(start).String(),
}
// 生成任务执行结果
result := fmt.Sprintf("%s到%s第%d个任务,处理完毕,%+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, logFields)
fmt.Printf(result)
//return fmt.Errorf(result)
return nil
}

View File

@ -1,6 +1,7 @@
package timeslice
import (
"encoding/json"
"time"
)
@ -9,7 +10,12 @@ type Manager struct {
EndTime time.Time // 结束时间
ProductNo string // 产品编号
GoNum int // 并发数
TimeSliceHours int // 时间片"小时"
TimeSliceHours int64 // 时间片"小时"
}
func (m *Manager) String() string {
b, _ := json.Marshal(m)
return string(b)
}
type Process struct {
@ -17,9 +23,19 @@ type Process struct {
TaskCount int // 任务数
}
func (m *Process) String() string {
b, _ := json.Marshal(m)
return string(b)
}
type Task struct {
Process *Process
CurrentStartTime time.Time // 时间片开始时间
CurrentEndTime time.Time // 时间片结束时间
TaskID int // 任务ID
}
func (m *Task) String() string {
b, _ := json.Marshal(m)
return string(b)
}

View File

@ -77,7 +77,7 @@ func (this *CmbService) TimeSliceQueryPush(ctx http.Context) error {
return err
}
var req *do.WechatQuery
var req *do.RdsWechatQuery
if err = json.Unmarshal(bodyBytes, &req); err != nil {
return err
}