diff --git a/internal/biz/cmb/consume.go b/internal/biz/cmb/consume.go index db31775..428af5e 100644 --- a/internal/biz/cmb/consume.go +++ b/internal/biz/cmb/consume.go @@ -82,6 +82,14 @@ func (v *Cmb) QueryConsume(ctx context.Context, order *bo.OrderBo) (err error) { return } -func (v *Cmb) NotifyConsume(ctx context.Context, orderNo string) (err error) { - return +func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) error { + + orderWechat, err := v.OrderWechatRepo.GetByOutRequestNo(ctx, orderOutRequestNo) + if err != nil { + return err + } + + fmt.Printf("orderWechat:%+v", orderWechat) + + return nil } diff --git a/internal/biz/cmb/voucher.go b/internal/biz/cmb/voucher.go index c430724..0ee5fef 100644 --- a/internal/biz/cmb/voucher.go +++ b/internal/biz/cmb/voucher.go @@ -11,13 +11,6 @@ import ( func (v *Cmb) Order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) { - orderNo, err = v.order(ctx, req) - - return -} - -func (v *Cmb) order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) { - order, err := v.OrderRepo.GetByOutBizNo(ctx, req.OutBizNo) if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { @@ -61,7 +54,6 @@ func (v *Cmb) order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo stri } func (v *Cmb) Query(ctx context.Context, productNo string) (reps *bo.OrderCreateRepBo, err error) { - return } diff --git a/internal/biz/consume.go b/internal/biz/consume.go index b7dd089..5123f4b 100644 --- a/internal/biz/consume.go +++ b/internal/biz/consume.go @@ -93,17 +93,30 @@ func (v *VoucherBiz) QueryConsume(ctx context.Context, orderNo string) (err erro return v.Cmb.QueryConsume(ctx, order) } - return nil + return fmt.Errorf("订单类型错误:%s", order.Type.GetText()) }) return } -func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo string) (err error) { +func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequestNo string) (err error) { err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("notify_consume_%s", orderNo), func(ctx context.Context) error { - return nil + order, err := v.OrderRepo.GetByOrderNo(ctx, orderNo) + if err != nil { + return err + } + + if order.Status.IsSuccess() { + return fmt.Errorf("订单状态错误,不能通知:%s", order.Status.GetText()) + } + + if order.Type.IsCmb() { + return v.Cmb.NotifyConsume(ctx, order, orderOutRequestNo) + } + + return fmt.Errorf("订单类型错误:%s", order.Type.GetText()) }) return diff --git a/internal/biz/repo/order_wechat.go b/internal/biz/repo/order_wechat.go index 321922b..69be1f7 100644 --- a/internal/biz/repo/order_wechat.go +++ b/internal/biz/repo/order_wechat.go @@ -9,4 +9,5 @@ type OrderWechatRepo interface { Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error) Success(ctx context.Context, id uint64, couponId string) error Fail(ctx context.Context, id uint64, remark string) error + GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) } diff --git a/internal/data/repoimpl/order_wechat.go b/internal/data/repoimpl/order_wechat.go index c16ca55..4e6e7ba 100644 --- a/internal/data/repoimpl/order_wechat.go +++ b/internal/data/repoimpl/order_wechat.go @@ -49,6 +49,22 @@ func (p *OrderWechatRepoImpl) Create(ctx context.Context, req *bo.OrderWechatBo) return p.ToBo(info), nil } +func (p *OrderWechatRepoImpl) GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) { + info := &model.OrderWechat{} + + tx := p.DB(ctx).Where(model.OrderWechat{OutRequestNo: outRequestNo}).Find(&info) + + if tx.Error != nil { + return nil, tx.Error + } + + if tx.RowsAffected == 0 { + return nil, gorm.ErrRecordNotFound + } + + return p.ToBo(info), nil +} + func (p *OrderWechatRepoImpl) Success(ctx context.Context, id uint64, couponId string) error { now := time.Now() diff --git a/internal/service/consume.go b/internal/service/consume.go index dbdec69..09c459c 100644 --- a/internal/service/consume.go +++ b/internal/service/consume.go @@ -4,6 +4,7 @@ import ( "context" "errors" "github.com/go-kratos/kratos/v2/log" + "strings" "voucher/internal/pkg/mq" ) @@ -93,14 +94,16 @@ func (j *VoucherService) GetNotifyConfig() *mq.ConsumerConfig { func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { - orderNo := msg.GetShardingKey() - if orderNo == "" { - log.Error("orderNotify 消费异常,获取 orderNo 失败") + shardingKey := msg.GetShardingKey() + if shardingKey == "" { + log.Error("orderNotify 消费异常,获取 shardingKey 失败") return errors.New("orderNotify 消费异常,获取 orderNo 失败") } - if err := j.VoucherBiz.NotifyConsume(ctx, orderNo); err != nil { - log.Errorf("notify 消费异常,orderNo:%s,error: %s", orderNo, err.Error()) + rep := strings.Split(shardingKey, "_") + + if err := j.VoucherBiz.NotifyConsume(ctx, rep[0], rep[1]); err != nil { + log.Errorf("notify 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error()) } return nil