diff --git a/api/v1/cmb_cpn.proto b/api/v1/cmb_cpn.proto index b92b7a5..a3cd78e 100644 --- a/api/v1/cmb_cpn.proto +++ b/api/v1/cmb_cpn.proto @@ -29,6 +29,12 @@ service Cmb { }; } + rpc OrderRetry (OrderRetryRequest) returns (Empty) { + option (google.api.http) = { + post: "/voucher/cmb/v1/orderRetry", + body: "*" + }; + } rpc OrderMock (CmbOrderRequest) returns (CmbRequest) { option (google.api.http) = { @@ -64,6 +70,10 @@ service Cmb { } } +message OrderRetryRequest { + repeated string transactionIds = 1 [json_name = "transactionIds"]; +} + message CmbRequest { // 请求公共参数 // 合作方唯一ID,32位定长 diff --git a/internal/biz/cmb.go b/internal/biz/cmb.go index c394d85..53acbf3 100644 --- a/internal/biz/cmb.go +++ b/internal/biz/cmb.go @@ -31,15 +31,14 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or if order != nil { - if order.Status.IsFail() { + if order.Status.IsFail() || order.Status.IsIng() { if err4 := v.orderRetry(ctx, order); err4 != nil { return orderNo, err4 } } - orderNo = order.OrderNo - return orderNo, err + return order.OrderNo, err } product, err3 := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) @@ -52,9 +51,7 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or return orderNo, err3 } - orderNo = order.OrderNo - - return orderNo, nil + return order.OrderNo, nil } func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (resp *v1.CmbQueryReply, err error) { diff --git a/internal/biz/order.go b/internal/biz/order.go index 8cccf2c..174c181 100644 --- a/internal/biz/order.go +++ b/internal/biz/order.go @@ -11,6 +11,43 @@ import ( "voucher/internal/pkg/lock" ) +func (v *VoucherBiz) OrderRetry(ctx context.Context, outBizNos []string) error { + + if len(outBizNos) > 0 { + + for _, outBizNo := range outBizNos { + + order, err := v.OrderRepo.GetByOutBizNo(ctx, vo.OrderTypeCmb, outBizNo) + if err != nil { + return fmt.Errorf(fmt.Sprintf("获取订单%s异常:%v", outBizNo, err)) + } + + if !order.Status.IsIng() { + return fmt.Errorf(fmt.Sprintf("订单%s状态异常:%s", order.OrderNo, order.Status)) + } + + if err4 := v.orderRetry(ctx, order); err4 != nil { + return err4 + } + } + + return nil + } + + return v.OrderRepo.FindIngInBatches(ctx, func(ctx context.Context, rows []*bo.OrderBo) error { + + for _, order := range rows { + + if err4 := v.orderRetry(ctx, order); err4 != nil { + return err4 + } + } + + return nil + }) + +} + func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (*bo.OrderBo, error) { order, err := v.create(ctx, req, product) @@ -32,7 +69,11 @@ func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, produc return nil, err } - return order, v.success(ctx, order, voucherNo) + if err = v.success(ctx, order, voucherNo); err != nil { + return nil, err + } + + return order, nil } func (v *VoucherBiz) orderRetry(ctx context.Context, order *bo.OrderBo) error { diff --git a/internal/biz/repo/order.go b/internal/biz/repo/order.go index b2c72ea..a1c583a 100644 --- a/internal/biz/repo/order.go +++ b/internal/biz/repo/order.go @@ -7,6 +7,7 @@ import ( ) type OrderRepo interface { + FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error) GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error) diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index a8efa66..89c7ef1 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -31,6 +31,26 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB { return p.db.DB(ctx).Model(model.Order{}) } +func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { + + var results = make([]*model.Order, 0) + + result := p.DB(ctx). + Where("status = ?", vo.OrderStatusIng.GetValue()). + FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { + // tx.RowsAffected 提供当前批处理中记录的计数(the count of records in the current batch) + // 'batch' 变量表示当前批号(the current batch number) + // 返回 error 将阻止更多的批处理 + return fun(ctx, p.ToBos(results)) + }) + + if result.Error != nil { + return result.Error + } + + return nil +} + func (p *OrderRepoImpl) FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { var results = make([]*model.Order, 0) @@ -199,8 +219,8 @@ func (p *OrderRepoImpl) Success(ctx context.Context, id uint64, voucherNo string tx := p.DB(ctx). Where(model.Order{ - ID: id, - Status: vo.OrderStatusIng.GetValue(), + ID: id, + //Status: vo.OrderStatusIng.GetValue(), }). Updates(model.Order{ Status: vo.OrderStatusSuccess.GetValue(), diff --git a/internal/server/wechat_notify_consumer.go b/internal/server/wechat_notify_consumer.go index d80ad2c..e9de469 100644 --- a/internal/server/wechat_notify_consumer.go +++ b/internal/server/wechat_notify_consumer.go @@ -8,6 +8,7 @@ import ( "github.com/go-kratos/kratos/v2/transport" "github.com/gogap/errors" "strings" + "sync/atomic" "time" "voucher/internal/conf" "voucher/internal/service" @@ -18,6 +19,9 @@ var _ transport.Server = (*WechatNotifyConsumer)(nil) type WechatNotifyConsumer struct { conf *conf.Bootstrap voucherService *service.VoucherService + + activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出 + shutdownFlag atomic.Bool // 关闭标记 } func NewWechatNotifyConsumer( @@ -128,6 +132,24 @@ func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer // 业务逻辑处理 func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntry) { + // 收到消息 + if w.shutdownFlag.Load() { + fmt.Println("wechat consumer 正在退出中,延期处理") + // 卡住,不再继续消费,等待退出 + time.Sleep(24 * time.Hour) + return + } + + // 标记活跃状态 + w.activeCnt.Add(1) + defer func() { + w.activeCnt.Add(-1) + if v := recover(); v != nil { + log.Errorf("wechat consumer 处理消息panic, ,%+v", v) + return + } + }() + log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody) ctx := context.Background() @@ -138,6 +160,41 @@ func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntr // Stop 停止消息消费 func (w *WechatNotifyConsumer) Stop(_ context.Context) error { - fmt.Println("关闭 wechat consumer 中...") + + if !w.conf.WechatNotifyMQ.IsOpenConsumer { + fmt.Println("wechat consumer 关闭完成!") + return nil + } + + fmt.Println("wechat consumer 关闭中...") + + w.shutdownFlag.Store(true) + + //shutdown之间,保证正在处理的消费先提交 + _ = w.blockWaitFinish() + + fmt.Println("wechat consumer 关闭完成") + + return nil +} + +// blockWaitFinish 阻塞等待业务完成 +func (c *WechatNotifyConsumer) blockWaitFinish() error { + // 每1s检查下业务是否都处理完成 + + for { + cnt := c.activeCnt.Load() + if cnt == 0 { + fmt.Println("wechat consumer 无业务处理,正常退") + break + } else { + fmt.Printf("wechat consumer 等待消费者退出,%d 个正在运行\n", cnt) + } + time.Sleep(1 * time.Second) + } + + //防止极端情况下commit未完成 + // nolint + time.Sleep(1 * time.Second) return nil } diff --git a/internal/service/cmb.go b/internal/service/cmb.go index 2a3043f..af6e7dc 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -43,6 +43,11 @@ func NewCmbService( } } +func (c *CmbService) OrderRetry(ctx context.Context, request *v1.OrderRetryRequest) (*v1.Empty, error) { + + return nil, c.VoucherBiz.OrderRetry(ctx, request.GetTransactionIds()) +} + func (c *CmbService) GetResponse(ctx context.Context, replyBizContent []byte) (*v1.CmbReply, error) { req := &bo.CmbResponseBo{ diff --git a/third_party/swagger_ui/openapi.yaml b/third_party/swagger_ui/openapi.yaml index eb8440b..26d9946 100644 --- a/third_party/swagger_ui/openapi.yaml +++ b/third_party/swagger_ui/openapi.yaml @@ -60,6 +60,24 @@ paths: application/json: schema: $ref: '#/components/schemas/api.v1.CmbRequest' + /voucher/cmb/v1/orderRetry: + post: + tags: + - Cmb + operationId: Cmb_OrderRetry + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/api.v1.OrderRetryRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/api.v1.Empty' /voucher/cmb/v1/product/query: post: tags: @@ -263,5 +281,12 @@ components: properties: encryptBody: type: string + api.v1.OrderRetryRequest: + type: object + properties: + transactionIds: + type: array + items: + type: string tags: - name: Cmb