diff --git a/cmd/server/wire_gen.go b/cmd/server/wire_gen.go index e7d535a..d959d06 100644 --- a/cmd/server/wire_gen.go +++ b/cmd/server/wire_gen.go @@ -35,22 +35,22 @@ func wireApp(bootstrap *conf.Bootstrap, logger log.Logger, accessLogger *log2.Ac return nil, nil, err } orderRepo := repoimpl.NewOrderRepoImpl() + orderWechatRepo := repoimpl.NewOrderWechatRepoImpl() productRepo := repoimpl.NewProductRepoImpl() + wechatCpnRepo, err := wechatrepoimpl.NewCpnRepoImpl(bootstrap) + if err != nil { + cleanup() + return nil, nil, err + } + generateMixRepo := mixrepoimpl.NewGenerateMixRepoImpl(rdb) + cmbCmb := cmb.NewCmb(rdb, orderRepo, orderWechatRepo, productRepo, wechatCpnRepo, generateMixRepo) rocketMQ, cleanup2, err := data.NewRocketMQ(bootstrap) if err != nil { cleanup() return nil, nil, err } mqSendMixRepo := mixrepoimpl.NewMQSendMixRepoImpl(rocketMQ) - wechatCpnRepo, err := wechatrepoimpl.NewCpnRepoImpl(bootstrap) - if err != nil { - cleanup2() - cleanup() - return nil, nil, err - } - generateMixRepo := mixrepoimpl.NewGenerateMixRepoImpl(rdb) - cmbCmb := cmb.NewCmb(bootstrap, rdb, orderRepo, productRepo, mqSendMixRepo, wechatCpnRepo, generateMixRepo) - voucherBiz := biz.NewVoucherBiz(rdb, cmbCmb, orderRepo) + voucherBiz := biz.NewVoucherBiz(bootstrap, rdb, cmbCmb, orderRepo, mqSendMixRepo) voucherService := service.NewVoucherService(bootstrap, voucherBiz) httpServer := server.NewHTTPServer(bootstrap, helper, accessLogger, voucherService) consumer := server.NewConsumer(helper, bootstrap, voucherService) diff --git a/configs/config.yaml b/configs/config.yaml index 6812577..3bb139a 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -35,12 +35,6 @@ rocketMQ: isOpenConsumer: false #是否启动消费 true/false PerCoroutineCnt: 5 #协程数量,不配置默认为20 RetryCnt: 3 #重试次数,不配置默认38 - query: - topic: voucher_order_query - group: voucher_order_query_group - isOpenConsumer: false #是否启动消费 true/false - PerCoroutineCnt: 5 #协程数量,不配置默认为20 - RetryCnt: 3 #重试次数,不配置默认38 notify: topic: voucher_order_notify group: voucher_order_notify_group diff --git a/internal/biz/cmb/cmb.go b/internal/biz/cmb/cmb.go index e99fd78..d23712e 100644 --- a/internal/biz/cmb/cmb.go +++ b/internal/biz/cmb/cmb.go @@ -10,6 +10,7 @@ import ( type Cmb struct { rdb *data.Rdb OrderRepo repo.OrderRepo + OrderWechatRepo repo.OrderWechatRepo ProductRepo repo.ProductRepo WechatCpnRepo wechatrepo.WechatCpnRepo GenerateMixRepo mixrepos.GenerateMixRepo @@ -18,6 +19,7 @@ type Cmb struct { func NewCmb( rdb *data.Rdb, orderRepo repo.OrderRepo, + OrderWechatRepo repo.OrderWechatRepo, ProductRepo repo.ProductRepo, WechatCpnRepo wechatrepo.WechatCpnRepo, GenerateMixRepo mixrepos.GenerateMixRepo, @@ -25,6 +27,7 @@ func NewCmb( return &Cmb{ rdb: rdb, OrderRepo: orderRepo, + OrderWechatRepo: OrderWechatRepo, ProductRepo: ProductRepo, WechatCpnRepo: WechatCpnRepo, GenerateMixRepo: GenerateMixRepo, diff --git a/internal/biz/cmb/consume.go b/internal/biz/cmb/consume.go index 287b694..db31775 100644 --- a/internal/biz/cmb/consume.go +++ b/internal/biz/cmb/consume.go @@ -2,11 +2,80 @@ package cmb import ( "context" + "fmt" "voucher/internal/biz/bo" + "voucher/internal/biz/vo" + "voucher/internal/pkg/uid" ) -func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (err error) { - return +func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (outRequestNo string, err error) { + + if order.Status.IsWait() { + return "", fmt.Errorf("订单状态错误,%s", order.Status.GetText()) + } + + if err = v.ing(ctx, order.ID); err != nil { + return + } + + orderWechat, err := v.create(ctx, order) + if err != nil { + return + } + + couponId, err := v.WechatCpnRepo.Order(ctx, orderWechat) + if err != nil { + return "", v.fail(ctx, order, orderWechat, err.Error()) + } + + err = v.success(ctx, order, orderWechat, couponId) + if err != nil { + return + } + + return orderWechat.OutRequestNo, nil +} + +func (v *Cmb) create(ctx context.Context, order *bo.OrderBo) (*bo.OrderWechatBo, error) { + outRequestNo, err := v.GenerateMixRepo.GeneratorString(ctx, uid.OrderWechat) + if err != nil { + return nil, err + } + + req := &bo.OrderWechatBo{ + OrderNo: order.OrderNo, + OutRequestNo: outRequestNo, + AppID: order.AppID, + StockCreatorMchid: order.MerchantNo, + OpenID: order.Account, + StockID: order.ProductNo, + Status: vo.OrderWechatStatusWait, + } + + orderWechat, err := v.OrderWechatRepo.Create(ctx, req) + if err != nil { + return nil, err + } + + return orderWechat, nil +} + +func (v *Cmb) ing(ctx context.Context, id uint64) error { + return v.OrderRepo.Ing(ctx, id) +} + +func (v *Cmb) success(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo, couponId string) error { + if err := v.OrderWechatRepo.Success(ctx, orderWechat.ID, couponId); err != nil { + return err + } + return v.OrderRepo.Success(ctx, order.ID) +} + +func (v *Cmb) fail(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo, remarks string) error { + if err := v.OrderWechatRepo.Fail(ctx, orderWechat.ID, remarks); err != nil { + return err + } + return v.OrderRepo.Fail(ctx, order.ID) } func (v *Cmb) QueryConsume(ctx context.Context, order *bo.OrderBo) (err error) { diff --git a/internal/biz/consume.go b/internal/biz/consume.go index f53244c..b7dd089 100644 --- a/internal/biz/consume.go +++ b/internal/biz/consume.go @@ -40,6 +40,21 @@ func (v *VoucherBiz) PushQueryDelayMQ(ctx context.Context, orderNo string) error return nil } +func (v *VoucherBiz) PushNotifyMQ(ctx context.Context, orderNo, outRequestNo string) error { + + eventMap := v.bc.RocketMQ.EventMap["notify"] + sendOption := []mq.SendOption{ + mq.WithSendShardingKeysOption(fmt.Sprintf("%s_%s", orderNo, outRequestNo)), + mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), + } + + if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { + return fmt.Errorf("notify,消费队列投递失败[%v]", err) + } + + return nil +} + func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) { err = lock.NewMutex(v.rdb.Rdb, time.Second*30).Lock(ctx, fmt.Sprintf("order_consume_%s", orderNo), func(ctx context.Context) error { @@ -50,10 +65,16 @@ func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err erro } if order.Type.IsCmb() { - return v.Cmb.OrderConsume(ctx, order) + + outRequestNo, err := v.Cmb.OrderConsume(ctx, order) + if err != nil { + return err + } + return v.PushNotifyMQ(ctx, orderNo, outRequestNo) + } - return nil + return fmt.Errorf("订单类型错误:%s", order.Type.GetText()) }) return diff --git a/internal/biz/repo/order_wechat.go b/internal/biz/repo/order_wechat.go index 3a1332f..321922b 100644 --- a/internal/biz/repo/order_wechat.go +++ b/internal/biz/repo/order_wechat.go @@ -6,8 +6,7 @@ import ( ) type OrderWechatRepo interface { - // Create 创建 OrderWechat Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error) - // GetByID 根据 ID 获取 OrderWechat - GetByID(ctx context.Context, id int32) (*bo.OrderWechatBo, error) + Success(ctx context.Context, id uint64, couponId string) error + Fail(ctx context.Context, id uint64, remark string) error } diff --git a/internal/biz/vo/channel.go b/internal/biz/vo/channel.go index 374d9d4..bd1d63a 100644 --- a/internal/biz/vo/channel.go +++ b/internal/biz/vo/channel.go @@ -16,7 +16,7 @@ func (s Channel) GetText() string { if t, ok := OrderChannelMap[s]; ok { return t } - return "" + return "未知商品渠道类型" } func (s Channel) GetValue() uint8 { diff --git a/internal/biz/vo/order_account_type.go b/internal/biz/vo/order_account_type.go index f5ceb92..6e6889e 100644 --- a/internal/biz/vo/order_account_type.go +++ b/internal/biz/vo/order_account_type.go @@ -16,7 +16,7 @@ func (s OrderAccountType) GetText() string { if t, ok := OrderAccountTypeMap[s]; ok { return t } - return "" + return "未知账号类型" } func (s OrderAccountType) GetValue() uint8 { diff --git a/internal/biz/vo/order_status.go b/internal/biz/vo/order_status.go index 8f15ff7..c013385 100644 --- a/internal/biz/vo/order_status.go +++ b/internal/biz/vo/order_status.go @@ -24,7 +24,7 @@ func (s OrderStatus) GetText() string { if t, ok := OrderStatusMap[s]; ok { return t } - return "" + return "未知状态" } func (s OrderStatus) GetValue() uint8 { diff --git a/internal/biz/vo/order_type.go b/internal/biz/vo/order_type.go index a0664f6..9fd8fdb 100644 --- a/internal/biz/vo/order_type.go +++ b/internal/biz/vo/order_type.go @@ -14,7 +14,7 @@ func (s OrderType) GetText() string { if t, ok := OrderTypeMap[s]; ok { return t } - return "" + return "未知类型" } func (s OrderType) GetValue() uint8 { diff --git a/internal/biz/vo/order_wechant_status.go b/internal/biz/vo/order_wechant_status.go index 7041722..24b4bbb 100644 --- a/internal/biz/vo/order_wechant_status.go +++ b/internal/biz/vo/order_wechant_status.go @@ -4,7 +4,6 @@ type OrderWechatStatus uint8 const ( OrderWechatStatusWait OrderWechatStatus = iota + 1 - OrderWechatStatusIng OrderWechatStatusSuccess OrderWechatStatusFail OrderWechatStatusUse @@ -13,7 +12,6 @@ const ( var OrderWechatStatusMap = map[OrderWechatStatus]string{ OrderWechatStatusWait: "待发放", - OrderWechatStatusIng: "发放中", OrderWechatStatusSuccess: "发放成功", OrderWechatStatusFail: "发放失败", OrderWechatStatusUse: "已使用", @@ -24,7 +22,7 @@ func (s OrderWechatStatus) GetText() string { if t, ok := OrderWechatStatusMap[s]; ok { return t } - return "" + return "未知状态" } func (s OrderWechatStatus) GetValue() uint8 { @@ -35,10 +33,6 @@ func (s OrderWechatStatus) IsWait() bool { return s == OrderWechatStatusWait } -func (s OrderWechatStatus) IsIng() bool { - return s == OrderWechatStatusIng -} - func (s OrderWechatStatus) IsSuccess() bool { return s == OrderWechatStatusSuccess } diff --git a/internal/data/repoimpl/order_wechat.go b/internal/data/repoimpl/order_wechat.go index 8673af3..c16ca55 100644 --- a/internal/data/repoimpl/order_wechat.go +++ b/internal/data/repoimpl/order_wechat.go @@ -2,8 +2,12 @@ package repoimpl import ( "context" + "gorm.io/gorm" + "time" + "unicode/utf8" "voucher/internal/biz/bo" "voucher/internal/biz/repo" + "voucher/internal/biz/vo" "voucher/internal/data" "voucher/internal/data/model" ) @@ -19,14 +23,77 @@ func NewOrderWechatRepoImpl() repo.OrderWechatRepo { return &OrderWechatRepoImpl{} } -func (r *OrderWechatRepoImpl) Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error) { - // todo 待实现 - return nil, nil +func (p *OrderWechatRepoImpl) DB(ctx context.Context) *gorm.DB { + return p.db.DB(ctx).Model(model.OrderWechat{}) } -// GetByID 根据 ID 获取 OrderWechat -func (r *OrderWechatRepoImpl) GetByID(ctx context.Context, id int32) (*bo.OrderWechatBo, error) { - var item model.OrderWechat - // todo 待实现 - return r.ToBo(&item), nil +func (p *OrderWechatRepoImpl) Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error) { + now := time.Now() + + info := &model.OrderWechat{ + OrderNo: req.OrderNo, + OutRequestNo: req.OutRequestNo, + AppID: req.AppID, + StockCreatorMchid: req.StockCreatorMchid, + OpenID: req.OpenID, + StockID: req.StockID, + Status: vo.OrderWechatStatusWait.GetValue(), + CreateTime: &now, + UpdateTime: &now, + } + + if err := p.db.DB(ctx).Create(info).Error; err != nil { + return nil, err + } + + return p.ToBo(info), nil +} + +func (p *OrderWechatRepoImpl) Success(ctx context.Context, id uint64, couponId string) error { + now := time.Now() + + res := p.db.DB(ctx). + Where(model.OrderWechat{ + ID: id, + Status: vo.OrderStatusWait.GetValue(), + }). + Updates(model.OrderWechat{ + Status: vo.OrderWechatStatusSuccess.GetValue(), + CouponID: couponId, + UpdateTime: &now, + }) + + if res.Error != nil { + return res.Error + } + + return nil +} + +func (p *OrderWechatRepoImpl) Fail(ctx context.Context, id uint64, remark string) error { + now := time.Now() + + if utf8.RuneCountInString(remark) > 100 { + runes := []rune(remark) + if len(runes) > 100 { + remark = string(runes[:100]) + } + } + + res := p.db.DB(ctx). + Where(model.OrderWechat{ + ID: id, + Status: vo.OrderStatusWait.GetValue(), + }). + Updates(model.OrderWechat{ + Status: vo.OrderWechatStatusFail.GetValue(), + Remark: remark, + UpdateTime: &now, + }) + + if res.Error != nil { + return res.Error + } + + return nil } diff --git a/internal/data/repoimpl/provider_set.go b/internal/data/repoimpl/provider_set.go index 32a7c2b..333465c 100644 --- a/internal/data/repoimpl/provider_set.go +++ b/internal/data/repoimpl/provider_set.go @@ -7,5 +7,6 @@ import ( // ProviderRepoImplSet is providers. var ProviderRepoImplSet = wire.NewSet( NewOrderRepoImpl, + NewOrderWechatRepoImpl, NewProductRepoImpl, ) diff --git a/internal/server/consume.go b/internal/server/consume.go index 9545f1b..52c5c75 100644 --- a/internal/server/consume.go +++ b/internal/server/consume.go @@ -38,12 +38,6 @@ func NewConsumer( } } - if c := voucherService.GetQueryConfig(); c != nil { - if err := manager.Subscribe(context.Background(), cf, c, voucherService.QueryConsumer); err != nil { - panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) - } - } - if c := voucherService.GetNotifyConfig(); c != nil { if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyConsumer); err != nil { panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))