This commit is contained in:
李子铭 2025-03-07 17:57:03 +08:00
parent e8e894b8ec
commit 09bf8b90fc
5 changed files with 37 additions and 32 deletions

View File

@ -206,6 +206,10 @@ func (v *Cmb) bizContent(_ context.Context, orderWechat *bo.OrderWechatBo) (stri
func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) error { func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) error {
if !order.Channel.IsWeChat() {
return fmt.Errorf("暂不支持订单%s渠道%s回调通知处理", order.OrderNo, order.Channel.GetText())
}
orderWechat, err := v.orderWechat(ctx, order, orderOutRequestNo) orderWechat, err := v.orderWechat(ctx, order, orderOutRequestNo)
if err != nil { if err != nil {
return err return err

View File

@ -24,21 +24,6 @@ func (v *VoucherBiz) PushOrderMQ(ctx context.Context, orderNo string) error {
return nil return nil
} }
func (v *VoucherBiz) PushNotifyMQ(ctx context.Context, orderNo, outRequestNo string) error {
eventMap := v.bc.RocketMQ.EventMap["notify"]
sendOption := []mq.SendOption{
mq.WithSendShardingKeysOption(fmt.Sprintf("%s_%s", orderNo, outRequestNo)),
mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()),
}
if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil {
return fmt.Errorf("notify消费队列投递失败[%v]", err)
}
return nil
}
func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) { func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) {
err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("order_consume_%s", orderNo), func(ctx context.Context) error { err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("order_consume_%s", orderNo), func(ctx context.Context) error {
@ -54,6 +39,7 @@ func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err erro
if err != nil { if err != nil {
return err return err
} }
return v.PushNotifyMQ(ctx, orderNo, outRequestNo) return v.PushNotifyMQ(ctx, orderNo, outRequestNo)
} }
@ -64,6 +50,21 @@ func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err erro
return return
} }
func (v *VoucherBiz) PushNotifyMQ(ctx context.Context, orderNo, outRequestNo string) error {
eventMap := v.bc.RocketMQ.EventMap["notify"]
sendOption := []mq.SendOption{
mq.WithSendShardingKeysOption(fmt.Sprintf("%s_%s", orderNo, outRequestNo)),
mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()),
}
if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil {
return fmt.Errorf("notify消费队列投递失败[%v]", err)
}
return nil
}
func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequestNo string) (err error) { func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequestNo string) (err error) {
err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("notify_consume_%s", orderNo), func(ctx context.Context) error { err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("notify_consume_%s", orderNo), func(ctx context.Context) error {

View File

@ -18,6 +18,7 @@ type CmbStatus string
const ( const (
CmbStatusSuccess CmbStatus = "0" CmbStatusSuccess CmbStatus = "0"
CmbStatusUse CmbStatus = "1" CmbStatusUse CmbStatus = "1"
CmbStatusExpired CmbStatus = "2" // 券过期-待确认是否通知
) )
func (s CmbStatus) GetValue() string { func (s CmbStatus) GetValue() string {

View File

@ -37,7 +37,7 @@ func (s OrderWechatStatus) IsExpired() bool {
} }
func (s OrderWechatStatus) CanNotify() bool { func (s OrderWechatStatus) CanNotify() bool {
return s.IsSuccess() || s.IsUse() return s.IsSuccess() || s.IsUse() || s.IsExpired()
} }
var OrderWechatStatusMap = map[OrderWechatStatus]string{ var OrderWechatStatusMap = map[OrderWechatStatus]string{
@ -51,6 +51,7 @@ var OrderWechatStatusMap = map[OrderWechatStatus]string{
var OrderStatusMapCmbStatus = map[OrderWechatStatus]CmbStatus{ var OrderStatusMapCmbStatus = map[OrderWechatStatus]CmbStatus{
OrderWechatStatusSuccess: CmbStatusSuccess, OrderWechatStatusSuccess: CmbStatusSuccess,
OrderWechatStatusUse: CmbStatusUse, OrderWechatStatusUse: CmbStatusUse,
OrderWechatStatusExpired: CmbStatusExpired,
} }
func (s OrderWechatStatus) GetCmbStatusText() (CmbStatus, error) { func (s OrderWechatStatus) GetCmbStatusText() (CmbStatus, error) {

View File

@ -26,34 +26,36 @@ func (j *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req *
} }
if req.PlainText.Status.IsUsed() { if req.PlainText.Status.IsUsed() {
return j.wechatVoucherUsed(ctx, orderWechat) err = j.wechatVoucherUsed(ctx, orderWechat)
} else if req.PlainText.Status.IsExpired() { } else if req.PlainText.Status.IsExpired() {
return j.wechatVoucherExpired(ctx, orderWechat) err = j.wechatVoucherExpired(ctx, orderWechat)
} else { } else {
return fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText()) err = fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText())
}
})
}
func (j *VoucherBiz) wechatVoucherUsed(ctx context.Context, orderWechat *bo.OrderWechatBo) error {
if orderWechat.Status.IsUse() {
return nil
} }
order, err := j.OrderRepo.GetByOrderNo(ctx, orderWechat.OrderNo)
if err != nil { if err != nil {
return err return err
} }
if err = j.OrderWechatRepo.Used(ctx, orderWechat.ID); err != nil { return j.PushNotifyMQ(ctx, orderWechat.OrderNo, orderWechat.OutRequestNo)
return err })
} }
if err = j.OrderRepo.Used(ctx, order.ID); err != nil {
return err
}
func (v *VoucherBiz) wechatVoucherUsed(ctx context.Context, orderWechat *bo.OrderWechatBo) error {
if orderWechat.Status.IsUse() {
return nil return nil
}
order, err := v.OrderRepo.GetByOrderNo(ctx, orderWechat.OrderNo)
if err != nil {
return err
}
if err = v.OrderWechatRepo.Used(ctx, orderWechat.ID); err != nil {
return err
}
return v.OrderRepo.Used(ctx, order.ID)
} }
func (j *VoucherBiz) wechatVoucherExpired(ctx context.Context, orderWechat *bo.OrderWechatBo) error { func (j *VoucherBiz) wechatVoucherExpired(ctx context.Context, orderWechat *bo.OrderWechatBo) error {
@ -70,9 +72,5 @@ func (j *VoucherBiz) wechatVoucherExpired(ctx context.Context, orderWechat *bo.O
return err return err
} }
if err = j.OrderRepo.Expired(ctx, order.ID); err != nil { return j.OrderRepo.Expired(ctx, order.ID)
return err
}
return nil
} }