Merge branch 'pro' into kx

# Conflicts:
#	internal/biz/wechat_notify.go
This commit is contained in:
ziming 2025-06-09 15:13:46 +08:00
commit 5cb45a3aaf
7 changed files with 97 additions and 21 deletions

View File

@ -89,13 +89,13 @@ rdsMQ:
wechatQuery: #发放结算 wechatQuery: #发放结算
name: "wechatQuery" name: "wechatQuery"
retryNum: 1 #重试次数 retryNum: 1 #重试次数
numWorkers: 1 #协程数量不配置默认为10 numWorkers: 2 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间 waitTime: 1s #处理完成后等待时间
isOpen: true #是否启动消费 true/false isOpen: true #是否启动消费 true/false
orderRetry: #发放结算 orderRetry: #发放结算
name: "orderRetry" name: "orderRetry"
retryNum: 1 #重试次数 retryNum: 1 #重试次数
numWorkers: 1 #协程数量不配置默认为10 numWorkers: 2 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间 waitTime: 1s #处理完成后等待时间
isOpen: true #是否启动消费 true/false isOpen: true #是否启动消费 true/false

View File

@ -23,6 +23,7 @@ type OrderBo struct {
Channel vo.Channel Channel vo.Channel
Attach string Attach string
Remark string Remark string
TransactionId string
ReceiveSuccessTime *time.Time ReceiveSuccessTime *time.Time
LastUseTime *time.Time LastUseTime *time.Time
CreateTime *time.Time CreateTime *time.Time

View File

@ -15,12 +15,14 @@ type OrderRepo interface {
GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error) GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error)
GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error) GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error)
GetByVoucherNo(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error) GetByVoucherNo(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error)
GetByTransactionId(ctx context.Context, stockCreatorMchId, stockID, transactionId string) (*bo.OrderBo, error)
Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderBo, error) Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderBo, error)
GetByID(ctx context.Context, id uint64) (*bo.OrderBo, error) GetByID(ctx context.Context, id uint64) (*bo.OrderBo, error)
Ing(ctx context.Context, id uint64) error Ing(ctx context.Context, id uint64) error
Success(ctx context.Context, id uint64, voucherNo string) error Success(ctx context.Context, id uint64, voucherNo string) error
Fail(ctx context.Context, id uint64, remark string) error Fail(ctx context.Context, id uint64, remark string) error
Used(ctx context.Context, id uint64) error Used(ctx context.Context, id uint64) error
NotifyUsed(ctx context.Context, id uint64, transactionId string) error
Available(ctx context.Context, id uint64) error Available(ctx context.Context, id uint64) error
Expired(ctx context.Context, id uint64) error Expired(ctx context.Context, id uint64) error
} }

View File

@ -19,7 +19,7 @@ func (v *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req *
return lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { return lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error {
order, err := v.OrderRepo.GetByVoucherNo(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID) order, err := v.getOrder(ctx, req)
if err != nil { if err != nil {
return err return err
} }
@ -30,11 +30,12 @@ func (v *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req *
} else if req.PlainText.Status.IsUsed() { } else if req.PlainText.Status.IsUsed() {
return v.used(ctx, order, req) return v.notifyUsed(ctx, order, req)
} else if req.PlainText.Status.IsExpired() { } else if req.PlainText.Status.IsExpired() {
return v.expired(ctx, order) return v.expired(ctx, order)
} }
return fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText()) return fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText())
@ -84,26 +85,46 @@ func (v *VoucherBiz) createUseLog(ctx context.Context, order *bo.OrderBo, req *b
return nil return nil
} }
func (v *VoucherBiz) used(ctx context.Context, order *bo.OrderBo, req *bo.WechatVoucherNotifyBo) error { func (this *VoucherBiz) getOrder(ctx context.Context, req *bo.WechatVoucherNotifyBo) (*bo.OrderBo, error) {
//var usedTime time.Time order, err := this.OrderRepo.GetByVoucherNo(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID)
//if req.PlainText.ConsumeInformation.ConsumeTime != "" {
// usedTime, _ = time.Parse(time.RFC3339, req.PlainText.ConsumeInformation.ConsumeTime)
//} else {
// usedTime = time.Now()
//}
//if order.Status.IsUse() && usedTime == *order.LastUseTime { if err != nil {
// log.Warnf("券状态已是可使用,忽略不处理,orderNo:%s", order.OrderNo)
// return nil if !errors.Is(err, gorm.ErrRecordNotFound) {
//} return nil, err
}
order, err = this.OrderRepo.GetByTransactionId(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.ConsumeInformation.TransactionID)
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("微信回调消费,订单不存在,StockCreatorMchid:%s,StockID:%s,CouponID:%s",
req.PlainText.StockCreatorMchid,
req.PlainText.StockID,
req.PlainText.CouponID,
)
}
return nil, err
}
return order, nil
}
return order, nil
}
func (v *VoucherBiz) notifyUsed(ctx context.Context, order *bo.OrderBo, req *bo.WechatVoucherNotifyBo) error {
if order.Status.IsUse() { if order.Status.IsUse() {
log.Warnf("券状态已是可使用,忽略不处理,orderNo:%s", order.OrderNo) log.Warnf("券状态已是可使用,忽略不处理,orderNo:%s", order.OrderNo)
return nil return nil
} }
if err := v.OrderRepo.Used(ctx, order.ID); err != nil { if err := v.OrderRepo.NotifyUsed(ctx, order.ID, req.PlainText.ConsumeInformation.TransactionID); err != nil {
return err return err
} }

View File

@ -79,10 +79,24 @@ func (v *VoucherBiz) wechatQuery(ctx context.Context, order *bo.OrderBo) error {
} }
if status.IsUse() { if status.IsUse() {
return v.used(ctx, order) return v.queryUsed(ctx, order)
} else if status.IsExpired() { } else if status.IsExpired() {
return v.expired(ctx, order) return v.expired(ctx, order)
} }
return nil return nil
} }
func (v *VoucherBiz) queryUsed(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsUse() {
log.Warnf("券状态已是已使用,忽略不处理,orderNo:%s", order.OrderNo)
return nil
}
if err := v.OrderRepo.Used(ctx, order.ID); err != nil {
return err
}
return v.notify(ctx, order)
}

View File

@ -30,6 +30,7 @@ type Order struct {
Attach string `gorm:"column:attach;not null;comment:attach" json:"attach"` Attach string `gorm:"column:attach;not null;comment:attach" json:"attach"`
ReceiveSuccessTime *time.Time `gorm:"column:receive_success_time" json:"receive_success_time"` ReceiveSuccessTime *time.Time `gorm:"column:receive_success_time" json:"receive_success_time"`
LastUseTime *time.Time `gorm:"column:last_use_time" json:"last_use_time"` LastUseTime *time.Time `gorm:"column:last_use_time" json:"last_use_time"`
TransactionId string `gorm:"column:transaction_id;not null" json:"transaction_id"`
CreateTime *time.Time `gorm:"column:create_time" json:"create_time"` CreateTime *time.Time `gorm:"column:create_time" json:"create_time"`
UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"` UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"`
} }

View File

@ -46,7 +46,7 @@ func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.We
var results = make([]*model.Order, 0) var results = make([]*model.Order, 0)
result := tx.FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { result := tx.FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
return fun(ctx, p.ToBos(results)) return fun(ctx, p.ToBos(results))
}) })
@ -66,7 +66,7 @@ func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo s
Where("batch_no = ?", batchNo). Where("batch_no = ?", batchNo).
Where("status = ?", vo.OrderStatusFail.GetValue()). Where("status = ?", vo.OrderStatusFail.GetValue()).
//Where("remark <> ?", "error: code = 500 reason = WechatFAIL message = 微信返回错误:该用户账号异常,无法领券。商家可联系微信支付或让用户联系微信支付客服处理。 metadat"). //Where("remark <> ?", "error: code = 500 reason = WechatFAIL message = 微信返回错误:该用户账号异常,无法领券。商家可联系微信支付或让用户联系微信支付客服处理。 metadat").
FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
return fun(ctx, p.ToBos(results)) return fun(ctx, p.ToBos(results))
}) })
@ -83,8 +83,8 @@ func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx conte
result := p.DB(ctx). result := p.DB(ctx).
Where("status = ?", vo.OrderStatusIng.GetValue()). Where("status = ?", vo.OrderStatusIng.GetValue()).
Limit(100). Limit(20).
FindInBatches(&results, 20, func(tx *gorm.DB, batch int) error { FindInBatches(&results, 10, func(tx *gorm.DB, batch int) error {
return fun(ctx, p.ToBos(results)) return fun(ctx, p.ToBos(results))
}) })
@ -229,6 +229,22 @@ func (p *OrderRepoImpl) GetByVoucherNo(ctx context.Context, merchantNo, batchNo,
return p.ToBo(info), nil return p.ToBo(info), nil
} }
func (this *OrderRepoImpl) GetByTransactionId(ctx context.Context, stockCreatorMchId, stockID, transactionId string) (*bo.OrderBo, error) {
row := &model.Order{}
tx := this.DB(ctx).Where(model.Order{MerchantNo: stockCreatorMchId, BatchNo: stockID, TransactionId: transactionId}).First(&row)
if tx.Error != nil {
return nil, tx.Error
}
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return this.ToBo(row), nil
}
func (p *OrderRepoImpl) Ing(ctx context.Context, id uint64) error { func (p *OrderRepoImpl) Ing(ctx context.Context, id uint64) error {
now := time.Now() now := time.Now()
@ -338,6 +354,27 @@ func (p *OrderRepoImpl) Used(ctx context.Context, id uint64) error {
return nil return nil
} }
func (p *OrderRepoImpl) NotifyUsed(ctx context.Context, id uint64, transactionId string) error {
now := time.Now()
tx := p.DB(ctx).
Where(model.Order{
ID: id,
}).
Updates(model.Order{
Status: vo.OrderStatusUse.GetValue(),
TransactionId: transactionId,
LastUseTime: &now,
UpdateTime: &now,
})
if tx.Error != nil {
return fmt.Errorf("update db fail %w", tx.Error)
}
return nil
}
func (p *OrderRepoImpl) Expired(ctx context.Context, id uint64) error { func (p *OrderRepoImpl) Expired(ctx context.Context, id uint64) error {
now := time.Now() now := time.Now()