From 69c4171a20a71f1fcf7fd9f52f0955aa08a844c4 Mon Sep 17 00:00:00 2001 From: ziming Date: Thu, 5 Jun 2025 18:27:22 +0800 Subject: [PATCH] wechat retry --- configs/config.yaml | 6 +++ internal/biz/repo/order.go | 1 + internal/biz/wechat_retry.go | 62 +++++++++++++++++++++++++ internal/conf/conf.pb.go | 77 +++++++++++++++++++------------- internal/conf/conf.proto | 1 + internal/data/repoimpl/order.go | 19 ++++++++ internal/server/http.go | 1 + internal/server/rds_consume.go | 4 ++ internal/service/cmb.go | 17 +++++++ internal/service/wechat_retry.go | 52 +++++++++++++++++++++ 10 files changed, 208 insertions(+), 32 deletions(-) create mode 100644 internal/biz/wechat_retry.go create mode 100644 internal/service/wechat_retry.go diff --git a/configs/config.yaml b/configs/config.yaml index 9425c90..95b9bef 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -91,6 +91,12 @@ rdsMQ: numWorkers: 1 #协程数量,不配置默认为10 waitTime: 1s #处理完成后等待时间 isOpen: true #是否启动消费 true/false + orderRetry: #发放结算 + name: "orderRetry" + retryNum: 1 #重试次数 + numWorkers: 1 #协程数量,不配置默认为10 + waitTime: 1s #处理完成后等待时间 + isOpen: true #是否启动消费 true/false #配置日志 logs: diff --git a/internal/biz/repo/order.go b/internal/biz/repo/order.go index 8c9bab9..2cb954f 100644 --- a/internal/biz/repo/order.go +++ b/internal/biz/repo/order.go @@ -8,6 +8,7 @@ import ( type OrderRepo interface { FinByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error + FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error) diff --git a/internal/biz/wechat_retry.go b/internal/biz/wechat_retry.go new file mode 100644 index 0000000..321205f --- /dev/null +++ b/internal/biz/wechat_retry.go @@ -0,0 +1,62 @@ +package biz + +import ( + "context" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "time" + "voucher/internal/biz/bo" +) + +func (v *VoucherBiz) PushWechatRetry(ctx context.Context, productNo string) error { + + product, err := v.ProductRepo.GetByProductNo(ctx, productNo) + if err != nil { + return err + } + + queue := v.bc.RdsMQ.GetWechatRetry() + if queue == nil { + return fmt.Errorf("队列不存在") + } + + _, err = v.rdb.Rdb.RPush(ctx, queue.Name, product.BatchNo).Result() + if err != nil { + return fmt.Errorf("添加到队列失败:%v", err) + } + + return nil +} + +func (v *VoucherBiz) WechatRetry(ctx context.Context, batchNo string) error { + + if batchNo == "" { + return fmt.Errorf("batchNo is empty") + } + + log.Infof("失败订单重试开始,batchNo:%s", batchNo) + + num := 0 + return v.OrderRepo.FinFailByStockIdInBatches(ctx, batchNo, func(ctx context.Context, rows []*bo.OrderBo) error { + + if len(rows) == 0 { + log.Infof("微信查询券订单状态,batchNo[%s],已处理[%d]单,无订单,结束执行", batchNo, num) + return nil + } + + for _, order := range rows { + + num += 1 + if err := v.orderRetry(ctx, order); err != nil { + log.Errorf("失败订单重试发生错误,batchNo:%s,orderNo:%s,appId:%s,openId:%s,err:%v", + batchNo, order.OrderNo, order.AppID, order.Account, err) + } + + } + + time.Sleep(1 * time.Second) + + return nil + }) + +} diff --git a/internal/conf/conf.pb.go b/internal/conf/conf.pb.go index dec41c7..dc691b8 100644 --- a/internal/conf/conf.pb.go +++ b/internal/conf/conf.pb.go @@ -841,6 +841,7 @@ type RdsMQ struct { unknownFields protoimpl.UnknownFields WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"` + WechatRetry *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"` } func (x *RdsMQ) Reset() { @@ -882,6 +883,13 @@ func (x *RdsMQ) GetWechatQuery() *RdsMQ_Queue { return nil } +func (x *RdsMQ) GetWechatRetry() *RdsMQ_Queue { + if x != nil { + return x.WechatRetry + } + return nil +} + type Logs struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1544,28 +1552,32 @@ var file_conf_conf_proto_rawDesc = []byte{ 0x6b, 0x65, 0x79, 0x12, 0x35, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, - 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xef, - 0x01, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, + 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xae, + 0x02, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68, - 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, - 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, - 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, - 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, - 0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, - 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, - 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, - 0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, - 0x6e, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, - 0x6e, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, - 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, - 0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61, + 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, + 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, + 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61, + 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, + 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, + 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x57, + 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, 0x75, + 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, 0x74, + 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x22, + 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, + 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, + 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, + 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, + 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1620,19 +1632,20 @@ var file_conf_conf_proto_depIdxs = []int32{ 15, // 13: voucher.config.RocketMQ.eventMap:type_name -> voucher.config.RocketMQ.EventMapEntry 17, // 14: voucher.config.Cron.commandMap:type_name -> voucher.config.Cron.CommandMapEntry 18, // 15: voucher.config.RdsMQ.wechatQuery:type_name -> voucher.config.RdsMQ.Queue - 19, // 16: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration - 19, // 17: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration - 19, // 18: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration - 19, // 19: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration - 19, // 20: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration - 4, // 21: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap - 16, // 22: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap - 19, // 23: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration - 24, // [24:24] is the sub-list for method output_type - 24, // [24:24] is the sub-list for method input_type - 24, // [24:24] is the sub-list for extension type_name - 24, // [24:24] is the sub-list for extension extendee - 0, // [0:24] is the sub-list for field type_name + 18, // 16: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue + 19, // 17: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration + 19, // 18: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration + 19, // 19: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration + 19, // 20: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration + 19, // 21: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration + 4, // 22: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap + 16, // 23: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap + 19, // 24: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration + 25, // [25:25] is the sub-list for method output_type + 25, // [25:25] is the sub-list for method input_type + 25, // [25:25] is the sub-list for extension type_name + 25, // [25:25] is the sub-list for extension extendee + 0, // [0:25] is the sub-list for field type_name } func init() { file_conf_conf_proto_init() } diff --git a/internal/conf/conf.proto b/internal/conf/conf.proto index b548e46..cb15acf 100644 --- a/internal/conf/conf.proto +++ b/internal/conf/conf.proto @@ -128,6 +128,7 @@ message RdsMQ { google.protobuf.Duration waitTime = 5; } Queue wechatQuery = 1; + Queue wechatRetry = 2; } message Logs { diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 79d388e..9d2d180 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -48,6 +48,25 @@ func (p *OrderRepoImpl) FinByStockIdInBatches(ctx context.Context, batchNo strin return nil } +func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { + + var results = make([]*model.Order, 0) + + result := p.DB(ctx). + Where("batch_no = ?", batchNo). + Where("status = ?", vo.OrderStatusFail.GetValue()). + Where("remark <> ?", "error: code = 500 reason = WechatFAIL message = 微信返回错误:该用户账号异常,无法领券。商家可联系微信支付或让用户联系微信支付客服处理。 metadat"). + FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { + return fun(ctx, p.ToBos(results)) + }) + + if result.Error != nil { + return result.Error + } + + return nil +} + func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { var results = make([]*model.Order, 0) diff --git a/internal/server/http.go b/internal/server/http.go index 636e471..c3e4e80 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -40,6 +40,7 @@ func NewHTTPServer( srv.Route("/voucher/").GET("queryOrder/{order_no}", cmb.QueryOrder) srv.Route("/voucher/").GET("registerTag/{product_no}", cmb.RegisterTag) srv.Route("/voucher/").GET("pushWechatQuery/{product_no}", cmb.PushWechatQuery) + srv.Route("/voucher/").GET("pushWechatQuery/{product_no}", cmb.PushWechatRetry) v1.RegisterCmbHTTPServer(srv, cmb) diff --git a/internal/server/rds_consume.go b/internal/server/rds_consume.go index e5fa79c..9eed6ef 100644 --- a/internal/server/rds_consume.go +++ b/internal/server/rds_consume.go @@ -30,6 +30,10 @@ func NewRdbConsumer( manager.Add(cf) } + if cf2 := voucherService.GetWechatConfig(); cf2 != nil { + manager.Add(cf2) + } + return &RdbConsumer{hLog: hLog, conf: conf, manager: manager} } diff --git a/internal/service/cmb.go b/internal/service/cmb.go index 2d2d260..bcbdc12 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -129,3 +129,20 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error { "data": productNo, }) } + +func (this *CmbService) PushWechatRetry(ctx http.Context) error { + + productNo := ctx.Vars().Get("product_no") + if productNo == "" { + return fmt.Errorf("product_no is empty") + } + + err := this.VoucherBiz.PushWechatRetry(ctx, productNo) + if err != nil { + return err + } + + return ctx.JSON(http2.StatusOK, map[string]interface{}{ + "data": productNo, + }) +} diff --git a/internal/service/wechat_retry.go b/internal/service/wechat_retry.go new file mode 100644 index 0000000..04a8bfd --- /dev/null +++ b/internal/service/wechat_retry.go @@ -0,0 +1,52 @@ +package service + +import ( + "context" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "time" + "voucher/internal/pkg/rdsmq" +) + +func (s *VoucherService) GetWechatConfig() *rdsmq.ConsumeConfig { + + queue := s.bc.RdsMQ.GetWechatRetry() + if queue == nil { + return nil + } + + if !queue.GetIsOpen() { + log.Warn(fmt.Sprintf("[%s]RdsMQ is not open", queue.Name)) + return nil + } + + return &rdsmq.ConsumeConfig{ + Rdb: s.rdb.Rdb, + QueueName: queue.Name, + NumWorkers: queue.NumWorkers, + WaitTime: queue.GetWaitTime().AsDuration(), + RetryNum: queue.RetryNum, + Fn: s.HandleWechat, + Logger: s.logHelper, + } +} + +func (s *VoucherService) HandleWechat(ctx context.Context, batchNo string) error { + + if batchNo == "" { + s.logHelper.Errorf("RdsMQ keySend error: batchNo is empty") + return nil + } + + start := time.Now() + fmt.Printf("失败订单重试处理开始:%s,batchNo:%s", start.String(), batchNo) + + if err := s.VoucherBiz.WechatRetry(ctx, batchNo); err != nil { + s.logHelper.Error(err) + } + + log.Warnf("失败订单重试处理耗时:%s,batchNo:%s", time.Now().Sub(start).String(), batchNo) + fmt.Printf("失败订单重试处理耗时:%s,batchNo:%s", time.Now().Sub(start).String(), batchNo) + + return nil +}