diff --git a/internal/biz/cmb.go b/internal/biz/cmb.go index c214b67..5a4e2e1 100644 --- a/internal/biz/cmb.go +++ b/internal/biz/cmb.go @@ -16,7 +16,7 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or return err } - return nil + return v.PushOrderMQ(ctx, orderNo) }) return diff --git a/internal/biz/cmb/cmb.go b/internal/biz/cmb/cmb.go index 16e067b..e99fd78 100644 --- a/internal/biz/cmb/cmb.go +++ b/internal/biz/cmb/cmb.go @@ -4,35 +4,28 @@ import ( "voucher/internal/biz/mixrepos" "voucher/internal/biz/repo" "voucher/internal/biz/wechatrepo" - "voucher/internal/conf" "voucher/internal/data" ) type Cmb struct { - bc *conf.Bootstrap rdb *data.Rdb OrderRepo repo.OrderRepo ProductRepo repo.ProductRepo - MqSendMixRepo mixrepos.MQSendMixRepo WechatCpnRepo wechatrepo.WechatCpnRepo GenerateMixRepo mixrepos.GenerateMixRepo } func NewCmb( - bc *conf.Bootstrap, rdb *data.Rdb, orderRepo repo.OrderRepo, ProductRepo repo.ProductRepo, - MqSendMixRepo mixrepos.MQSendMixRepo, WechatCpnRepo wechatrepo.WechatCpnRepo, GenerateMixRepo mixrepos.GenerateMixRepo, ) *Cmb { return &Cmb{ - bc: bc, rdb: rdb, OrderRepo: orderRepo, ProductRepo: ProductRepo, - MqSendMixRepo: MqSendMixRepo, WechatCpnRepo: WechatCpnRepo, GenerateMixRepo: GenerateMixRepo, } diff --git a/internal/biz/cmb/consume.go b/internal/biz/cmb/consume.go index a9b956e..287b694 100644 --- a/internal/biz/cmb/consume.go +++ b/internal/biz/cmb/consume.go @@ -2,47 +2,13 @@ package cmb import ( "context" - "fmt" - "go.opentelemetry.io/otel/trace" "voucher/internal/biz/bo" - "voucher/internal/pkg/mq" ) -func (v *Cmb) PushOrderMQ(ctx context.Context, orderNo string) error { - - eventMap := v.bc.RocketMQ.EventMap["order"] - sendOption := []mq.SendOption{ - mq.WithSendShardingKeysOption(fmt.Sprintf("%s", orderNo)), - mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), - } - - if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { - return fmt.Errorf("收单成功,消费队列投递失败[%v]", err) - } - - return nil -} - func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (err error) { return } -func (v *Cmb) PushQueryDelayMQ(ctx context.Context, orderNo string) error { - - eventMap := v.bc.RocketMQ.EventMap["query"] - sendOption := []mq.SendOption{ - mq.WithSendShardingKeysOption(orderNo), - mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), - mq.WithSendDelayLevelOption(300), - } - - if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { - return fmt.Errorf("查询入队投递失败[%v]", err) - } - - return nil -} - func (v *Cmb) QueryConsume(ctx context.Context, order *bo.OrderBo) (err error) { return } diff --git a/internal/biz/cmb/voucher.go b/internal/biz/cmb/voucher.go index 6191b82..c430724 100644 --- a/internal/biz/cmb/voucher.go +++ b/internal/biz/cmb/voucher.go @@ -19,6 +19,7 @@ func (v *Cmb) Order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo stri func (v *Cmb) order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) { order, err := v.OrderRepo.GetByOutBizNo(ctx, req.OutBizNo) + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { return } @@ -56,8 +57,6 @@ func (v *Cmb) order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo stri return } - err = v.PushOrderMQ(ctx, orderNo) - return } diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index 94eccf1..1087652 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -1,25 +1,68 @@ package biz import ( + "context" + "fmt" + "go.opentelemetry.io/otel/trace" "voucher/internal/biz/cmb" + "voucher/internal/biz/mixrepos" "voucher/internal/biz/repo" + "voucher/internal/conf" "voucher/internal/data" + "voucher/internal/pkg/mq" ) type VoucherBiz struct { - rdb *data.Rdb - Cmb *cmb.Cmb - OrderRepo repo.OrderRepo + bc *conf.Bootstrap + rdb *data.Rdb + Cmb *cmb.Cmb + OrderRepo repo.OrderRepo + MqSendMixRepo mixrepos.MQSendMixRepo } func NewVoucherBiz( + bc *conf.Bootstrap, rdb *data.Rdb, Cmb *cmb.Cmb, OrderRepo repo.OrderRepo, + MqSendMixRepo mixrepos.MQSendMixRepo, ) *VoucherBiz { return &VoucherBiz{ - rdb: rdb, - Cmb: Cmb, - OrderRepo: OrderRepo, + bc: bc, + rdb: rdb, + Cmb: Cmb, + OrderRepo: OrderRepo, + MqSendMixRepo: MqSendMixRepo, } } + +func (v *VoucherBiz) PushOrderMQ(ctx context.Context, orderNo string) error { + + eventMap := v.bc.RocketMQ.EventMap["order"] + sendOption := []mq.SendOption{ + mq.WithSendShardingKeysOption(fmt.Sprintf("%s", orderNo)), + mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), + } + + if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { + return fmt.Errorf("收单成功,消费队列投递失败[%v]", err) + } + + return nil +} + +func (v *VoucherBiz) PushQueryDelayMQ(ctx context.Context, orderNo string) error { + + eventMap := v.bc.RocketMQ.EventMap["query"] + sendOption := []mq.SendOption{ + mq.WithSendShardingKeysOption(orderNo), + mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), + mq.WithSendDelayLevelOption(300), + } + + if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { + return fmt.Errorf("查询入队投递失败[%v]", err) + } + + return nil +}