cmb
This commit is contained in:
parent
7808865b32
commit
36e6589c49
|
|
@ -35,22 +35,22 @@ func wireApp(bootstrap *conf.Bootstrap, logger log.Logger, accessLogger *log2.Ac
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
orderRepo := repoimpl.NewOrderRepoImpl()
|
orderRepo := repoimpl.NewOrderRepoImpl()
|
||||||
|
orderWechatRepo := repoimpl.NewOrderWechatRepoImpl()
|
||||||
productRepo := repoimpl.NewProductRepoImpl()
|
productRepo := repoimpl.NewProductRepoImpl()
|
||||||
|
wechatCpnRepo, err := wechatrepoimpl.NewCpnRepoImpl(bootstrap)
|
||||||
|
if err != nil {
|
||||||
|
cleanup()
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
generateMixRepo := mixrepoimpl.NewGenerateMixRepoImpl(rdb)
|
||||||
|
cmbCmb := cmb.NewCmb(rdb, orderRepo, orderWechatRepo, productRepo, wechatCpnRepo, generateMixRepo)
|
||||||
rocketMQ, cleanup2, err := data.NewRocketMQ(bootstrap)
|
rocketMQ, cleanup2, err := data.NewRocketMQ(bootstrap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cleanup()
|
cleanup()
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
mqSendMixRepo := mixrepoimpl.NewMQSendMixRepoImpl(rocketMQ)
|
mqSendMixRepo := mixrepoimpl.NewMQSendMixRepoImpl(rocketMQ)
|
||||||
wechatCpnRepo, err := wechatrepoimpl.NewCpnRepoImpl(bootstrap)
|
voucherBiz := biz.NewVoucherBiz(bootstrap, rdb, cmbCmb, orderRepo, mqSendMixRepo)
|
||||||
if err != nil {
|
|
||||||
cleanup2()
|
|
||||||
cleanup()
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
generateMixRepo := mixrepoimpl.NewGenerateMixRepoImpl(rdb)
|
|
||||||
cmbCmb := cmb.NewCmb(bootstrap, rdb, orderRepo, productRepo, mqSendMixRepo, wechatCpnRepo, generateMixRepo)
|
|
||||||
voucherBiz := biz.NewVoucherBiz(rdb, cmbCmb, orderRepo)
|
|
||||||
voucherService := service.NewVoucherService(bootstrap, voucherBiz)
|
voucherService := service.NewVoucherService(bootstrap, voucherBiz)
|
||||||
httpServer := server.NewHTTPServer(bootstrap, helper, accessLogger, voucherService)
|
httpServer := server.NewHTTPServer(bootstrap, helper, accessLogger, voucherService)
|
||||||
consumer := server.NewConsumer(helper, bootstrap, voucherService)
|
consumer := server.NewConsumer(helper, bootstrap, voucherService)
|
||||||
|
|
|
||||||
|
|
@ -35,12 +35,6 @@ rocketMQ:
|
||||||
isOpenConsumer: false #是否启动消费 true/false
|
isOpenConsumer: false #是否启动消费 true/false
|
||||||
PerCoroutineCnt: 5 #协程数量,不配置默认为20
|
PerCoroutineCnt: 5 #协程数量,不配置默认为20
|
||||||
RetryCnt: 3 #重试次数,不配置默认38
|
RetryCnt: 3 #重试次数,不配置默认38
|
||||||
query:
|
|
||||||
topic: voucher_order_query
|
|
||||||
group: voucher_order_query_group
|
|
||||||
isOpenConsumer: false #是否启动消费 true/false
|
|
||||||
PerCoroutineCnt: 5 #协程数量,不配置默认为20
|
|
||||||
RetryCnt: 3 #重试次数,不配置默认38
|
|
||||||
notify:
|
notify:
|
||||||
topic: voucher_order_notify
|
topic: voucher_order_notify
|
||||||
group: voucher_order_notify_group
|
group: voucher_order_notify_group
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import (
|
||||||
type Cmb struct {
|
type Cmb struct {
|
||||||
rdb *data.Rdb
|
rdb *data.Rdb
|
||||||
OrderRepo repo.OrderRepo
|
OrderRepo repo.OrderRepo
|
||||||
|
OrderWechatRepo repo.OrderWechatRepo
|
||||||
ProductRepo repo.ProductRepo
|
ProductRepo repo.ProductRepo
|
||||||
WechatCpnRepo wechatrepo.WechatCpnRepo
|
WechatCpnRepo wechatrepo.WechatCpnRepo
|
||||||
GenerateMixRepo mixrepos.GenerateMixRepo
|
GenerateMixRepo mixrepos.GenerateMixRepo
|
||||||
|
|
@ -18,6 +19,7 @@ type Cmb struct {
|
||||||
func NewCmb(
|
func NewCmb(
|
||||||
rdb *data.Rdb,
|
rdb *data.Rdb,
|
||||||
orderRepo repo.OrderRepo,
|
orderRepo repo.OrderRepo,
|
||||||
|
OrderWechatRepo repo.OrderWechatRepo,
|
||||||
ProductRepo repo.ProductRepo,
|
ProductRepo repo.ProductRepo,
|
||||||
WechatCpnRepo wechatrepo.WechatCpnRepo,
|
WechatCpnRepo wechatrepo.WechatCpnRepo,
|
||||||
GenerateMixRepo mixrepos.GenerateMixRepo,
|
GenerateMixRepo mixrepos.GenerateMixRepo,
|
||||||
|
|
@ -25,6 +27,7 @@ func NewCmb(
|
||||||
return &Cmb{
|
return &Cmb{
|
||||||
rdb: rdb,
|
rdb: rdb,
|
||||||
OrderRepo: orderRepo,
|
OrderRepo: orderRepo,
|
||||||
|
OrderWechatRepo: OrderWechatRepo,
|
||||||
ProductRepo: ProductRepo,
|
ProductRepo: ProductRepo,
|
||||||
WechatCpnRepo: WechatCpnRepo,
|
WechatCpnRepo: WechatCpnRepo,
|
||||||
GenerateMixRepo: GenerateMixRepo,
|
GenerateMixRepo: GenerateMixRepo,
|
||||||
|
|
|
||||||
|
|
@ -2,11 +2,80 @@ package cmb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"voucher/internal/biz/bo"
|
"voucher/internal/biz/bo"
|
||||||
|
"voucher/internal/biz/vo"
|
||||||
|
"voucher/internal/pkg/uid"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (err error) {
|
func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (outRequestNo string, err error) {
|
||||||
return
|
|
||||||
|
if order.Status.IsWait() {
|
||||||
|
return "", fmt.Errorf("订单状态错误,%s", order.Status.GetText())
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = v.ing(ctx, order.ID); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
orderWechat, err := v.create(ctx, order)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
couponId, err := v.WechatCpnRepo.Order(ctx, orderWechat)
|
||||||
|
if err != nil {
|
||||||
|
return "", v.fail(ctx, order, orderWechat, err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
err = v.success(ctx, order, orderWechat, couponId)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
return orderWechat.OutRequestNo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *Cmb) create(ctx context.Context, order *bo.OrderBo) (*bo.OrderWechatBo, error) {
|
||||||
|
outRequestNo, err := v.GenerateMixRepo.GeneratorString(ctx, uid.OrderWechat)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req := &bo.OrderWechatBo{
|
||||||
|
OrderNo: order.OrderNo,
|
||||||
|
OutRequestNo: outRequestNo,
|
||||||
|
AppID: order.AppID,
|
||||||
|
StockCreatorMchid: order.MerchantNo,
|
||||||
|
OpenID: order.Account,
|
||||||
|
StockID: order.ProductNo,
|
||||||
|
Status: vo.OrderWechatStatusWait,
|
||||||
|
}
|
||||||
|
|
||||||
|
orderWechat, err := v.OrderWechatRepo.Create(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return orderWechat, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *Cmb) ing(ctx context.Context, id uint64) error {
|
||||||
|
return v.OrderRepo.Ing(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *Cmb) success(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo, couponId string) error {
|
||||||
|
if err := v.OrderWechatRepo.Success(ctx, orderWechat.ID, couponId); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return v.OrderRepo.Success(ctx, order.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (v *Cmb) fail(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo, remarks string) error {
|
||||||
|
if err := v.OrderWechatRepo.Fail(ctx, orderWechat.ID, remarks); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return v.OrderRepo.Fail(ctx, order.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *Cmb) QueryConsume(ctx context.Context, order *bo.OrderBo) (err error) {
|
func (v *Cmb) QueryConsume(ctx context.Context, order *bo.OrderBo) (err error) {
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,21 @@ func (v *VoucherBiz) PushQueryDelayMQ(ctx context.Context, orderNo string) error
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (v *VoucherBiz) PushNotifyMQ(ctx context.Context, orderNo, outRequestNo string) error {
|
||||||
|
|
||||||
|
eventMap := v.bc.RocketMQ.EventMap["notify"]
|
||||||
|
sendOption := []mq.SendOption{
|
||||||
|
mq.WithSendShardingKeysOption(fmt.Sprintf("%s_%s", orderNo, outRequestNo)),
|
||||||
|
mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil {
|
||||||
|
return fmt.Errorf("notify,消费队列投递失败[%v]", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) {
|
func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) {
|
||||||
|
|
||||||
err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("order_consume_%s", orderNo), func(ctx context.Context) error {
|
err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("order_consume_%s", orderNo), func(ctx context.Context) error {
|
||||||
|
|
@ -50,10 +65,16 @@ func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err erro
|
||||||
}
|
}
|
||||||
|
|
||||||
if order.Type.IsCmb() {
|
if order.Type.IsCmb() {
|
||||||
return v.Cmb.OrderConsume(ctx, order)
|
|
||||||
|
outRequestNo, err := v.Cmb.OrderConsume(ctx, order)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return v.PushNotifyMQ(ctx, orderNo, outRequestNo)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return fmt.Errorf("订单类型错误:%s", order.Type.GetText())
|
||||||
})
|
})
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type OrderWechatRepo interface {
|
type OrderWechatRepo interface {
|
||||||
// Create 创建 OrderWechat
|
|
||||||
Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error)
|
Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error)
|
||||||
// GetByID 根据 ID 获取 OrderWechat
|
Success(ctx context.Context, id uint64, couponId string) error
|
||||||
GetByID(ctx context.Context, id int32) (*bo.OrderWechatBo, error)
|
Fail(ctx context.Context, id uint64, remark string) error
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ func (s Channel) GetText() string {
|
||||||
if t, ok := OrderChannelMap[s]; ok {
|
if t, ok := OrderChannelMap[s]; ok {
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
return ""
|
return "未知商品渠道类型"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s Channel) GetValue() uint8 {
|
func (s Channel) GetValue() uint8 {
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,7 @@ func (s OrderAccountType) GetText() string {
|
||||||
if t, ok := OrderAccountTypeMap[s]; ok {
|
if t, ok := OrderAccountTypeMap[s]; ok {
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
return ""
|
return "未知账号类型"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s OrderAccountType) GetValue() uint8 {
|
func (s OrderAccountType) GetValue() uint8 {
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ func (s OrderStatus) GetText() string {
|
||||||
if t, ok := OrderStatusMap[s]; ok {
|
if t, ok := OrderStatusMap[s]; ok {
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
return ""
|
return "未知状态"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s OrderStatus) GetValue() uint8 {
|
func (s OrderStatus) GetValue() uint8 {
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ func (s OrderType) GetText() string {
|
||||||
if t, ok := OrderTypeMap[s]; ok {
|
if t, ok := OrderTypeMap[s]; ok {
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
return ""
|
return "未知类型"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s OrderType) GetValue() uint8 {
|
func (s OrderType) GetValue() uint8 {
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@ type OrderWechatStatus uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
OrderWechatStatusWait OrderWechatStatus = iota + 1
|
OrderWechatStatusWait OrderWechatStatus = iota + 1
|
||||||
OrderWechatStatusIng
|
|
||||||
OrderWechatStatusSuccess
|
OrderWechatStatusSuccess
|
||||||
OrderWechatStatusFail
|
OrderWechatStatusFail
|
||||||
OrderWechatStatusUse
|
OrderWechatStatusUse
|
||||||
|
|
@ -13,7 +12,6 @@ const (
|
||||||
|
|
||||||
var OrderWechatStatusMap = map[OrderWechatStatus]string{
|
var OrderWechatStatusMap = map[OrderWechatStatus]string{
|
||||||
OrderWechatStatusWait: "待发放",
|
OrderWechatStatusWait: "待发放",
|
||||||
OrderWechatStatusIng: "发放中",
|
|
||||||
OrderWechatStatusSuccess: "发放成功",
|
OrderWechatStatusSuccess: "发放成功",
|
||||||
OrderWechatStatusFail: "发放失败",
|
OrderWechatStatusFail: "发放失败",
|
||||||
OrderWechatStatusUse: "已使用",
|
OrderWechatStatusUse: "已使用",
|
||||||
|
|
@ -24,7 +22,7 @@ func (s OrderWechatStatus) GetText() string {
|
||||||
if t, ok := OrderWechatStatusMap[s]; ok {
|
if t, ok := OrderWechatStatusMap[s]; ok {
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
return ""
|
return "未知状态"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s OrderWechatStatus) GetValue() uint8 {
|
func (s OrderWechatStatus) GetValue() uint8 {
|
||||||
|
|
@ -35,10 +33,6 @@ func (s OrderWechatStatus) IsWait() bool {
|
||||||
return s == OrderWechatStatusWait
|
return s == OrderWechatStatusWait
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s OrderWechatStatus) IsIng() bool {
|
|
||||||
return s == OrderWechatStatusIng
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s OrderWechatStatus) IsSuccess() bool {
|
func (s OrderWechatStatus) IsSuccess() bool {
|
||||||
return s == OrderWechatStatusSuccess
|
return s == OrderWechatStatusSuccess
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,12 @@ package repoimpl
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
"time"
|
||||||
|
"unicode/utf8"
|
||||||
"voucher/internal/biz/bo"
|
"voucher/internal/biz/bo"
|
||||||
"voucher/internal/biz/repo"
|
"voucher/internal/biz/repo"
|
||||||
|
"voucher/internal/biz/vo"
|
||||||
"voucher/internal/data"
|
"voucher/internal/data"
|
||||||
"voucher/internal/data/model"
|
"voucher/internal/data/model"
|
||||||
)
|
)
|
||||||
|
|
@ -19,14 +23,77 @@ func NewOrderWechatRepoImpl() repo.OrderWechatRepo {
|
||||||
return &OrderWechatRepoImpl{}
|
return &OrderWechatRepoImpl{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *OrderWechatRepoImpl) Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error) {
|
func (p *OrderWechatRepoImpl) DB(ctx context.Context) *gorm.DB {
|
||||||
// todo 待实现
|
return p.db.DB(ctx).Model(model.OrderWechat{})
|
||||||
return nil, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetByID 根据 ID 获取 OrderWechat
|
func (p *OrderWechatRepoImpl) Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error) {
|
||||||
func (r *OrderWechatRepoImpl) GetByID(ctx context.Context, id int32) (*bo.OrderWechatBo, error) {
|
now := time.Now()
|
||||||
var item model.OrderWechat
|
|
||||||
// todo 待实现
|
info := &model.OrderWechat{
|
||||||
return r.ToBo(&item), nil
|
OrderNo: req.OrderNo,
|
||||||
|
OutRequestNo: req.OutRequestNo,
|
||||||
|
AppID: req.AppID,
|
||||||
|
StockCreatorMchid: req.StockCreatorMchid,
|
||||||
|
OpenID: req.OpenID,
|
||||||
|
StockID: req.StockID,
|
||||||
|
Status: vo.OrderWechatStatusWait.GetValue(),
|
||||||
|
CreateTime: &now,
|
||||||
|
UpdateTime: &now,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := p.db.DB(ctx).Create(info).Error; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.ToBo(info), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *OrderWechatRepoImpl) Success(ctx context.Context, id uint64, couponId string) error {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
res := p.db.DB(ctx).
|
||||||
|
Where(model.OrderWechat{
|
||||||
|
ID: id,
|
||||||
|
Status: vo.OrderStatusWait.GetValue(),
|
||||||
|
}).
|
||||||
|
Updates(model.OrderWechat{
|
||||||
|
Status: vo.OrderWechatStatusSuccess.GetValue(),
|
||||||
|
CouponID: couponId,
|
||||||
|
UpdateTime: &now,
|
||||||
|
})
|
||||||
|
|
||||||
|
if res.Error != nil {
|
||||||
|
return res.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *OrderWechatRepoImpl) Fail(ctx context.Context, id uint64, remark string) error {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
if utf8.RuneCountInString(remark) > 100 {
|
||||||
|
runes := []rune(remark)
|
||||||
|
if len(runes) > 100 {
|
||||||
|
remark = string(runes[:100])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
res := p.db.DB(ctx).
|
||||||
|
Where(model.OrderWechat{
|
||||||
|
ID: id,
|
||||||
|
Status: vo.OrderStatusWait.GetValue(),
|
||||||
|
}).
|
||||||
|
Updates(model.OrderWechat{
|
||||||
|
Status: vo.OrderWechatStatusFail.GetValue(),
|
||||||
|
Remark: remark,
|
||||||
|
UpdateTime: &now,
|
||||||
|
})
|
||||||
|
|
||||||
|
if res.Error != nil {
|
||||||
|
return res.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,5 +7,6 @@ import (
|
||||||
// ProviderRepoImplSet is providers.
|
// ProviderRepoImplSet is providers.
|
||||||
var ProviderRepoImplSet = wire.NewSet(
|
var ProviderRepoImplSet = wire.NewSet(
|
||||||
NewOrderRepoImpl,
|
NewOrderRepoImpl,
|
||||||
|
NewOrderWechatRepoImpl,
|
||||||
NewProductRepoImpl,
|
NewProductRepoImpl,
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -38,12 +38,6 @@ func NewConsumer(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if c := voucherService.GetQueryConfig(); c != nil {
|
|
||||||
if err := manager.Subscribe(context.Background(), cf, c, voucherService.QueryConsumer); err != nil {
|
|
||||||
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if c := voucherService.GetNotifyConfig(); c != nil {
|
if c := voucherService.GetNotifyConfig(); c != nil {
|
||||||
if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyConsumer); err != nil {
|
if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyConsumer); err != nil {
|
||||||
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
|
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue