wechat retry

This commit is contained in:
ziming 2025-06-05 18:27:22 +08:00
parent 0df46d71ba
commit 69c4171a20
10 changed files with 208 additions and 32 deletions

View File

@ -91,6 +91,12 @@ rdsMQ:
numWorkers: 1 #协程数量不配置默认为10 numWorkers: 1 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间 waitTime: 1s #处理完成后等待时间
isOpen: true #是否启动消费 true/false isOpen: true #是否启动消费 true/false
orderRetry: #发放结算
name: "orderRetry"
retryNum: 1 #重试次数
numWorkers: 1 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间
isOpen: true #是否启动消费 true/false
#配置日志 #配置日志
logs: logs:

View File

@ -8,6 +8,7 @@ import (
type OrderRepo interface { type OrderRepo interface {
FinByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error) GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error)

View File

@ -0,0 +1,62 @@
package biz
import (
"context"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"time"
"voucher/internal/biz/bo"
)
func (v *VoucherBiz) PushWechatRetry(ctx context.Context, productNo string) error {
product, err := v.ProductRepo.GetByProductNo(ctx, productNo)
if err != nil {
return err
}
queue := v.bc.RdsMQ.GetWechatRetry()
if queue == nil {
return fmt.Errorf("队列不存在")
}
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, product.BatchNo).Result()
if err != nil {
return fmt.Errorf("添加到队列失败:%v", err)
}
return nil
}
func (v *VoucherBiz) WechatRetry(ctx context.Context, batchNo string) error {
if batchNo == "" {
return fmt.Errorf("batchNo is empty")
}
log.Infof("失败订单重试开始,batchNo:%s", batchNo)
num := 0
return v.OrderRepo.FinFailByStockIdInBatches(ctx, batchNo, func(ctx context.Context, rows []*bo.OrderBo) error {
if len(rows) == 0 {
log.Infof("微信查询券订单状态,batchNo[%s],已处理[%d]单,无订单,结束执行", batchNo, num)
return nil
}
for _, order := range rows {
num += 1
if err := v.orderRetry(ctx, order); err != nil {
log.Errorf("失败订单重试发生错误,batchNo:%s,orderNo:%s,appId:%s,openId:%s,err:%v",
batchNo, order.OrderNo, order.AppID, order.Account, err)
}
}
time.Sleep(1 * time.Second)
return nil
})
}

View File

@ -841,6 +841,7 @@ type RdsMQ struct {
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"` WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"`
WechatRetry *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"`
} }
func (x *RdsMQ) Reset() { func (x *RdsMQ) Reset() {
@ -882,6 +883,13 @@ func (x *RdsMQ) GetWechatQuery() *RdsMQ_Queue {
return nil return nil
} }
func (x *RdsMQ) GetWechatRetry() *RdsMQ_Queue {
if x != nil {
return x.WechatRetry
}
return nil
}
type Logs struct { type Logs struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@ -1544,28 +1552,32 @@ var file_conf_conf_proto_rawDesc = []byte{
0x6b, 0x65, 0x79, 0x12, 0x35, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x6b, 0x65, 0x79, 0x12, 0x35, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01,
0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e,
0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xef, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xae,
0x01, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x02, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68,
0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e,
0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52,
0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68,
0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61,
0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76,
0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64,
0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61,
0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65,
0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x02,
0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08,
0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08,
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x57,
0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, 0x75,
0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, 0x74,
0x6e, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f,
0x6e, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72,
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x22,
0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e,
0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e,
0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76,
0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b,
0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@ -1620,19 +1632,20 @@ var file_conf_conf_proto_depIdxs = []int32{
15, // 13: voucher.config.RocketMQ.eventMap:type_name -> voucher.config.RocketMQ.EventMapEntry 15, // 13: voucher.config.RocketMQ.eventMap:type_name -> voucher.config.RocketMQ.EventMapEntry
17, // 14: voucher.config.Cron.commandMap:type_name -> voucher.config.Cron.CommandMapEntry 17, // 14: voucher.config.Cron.commandMap:type_name -> voucher.config.Cron.CommandMapEntry
18, // 15: voucher.config.RdsMQ.wechatQuery:type_name -> voucher.config.RdsMQ.Queue 18, // 15: voucher.config.RdsMQ.wechatQuery:type_name -> voucher.config.RdsMQ.Queue
19, // 16: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration 18, // 16: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue
19, // 17: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration 19, // 17: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration
19, // 18: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration 19, // 18: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration
19, // 19: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration 19, // 19: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration
19, // 20: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration 19, // 20: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration
4, // 21: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap 19, // 21: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration
16, // 22: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap 4, // 22: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap
19, // 23: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration 16, // 23: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap
24, // [24:24] is the sub-list for method output_type 19, // 24: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration
24, // [24:24] is the sub-list for method input_type 25, // [25:25] is the sub-list for method output_type
24, // [24:24] is the sub-list for extension type_name 25, // [25:25] is the sub-list for method input_type
24, // [24:24] is the sub-list for extension extendee 25, // [25:25] is the sub-list for extension type_name
0, // [0:24] is the sub-list for field type_name 25, // [25:25] is the sub-list for extension extendee
0, // [0:25] is the sub-list for field type_name
} }
func init() { file_conf_conf_proto_init() } func init() { file_conf_conf_proto_init() }

View File

@ -128,6 +128,7 @@ message RdsMQ {
google.protobuf.Duration waitTime = 5; google.protobuf.Duration waitTime = 5;
} }
Queue wechatQuery = 1; Queue wechatQuery = 1;
Queue wechatRetry = 2;
} }
message Logs { message Logs {

View File

@ -48,6 +48,25 @@ func (p *OrderRepoImpl) FinByStockIdInBatches(ctx context.Context, batchNo strin
return nil return nil
} }
func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
var results = make([]*model.Order, 0)
result := p.DB(ctx).
Where("batch_no = ?", batchNo).
Where("status = ?", vo.OrderStatusFail.GetValue()).
Where("remark <> ?", "error: code = 500 reason = WechatFAIL message = 微信返回错误:该用户账号异常,无法领券。商家可联系微信支付或让用户联系微信支付客服处理。 metadat").
FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error {
return fun(ctx, p.ToBos(results))
})
if result.Error != nil {
return result.Error
}
return nil
}
func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
var results = make([]*model.Order, 0) var results = make([]*model.Order, 0)

View File

@ -40,6 +40,7 @@ func NewHTTPServer(
srv.Route("/voucher/").GET("queryOrder/{order_no}", cmb.QueryOrder) srv.Route("/voucher/").GET("queryOrder/{order_no}", cmb.QueryOrder)
srv.Route("/voucher/").GET("registerTag/{product_no}", cmb.RegisterTag) srv.Route("/voucher/").GET("registerTag/{product_no}", cmb.RegisterTag)
srv.Route("/voucher/").GET("pushWechatQuery/{product_no}", cmb.PushWechatQuery) srv.Route("/voucher/").GET("pushWechatQuery/{product_no}", cmb.PushWechatQuery)
srv.Route("/voucher/").GET("pushWechatQuery/{product_no}", cmb.PushWechatRetry)
v1.RegisterCmbHTTPServer(srv, cmb) v1.RegisterCmbHTTPServer(srv, cmb)

View File

@ -30,6 +30,10 @@ func NewRdbConsumer(
manager.Add(cf) manager.Add(cf)
} }
if cf2 := voucherService.GetWechatConfig(); cf2 != nil {
manager.Add(cf2)
}
return &RdbConsumer{hLog: hLog, conf: conf, manager: manager} return &RdbConsumer{hLog: hLog, conf: conf, manager: manager}
} }

View File

@ -129,3 +129,20 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error {
"data": productNo, "data": productNo,
}) })
} }
func (this *CmbService) PushWechatRetry(ctx http.Context) error {
productNo := ctx.Vars().Get("product_no")
if productNo == "" {
return fmt.Errorf("product_no is empty")
}
err := this.VoucherBiz.PushWechatRetry(ctx, productNo)
if err != nil {
return err
}
return ctx.JSON(http2.StatusOK, map[string]interface{}{
"data": productNo,
})
}

View File

@ -0,0 +1,52 @@
package service
import (
"context"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"time"
"voucher/internal/pkg/rdsmq"
)
func (s *VoucherService) GetWechatConfig() *rdsmq.ConsumeConfig {
queue := s.bc.RdsMQ.GetWechatRetry()
if queue == nil {
return nil
}
if !queue.GetIsOpen() {
log.Warn(fmt.Sprintf("[%s]RdsMQ is not open", queue.Name))
return nil
}
return &rdsmq.ConsumeConfig{
Rdb: s.rdb.Rdb,
QueueName: queue.Name,
NumWorkers: queue.NumWorkers,
WaitTime: queue.GetWaitTime().AsDuration(),
RetryNum: queue.RetryNum,
Fn: s.HandleWechat,
Logger: s.logHelper,
}
}
func (s *VoucherService) HandleWechat(ctx context.Context, batchNo string) error {
if batchNo == "" {
s.logHelper.Errorf("RdsMQ keySend error: batchNo is empty")
return nil
}
start := time.Now()
fmt.Printf("失败订单重试处理开始:%s,batchNo:%s", start.String(), batchNo)
if err := s.VoucherBiz.WechatRetry(ctx, batchNo); err != nil {
s.logHelper.Error(err)
}
log.Warnf("失败订单重试处理耗时:%s,batchNo:%s", time.Now().Sub(start).String(), batchNo)
fmt.Printf("失败订单重试处理耗时:%s,batchNo:%s", time.Now().Sub(start).String(), batchNo)
return nil
}