From 87222148ec8506f3fa866af9a5a84d776ed2862e Mon Sep 17 00:00:00 2001 From: ziming Date: Tue, 10 Jun 2025 19:01:15 +0800 Subject: [PATCH] query --- go.mod | 2 + go.sum | 4 + internal/biz/timeslice/manager.go | 104 +++++++++++++++++++++++++ internal/biz/timeslice/manager_test.go | 78 +++++++++++++++++++ internal/biz/timeslice/model.go | 24 ++++++ internal/biz/voucher.go | 4 +- internal/biz/wechat_query.go | 24 ++++-- internal/biz/wechatrepo/cpn.go | 1 + internal/data/wechatrepoimpl/cpn.go | 28 +++++++ internal/pkg/helper/utils.go | 13 ++++ internal/pkg/helper/utils_test.go | 5 ++ internal/pkg/script/script.go | 66 ++++++++++++++++ internal/pkg/script/script_test.go | 44 +++++++++++ 13 files changed, 387 insertions(+), 10 deletions(-) create mode 100644 internal/biz/timeslice/manager.go create mode 100644 internal/biz/timeslice/manager_test.go create mode 100644 internal/biz/timeslice/model.go create mode 100644 internal/pkg/script/script.go create mode 100644 internal/pkg/script/script_test.go diff --git a/go.mod b/go.mod index cfddba6..34d68d0 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,8 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/mux v1.8.1 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect diff --git a/go.sum b/go.sum index 80fa89b..8561a05 100644 --- a/go.sum +++ b/go.sum @@ -150,6 +150,10 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grokify/html-strip-tags-go v0.0.1 h1:0fThFwLbW7P/kOiTBs03FsJSV9RM2M/Q/MOnCQxKMo0= github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618ARB8iVo6/DR99A6d78= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= diff --git a/internal/biz/timeslice/manager.go b/internal/biz/timeslice/manager.go new file mode 100644 index 0000000..c908e44 --- /dev/null +++ b/internal/biz/timeslice/manager.go @@ -0,0 +1,104 @@ +package timeslice + +import ( + "context" + "fmt" + "github.com/hashicorp/go-multierror" + "golang.org/x/sync/errgroup" + "time" +) + +type ManagerSrv struct { + callback func(ctx context.Context, req *Task) error +} + +func NewManager(callback func(ctx context.Context, req *Task) error) *ManagerSrv { + return &ManagerSrv{callback: callback} +} + +func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) { + + if req.StartTime.After(req.EndTime) { + return 0, fmt.Errorf("start_time不能大于end_time") + } + + totalHours := req.EndTime.Sub(req.StartTime).Hours() + taskCount := int(totalHours / 2) + + // 如果剩余时间不足2小时,增加任务数 + if totalHours-float64(taskCount)*float64(2) > 0 { + taskCount++ + } + + processReq := &Process{ + manager: req, + taskCount: taskCount, + } + + return taskCount, m.process(ctx, processReq) +} + +func (m *ManagerSrv) process(ctx context.Context, req *Process) error { + + if req.taskCount == 0 { + return fmt.Errorf("该时间范围无可执行任务次数,请检查时间范围") + } + + if req.manager.GoNum == 0 { + return fmt.Errorf("协程数量不能为0") + } + if req.manager.GoNum > 100 { + return fmt.Errorf("协程数量不能大于100") + } + + // 设置最大并发任务数为 5 + eg := new(errgroup.Group) + eg.SetLimit(req.manager.GoNum) + errs := make([]error, 0) // 用于存储所有错误 + + // 为每个任务分配开始和结束时间 + for i := 0; i < req.taskCount; i++ { + + currentStart := req.manager.StartTime.Add(time.Duration(i) * 2 * time.Hour) + currentEnd := currentStart.Add(2 * time.Hour) + if currentEnd.After(req.manager.EndTime) { + currentEnd = req.manager.EndTime + } + + eg.Go(func() error { + + defer func() { + if err := recover(); err != nil { + errs = append(errs, fmt.Errorf("panic: %v", err)) + } + }() + + taskID := i + 1 + taskReq := &Task{ + CurrentStartTime: currentStart, + CurrentEndTime: currentEnd, + TaskID: taskID, + ProductNo: req.manager.ProductNo, + } + + if err := m.callback(ctx, taskReq); err != nil { + errs = append(errs, err) + return err + } + + return nil + }) + } + + // 等待所有任务完成 + _ = eg.Wait() + + var result error + + // 收集错误 + for _, err2 := range errs { + result = multierror.Append(result, err2) + } + + return result +} diff --git a/internal/biz/timeslice/manager_test.go b/internal/biz/timeslice/manager_test.go new file mode 100644 index 0000000..0b73271 --- /dev/null +++ b/internal/biz/timeslice/manager_test.go @@ -0,0 +1,78 @@ +package timeslice + +import ( + "context" + "fmt" + "golang.org/x/sync/errgroup" + "math/rand" + "testing" + "time" +) + +func ProcessTasks() error { + eg := new(errgroup.Group) + eg.SetLimit(5) + + for i := 0; i < 5; i++ { + eg.Go(func() error { + // 任务逻辑... + time.Sleep(100 * time.Millisecond) + + return fmt.Errorf("任务失败") + }) + } + + return eg.Wait() // 仅返回第一个错误 +} + +func TestNewManager(t *testing.T) { + + // 解析起始时间和结束时间 + start, err := time.Parse(time.DateTime, "2023-01-01 00:00:00") + if err != nil { + t.Fatalf("查询失败: %v", err) + return + } + + //end, err := time.Parse(time.DateTime, "2023-01-31 02:00:01") + end, err := time.Parse(time.DateTime, "2023-01-02 02:00:01") + if err != nil { + t.Fatalf("查询失败: %v", err) + return + } + + var results []string + callback := func(ctx context.Context, req *Task) error { + // 模拟任务执行,休眠随机时间 + time.Sleep(time.Duration(rand.Intn(5)) * time.Second) + // 生成任务执行结果 + result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime)) + results = append(results, result) + //return nil + return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime)) + } + + startTime := time.Now() + startStr := time.Now().String() + + srv := NewManager(callback) + taskCount, err := srv.Run(context.Background(), &Manager{ + StartTime: start, + EndTime: end, + ProductNo: "123456", + GoNum: 2, + }) + + // 输出结果 + fmt.Printf("总任务数:%d\n", taskCount) + for _, result := range results { + fmt.Printf("%v\n", result) + } + + if err != nil { + t.Error(err) + } + + endTime := time.Now() + fmt.Printf("处理耗时:%s,开始处理时间:%s,结束时间%s", startStr, endTime.Sub(startTime).String(), endTime.String()) +} diff --git a/internal/biz/timeslice/model.go b/internal/biz/timeslice/model.go new file mode 100644 index 0000000..ca4b4a5 --- /dev/null +++ b/internal/biz/timeslice/model.go @@ -0,0 +1,24 @@ +package timeslice + +import ( + "time" +) + +type Manager struct { + StartTime time.Time + EndTime time.Time + ProductNo string + GoNum int +} + +type Process struct { + manager *Manager + taskCount int +} + +type Task struct { + CurrentStartTime time.Time + CurrentEndTime time.Time + TaskID int + ProductNo string +} diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index 400a505..7c92587 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -60,11 +60,11 @@ func NewVoucherBiz( } } -func (this *VoucherBiz) Get(stockNo string) bool { +func (this *VoucherBiz) Get(uid string) bool { this.mu.Lock() defer this.mu.Unlock() - if _, ok := this.queryMap[stockNo]; ok { + if _, ok := this.queryMap[uid]; ok { return ok } diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index 284dfe9..f389912 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -6,16 +6,17 @@ import ( "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport/http" + "github.com/nacos-group/nacos-sdk-go/util" "time" "voucher/internal/biz/bo" "voucher/internal/biz/do" ) -func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error { +func (v *VoucherBiz) uid(_ context.Context, msg string) string { + return util.Md5(msg) +} - if v.Get("CMB_WECHAT_QUERY") { - return fmt.Errorf("此台服务队列正在处理中,ip:%s", ctx.Header().Get("X-Forwarded-For")) - } +func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error { if req.ProductNo != "" { _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) @@ -34,11 +35,18 @@ func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) erro return err } - v.Add("CMB_WECHAT_QUERY") + strMsg := string(msg) - _, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result() + uid := v.uid(ctx, strMsg) + if v.Get(uid) { + return fmt.Errorf("此台服务队列正在处理中,key:%s,ip:%s", uid, ctx.Header().Get("X-Forwarded-For")) + } + + v.Add(uid) + + _, err = v.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result() if err != nil { - v.Remove("CMB_WECHAT_QUERY") + v.Remove(uid) return fmt.Errorf("添加到队列失败:%v", err) } @@ -47,7 +55,7 @@ func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) erro func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { - defer v.Remove("CMB_WECHAT_QUERY") + defer v.Remove(v.uid(ctx, msg)) var req *do.WechatQuery diff --git a/internal/biz/wechatrepo/cpn.go b/internal/biz/wechatrepo/cpn.go index 2813652..20adf29 100644 --- a/internal/biz/wechatrepo/cpn.go +++ b/internal/biz/wechatrepo/cpn.go @@ -10,6 +10,7 @@ import ( type WechatCpnRepo interface { Order(ctx context.Context, order *bo.OrderBo) (couponId string, err error) Query(ctx context.Context, order *bo.OrderBo) (vo.OrderStatus, error) + QueryCoupon(ctx context.Context, orderWechat *bo.OrderBo) (*cashcoupons.Coupon, error) QueryProduct(ctx context.Context, stockCreatorMchId, stockId string) (*cashcoupons.Stock, error) QueryCallback(ctx context.Context) (*cashcoupons.Callback, error) SetCallback(ctx context.Context, url string) (*cashcoupons.SetCallbackResponse, error) diff --git a/internal/data/wechatrepoimpl/cpn.go b/internal/data/wechatrepoimpl/cpn.go index 0bc9ea8..dd0cbdd 100644 --- a/internal/data/wechatrepoimpl/cpn.go +++ b/internal/data/wechatrepoimpl/cpn.go @@ -126,6 +126,34 @@ func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderBo) (vo.Or return CpnStatus(*resp.Status).GetStatus() } +func (c *CpnRepoImpl) QueryCoupon(ctx context.Context, orderWechat *bo.OrderBo) (*cashcoupons.Coupon, error) { + + req := cashcoupons.QueryCouponRequest{ + CouponId: core.String(orderWechat.VoucherNo), + Appid: core.String(orderWechat.AppID), + Openid: core.String(orderWechat.Account), + } + + client, err := c.GetClient(ctx) + if err != nil { + return nil, err + } + + svc := cashcoupons.CouponApiService{Client: client} + + resp, result, err := svc.QueryCoupon(ctx, req) + if err != nil { + + if result.Response != nil && result.Response.Body != nil { + return nil, c.bodyErr(ctx, result) + } + + return nil, err + } + + return resp, nil +} + func (c *CpnRepoImpl) QueryProduct(ctx context.Context, stockCreatorMchId, stockId string) (*cashcoupons.Stock, error) { if stockCreatorMchId == "" || stockId == "" { diff --git a/internal/pkg/helper/utils.go b/internal/pkg/helper/utils.go index 7677cd6..bd9f033 100644 --- a/internal/pkg/helper/utils.go +++ b/internal/pkg/helper/utils.go @@ -1,6 +1,8 @@ package helper import ( + "crypto/md5" + "encoding/hex" "hash/fnv" "math" "os" @@ -18,3 +20,14 @@ func HashMod(hashStr string) int { hashValue := hash.Sum32() return int(math.Mod(float64(hashValue), 32)) } + +func Md5(str string) string { + // 创建一个 MD5 哈希对象 + hash := md5.New() + // 写入待加密的数据 + hash.Write([]byte(str)) + // 获取 MD5 哈希值 + hashBytes := hash.Sum(nil) + // 将 MD5 哈希值转换为16进制字符串 + return hex.EncodeToString(hashBytes) +} diff --git a/internal/pkg/helper/utils_test.go b/internal/pkg/helper/utils_test.go index e596e66..b8d2096 100644 --- a/internal/pkg/helper/utils_test.go +++ b/internal/pkg/helper/utils_test.go @@ -41,3 +41,8 @@ func used(useNum *int) { func queryUsed(useNum *int) { *useNum += 1 } + +func TestMd5(t *testing.T) { + s := Md5(`{"product_no":"","start_time":"2025-04-20 09:00:00","end_time":"2025-05-01 00:00:00"}`) + t.Log(s) +} diff --git a/internal/pkg/script/script.go b/internal/pkg/script/script.go new file mode 100644 index 0000000..cf06e4e --- /dev/null +++ b/internal/pkg/script/script.go @@ -0,0 +1,66 @@ +package script + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +func script(startTime, endTime time.Time, duration time.Duration) error { + + // 每指定间隔时间发送一次请求 + for t := startTime; t.Before(endTime); t = t.Add(duration) { + end := t.Add(duration) // 计算每次请求的结束时间 + + // 发送请求 + if err := sendRequest(t, end); err != nil { + fmt.Printf("Error sending request: %v\n", err) + } + + // 等待一段时间后再发送下一个请求 + time.Sleep(1 * time.Second) // 可以根据需要调整间隔时间 + } + + return nil +} + +func sendRequest(startTime, endTime time.Time) error { + + url := "https://gateway.dev.cdlsxd.cn/voucher/cmb/v1/orderQuery" + + // 创建请求体 + requestBody := map[string]interface{}{ + "product_no": "", + "start_time": startTime.Format(time.DateTime), + "end_time": endTime.Format(time.DateTime), + } + + // 将请求体转换为 JSON 格式 + jsonData, err := json.Marshal(requestBody) + if err != nil { + return fmt.Errorf("failed to marshal JSON: %v", err) + } + + // 发送 POST 请求 + resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to send POST request: %v", err) + } + defer resp.Body.Close() + + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("读取响应体失败: %w", err) + } + + if resp.StatusCode == http.StatusOK { + fmt.Printf("Request sent successfully,body:%s", string(bodyBytes)) + } else { + return fmt.Errorf("failed with status code: %d", resp.StatusCode) + } + + return nil +} diff --git a/internal/pkg/script/script_test.go b/internal/pkg/script/script_test.go new file mode 100644 index 0000000..a557e56 --- /dev/null +++ b/internal/pkg/script/script_test.go @@ -0,0 +1,44 @@ +package script + +import ( + "testing" + "time" +) + +func Test_script(t *testing.T) { + + startTime, err := time.Parse(time.DateTime, "2025-05-01 00:00:00") + if err != nil { + t.Error(err) + return + } + + endTime, err := time.Parse(time.DateTime, "2025-05-31 00:00:00") + if err != nil { + t.Error(err) + return + } + + duration := 24 * time.Hour + + if err = script(startTime, endTime, duration); err != nil { + t.Error(err) + } +} + +func Test_script2(t *testing.T) { + + startTime, err := time.Parse(time.DateTime, "2025-05-31 00:00:00") + if err != nil { + t.Error(err) + return + } + + endTime := time.Now() + + duration := 1 * time.Hour + + if err = script(startTime, endTime, duration); err != nil { + t.Error(err) + } +}