package biz import ( "context" "fmt" "go.opentelemetry.io/otel/trace" "strconv" errPb "voucher/api/err" "voucher/internal/biz/bo" "voucher/internal/biz/vo" "voucher/internal/pkg/lock" "voucher/internal/pkg/mq" ) func (v *VoucherBiz) PushNotifyRetryDelayMQ(ctx context.Context, level int, notifyId uint64) error { str := strconv.FormatUint(notifyId, 10) eventMap := v.bc.RocketMQ.EventMap["notify_retry"] sendOption := []mq.SendOption{ mq.WithSendShardingKeysOption(str), mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), mq.WithSendDelayLevelOption(level), } if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { return fmt.Errorf("回调通知延迟队列,投递消息出错:err=%s", err.Error()) } return nil } func (v *VoucherBiz) NotifyRetryConsume(ctx context.Context, orderNotifyId uint64) error { var ( err error orderNotify *bo.OrderNotifyBo cache = vo.NotifyRetryConsume.BuildCacheUint64([]uint64{orderNotifyId}) ) err = lock.NewMutex(v.rdb.Rdb, cache.TTL).Lock(ctx, cache.Key, func(ctx context.Context) error { orderNotify, err = v.OrderNotifyRepo.GetByID(ctx, orderNotifyId) if err != nil { return err } order, err2 := v.OrderRepo.GetByOrderNo(ctx, orderNotify.OrderNo) if err2 != nil { return err2 } if order.Type.IsCmb() { orderNotify, err = v.Cmb.NotifyRetryConsume(ctx, order, orderNotify) } return nil }) if !errPb.IsNeedRetryNotify(err) { return err } level, err2 := v.level(ctx, orderNotify) if err2 != nil { return err2 } return v.PushNotifyRetryDelayMQ(ctx, level, orderNotify.ID) } func (v *VoucherBiz) level(ctx context.Context, orderNotify *bo.OrderNotifyBo) (int, error) { // 状态回调接口失败需要支持重试 重试间隔为1分钟、2分钟、12分钟、60分钟、360分钟 count, err := v.OrderNotifyRepo.GetCountByOrderNoAndEvent(ctx, orderNotify.OrderNo, orderNotify.Event) if err != nil { return 0, err } switch count { case 1: return 60, nil case 2: return 120, nil case 3: return 720, nil case 4: return 3600, nil case 5: return 21600, nil } return 0, fmt.Errorf("回调通知失败次数超过5次,不再重试") }