diff --git a/internal/biz/consume.go b/internal/biz/consume.go index 033fb89..f53244c 100644 --- a/internal/biz/consume.go +++ b/internal/biz/consume.go @@ -3,10 +3,43 @@ package biz import ( "context" "fmt" + "go.opentelemetry.io/otel/trace" "time" "voucher/internal/pkg/lock" + "voucher/internal/pkg/mq" ) +func (v *VoucherBiz) PushOrderMQ(ctx context.Context, orderNo string) error { + + eventMap := v.bc.RocketMQ.EventMap["order"] + sendOption := []mq.SendOption{ + mq.WithSendShardingKeysOption(fmt.Sprintf("%s", orderNo)), + mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), + } + + if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { + return fmt.Errorf("收单成功,消费队列投递失败[%v]", err) + } + + return nil +} + +func (v *VoucherBiz) PushQueryDelayMQ(ctx context.Context, orderNo string) error { + + eventMap := v.bc.RocketMQ.EventMap["query"] + sendOption := []mq.SendOption{ + mq.WithSendShardingKeysOption(orderNo), + mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), + mq.WithSendDelayLevelOption(300), + } + + if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { + return fmt.Errorf("查询入队投递失败[%v]", err) + } + + return nil +} + func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) { err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("order_consume_%s", orderNo), func(ctx context.Context) error { diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index 1087652..95fe67b 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -1,15 +1,11 @@ package biz import ( - "context" - "fmt" - "go.opentelemetry.io/otel/trace" "voucher/internal/biz/cmb" "voucher/internal/biz/mixrepos" "voucher/internal/biz/repo" "voucher/internal/conf" "voucher/internal/data" - "voucher/internal/pkg/mq" ) type VoucherBiz struct { @@ -35,34 +31,3 @@ func NewVoucherBiz( MqSendMixRepo: MqSendMixRepo, } } - -func (v *VoucherBiz) PushOrderMQ(ctx context.Context, orderNo string) error { - - eventMap := v.bc.RocketMQ.EventMap["order"] - sendOption := []mq.SendOption{ - mq.WithSendShardingKeysOption(fmt.Sprintf("%s", orderNo)), - mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), - } - - if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { - return fmt.Errorf("收单成功,消费队列投递失败[%v]", err) - } - - return nil -} - -func (v *VoucherBiz) PushQueryDelayMQ(ctx context.Context, orderNo string) error { - - eventMap := v.bc.RocketMQ.EventMap["query"] - sendOption := []mq.SendOption{ - mq.WithSendShardingKeysOption(orderNo), - mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), - mq.WithSendDelayLevelOption(300), - } - - if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { - return fmt.Errorf("查询入队投递失败[%v]", err) - } - - return nil -} diff --git a/internal/server/consume.go b/internal/server/consume.go index 3bfcc38..9545f1b 100644 --- a/internal/server/consume.go +++ b/internal/server/consume.go @@ -32,20 +32,20 @@ func NewConsumer( SecretKey: conf.RocketMQ.SecretKey, } - if c := voucherService.GetOrderCreateConfig(); c != nil { - if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderCreateConsumer); err != nil { + if c := voucherService.GetOrderConfig(); c != nil { + if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderConsumer); err != nil { panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) } } - if c := voucherService.GetOrderQueryConfig(); c != nil { - if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderQueryConsumer); err != nil { + if c := voucherService.GetQueryConfig(); c != nil { + if err := manager.Subscribe(context.Background(), cf, c, voucherService.QueryConsumer); err != nil { panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) } } - if c := voucherService.GetOrderNotifyConfig(); c != nil { - if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderNotifyConsumer); err != nil { + if c := voucherService.GetNotifyConfig(); c != nil { + if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyConsumer); err != nil { panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) } } diff --git a/internal/service/consume.go b/internal/service/consume.go index e1a8f75..dbdec69 100644 --- a/internal/service/consume.go +++ b/internal/service/consume.go @@ -7,7 +7,7 @@ import ( "voucher/internal/pkg/mq" ) -func (j *VoucherService) GetOrderCreateConfig() *mq.ConsumerConfig { +func (j *VoucherService) GetOrderConfig() *mq.ConsumerConfig { elm, ok := j.bc.RocketMQ.EventMap["order"] if !ok { return nil @@ -25,12 +25,12 @@ func (j *VoucherService) GetOrderCreateConfig() *mq.ConsumerConfig { } } -func (j *VoucherService) OrderCreateConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { +func (j *VoucherService) OrderConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { orderNo := msg.GetShardingKey() if orderNo == "" { - log.Error("orderCreate 消费异常,获取 orderNo 失败") - return errors.New("orderCreate 消费异常,获取 orderNo 失败") + log.Error("order 消费异常,获取 orderNo 失败") + return errors.New("order 消费异常,获取 orderNo 失败") } if err := j.VoucherBiz.OrderConsume(ctx, orderNo); err != nil { @@ -40,7 +40,7 @@ func (j *VoucherService) OrderCreateConsumer(ctx context.Context, msg *mq.Consum return nil } -func (j *VoucherService) GetOrderQueryConfig() *mq.ConsumerConfig { +func (j *VoucherService) GetQueryConfig() *mq.ConsumerConfig { elm, ok := j.bc.RocketMQ.EventMap["query"] if !ok { return nil @@ -58,7 +58,7 @@ func (j *VoucherService) GetOrderQueryConfig() *mq.ConsumerConfig { } } -func (j *VoucherService) OrderQueryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { +func (j *VoucherService) QueryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { orderNo := msg.GetShardingKey() if orderNo == "" { @@ -73,7 +73,7 @@ func (j *VoucherService) OrderQueryConsumer(ctx context.Context, msg *mq.Consume return nil } -func (j *VoucherService) GetOrderNotifyConfig() *mq.ConsumerConfig { +func (j *VoucherService) GetNotifyConfig() *mq.ConsumerConfig { elm, ok := j.bc.RocketMQ.EventMap["notify"] if !ok { return nil @@ -91,7 +91,7 @@ func (j *VoucherService) GetOrderNotifyConfig() *mq.ConsumerConfig { } } -func (j *VoucherService) OrderNotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { +func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { orderNo := msg.GetShardingKey() if orderNo == "" {