From 3aa1ad9520f0731a13e081e909abd07cc7dec94e Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 11 Jun 2025 17:03:41 +0800 Subject: [PATCH] timeSliceQueryPush --- READEME.md | 55 +-------- cmd/server/wire.go | 2 + configs/config.yaml | 12 +- configs/config_test.yaml | 7 +- internal/biz/timeslicequery/README.md | 14 +++ internal/biz/timeslicequery/base.go | 84 ++++++++++++++ internal/biz/timeslicequery/execute.go | 86 ++++++++++++++ internal/biz/timeslicequery/mq.go | 118 ++++++++++++++++++++ internal/biz/timeslicequery/provider_set.go | 7 ++ internal/biz/timeslicequery/query.go | 66 +++++++++++ internal/conf/conf.pb.go | 90 ++++++++------- internal/conf/conf.proto | 3 +- internal/data/repoimpl/order.go | 2 +- internal/pkg/timeslice/manager.go | 50 +++++---- internal/pkg/timeslice/manager_test.go | 10 +- internal/pkg/timeslice/model.go | 19 ++-- internal/server/http.go | 1 + internal/service/cmb.go | 105 +++-------------- internal/service/script.go | 118 ++++++++++++++++++++ internal/service/voucher.go | 15 ++- internal/service/wechat_query.go | 37 ++++++ 21 files changed, 670 insertions(+), 231 deletions(-) create mode 100644 internal/biz/timeslicequery/README.md create mode 100644 internal/biz/timeslicequery/base.go create mode 100644 internal/biz/timeslicequery/execute.go create mode 100644 internal/biz/timeslicequery/mq.go create mode 100644 internal/biz/timeslicequery/provider_set.go create mode 100644 internal/biz/timeslicequery/query.go create mode 100644 internal/service/script.go diff --git a/READEME.md b/READEME.md index 96a1103..69e7a99 100755 --- a/READEME.md +++ b/READEME.md @@ -1,59 +1,8 @@ -#

营销系统后台API

+#

招行立减金券系统

-### 参与开发 -[请参阅](https://tvd8jq9lqkp.feishu.cn/wiki/LNWVweZ64iY2UBkkTkZcezy0n5h?from=from_copylink) * * * ### 主要工作 -+ 后台接口API ++ 发券API * * * -### 规则说明 -+ 路由前缀都为 __/admin__ 开始,路由规则全小写+下划线,例如:/admin/v1/demo_1 -* * * -### 构建部署 -+ 采用多阶段构建,以获得最小体积的容器镜像 -````bash -cd /项目根目录 && make deploy folder=./configs_dev marketing=marketing_backend container_name=marketing_backend http_port=8090 -```` -* * * -### docker环境下开发 -+ 一、[下载Docker Desktop安装程序](https://www.docker.com/products/docker-desktop) -+ 二、在项目根目录下执行命令 - ```shell - docker build -f Dockerfile_win -t 镜像名称 . - docker run --privileged -itd --name 容器名称 --restart=always -v ./:/src 镜像名称 - docker ps - docker exec -it 容器名称 sh - make init - make all - ``` - -### windows非docker开发 -1 安装插件(配置goproxy,GOPROXY=https://goproxy.cn,direct) -```shell -go install google.golang.org/protobuf/cmd/protoc-gen-go@latest -go install github.com/go-kratos/kratos/cmd/kratos/v2@latest -go install github.com/go-kratos/kratos/cmd/protoc-gen-go-http/v2@latest -go install github.com/google/gnostic/cmd/protoc-gen-openapi@latest -go install github.com/google/wire/cmd/wire@latest -go install github.com/go-kratos/kratos/cmd/protoc-gen-go-errors/v2@latest -go install gorm.io/gen/tools/gentool@latest -``` -2生成相应rpo - -命令:kratos proto client api/helloworld/v1/demo.proto - -位置:api和internal下面的conf - -3 wire生成依赖 -cd cmd/server -wire - -4 配置编译 - -![img_1.png](img.png) - -5 生成service -kratos proto server api/helloworld/v1/demo.proto -t internal/service - diff --git a/cmd/server/wire.go b/cmd/server/wire.go index f03001e..6c27594 100644 --- a/cmd/server/wire.go +++ b/cmd/server/wire.go @@ -12,6 +12,7 @@ import ( "github.com/robfig/cron" "voucher/internal/biz" "voucher/internal/biz/cmb" + "voucher/internal/biz/timeslicequery" "voucher/internal/conf" "voucher/internal/data" "voucher/internal/data/mixrepoimpl" @@ -33,6 +34,7 @@ func wireApp(*conf.Bootstrap, log.Logger, *log2.AccessLogger) (*kratos.App, func repoimpl.ProviderRepoImplSet, wechatrepoimpl.ProviderWechatReposImplSet, mixrepoimpl.ProviderMixRepoImplSet, + timeslicequery.ProviderSetTimeSliceQuery, log2.NewLogHelper, cron.New, newApp, diff --git a/configs/config.yaml b/configs/config.yaml index decdbea..14cc902 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -85,18 +85,24 @@ cron: command: "0 0 1 * * ?" # 每天凌晨1点执行一次 rdsMQ: - wechatQuery: #发放结算 + wechatQuery: name: "wechatQuery" retryNum: 1 #重试次数 numWorkers: 2 #协程数量,不配置默认为10 waitTime: 1s #处理完成后等待时间 + isOpen: false #是否启动消费 true/false + wechatTimeSliceQuery: + name: "wechatTimeSliceQuery" + retryNum: 1 #重试次数 + numWorkers: 3 #协程数量,不配置默认为10 + waitTime: 1s #处理完成后等待时间 isOpen: true #是否启动消费 true/false - orderRetry: #发放结算 + orderRetry: name: "orderRetry" retryNum: 1 #重试次数 numWorkers: 2 #协程数量,不配置默认为10 waitTime: 1s #处理完成后等待时间 - isOpen: true #是否启动消费 true/false + isOpen: false #是否启动消费 true/false #配置日志 logs: diff --git a/configs/config_test.yaml b/configs/config_test.yaml index 9718289..7229860 100644 --- a/configs/config_test.yaml +++ b/configs/config_test.yaml @@ -91,7 +91,12 @@ rdsMQ: numWorkers: 1 #协程数量,不配置默认为10 waitTime: 1s #处理完成后等待时间 isOpen: true #是否启动消费 true/false - + wechatTimeSliceQuery: + name: "wechatTimeSliceQuery" + retryNum: 1 #重试次数 + numWorkers: 3 #协程数量,不配置默认为10 + waitTime: 1s #处理完成后等待时间 + isOpen: true #是否启动消费 true/false #配置日志 logs: business: business.log #业务日志路径:如果不写日志,则不配置或配置为空 diff --git a/internal/biz/timeslicequery/README.md b/internal/biz/timeslicequery/README.md new file mode 100644 index 0000000..e0d8ef2 --- /dev/null +++ b/internal/biz/timeslicequery/README.md @@ -0,0 +1,14 @@ +#

券状态同步

+* * * +### 主要工作 ++ 券状态查询同步 +* * * +### 规则说明 ++ 按照时间分片处理,按照2小时为一个时间片启用一个协程消费处理 ++ 协程最大可同时运行指定数量,设置为2 +* * * +### 使用方式 ++ 消费处理,按照时间范围上报消费 ++ 每次请求按照时间片分别启用2个协程消费处理,也就是每个请求可以同时启用2个协程消费处理 ++ 请不要无休止的访问,请按照时间片进行访问,并且不要重复的时间片访问,增加系统负载,特殊发券日期量较大,建议缩短时间范围上报,多切分上报处理 +* * * \ No newline at end of file diff --git a/internal/biz/timeslicequery/base.go b/internal/biz/timeslicequery/base.go new file mode 100644 index 0000000..81155bd --- /dev/null +++ b/internal/biz/timeslicequery/base.go @@ -0,0 +1,84 @@ +package timeslicequery + +import ( + "github.com/nacos-group/nacos-sdk-go/util" + "sync" + "voucher/internal/biz/cmb" + "voucher/internal/biz/mixrepos" + "voucher/internal/biz/repo" + "voucher/internal/biz/wechatrepo" + "voucher/internal/conf" + "voucher/internal/data" +) + +type Query struct { + mu sync.RWMutex + queryMap map[string]bool + + bc *conf.Bootstrap + rdb *data.Rdb + cmb *cmb.Cmb + + productRepo repo.ProductRepo + orderRepo repo.OrderRepo + + wechatCpnRepo wechatrepo.WechatCpnRepo + mqSendMixRepo mixrepos.MQSendMixRepo +} + +func NewQuery( + bc *conf.Bootstrap, + rdb *data.Rdb, + cmb *cmb.Cmb, + productRepo repo.ProductRepo, + orderRepo repo.OrderRepo, + wechatCpnRepo wechatrepo.WechatCpnRepo, + mqSendMixRepo mixrepos.MQSendMixRepo) *Query { + return &Query{ + queryMap: make(map[string]bool), + bc: bc, + rdb: rdb, + cmb: cmb, + productRepo: productRepo, + orderRepo: orderRepo, + wechatCpnRepo: wechatCpnRepo, + mqSendMixRepo: mqSendMixRepo} +} + +func (v *Query) uid(no string) string { + return util.Md5("query" + no) +} + +func (this *Query) GetAll() map[string]bool { + return this.queryMap +} + +func (this *Query) Get(uid string) bool { + + this.mu.Lock() + defer this.mu.Unlock() + + if _, ok := this.queryMap[uid]; ok { + return ok + } + + return false +} + +func (this *Query) Add(uid string) { + + this.mu.Lock() + defer this.mu.Unlock() + + this.queryMap[uid] = true +} + +func (this *Query) Remove(uid string) { + + this.mu.Lock() + defer this.mu.Unlock() + + if _, ok := this.queryMap[uid]; ok { + delete(this.queryMap, uid) + } +} diff --git a/internal/biz/timeslicequery/execute.go b/internal/biz/timeslicequery/execute.go new file mode 100644 index 0000000..c143c6b --- /dev/null +++ b/internal/biz/timeslicequery/execute.go @@ -0,0 +1,86 @@ +package timeslicequery + +import ( + "context" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "time" + "voucher/internal/biz/bo" + "voucher/internal/biz/do" + "voucher/internal/pkg/timeslice" +) + +func (v *Query) execute(ctx context.Context, req *timeslice.Manager) error { + + managerStartStr := req.StartTime.Format(time.DateTime) + managerEndStr := req.EndTime.Format(time.DateTime) + + taskCount, err := timeslice.NewManager(v.callbackFunc).Run(ctx, req) + if err != nil { + log.Errorf("%s到%s,发生错误:%v", managerStartStr, managerEndStr, err) + } + + fmt.Printf("%s到%s,总任务数:%d\n", managerStartStr, managerEndStr, taskCount) + log.Warnf("%s到%s,总任务数:%d", managerStartStr, managerEndStr, taskCount) + + return nil +} + +func (v *Query) callbackFunc(ctx context.Context, req *timeslice.Task) error { + + startTimeStr := req.Process.Manager.StartTime.Format(time.DateTime) + endTimeStr := req.Process.Manager.EndTime.Format(time.DateTime) + + currentStartTimeStr := req.CurrentStartTime.Format(time.DateTime) + currentEndTimeStr := req.CurrentEndTime.Format(time.DateTime) + + start := time.Now() + startStr := start.Format(time.DateTime) + + x := &do.WechatQuery{ + StartTime: currentStartTimeStr, + EndTime: currentEndTimeStr, + ProductNo: req.Process.Manager.ProductNo, + } + + n := 0 + num := 0 + notifyNum := 0 + + err := v.orderRepo.FinSucByStockIdInBatches(ctx, x, func(ctx context.Context, rows []*bo.OrderBo) error { + + n += 1 + for _, order := range rows { + + num += 1 + if err := v.wechatQuery(ctx, order, ¬ifyNum); err != nil { + logFields := map[string]string{ + "order_no": order.OrderNo, + "coupon_id": order.VoucherNo, + "open_id": order.Account, + "err": err.Error(), + } + log.Errorf("%s到%s,第%d个任务,第%d组,发生错误:+v", startTimeStr, endTimeStr, req.TaskID, n, logFields) + } + + } + + return nil + }) + + end := time.Now() + + logFields := map[string]interface{}{ + "总处理组数": n, + "总处理条数": num, + "总通知条数": notifyNum, + "执行任务开始时间": startStr, + "执行任务结束时间": end.Format(time.DateTime), + "任务处理开始时间": currentStartTimeStr, + "任务处理结束时间": currentEndTimeStr, + "总处理耗时": end.Sub(start).String(), + } + log.Warnf("%s到%s,第%d个任务,处理完毕:%+v", startTimeStr, endTimeStr, req.TaskID, logFields) + + return err +} diff --git a/internal/biz/timeslicequery/mq.go b/internal/biz/timeslicequery/mq.go new file mode 100644 index 0000000..6e0c171 --- /dev/null +++ b/internal/biz/timeslicequery/mq.go @@ -0,0 +1,118 @@ +package timeslicequery + +import ( + "context" + "encoding/json" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/transport/http" + "time" + "voucher/internal/biz/do" + "voucher/internal/pkg/timeslice" +) + +func (v *Query) Push(ctx http.Context, req *do.WechatQuery) (string, error) { + + if req.StartTime == "" || req.EndTime == "" { + return "", fmt.Errorf("时间参数不能为空") + } + + queue := v.bc.RdsMQ.GetWechatTimeSliceQuery() + if queue == nil { + return "", fmt.Errorf("队列不存在") + } + + if queue.Name == "" { + return "", fmt.Errorf("队列不存在") + } + + if queue.IsOpen == false { + return "", fmt.Errorf("队列未开启") + } + + if req.ProductNo != "" { + _, err := v.productRepo.GetByProductNo(ctx, req.ProductNo) + if err != nil { + return "", err + } + } + + b, err := json.Marshal(req) + if err != nil { + return "", err + } + + strMsg := string(b) + + uid := v.uid(strMsg) + if v.Get(uid) { + return "", fmt.Errorf("此台服务队列正在处理中,%s-%s,ip:%s", uid, strMsg, ctx.Header().Get("X-Forwarded-For")) + } + + v.Add(uid) + + _, err = v.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result() + if err != nil { + v.Remove(uid) + return "", fmt.Errorf("添加到队列失败:%v", err) + } + + return strMsg, nil +} + +func (v *Query) getManager(_ context.Context, msg string) (*timeslice.Manager, error) { + + var req *do.WechatQuery + + if err := json.Unmarshal([]byte(msg), &req); err != nil { + return nil, err + } + + if req.StartTime == "" || req.EndTime == "" { + return nil, fmt.Errorf("时间参数不能为空") + } + + start, err := time.Parse(time.DateTime, req.StartTime) + if err != nil { + return nil, err + } + end, err := time.Parse(time.DateTime, req.EndTime) + if err != nil { + return nil, err + } + + return ×lice.Manager{ + StartTime: start, + EndTime: end, + ProductNo: req.ProductNo, + GoNum: 2, // 协程数量 + TimeSliceHours: 2, // 时间间隔 + }, nil +} + +func (v *Query) Consumer(ctx context.Context, msg string) error { + + defer v.Remove(v.uid(msg)) + + req, err := v.getManager(ctx, msg) + if err != nil { + return err + } + + executeStart := time.Now() + executeStartStr := executeStart.Format(time.DateTime) + + log.Warnf("微信券查询处理开始:%s,msg:%s", executeStartStr, msg) + fmt.Printf("微信券查询处理开始:%s,msg:%s", executeStartStr, msg) + + if err = v.execute(ctx, req); err != nil { + log.Errorf("微信券查询处理失败:%s,msg:%s,err:%v", executeStartStr, msg, err) + return fmt.Errorf("微信券查询处理失败:%s,msg:%s,err:%v", executeStartStr, msg, err) + } + + executeEnd := time.Now() + log.Warnf("微信券查询处理耗时:%s,结束时间%s,msg:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), msg) + fmt.Printf("微信券查询处理耗时:%s,结束时间%s,msg:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), msg) + + return nil +} diff --git a/internal/biz/timeslicequery/provider_set.go b/internal/biz/timeslicequery/provider_set.go new file mode 100644 index 0000000..72ffd16 --- /dev/null +++ b/internal/biz/timeslicequery/provider_set.go @@ -0,0 +1,7 @@ +package timeslicequery + +import ( + "github.com/google/wire" +) + +var ProviderSetTimeSliceQuery = wire.NewSet(NewQuery) diff --git a/internal/biz/timeslicequery/query.go b/internal/biz/timeslicequery/query.go new file mode 100644 index 0000000..1b626f1 --- /dev/null +++ b/internal/biz/timeslicequery/query.go @@ -0,0 +1,66 @@ +package timeslicequery + +import ( + "context" + "github.com/go-kratos/kratos/v2/log" + "voucher/internal/biz/bo" +) + +func (v *Query) wechatQuery(ctx context.Context, order *bo.OrderBo, notifyNum *int) error { + + status, err := v.wechatCpnRepo.Query(ctx, order) + if err != nil { + return err + } + + if status.IsUse() { + return v.queryUsed(ctx, order, notifyNum) + } else if status.IsExpired() { + return v.queryExpired(ctx, order) + } + + return nil +} + +func (v *Query) queryUsed(ctx context.Context, order *bo.OrderBo, notifyNum *int) error { + + *notifyNum += 1 + + if order.Status.IsUse() { + return v.notify(ctx, order) + } + + if err := v.orderRepo.Used(ctx, order.ID); err != nil { + return err + } + + return v.notify(ctx, order) +} + +func (v *Query) queryExpired(ctx context.Context, order *bo.OrderBo) error { + + if order.Status.IsExpired() { + log.Warnf("券状态已是已过期,忽略不处理,orderNo:%s", order.OrderNo) + return nil + } + + if err := v.orderRepo.Expired(ctx, order.ID); err != nil { + return err + } + + return nil // 过期不做通知 +} + +func (v *Query) notify(ctx context.Context, order *bo.OrderBo) error { + + order, err := v.orderRepo.GetByID(ctx, order.ID) + if err != nil { + return err + } + + if _, err = v.cmb.Notify(ctx, order); err != nil { + return err + } + + return nil +} diff --git a/internal/conf/conf.pb.go b/internal/conf/conf.pb.go index dc691b8..0dcadd7 100644 --- a/internal/conf/conf.pb.go +++ b/internal/conf/conf.pb.go @@ -840,8 +840,9 @@ type RdsMQ struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"` - WechatRetry *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"` + WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"` + WechatTimeSliceQuery *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatTimeSliceQuery,proto3" json:"wechatTimeSliceQuery,omitempty"` + WechatRetry *RdsMQ_Queue `protobuf:"bytes,3,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"` } func (x *RdsMQ) Reset() { @@ -883,6 +884,13 @@ func (x *RdsMQ) GetWechatQuery() *RdsMQ_Queue { return nil } +func (x *RdsMQ) GetWechatTimeSliceQuery() *RdsMQ_Queue { + if x != nil { + return x.WechatTimeSliceQuery + } + return nil +} + func (x *RdsMQ) GetWechatRetry() *RdsMQ_Queue { if x != nil { return x.WechatRetry @@ -1552,32 +1560,37 @@ var file_conf_conf_proto_rawDesc = []byte{ 0x6b, 0x65, 0x79, 0x12, 0x35, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, - 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xae, + 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xff, 0x02, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68, - 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61, - 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, - 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, - 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61, - 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65, - 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, - 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, - 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, - 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x57, - 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, 0x75, - 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, 0x74, - 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, - 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x22, - 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, - 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, - 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, - 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, - 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x4f, 0x0a, 0x14, 0x77, 0x65, 0x63, 0x68, 0x61, + 0x74, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x6c, 0x69, 0x63, 0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, + 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, + 0x75, 0x65, 0x52, 0x14, 0x77, 0x65, 0x63, 0x68, 0x61, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x6c, + 0x69, 0x63, 0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, + 0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, + 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, + 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68, + 0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, + 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, + 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, + 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, + 0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, + 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, + 0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, + 0x6e, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, + 0x6e, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, + 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, + 0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1632,20 +1645,21 @@ var file_conf_conf_proto_depIdxs = []int32{ 15, // 13: voucher.config.RocketMQ.eventMap:type_name -> voucher.config.RocketMQ.EventMapEntry 17, // 14: voucher.config.Cron.commandMap:type_name -> voucher.config.Cron.CommandMapEntry 18, // 15: voucher.config.RdsMQ.wechatQuery:type_name -> voucher.config.RdsMQ.Queue - 18, // 16: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue - 19, // 17: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration - 19, // 18: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration - 19, // 19: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration - 19, // 20: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration - 19, // 21: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration - 4, // 22: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap - 16, // 23: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap - 19, // 24: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration - 25, // [25:25] is the sub-list for method output_type - 25, // [25:25] is the sub-list for method input_type - 25, // [25:25] is the sub-list for extension type_name - 25, // [25:25] is the sub-list for extension extendee - 0, // [0:25] is the sub-list for field type_name + 18, // 16: voucher.config.RdsMQ.wechatTimeSliceQuery:type_name -> voucher.config.RdsMQ.Queue + 18, // 17: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue + 19, // 18: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration + 19, // 19: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration + 19, // 20: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration + 19, // 21: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration + 19, // 22: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration + 4, // 23: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap + 16, // 24: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap + 19, // 25: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration + 26, // [26:26] is the sub-list for method output_type + 26, // [26:26] is the sub-list for method input_type + 26, // [26:26] is the sub-list for extension type_name + 26, // [26:26] is the sub-list for extension extendee + 0, // [0:26] is the sub-list for field type_name } func init() { file_conf_conf_proto_init() } diff --git a/internal/conf/conf.proto b/internal/conf/conf.proto index cb15acf..772e7fc 100644 --- a/internal/conf/conf.proto +++ b/internal/conf/conf.proto @@ -128,7 +128,8 @@ message RdsMQ { google.protobuf.Duration waitTime = 5; } Queue wechatQuery = 1; - Queue wechatRetry = 2; + Queue wechatTimeSliceQuery = 2; + Queue wechatRetry = 3; } message Logs { diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 6c803bc..f4dfea0 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -39,7 +39,7 @@ func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.We tx = tx.Where("product_no = ?", req.ProductNo) } if req.StartTime != "" { - tx = tx.Where("receive_success_time >= ?", req.StartTime) + tx = tx.Where("receive_success_time > ?", req.StartTime) } if req.EndTime != "" { tx = tx.Where("receive_success_time <= ?", req.EndTime) diff --git a/internal/pkg/timeslice/manager.go b/internal/pkg/timeslice/manager.go index aa7e0c6..2d221ac 100644 --- a/internal/pkg/timeslice/manager.go +++ b/internal/pkg/timeslice/manager.go @@ -8,11 +8,15 @@ import ( "time" ) +const TimeSliceHours = 2 + +type Callback func(ctx context.Context, req *Task) error + type ManagerSrv struct { - callback func(ctx context.Context, req *Task) error + callback Callback } -func NewManager(callback func(ctx context.Context, req *Task) error) *ManagerSrv { +func NewManager(callback Callback) *ManagerSrv { return &ManagerSrv{callback: callback} } @@ -22,17 +26,24 @@ func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) { return 0, fmt.Errorf("start_time不能大于end_time") } - totalHours := req.EndTime.Sub(req.StartTime).Hours() - taskCount := int(totalHours / 2) + if req.GoNum == 0 { + return 0, fmt.Errorf("协程数量不能为0") + } + if req.GoNum > 100 { + return 0, fmt.Errorf("协程数量不能大于100") + } - // 如果剩余时间不足2小时,增加任务数 - if totalHours-float64(taskCount)*float64(2) > 0 { + totalHours := req.EndTime.Sub(req.StartTime).Hours() + taskCount := int(totalHours / TimeSliceHours) + + // 如果剩余时间不足 TimeSliceHours 小时,增加任务数 + if totalHours-float64(taskCount)*float64(TimeSliceHours) > 0 { taskCount++ } processReq := &Process{ - manager: req, - taskCount: taskCount, + Manager: req, + TaskCount: taskCount, } return taskCount, m.process(ctx, processReq) @@ -40,31 +51,24 @@ func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) { func (m *ManagerSrv) process(ctx context.Context, req *Process) error { - if req.taskCount == 0 { + if req.TaskCount == 0 { return fmt.Errorf("该时间范围无可执行任务次数,请检查时间范围") } - if req.manager.GoNum == 0 { - return fmt.Errorf("协程数量不能为0") - } - if req.manager.GoNum > 100 { - return fmt.Errorf("协程数量不能大于100") - } - // 设置最大并发任务数为 5 eg := new(errgroup.Group) - eg.SetLimit(req.manager.GoNum) + eg.SetLimit(req.Manager.GoNum) errs := make([]error, 0) // 用于存储所有错误 - // 为每个任务分配开始和结束时间 - for i := 0; i < req.taskCount; i++ { + // 为每个任务按指定的时间片 TimeSliceHours 分配开始和结束时间 + for i := 0; i < req.TaskCount; i++ { - currentStart := req.manager.StartTime.Add(time.Duration(i) * 2 * time.Hour) - currentEnd := currentStart.Add(2 * time.Hour) + currentStart := req.Manager.StartTime.Add(time.Duration(i) * TimeSliceHours * time.Hour) + currentEnd := currentStart.Add(TimeSliceHours * time.Hour) - if currentEnd.After(req.manager.EndTime) { - currentEnd = req.manager.EndTime + if currentEnd.After(req.Manager.EndTime) { + currentEnd = req.Manager.EndTime } taskID := i + 1 diff --git a/internal/pkg/timeslice/manager_test.go b/internal/pkg/timeslice/manager_test.go index 2864dc9..2cda4ae 100644 --- a/internal/pkg/timeslice/manager_test.go +++ b/internal/pkg/timeslice/manager_test.go @@ -49,10 +49,10 @@ func TestNewManager(t *testing.T) { // 模拟任务执行,休眠随机时间 time.Sleep(time.Duration(rand.Intn(3)) * time.Second) // 生成任务执行结果 - result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.Process.manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime)) + result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.Process.Manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime)) results = append(results, result) //return nil - return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.Process.manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime)) + return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.Process.Manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime)) } startTime := time.Now() @@ -179,8 +179,8 @@ func CallbackFunc(start, end time.Time) error { func callbackFunc(_ context.Context, req *Task) error { - managerStartTimeStr := req.Process.manager.StartTime.Format(time.DateTime) - managerEndTimeStr := req.Process.manager.EndTime.Format(time.DateTime) + managerStartTimeStr := req.Process.Manager.StartTime.Format(time.DateTime) + managerEndTimeStr := req.Process.Manager.EndTime.Format(time.DateTime) currentStartTimeStr := req.CurrentStartTime.Format(time.DateTime) currentEndTimeStr := req.CurrentEndTime.Format(time.DateTime) @@ -225,9 +225,7 @@ func callbackFunc(_ context.Context, req *Task) error { // 生成任务执行结果 result := fmt.Sprintf("%s到%s,第%d个任务,处理完毕,%+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, logFields) - fmt.Printf(result) - //return fmt.Errorf(result) return nil } diff --git a/internal/pkg/timeslice/model.go b/internal/pkg/timeslice/model.go index 65ef733..35dfb12 100644 --- a/internal/pkg/timeslice/model.go +++ b/internal/pkg/timeslice/model.go @@ -5,20 +5,21 @@ import ( ) type Manager struct { - StartTime time.Time - EndTime time.Time - ProductNo string - GoNum int + StartTime time.Time // 开始时间 + EndTime time.Time // 结束时间 + ProductNo string // 产品编号 + GoNum int // 并发数 + TimeSliceHours int // 时间片"小时" } type Process struct { - manager *Manager - taskCount int + Manager *Manager + TaskCount int // 任务数 } type Task struct { Process *Process - CurrentStartTime time.Time - CurrentEndTime time.Time - TaskID int + CurrentStartTime time.Time // 时间片开始时间 + CurrentEndTime time.Time // 时间片结束时间 + TaskID int // 任务ID } diff --git a/internal/server/http.go b/internal/server/http.go index 3109eed..5bdbec5 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -40,6 +40,7 @@ func NewHTTPServer( srv.Route("/voucher/").POST("queryOrder/{order_no}", cmb.QueryOrder) srv.Route("/voucher/").POST("registerTag/{product_no}", cmb.RegisterTag) srv.Route("/voucher/").POST("pushWechatQuery", cmb.PushWechatQuery) + srv.Route("/voucher/").POST("timeSliceQueryPush", cmb.TimeSliceQueryPush) srv.Route("/voucher/").POST("pushWechatRetry/{product_no}", cmb.PushWechatRetry) v1.RegisterCmbHTTPServer(srv, cmb) diff --git a/internal/service/cmb.go b/internal/service/cmb.go index 7936dbc..cb5d52f 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -2,19 +2,16 @@ package service import ( "context" - "encoding/json" "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport/http" "github.com/robfig/cron" - "io" http2 "net/http" - "strconv" v1 "voucher/api/v1" "voucher/internal/biz" "voucher/internal/biz/bo" - "voucher/internal/biz/do" "voucher/internal/biz/mixrepos" + "voucher/internal/biz/timeslicequery" "voucher/internal/biz/vo" "voucher/internal/biz/wechatrepo" "voucher/internal/conf" @@ -23,11 +20,12 @@ import ( var _ v1.CmbHTTPServer = (*CmbService)(nil) type CmbService struct { - bc *conf.Bootstrap - cron *cron.Cron - VoucherBiz *biz.VoucherBiz - CmbMixRepo mixrepos.CmbMixRepo - WechatCpnRepo wechatrepo.WechatCpnRepo + bc *conf.Bootstrap + cron *cron.Cron + VoucherBiz *biz.VoucherBiz + CmbMixRepo mixrepos.CmbMixRepo + WechatCpnRepo wechatrepo.WechatCpnRepo + timeSliceQuery *timeslicequery.Query } func NewCmbService( @@ -36,13 +34,15 @@ func NewCmbService( VoucherBiz *biz.VoucherBiz, CmbMixRepo mixrepos.CmbMixRepo, WechatCpnRepo wechatrepo.WechatCpnRepo, + timeSliceQuery *timeslicequery.Query, ) *CmbService { return &CmbService{ - bc: bc, - cron: cron, - VoucherBiz: VoucherBiz, - CmbMixRepo: CmbMixRepo, - WechatCpnRepo: WechatCpnRepo, + bc: bc, + cron: cron, + VoucherBiz: VoucherBiz, + CmbMixRepo: CmbMixRepo, + WechatCpnRepo: WechatCpnRepo, + timeSliceQuery: timeSliceQuery, } } @@ -63,37 +63,6 @@ func (c *CmbService) GetResponse(ctx context.Context, replyBizContent []byte) (* return reply, nil } -func (this *CmbService) NotifyRetry(ctx http.Context) error { - id := ctx.Vars().Get("id") - if id == "" { - return fmt.Errorf("id is empty") - } - - orderNotifyId, err := strconv.ParseUint(id, 10, 64) - if err != nil { - return err - } - - return this.VoucherBiz.PushNotifyRetryDelayMQ(ctx, 1, orderNotifyId) -} - -func (this *CmbService) QueryOrder(ctx http.Context) error { - - orderNo := ctx.Vars().Get("order_no") - if orderNo == "" { - return fmt.Errorf("orderNo is empty") - } - - str, err := this.VoucherBiz.QueryOrder(ctx, orderNo) - if err != nil { - return err - } - - return ctx.JSON(http2.StatusOK, map[string]interface{}{ - "data": str, - }) -} - func (c *CmbService) OrderRetry(ctx context.Context, request *v1.OrderRetryRequest) (*v1.Empty, error) { return nil, c.VoucherBiz.OrderRetry(ctx, request.GetTransactionIds()) @@ -115,49 +84,3 @@ func (this *CmbService) RegisterTag(ctx http.Context) error { "data": productNo, }) } - -func (this *CmbService) PushWechatQuery(ctx http.Context) error { - - bodyBytes, err := io.ReadAll(ctx.Request().Body) - if err != nil { - return err - } - - var req *do.WechatQuery - if err = json.Unmarshal(bodyBytes, &req); err != nil { - return err - } - - if req == nil { - return fmt.Errorf("req is empty") - } - - if req.StartTime == "" || req.EndTime == "" { - return fmt.Errorf("start_time or end_time is empty") - } - - if err = this.VoucherBiz.PushWechatQuery(ctx, req); err != nil { - return err - } - - return ctx.JSON(http2.StatusOK, map[string]interface{}{ - "data": req, - }) -} - -func (this *CmbService) PushWechatRetry(ctx http.Context) error { - - productNo := ctx.Vars().Get("product_no") - if productNo == "" { - return fmt.Errorf("product_no is empty") - } - - err := this.VoucherBiz.PushWechatRetry(ctx, productNo) - if err != nil { - return err - } - - return ctx.JSON(http2.StatusOK, map[string]interface{}{ - "data": productNo, - }) -} diff --git a/internal/service/script.go b/internal/service/script.go new file mode 100644 index 0000000..e1ea10f --- /dev/null +++ b/internal/service/script.go @@ -0,0 +1,118 @@ +package service + +import ( + "encoding/json" + "fmt" + "github.com/go-kratos/kratos/v2/transport/http" + "io" + http2 "net/http" + "strconv" + "voucher/internal/biz/do" +) + +func (this *CmbService) NotifyRetry(ctx http.Context) error { + id := ctx.Vars().Get("id") + if id == "" { + return fmt.Errorf("id is empty") + } + + orderNotifyId, err := strconv.ParseUint(id, 10, 64) + if err != nil { + return err + } + + return this.VoucherBiz.PushNotifyRetryDelayMQ(ctx, 1, orderNotifyId) +} + +func (this *CmbService) QueryOrder(ctx http.Context) error { + + orderNo := ctx.Vars().Get("order_no") + if orderNo == "" { + return fmt.Errorf("orderNo is empty") + } + + str, err := this.VoucherBiz.QueryOrder(ctx, orderNo) + if err != nil { + return err + } + + return ctx.JSON(http2.StatusOK, map[string]interface{}{ + "data": str, + }) +} + +func (this *CmbService) PushWechatQuery(ctx http.Context) error { + + bodyBytes, err := io.ReadAll(ctx.Request().Body) + if err != nil { + return err + } + + var req *do.WechatQuery + if err = json.Unmarshal(bodyBytes, &req); err != nil { + return err + } + + if req == nil { + return fmt.Errorf("req is empty") + } + + if req.StartTime == "" || req.EndTime == "" { + return fmt.Errorf("start_time or end_time is empty") + } + + if err = this.VoucherBiz.PushWechatQuery(ctx, req); err != nil { + return err + } + + return ctx.JSON(http2.StatusOK, map[string]interface{}{ + "data": req, + }) +} + +func (this *CmbService) TimeSliceQueryPush(ctx http.Context) error { + + bodyBytes, err := io.ReadAll(ctx.Request().Body) + if err != nil { + return err + } + + var req *do.WechatQuery + if err = json.Unmarshal(bodyBytes, &req); err != nil { + return err + } + + if req == nil { + return fmt.Errorf("req is empty") + } + + if req.StartTime == "" || req.EndTime == "" { + return fmt.Errorf("start_time or end_time is empty") + } + + reps, err := this.timeSliceQuery.Push(ctx, req) + if err != nil { + return err + } + + return ctx.JSON(http2.StatusOK, map[string]interface{}{ + "data": reps, + }) +} + +func (this *CmbService) PushWechatRetry(ctx http.Context) error { + + productNo := ctx.Vars().Get("product_no") + if productNo == "" { + return fmt.Errorf("product_no is empty") + } + + err := this.VoucherBiz.PushWechatRetry(ctx, productNo) + if err != nil { + return err + } + + return ctx.JSON(http2.StatusOK, map[string]interface{}{ + "data": productNo, + }) +} diff --git a/internal/service/voucher.go b/internal/service/voucher.go index c663d55..83f1acb 100644 --- a/internal/service/voucher.go +++ b/internal/service/voucher.go @@ -10,6 +10,7 @@ import ( "time" "voucher/internal/biz" "voucher/internal/biz/bo" + "voucher/internal/biz/timeslicequery" "voucher/internal/conf" "voucher/internal/data" "voucher/internal/pkg/mq" @@ -21,6 +22,8 @@ type VoucherService struct { VoucherBiz *biz.VoucherBiz rdb *data.Rdb logHelper *log.Helper + + timeSliceQuery *timeslicequery.Query } func NewVoucherService( @@ -29,13 +32,15 @@ func NewVoucherService( VoucherBiz *biz.VoucherBiz, rdb *data.Rdb, logHelper *log.Helper, + timeSliceQuery *timeslicequery.Query, ) *VoucherService { return &VoucherService{ - bc: bc, - cron: cron, - VoucherBiz: VoucherBiz, - rdb: rdb, - logHelper: logHelper, + bc: bc, + cron: cron, + VoucherBiz: VoucherBiz, + rdb: rdb, + logHelper: logHelper, + timeSliceQuery: timeSliceQuery, } } diff --git a/internal/service/wechat_query.go b/internal/service/wechat_query.go index 390964e..6e44485 100644 --- a/internal/service/wechat_query.go +++ b/internal/service/wechat_query.go @@ -43,3 +43,40 @@ func (s *VoucherService) WechatQueryHandle(ctx context.Context, msg string) erro return nil } + +func (s *VoucherService) GetWechatTimeSliceQueryConfig() *rdsmq.ConsumeConfig { + + queue := s.bc.RdsMQ.GetWechatTimeSliceQuery() + if queue == nil { + return nil + } + + if !queue.GetIsOpen() { + log.Warn(fmt.Sprintf("[%s]RdsMQ is not open", queue.Name)) + return nil + } + + return &rdsmq.ConsumeConfig{ + Rdb: s.rdb.Rdb, + QueueName: queue.Name, + NumWorkers: queue.NumWorkers, + WaitTime: queue.GetWaitTime().AsDuration(), + RetryNum: queue.RetryNum, + Fn: s.WechatTimeSliceQueryHandle, + Logger: s.logHelper, + } +} + +func (s *VoucherService) WechatTimeSliceQueryHandle(ctx context.Context, msg string) error { + + if msg == "" { + s.logHelper.Errorf("wechat TimeSlice query error: batchNo is empty") + return nil + } + + if err := s.timeSliceQuery.Consumer(ctx, msg); err != nil { + s.logHelper.Errorf("wechat TimeSlice query msg:%s error: %v", msg, err) + } + + return nil +}