This commit is contained in:
李子铭 2025-03-04 09:48:51 +08:00
parent 67215ca534
commit 0e55f5c468
9 changed files with 245 additions and 16 deletions

View File

@ -1,17 +1,20 @@
package bo package bo
import "time" import (
"time"
"voucher/internal/biz/vo"
)
// OrderWechatBo 领域实体Bo结构字段和模型字段保持一致 // OrderWechatBo 领域实体Bo结构字段和模型字段保持一致
type OrderWechatBo struct { type OrderWechatBo struct {
ID int32 ID uint64
OrderNo string OrderNo string
OutRequestNo string OutRequestNo string
AppID string AppID string
StockCreatorMchid string StockCreatorMchid string
OpenID string OpenID string
StockID string StockID string
Status bool Status vo.OrderWechatStatus
CouponID string CouponID string
Remark string Remark string
CreateTime *time.Time CreateTime *time.Time

View File

@ -17,3 +17,17 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (re
return return
} }
func (v *VoucherBiz) CmbQuery(ctx context.Context, req *bo.OrderCreateReqBo) (reps *bo.OrderCreateRepBo, err error) {
err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("cmb_order_%s", req.OutBizNo), func(ctx context.Context) error {
return nil
})
return
}
func (v *VoucherBiz) CmbNotify(ctx context.Context, order *bo.OrderBo) (err error) {
return
}

View File

@ -7,6 +7,8 @@ const (
OrderStatusIng OrderStatusIng
OrderStatusSuccess OrderStatusSuccess
OrderStatusFail OrderStatusFail
OrderStatusUse
OrderStatusExpired
) )
var OrderStatusMap = map[OrderStatus]string{ var OrderStatusMap = map[OrderStatus]string{
@ -14,6 +16,8 @@ var OrderStatusMap = map[OrderStatus]string{
OrderStatusIng: "发放中", OrderStatusIng: "发放中",
OrderStatusSuccess: "发放成功", OrderStatusSuccess: "发放成功",
OrderStatusFail: "发放失败", OrderStatusFail: "发放失败",
OrderStatusUse: "已使用",
OrderStatusExpired: "已过期",
} }
func (s OrderStatus) GetText() string { func (s OrderStatus) GetText() string {

View File

@ -0,0 +1,48 @@
package vo
type OrderWechatStatus uint8
const (
OrderWechatStatusWait OrderWechatStatus = iota + 1
OrderWechatStatusIng
OrderWechatStatusSuccess
OrderWechatStatusFail
OrderWechatStatusUse
OrderWechatStatusExpired
)
var OrderWechatStatusMap = map[OrderWechatStatus]string{
OrderWechatStatusWait: "待发放",
OrderWechatStatusIng: "发放中",
OrderWechatStatusSuccess: "发放成功",
OrderWechatStatusFail: "发放失败",
OrderWechatStatusUse: "已使用",
OrderWechatStatusExpired: "已过期",
}
func (s OrderWechatStatus) GetText() string {
if t, ok := OrderWechatStatusMap[s]; ok {
return t
}
return ""
}
func (s OrderWechatStatus) GetValue() uint8 {
return uint8(s)
}
func (s OrderWechatStatus) IsWait() bool {
return s == OrderWechatStatusWait
}
func (s OrderWechatStatus) IsIng() bool {
return s == OrderWechatStatusIng
}
func (s OrderWechatStatus) IsSuccess() bool {
return s == OrderWechatStatusSuccess
}
func (s OrderWechatStatus) IsFail() bool {
return s == OrderWechatStatusFail
}

View File

@ -1,9 +1,13 @@
package biz package biz
import ( import (
"context"
"fmt"
"time"
"voucher/internal/biz/repo" "voucher/internal/biz/repo"
"voucher/internal/biz/thirdrepo" "voucher/internal/biz/thirdrepo"
"voucher/internal/data" "voucher/internal/data"
"voucher/internal/pkg/lock"
) )
type VoucherBiz struct { type VoucherBiz struct {
@ -15,3 +19,33 @@ type VoucherBiz struct {
func NewVoucherBiz(rdb *data.Rdb, orderRepo repo.OrderRepo, thirdMQSend thirdrepo.ThirdMQSend) *VoucherBiz { func NewVoucherBiz(rdb *data.Rdb, orderRepo repo.OrderRepo, thirdMQSend thirdrepo.ThirdMQSend) *VoucherBiz {
return &VoucherBiz{rdb: rdb, OrderRepo: orderRepo, ThirdMQSend: thirdMQSend} return &VoucherBiz{rdb: rdb, OrderRepo: orderRepo, ThirdMQSend: thirdMQSend}
} }
func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) {
err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("order_consume_%s", orderNo), func(ctx context.Context) error {
return nil
})
return
}
func (v *VoucherBiz) QueryConsume(ctx context.Context, orderNo string) (err error) {
err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("query_consume_%s", orderNo), func(ctx context.Context) error {
return nil
})
return
}
func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo string) (err error) {
err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("notify_consume_%s", orderNo), func(ctx context.Context) error {
return nil
})
return
}

View File

@ -12,7 +12,7 @@ const TableNameOrderWechat = "order_wechat"
// OrderWechat mapped from table <order_wechat> // OrderWechat mapped from table <order_wechat>
type OrderWechat struct { type OrderWechat struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"` ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"`
OrderNo string `gorm:"column:order_no;not null;comment:订单号" json:"order_no"` // 订单号 OrderNo string `gorm:"column:order_no;not null;comment:订单号" json:"order_no"` // 订单号
OutRequestNo string `gorm:"column:out_request_no;not null;comment:请求单号" json:"out_request_no"` // 请求单号 OutRequestNo string `gorm:"column:out_request_no;not null;comment:请求单号" json:"out_request_no"` // 请求单号
AppID string `gorm:"column:app_id;not null;comment:微信应用id" json:"app_id"` // 微信应用id AppID string `gorm:"column:app_id;not null;comment:微信应用id" json:"app_id"` // 微信应用id

View File

@ -26,17 +26,29 @@ func NewConsumer(
manager := mq.NewConsumerManager(hLog) manager := mq.NewConsumerManager(hLog)
//cf := &mq.ConsumerConnConfig{ cf := &mq.ConsumerConnConfig{
// NameServers: conf.RocketMQ.Addr, NameServers: conf.RocketMQ.Addr,
// AccessKey: conf.RocketMQ.AccessKey, AccessKey: conf.RocketMQ.AccessKey,
// SecretKey: conf.RocketMQ.SecretKey, SecretKey: conf.RocketMQ.SecretKey,
//} }
//if c := keyService.GetConfig(); c != nil { if c := voucherService.GetOrderCreateConfig(); c != nil {
// if err := manager.Subscribe(context.Background(), cf, c, keyService.Handle); err != nil { if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderCreateConsumer); err != nil {
// panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
// } }
//} }
if c := voucherService.GetOrderQueryConfig(); c != nil {
if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderQueryConsumer); err != nil {
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
}
}
if c := voucherService.GetOrderNotifyConfig(); c != nil {
if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderNotifyConsumer); err != nil {
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
}
}
return &Consumer{manager: manager, hLog: hLog, conf: conf} return &Consumer{manager: manager, hLog: hLog, conf: conf}
} }

View File

@ -1 +1,107 @@
package service package service
import (
"context"
"errors"
"github.com/go-kratos/kratos/v2/log"
"voucher/internal/pkg/mq"
)
func (j *VoucherService) GetOrderCreateConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["order"]
if !ok {
return nil
}
if !elm.IsOpenConsumer {
log.Warnf("order MQ is not open")
return nil
}
return &mq.ConsumerConfig{
TopicName: elm.Topic,
GroupName: elm.Group,
PerCoroutineCnt: int(elm.PerCoroutineCnt),
}
}
func (j *VoucherService) OrderCreateConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
orderNo := msg.GetShardingKey()
if orderNo == "" {
log.Error("orderCreate 消费异常,获取 orderNo 失败")
return errors.New("orderCreate 消费异常,获取 orderNo 失败")
}
if err := j.VoucherBiz.OrderConsume(ctx, orderNo); err != nil {
log.Errorf("order 消费异常,orderNo:%s,error: %s", orderNo, err.Error())
}
return nil
}
func (j *VoucherService) GetOrderQueryConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["query"]
if !ok {
return nil
}
if !elm.IsOpenConsumer {
log.Warnf("query MQ is not open")
return nil
}
return &mq.ConsumerConfig{
TopicName: elm.Topic,
GroupName: elm.Group,
PerCoroutineCnt: int(elm.PerCoroutineCnt),
}
}
func (j *VoucherService) OrderQueryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
orderNo := msg.GetShardingKey()
if orderNo == "" {
log.Error("orderQuery 消费异常,获取 orderNo 失败")
return errors.New("orderQuery 消费异常,获取 orderNo 失败")
}
if err := j.VoucherBiz.QueryConsume(ctx, orderNo); err != nil {
log.Errorf("query 消费异常,orderNo:%s,error: %s", orderNo, err.Error())
}
return nil
}
func (j *VoucherService) GetOrderNotifyConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["notify"]
if !ok {
return nil
}
if !elm.IsOpenConsumer {
log.Warnf("notify MQ is not open")
return nil
}
return &mq.ConsumerConfig{
TopicName: elm.Topic,
GroupName: elm.Group,
PerCoroutineCnt: int(elm.PerCoroutineCnt),
}
}
func (j *VoucherService) OrderNotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
orderNo := msg.GetShardingKey()
if orderNo == "" {
log.Error("orderNotify 消费异常,获取 orderNo 失败")
return errors.New("orderNotify 消费异常,获取 orderNo 失败")
}
if err := j.VoucherBiz.NotifyConsume(ctx, orderNo); err != nil {
log.Errorf("notify 消费异常,orderNo:%s,error: %s", orderNo, err.Error())
}
return nil
}

View File

@ -2,12 +2,20 @@ package service
import ( import (
"voucher/internal/biz" "voucher/internal/biz"
"voucher/internal/conf"
) )
type VoucherService struct { type VoucherService struct {
bc *conf.Bootstrap
VoucherBiz *biz.VoucherBiz VoucherBiz *biz.VoucherBiz
} }
func NewVoucherService(VoucherBiz *biz.VoucherBiz) *VoucherService { func NewVoucherService(
return &VoucherService{VoucherBiz: VoucherBiz} bc *conf.Bootstrap,
VoucherBiz *biz.VoucherBiz,
) *VoucherService {
return &VoucherService{
bc: bc,
VoucherBiz: VoucherBiz,
}
} }