order retry
This commit is contained in:
parent
9c62904de5
commit
3120c34de5
|
|
@ -29,6 +29,12 @@ service Cmb {
|
|||
};
|
||||
}
|
||||
|
||||
rpc OrderRetry (OrderRetryRequest) returns (Empty) {
|
||||
option (google.api.http) = {
|
||||
post: "/voucher/cmb/v1/orderRetry",
|
||||
body: "*"
|
||||
};
|
||||
}
|
||||
|
||||
rpc OrderMock (CmbOrderRequest) returns (CmbRequest) {
|
||||
option (google.api.http) = {
|
||||
|
|
@ -64,6 +70,10 @@ service Cmb {
|
|||
}
|
||||
}
|
||||
|
||||
message OrderRetryRequest {
|
||||
repeated string transactionIds = 1 [json_name = "transactionIds"];
|
||||
}
|
||||
|
||||
message CmbRequest {
|
||||
// 请求公共参数
|
||||
// 合作方唯一ID,32位定长
|
||||
|
|
|
|||
|
|
@ -31,15 +31,14 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or
|
|||
|
||||
if order != nil {
|
||||
|
||||
if order.Status.IsFail() {
|
||||
if order.Status.IsFail() || order.Status.IsIng() {
|
||||
|
||||
if err4 := v.orderRetry(ctx, order); err4 != nil {
|
||||
return orderNo, err4
|
||||
}
|
||||
}
|
||||
|
||||
orderNo = order.OrderNo
|
||||
return orderNo, err
|
||||
return order.OrderNo, err
|
||||
}
|
||||
|
||||
product, err3 := v.ProductRepo.GetByProductNo(ctx, req.ProductNo)
|
||||
|
|
@ -52,9 +51,7 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or
|
|||
return orderNo, err3
|
||||
}
|
||||
|
||||
orderNo = order.OrderNo
|
||||
|
||||
return orderNo, nil
|
||||
return order.OrderNo, nil
|
||||
}
|
||||
|
||||
func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (resp *v1.CmbQueryReply, err error) {
|
||||
|
|
|
|||
|
|
@ -11,6 +11,43 @@ import (
|
|||
"voucher/internal/pkg/lock"
|
||||
)
|
||||
|
||||
func (v *VoucherBiz) OrderRetry(ctx context.Context, outBizNos []string) error {
|
||||
|
||||
if len(outBizNos) > 0 {
|
||||
|
||||
for _, outBizNo := range outBizNos {
|
||||
|
||||
order, err := v.OrderRepo.GetByOutBizNo(ctx, vo.OrderTypeCmb, outBizNo)
|
||||
if err != nil {
|
||||
return fmt.Errorf(fmt.Sprintf("获取订单%s异常:%v", outBizNo, err))
|
||||
}
|
||||
|
||||
if !order.Status.IsIng() {
|
||||
return fmt.Errorf(fmt.Sprintf("订单%s状态异常:%s", order.OrderNo, order.Status))
|
||||
}
|
||||
|
||||
if err4 := v.orderRetry(ctx, order); err4 != nil {
|
||||
return err4
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return v.OrderRepo.FindIngInBatches(ctx, func(ctx context.Context, rows []*bo.OrderBo) error {
|
||||
|
||||
for _, order := range rows {
|
||||
|
||||
if err4 := v.orderRetry(ctx, order); err4 != nil {
|
||||
return err4
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (*bo.OrderBo, error) {
|
||||
|
||||
order, err := v.create(ctx, req, product)
|
||||
|
|
@ -32,7 +69,11 @@ func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, produc
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return order, v.success(ctx, order, voucherNo)
|
||||
if err = v.success(ctx, order, voucherNo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return order, nil
|
||||
}
|
||||
|
||||
func (v *VoucherBiz) orderRetry(ctx context.Context, order *bo.OrderBo) error {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import (
|
|||
)
|
||||
|
||||
type OrderRepo interface {
|
||||
FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
||||
FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
||||
GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error)
|
||||
GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error)
|
||||
|
|
|
|||
|
|
@ -31,6 +31,26 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB {
|
|||
return p.db.DB(ctx).Model(model.Order{})
|
||||
}
|
||||
|
||||
func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
|
||||
|
||||
var results = make([]*model.Order, 0)
|
||||
|
||||
result := p.DB(ctx).
|
||||
Where("status = ?", vo.OrderStatusIng.GetValue()).
|
||||
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
|
||||
// tx.RowsAffected 提供当前批处理中记录的计数(the count of records in the current batch)
|
||||
// 'batch' 变量表示当前批号(the current batch number)
|
||||
// 返回 error 将阻止更多的批处理
|
||||
return fun(ctx, p.ToBos(results))
|
||||
})
|
||||
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *OrderRepoImpl) FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
|
||||
|
||||
var results = make([]*model.Order, 0)
|
||||
|
|
@ -199,8 +219,8 @@ func (p *OrderRepoImpl) Success(ctx context.Context, id uint64, voucherNo string
|
|||
|
||||
tx := p.DB(ctx).
|
||||
Where(model.Order{
|
||||
ID: id,
|
||||
Status: vo.OrderStatusIng.GetValue(),
|
||||
ID: id,
|
||||
//Status: vo.OrderStatusIng.GetValue(),
|
||||
}).
|
||||
Updates(model.Order{
|
||||
Status: vo.OrderStatusSuccess.GetValue(),
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"github.com/go-kratos/kratos/v2/transport"
|
||||
"github.com/gogap/errors"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"voucher/internal/conf"
|
||||
"voucher/internal/service"
|
||||
|
|
@ -18,6 +19,9 @@ var _ transport.Server = (*WechatNotifyConsumer)(nil)
|
|||
type WechatNotifyConsumer struct {
|
||||
conf *conf.Bootstrap
|
||||
voucherService *service.VoucherService
|
||||
|
||||
activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出
|
||||
shutdownFlag atomic.Bool // 关闭标记
|
||||
}
|
||||
|
||||
func NewWechatNotifyConsumer(
|
||||
|
|
@ -128,6 +132,24 @@ func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer
|
|||
|
||||
// 业务逻辑处理
|
||||
func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntry) {
|
||||
// 收到消息
|
||||
if w.shutdownFlag.Load() {
|
||||
fmt.Println("wechat consumer 正在退出中,延期处理")
|
||||
// 卡住,不再继续消费,等待退出
|
||||
time.Sleep(24 * time.Hour)
|
||||
return
|
||||
}
|
||||
|
||||
// 标记活跃状态
|
||||
w.activeCnt.Add(1)
|
||||
defer func() {
|
||||
w.activeCnt.Add(-1)
|
||||
if v := recover(); v != nil {
|
||||
log.Errorf("wechat consumer 处理消息panic, ,%+v", v)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody)
|
||||
|
||||
ctx := context.Background()
|
||||
|
|
@ -138,6 +160,41 @@ func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntr
|
|||
|
||||
// Stop 停止消息消费
|
||||
func (w *WechatNotifyConsumer) Stop(_ context.Context) error {
|
||||
fmt.Println("关闭 wechat consumer 中...")
|
||||
|
||||
if !w.conf.WechatNotifyMQ.IsOpenConsumer {
|
||||
fmt.Println("wechat consumer 关闭完成!")
|
||||
return nil
|
||||
}
|
||||
|
||||
fmt.Println("wechat consumer 关闭中...")
|
||||
|
||||
w.shutdownFlag.Store(true)
|
||||
|
||||
//shutdown之间,保证正在处理的消费先提交
|
||||
_ = w.blockWaitFinish()
|
||||
|
||||
fmt.Println("wechat consumer 关闭完成")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// blockWaitFinish 阻塞等待业务完成
|
||||
func (c *WechatNotifyConsumer) blockWaitFinish() error {
|
||||
// 每1s检查下业务是否都处理完成
|
||||
|
||||
for {
|
||||
cnt := c.activeCnt.Load()
|
||||
if cnt == 0 {
|
||||
fmt.Println("wechat consumer 无业务处理,正常退")
|
||||
break
|
||||
} else {
|
||||
fmt.Printf("wechat consumer 等待消费者退出,%d 个正在运行\n", cnt)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
//防止极端情况下commit未完成
|
||||
// nolint
|
||||
time.Sleep(1 * time.Second)
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,6 +43,11 @@ func NewCmbService(
|
|||
}
|
||||
}
|
||||
|
||||
func (c *CmbService) OrderRetry(ctx context.Context, request *v1.OrderRetryRequest) (*v1.Empty, error) {
|
||||
|
||||
return nil, c.VoucherBiz.OrderRetry(ctx, request.GetTransactionIds())
|
||||
}
|
||||
|
||||
func (c *CmbService) GetResponse(ctx context.Context, replyBizContent []byte) (*v1.CmbReply, error) {
|
||||
|
||||
req := &bo.CmbResponseBo{
|
||||
|
|
|
|||
|
|
@ -60,6 +60,24 @@ paths:
|
|||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/api.v1.CmbRequest'
|
||||
/voucher/cmb/v1/orderRetry:
|
||||
post:
|
||||
tags:
|
||||
- Cmb
|
||||
operationId: Cmb_OrderRetry
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/api.v1.OrderRetryRequest'
|
||||
required: true
|
||||
responses:
|
||||
"200":
|
||||
description: OK
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/api.v1.Empty'
|
||||
/voucher/cmb/v1/product/query:
|
||||
post:
|
||||
tags:
|
||||
|
|
@ -263,5 +281,12 @@ components:
|
|||
properties:
|
||||
encryptBody:
|
||||
type: string
|
||||
api.v1.OrderRetryRequest:
|
||||
type: object
|
||||
properties:
|
||||
transactionIds:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
tags:
|
||||
- name: Cmb
|
||||
|
|
|
|||
Loading…
Reference in New Issue