From 41aa1d379eaeb279b5fb17bd1234d4b062d1ab0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AD=90=E9=93=AD?= Date: Mon, 10 Mar 2025 11:27:26 +0800 Subject: [PATCH] cmb --- .../biz/{consume.go => notify_consume.go} | 43 --------------- internal/biz/order_consume.go | 53 +++++++++++++++++++ 2 files changed, 53 insertions(+), 43 deletions(-) rename internal/biz/{consume.go => notify_consume.go} (54%) create mode 100644 internal/biz/order_consume.go diff --git a/internal/biz/consume.go b/internal/biz/notify_consume.go similarity index 54% rename from internal/biz/consume.go rename to internal/biz/notify_consume.go index 1ca4593..1aa370c 100644 --- a/internal/biz/consume.go +++ b/internal/biz/notify_consume.go @@ -9,49 +9,6 @@ import ( "voucher/internal/pkg/mq" ) -func (v *VoucherBiz) PushOrderMQ(ctx context.Context, orderNo string) error { - - eventMap := v.bc.RocketMQ.EventMap["order"] - sendOption := []mq.SendOption{ - mq.WithSendShardingKeysOption(fmt.Sprintf("%s", orderNo)), - mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), - } - - if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { - return fmt.Errorf("收单成功,消费队列投递失败[%v]", err) - } - - return nil -} - -func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) { - - c := vo.OrderConsume.BuildCache([]string{orderNo}) - - err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { - - order, err := v.OrderRepo.GetByOrderNo(ctx, orderNo) - if err != nil { - return err - } - - if order.Type.IsCmb() { - - outRequestNo, err := v.Cmb.OrderConsume(ctx, order) - if err != nil { - return err - } - - return v.PushNotifyMQ(ctx, orderNo, outRequestNo) - - } - - return fmt.Errorf("订单类型错误:%s", order.Type.GetText()) - }) - - return -} - func (v *VoucherBiz) PushNotifyMQ(ctx context.Context, orderNo, outRequestNo string) error { eventMap := v.bc.RocketMQ.EventMap["notify"] diff --git a/internal/biz/order_consume.go b/internal/biz/order_consume.go new file mode 100644 index 0000000..a2cc647 --- /dev/null +++ b/internal/biz/order_consume.go @@ -0,0 +1,53 @@ +package biz + +import ( + "context" + "fmt" + "go.opentelemetry.io/otel/trace" + "voucher/internal/biz/vo" + "voucher/internal/pkg/lock" + "voucher/internal/pkg/mq" +) + +func (v *VoucherBiz) PushOrderMQ(ctx context.Context, orderNo string) error { + + eventMap := v.bc.RocketMQ.EventMap["order"] + sendOption := []mq.SendOption{ + mq.WithSendShardingKeysOption(fmt.Sprintf("%s", orderNo)), + mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), + } + + if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { + return fmt.Errorf("收单成功,消费队列投递失败[%v]", err) + } + + return nil +} + +func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) { + + c := vo.OrderConsume.BuildCache([]string{orderNo}) + + err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { + + order, err := v.OrderRepo.GetByOrderNo(ctx, orderNo) + if err != nil { + return err + } + + if order.Type.IsCmb() { + + outRequestNo, err := v.Cmb.OrderConsume(ctx, order) + if err != nil { + return err + } + + return v.PushNotifyMQ(ctx, orderNo, outRequestNo) + + } + + return fmt.Errorf("订单类型错误:%s", order.Type.GetText()) + }) + + return +}