From 0e55f5c468d2580b0720580c247e1dd2f6b87e1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AD=90=E9=93=AD?= Date: Tue, 4 Mar 2025 09:48:51 +0800 Subject: [PATCH] cmb --- internal/biz/bo/order_wechat_bo.go | 9 +- internal/biz/cmb.go | 14 ++++ internal/biz/vo/order_status.go | 4 + internal/biz/vo/order_wechant_status.go | 48 +++++++++++ internal/biz/voucher.go | 34 ++++++++ internal/data/model/order_wechat.gen.go | 2 +- internal/server/consume.go | 32 ++++--- internal/service/consume.go | 106 ++++++++++++++++++++++++ internal/service/voucher.go | 12 ++- 9 files changed, 245 insertions(+), 16 deletions(-) create mode 100644 internal/biz/vo/order_wechant_status.go diff --git a/internal/biz/bo/order_wechat_bo.go b/internal/biz/bo/order_wechat_bo.go index 25869ec..db33167 100644 --- a/internal/biz/bo/order_wechat_bo.go +++ b/internal/biz/bo/order_wechat_bo.go @@ -1,17 +1,20 @@ package bo -import "time" +import ( + "time" + "voucher/internal/biz/vo" +) // OrderWechatBo 领域实体Bo结构,字段和模型字段保持一致 type OrderWechatBo struct { - ID int32 + ID uint64 OrderNo string OutRequestNo string AppID string StockCreatorMchid string OpenID string StockID string - Status bool + Status vo.OrderWechatStatus CouponID string Remark string CreateTime *time.Time diff --git a/internal/biz/cmb.go b/internal/biz/cmb.go index 38034d3..b7ee9ba 100644 --- a/internal/biz/cmb.go +++ b/internal/biz/cmb.go @@ -17,3 +17,17 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (re return } + +func (v *VoucherBiz) CmbQuery(ctx context.Context, req *bo.OrderCreateReqBo) (reps *bo.OrderCreateRepBo, err error) { + + err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("cmb_order_%s", req.OutBizNo), func(ctx context.Context) error { + + return nil + }) + + return +} + +func (v *VoucherBiz) CmbNotify(ctx context.Context, order *bo.OrderBo) (err error) { + return +} diff --git a/internal/biz/vo/order_status.go b/internal/biz/vo/order_status.go index c351dc4..8f15ff7 100644 --- a/internal/biz/vo/order_status.go +++ b/internal/biz/vo/order_status.go @@ -7,6 +7,8 @@ const ( OrderStatusIng OrderStatusSuccess OrderStatusFail + OrderStatusUse + OrderStatusExpired ) var OrderStatusMap = map[OrderStatus]string{ @@ -14,6 +16,8 @@ var OrderStatusMap = map[OrderStatus]string{ OrderStatusIng: "发放中", OrderStatusSuccess: "发放成功", OrderStatusFail: "发放失败", + OrderStatusUse: "已使用", + OrderStatusExpired: "已过期", } func (s OrderStatus) GetText() string { diff --git a/internal/biz/vo/order_wechant_status.go b/internal/biz/vo/order_wechant_status.go new file mode 100644 index 0000000..7041722 --- /dev/null +++ b/internal/biz/vo/order_wechant_status.go @@ -0,0 +1,48 @@ +package vo + +type OrderWechatStatus uint8 + +const ( + OrderWechatStatusWait OrderWechatStatus = iota + 1 + OrderWechatStatusIng + OrderWechatStatusSuccess + OrderWechatStatusFail + OrderWechatStatusUse + OrderWechatStatusExpired +) + +var OrderWechatStatusMap = map[OrderWechatStatus]string{ + OrderWechatStatusWait: "待发放", + OrderWechatStatusIng: "发放中", + OrderWechatStatusSuccess: "发放成功", + OrderWechatStatusFail: "发放失败", + OrderWechatStatusUse: "已使用", + OrderWechatStatusExpired: "已过期", +} + +func (s OrderWechatStatus) GetText() string { + if t, ok := OrderWechatStatusMap[s]; ok { + return t + } + return "" +} + +func (s OrderWechatStatus) GetValue() uint8 { + return uint8(s) +} + +func (s OrderWechatStatus) IsWait() bool { + return s == OrderWechatStatusWait +} + +func (s OrderWechatStatus) IsIng() bool { + return s == OrderWechatStatusIng +} + +func (s OrderWechatStatus) IsSuccess() bool { + return s == OrderWechatStatusSuccess +} + +func (s OrderWechatStatus) IsFail() bool { + return s == OrderWechatStatusFail +} diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index 97fb3d5..04ae355 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -1,9 +1,13 @@ package biz import ( + "context" + "fmt" + "time" "voucher/internal/biz/repo" "voucher/internal/biz/thirdrepo" "voucher/internal/data" + "voucher/internal/pkg/lock" ) type VoucherBiz struct { @@ -15,3 +19,33 @@ type VoucherBiz struct { func NewVoucherBiz(rdb *data.Rdb, orderRepo repo.OrderRepo, thirdMQSend thirdrepo.ThirdMQSend) *VoucherBiz { return &VoucherBiz{rdb: rdb, OrderRepo: orderRepo, ThirdMQSend: thirdMQSend} } + +func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) { + + err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("order_consume_%s", orderNo), func(ctx context.Context) error { + + return nil + }) + + return +} + +func (v *VoucherBiz) QueryConsume(ctx context.Context, orderNo string) (err error) { + + err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("query_consume_%s", orderNo), func(ctx context.Context) error { + + return nil + }) + + return +} + +func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo string) (err error) { + + err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("notify_consume_%s", orderNo), func(ctx context.Context) error { + + return nil + }) + + return +} diff --git a/internal/data/model/order_wechat.gen.go b/internal/data/model/order_wechat.gen.go index 54edd96..37179d0 100644 --- a/internal/data/model/order_wechat.gen.go +++ b/internal/data/model/order_wechat.gen.go @@ -12,7 +12,7 @@ const TableNameOrderWechat = "order_wechat" // OrderWechat mapped from table type OrderWechat struct { - ID int64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"` + ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"` OrderNo string `gorm:"column:order_no;not null;comment:订单号" json:"order_no"` // 订单号 OutRequestNo string `gorm:"column:out_request_no;not null;comment:请求单号" json:"out_request_no"` // 请求单号 AppID string `gorm:"column:app_id;not null;comment:微信应用id" json:"app_id"` // 微信应用id diff --git a/internal/server/consume.go b/internal/server/consume.go index 66b967c..3bfcc38 100644 --- a/internal/server/consume.go +++ b/internal/server/consume.go @@ -26,17 +26,29 @@ func NewConsumer( manager := mq.NewConsumerManager(hLog) - //cf := &mq.ConsumerConnConfig{ - // NameServers: conf.RocketMQ.Addr, - // AccessKey: conf.RocketMQ.AccessKey, - // SecretKey: conf.RocketMQ.SecretKey, - //} + cf := &mq.ConsumerConnConfig{ + NameServers: conf.RocketMQ.Addr, + AccessKey: conf.RocketMQ.AccessKey, + SecretKey: conf.RocketMQ.SecretKey, + } - //if c := keyService.GetConfig(); c != nil { - // if err := manager.Subscribe(context.Background(), cf, c, keyService.Handle); err != nil { - // panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) - // } - //} + if c := voucherService.GetOrderCreateConfig(); c != nil { + if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderCreateConsumer); err != nil { + panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) + } + } + + if c := voucherService.GetOrderQueryConfig(); c != nil { + if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderQueryConsumer); err != nil { + panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) + } + } + + if c := voucherService.GetOrderNotifyConfig(); c != nil { + if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderNotifyConsumer); err != nil { + panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) + } + } return &Consumer{manager: manager, hLog: hLog, conf: conf} } diff --git a/internal/service/consume.go b/internal/service/consume.go index 6d43c33..e1a8f75 100644 --- a/internal/service/consume.go +++ b/internal/service/consume.go @@ -1 +1,107 @@ package service + +import ( + "context" + "errors" + "github.com/go-kratos/kratos/v2/log" + "voucher/internal/pkg/mq" +) + +func (j *VoucherService) GetOrderCreateConfig() *mq.ConsumerConfig { + elm, ok := j.bc.RocketMQ.EventMap["order"] + if !ok { + return nil + } + + if !elm.IsOpenConsumer { + log.Warnf("order MQ is not open") + return nil + } + + return &mq.ConsumerConfig{ + TopicName: elm.Topic, + GroupName: elm.Group, + PerCoroutineCnt: int(elm.PerCoroutineCnt), + } +} + +func (j *VoucherService) OrderCreateConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { + + orderNo := msg.GetShardingKey() + if orderNo == "" { + log.Error("orderCreate 消费异常,获取 orderNo 失败") + return errors.New("orderCreate 消费异常,获取 orderNo 失败") + } + + if err := j.VoucherBiz.OrderConsume(ctx, orderNo); err != nil { + log.Errorf("order 消费异常,orderNo:%s,error: %s", orderNo, err.Error()) + } + + return nil +} + +func (j *VoucherService) GetOrderQueryConfig() *mq.ConsumerConfig { + elm, ok := j.bc.RocketMQ.EventMap["query"] + if !ok { + return nil + } + + if !elm.IsOpenConsumer { + log.Warnf("query MQ is not open") + return nil + } + + return &mq.ConsumerConfig{ + TopicName: elm.Topic, + GroupName: elm.Group, + PerCoroutineCnt: int(elm.PerCoroutineCnt), + } +} + +func (j *VoucherService) OrderQueryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { + + orderNo := msg.GetShardingKey() + if orderNo == "" { + log.Error("orderQuery 消费异常,获取 orderNo 失败") + return errors.New("orderQuery 消费异常,获取 orderNo 失败") + } + + if err := j.VoucherBiz.QueryConsume(ctx, orderNo); err != nil { + log.Errorf("query 消费异常,orderNo:%s,error: %s", orderNo, err.Error()) + } + + return nil +} + +func (j *VoucherService) GetOrderNotifyConfig() *mq.ConsumerConfig { + elm, ok := j.bc.RocketMQ.EventMap["notify"] + if !ok { + return nil + } + + if !elm.IsOpenConsumer { + log.Warnf("notify MQ is not open") + return nil + } + + return &mq.ConsumerConfig{ + TopicName: elm.Topic, + GroupName: elm.Group, + PerCoroutineCnt: int(elm.PerCoroutineCnt), + } +} + +func (j *VoucherService) OrderNotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { + + orderNo := msg.GetShardingKey() + if orderNo == "" { + log.Error("orderNotify 消费异常,获取 orderNo 失败") + return errors.New("orderNotify 消费异常,获取 orderNo 失败") + } + + if err := j.VoucherBiz.NotifyConsume(ctx, orderNo); err != nil { + log.Errorf("notify 消费异常,orderNo:%s,error: %s", orderNo, err.Error()) + } + + return nil +} diff --git a/internal/service/voucher.go b/internal/service/voucher.go index 628692a..cf0bcc2 100644 --- a/internal/service/voucher.go +++ b/internal/service/voucher.go @@ -2,12 +2,20 @@ package service import ( "voucher/internal/biz" + "voucher/internal/conf" ) type VoucherService struct { + bc *conf.Bootstrap VoucherBiz *biz.VoucherBiz } -func NewVoucherService(VoucherBiz *biz.VoucherBiz) *VoucherService { - return &VoucherService{VoucherBiz: VoucherBiz} +func NewVoucherService( + bc *conf.Bootstrap, + VoucherBiz *biz.VoucherBiz, +) *VoucherService { + return &VoucherService{ + bc: bc, + VoucherBiz: VoucherBiz, + } }