This commit is contained in:
李子铭 2025-03-04 17:46:42 +08:00
parent 36e6589c49
commit 0ce8934364
6 changed files with 51 additions and 18 deletions

View File

@ -82,6 +82,14 @@ func (v *Cmb) QueryConsume(ctx context.Context, order *bo.OrderBo) (err error) {
return return
} }
func (v *Cmb) NotifyConsume(ctx context.Context, orderNo string) (err error) { func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) error {
return
orderWechat, err := v.OrderWechatRepo.GetByOutRequestNo(ctx, orderOutRequestNo)
if err != nil {
return err
}
fmt.Printf("orderWechat:%+v", orderWechat)
return nil
} }

View File

@ -11,13 +11,6 @@ import (
func (v *Cmb) Order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) { func (v *Cmb) Order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) {
orderNo, err = v.order(ctx, req)
return
}
func (v *Cmb) order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) {
order, err := v.OrderRepo.GetByOutBizNo(ctx, req.OutBizNo) order, err := v.OrderRepo.GetByOutBizNo(ctx, req.OutBizNo)
if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) {
@ -61,7 +54,6 @@ func (v *Cmb) order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo stri
} }
func (v *Cmb) Query(ctx context.Context, productNo string) (reps *bo.OrderCreateRepBo, err error) { func (v *Cmb) Query(ctx context.Context, productNo string) (reps *bo.OrderCreateRepBo, err error) {
return return
} }

View File

@ -93,17 +93,30 @@ func (v *VoucherBiz) QueryConsume(ctx context.Context, orderNo string) (err erro
return v.Cmb.QueryConsume(ctx, order) return v.Cmb.QueryConsume(ctx, order)
} }
return nil return fmt.Errorf("订单类型错误:%s", order.Type.GetText())
}) })
return return
} }
func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo string) (err error) { func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequestNo string) (err error) {
err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("notify_consume_%s", orderNo), func(ctx context.Context) error { err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("notify_consume_%s", orderNo), func(ctx context.Context) error {
return nil order, err := v.OrderRepo.GetByOrderNo(ctx, orderNo)
if err != nil {
return err
}
if order.Status.IsSuccess() {
return fmt.Errorf("订单状态错误,不能通知:%s", order.Status.GetText())
}
if order.Type.IsCmb() {
return v.Cmb.NotifyConsume(ctx, order, orderOutRequestNo)
}
return fmt.Errorf("订单类型错误:%s", order.Type.GetText())
}) })
return return

View File

@ -9,4 +9,5 @@ type OrderWechatRepo interface {
Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error) Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error)
Success(ctx context.Context, id uint64, couponId string) error Success(ctx context.Context, id uint64, couponId string) error
Fail(ctx context.Context, id uint64, remark string) error Fail(ctx context.Context, id uint64, remark string) error
GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error)
} }

View File

@ -49,6 +49,22 @@ func (p *OrderWechatRepoImpl) Create(ctx context.Context, req *bo.OrderWechatBo)
return p.ToBo(info), nil return p.ToBo(info), nil
} }
func (p *OrderWechatRepoImpl) GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) {
info := &model.OrderWechat{}
tx := p.DB(ctx).Where(model.OrderWechat{OutRequestNo: outRequestNo}).Find(&info)
if tx.Error != nil {
return nil, tx.Error
}
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return p.ToBo(info), nil
}
func (p *OrderWechatRepoImpl) Success(ctx context.Context, id uint64, couponId string) error { func (p *OrderWechatRepoImpl) Success(ctx context.Context, id uint64, couponId string) error {
now := time.Now() now := time.Now()

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"strings"
"voucher/internal/pkg/mq" "voucher/internal/pkg/mq"
) )
@ -93,14 +94,16 @@ func (j *VoucherService) GetNotifyConfig() *mq.ConsumerConfig {
func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
orderNo := msg.GetShardingKey() shardingKey := msg.GetShardingKey()
if orderNo == "" { if shardingKey == "" {
log.Error("orderNotify 消费异常,获取 orderNo 失败") log.Error("orderNotify 消费异常,获取 shardingKey 失败")
return errors.New("orderNotify 消费异常,获取 orderNo 失败") return errors.New("orderNotify 消费异常,获取 orderNo 失败")
} }
if err := j.VoucherBiz.NotifyConsume(ctx, orderNo); err != nil { rep := strings.Split(shardingKey, "_")
log.Errorf("notify 消费异常,orderNo:%s,error: %s", orderNo, err.Error())
if err := j.VoucherBiz.NotifyConsume(ctx, rep[0], rep[1]); err != nil {
log.Errorf("notify 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error())
} }
return nil return nil