From ce81f416934f348d706fc9501df37097327f6e43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AD=90=E9=93=AD?= Date: Tue, 4 Mar 2025 15:55:05 +0800 Subject: [PATCH] cmb --- internal/biz/cmb/cmb.go | 10 +++++++--- internal/biz/cmb/consume.go | 34 ++++++++++++++++++++++++++++++++++ internal/biz/cmb/voucher.go | 19 ++++++++----------- 3 files changed, 49 insertions(+), 14 deletions(-) diff --git a/internal/biz/cmb/cmb.go b/internal/biz/cmb/cmb.go index ef1738b..16e067b 100644 --- a/internal/biz/cmb/cmb.go +++ b/internal/biz/cmb/cmb.go @@ -4,31 +4,35 @@ import ( "voucher/internal/biz/mixrepos" "voucher/internal/biz/repo" "voucher/internal/biz/wechatrepo" + "voucher/internal/conf" "voucher/internal/data" ) type Cmb struct { + bc *conf.Bootstrap rdb *data.Rdb OrderRepo repo.OrderRepo ProductRepo repo.ProductRepo - ThirdMQSend mixrepos.MQSendMixRepo + MqSendMixRepo mixrepos.MQSendMixRepo WechatCpnRepo wechatrepo.WechatCpnRepo GenerateMixRepo mixrepos.GenerateMixRepo } func NewCmb( + bc *conf.Bootstrap, rdb *data.Rdb, orderRepo repo.OrderRepo, ProductRepo repo.ProductRepo, - thirdMQSend mixrepos.MQSendMixRepo, + MqSendMixRepo mixrepos.MQSendMixRepo, WechatCpnRepo wechatrepo.WechatCpnRepo, GenerateMixRepo mixrepos.GenerateMixRepo, ) *Cmb { return &Cmb{ + bc: bc, rdb: rdb, OrderRepo: orderRepo, ProductRepo: ProductRepo, - ThirdMQSend: thirdMQSend, + MqSendMixRepo: MqSendMixRepo, WechatCpnRepo: WechatCpnRepo, GenerateMixRepo: GenerateMixRepo, } diff --git a/internal/biz/cmb/consume.go b/internal/biz/cmb/consume.go index 287b694..a9b956e 100644 --- a/internal/biz/cmb/consume.go +++ b/internal/biz/cmb/consume.go @@ -2,13 +2,47 @@ package cmb import ( "context" + "fmt" + "go.opentelemetry.io/otel/trace" "voucher/internal/biz/bo" + "voucher/internal/pkg/mq" ) +func (v *Cmb) PushOrderMQ(ctx context.Context, orderNo string) error { + + eventMap := v.bc.RocketMQ.EventMap["order"] + sendOption := []mq.SendOption{ + mq.WithSendShardingKeysOption(fmt.Sprintf("%s", orderNo)), + mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), + } + + if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { + return fmt.Errorf("收单成功,消费队列投递失败[%v]", err) + } + + return nil +} + func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (err error) { return } +func (v *Cmb) PushQueryDelayMQ(ctx context.Context, orderNo string) error { + + eventMap := v.bc.RocketMQ.EventMap["query"] + sendOption := []mq.SendOption{ + mq.WithSendShardingKeysOption(orderNo), + mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), + mq.WithSendDelayLevelOption(300), + } + + if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { + return fmt.Errorf("查询入队投递失败[%v]", err) + } + + return nil +} + func (v *Cmb) QueryConsume(ctx context.Context, order *bo.OrderBo) (err error) { return } diff --git a/internal/biz/cmb/voucher.go b/internal/biz/cmb/voucher.go index 43aba03..a5b997a 100644 --- a/internal/biz/cmb/voucher.go +++ b/internal/biz/cmb/voucher.go @@ -49,16 +49,13 @@ func (v *Cmb) order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo stri } o := &bo.OrderBo{ - OrderNo: orderNo, - - OutBizNo: req.OutBizNo, - ProductNo: req.ProductNo, - Account: req.Account, - - AppID: product.AppID, - MerchantNo: product.MerchantNo, - Channel: product.Channel, - + OrderNo: orderNo, + OutBizNo: req.OutBizNo, + ProductNo: req.ProductNo, + Account: req.Account, + AppID: product.AppID, + MerchantNo: product.MerchantNo, + Channel: product.Channel, AccountType: vo.OrderAccountTypeOpenId, Type: vo.OrderTypeCmb, Status: vo.OrderStatusWait, @@ -69,7 +66,7 @@ func (v *Cmb) order(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo stri return } - //v.ThirdMQSend + err = v.PushOrderMQ(ctx, orderNo) return }