接口调整

This commit is contained in:
李子铭 2025-03-12 17:47:31 +08:00
parent e8967efc9d
commit 9e101492b2
36 changed files with 343 additions and 878 deletions

View File

@ -57,6 +57,8 @@ message CmbOrderRequest {
string cmbUidType = 12 [json_name = "cmbUidType", (validate.rules).string = {min_len: 1,max_len: 10}]; string cmbUidType = 12 [json_name = "cmbUidType", (validate.rules).string = {min_len: 1,max_len: 10}];
// 13 // 13
string timestamp = 13 [json_name = "timestamp", (validate.rules).string = {min_len: 1,max_len: 14}]; string timestamp = 13 [json_name = "timestamp", (validate.rules).string = {min_len: 1,max_len: 14}];
//
string attach = 15 [json_name = "attach"];
} }
message CmbOrderReply { message CmbOrderReply {
// 1000 1001 // 1000 1001
@ -132,6 +134,7 @@ message CmbNotifyRequest {
string orgNo = 12 [json_name = "orgNo"]; string orgNo = 12 [json_name = "orgNo"];
// //
string ext = 13 [json_name = "ext"]; string ext = 13 [json_name = "ext"];
string attach = 14 [json_name = "attach"];
} }
message CmbNotifyReply { message CmbNotifyReply {
// 1000 1001 // 1000 1001

View File

@ -29,18 +29,6 @@ rocketMQ:
secretKey: "Z3596KCFA9RAUR6k" secretKey: "Z3596KCFA9RAUR6k"
secretToken: "" secretToken: ""
eventMap: eventMap:
order:
topic: voucher_order_create
group: voucher_order_create_group
isOpenConsumer: false #是否启动消费 true/false
PerCoroutineCnt: 2 #协程数量不配置默认为20
RetryCnt: 3 #重试次数,不配置默认38
notify:
topic: voucher_order_notify
group: voucher_order_notify_group
isOpenConsumer: false #是否启动消费 true/false
PerCoroutineCnt: 2 #协程数量不配置默认为20
RetryCnt: 3 #重试次数,不配置默认38
notifyRetry: # 重试延迟队列 notifyRetry: # 重试延迟队列
topic: voucher_order_notifyRetry topic: voucher_order_notifyRetry
group: voucher_order_notifyRetry_group group: voucher_order_notifyRetry_group

View File

@ -29,18 +29,6 @@ rocketMQ:
secretKey: "Z3596KCFA9RAUR6k" secretKey: "Z3596KCFA9RAUR6k"
secretToken: "" secretToken: ""
eventMap: eventMap:
order:
topic: voucher_order_create
group: voucher_order_create_group
isOpenConsumer: false #是否启动消费 true/false
PerCoroutineCnt: 2 #协程数量不配置默认为20
RetryCnt: 3 #重试次数,不配置默认38
notify:
topic: voucher_order_notify
group: voucher_order_notify_group
isOpenConsumer: false #是否启动消费 true/false
PerCoroutineCnt: 2 #协程数量不配置默认为20
RetryCnt: 3 #重试次数,不配置默认38
notifyRetry: # 重试延迟队列 notifyRetry: # 重试延迟队列
topic: voucher_order_notifyRetry topic: voucher_order_notifyRetry
group: voucher_order_notifyRetry_group group: voucher_order_notifyRetry_group

View File

@ -10,6 +10,7 @@ type OrderBo struct {
ID uint64 ID uint64
OrderNo string OrderNo string
OutBizNo string OutBizNo string
VoucherNo string
ProductNo string ProductNo string
BatchNo string BatchNo string
Account string Account string
@ -20,6 +21,8 @@ type OrderBo struct {
MerchantNo string MerchantNo string
NotifyUrl string NotifyUrl string
Channel vo.Channel Channel vo.Channel
Attach string
Remark string
CreateTime *time.Time CreateTime *time.Time
UpdateTime *time.Time UpdateTime *time.Time
} }
@ -31,6 +34,7 @@ type OrderCreateReqBo struct {
AppID string AppID string
Type vo.OrderType Type vo.OrderType
AccountType vo.OrderAccountType AccountType vo.OrderAccountType
Attach string
} }
type OrderCreateRepBo struct { type OrderCreateRepBo struct {

View File

@ -7,17 +7,16 @@ import (
// OrderNotifyBo 领域实体Bo结构字段和模型字段保持一致 // OrderNotifyBo 领域实体Bo结构字段和模型字段保持一致
type OrderNotifyBo struct { type OrderNotifyBo struct {
ID uint64 ID uint64
OrderNo string OrderNo string
OutRequestNo string Status vo.OrderNotifyStatus
Status vo.OrderNotifyStatus Request string
Request string Event vo.OrderNotifyEvent
Event vo.OrderNotifyEvent Channel vo.Channel
Channel vo.Channel Type vo.OrderType
Type vo.OrderType Responses string
Responses string Remark string
Remark string NotifyUrl string
NotifyUrl string CreateTime *time.Time
CreateTime *time.Time UpdateTime *time.Time
UpdateTime *time.Time
} }

View File

@ -1,22 +0,0 @@
package bo
import (
"time"
"voucher/internal/biz/vo"
)
// OrderWechatBo 领域实体Bo结构字段和模型字段保持一致
type OrderWechatBo struct {
ID uint64
OrderNo string
OutRequestNo string
AppID string
StockCreatorMchid string
OpenID string
StockID string
Status vo.OrderWechatStatus
CouponID string
Remark string
CreateTime *time.Time
UpdateTime *time.Time
}

View File

@ -12,9 +12,9 @@ import (
"voucher/internal/pkg/lock" "voucher/internal/pkg/lock"
) )
func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) { func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (voucherNo string, err error) {
c := vo.CmbOrderLockKey.BuildCache([]string{req.OutBizNo}) c := vo.CmbOrderLockKey.BuildCache([]string{req.OutBizNo, req.Type.String()})
err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error {
@ -25,7 +25,14 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or
} }
if order != nil { if order != nil {
orderNo = order.OrderNo
if order.Status.IsFail() {
if err = v.orderRetry(ctx, order); err != nil {
return err
}
}
voucherNo = order.VoucherNo
return nil return nil
} }
@ -38,17 +45,20 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or
return fmt.Errorf("只支持微信") return fmt.Errorf("只支持微信")
} }
if orderNo, err = v.Cmb.Order(ctx, req, product); err != nil { order, err = v.order(ctx, req, product)
if err != nil {
return err return err
} }
return v.PushOrderMQ(ctx, orderNo) voucherNo = order.VoucherNo
return nil
}) })
return return
} }
func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (reps *v1.CmbQueryReply, err error) { func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (resp *v1.CmbQueryReply, err error) {
c := vo.CmbQueryLockKey.BuildCache([]string{orderNo}) c := vo.CmbQueryLockKey.BuildCache([]string{orderNo})
@ -64,7 +74,7 @@ func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (reps *v1.Cmb
return err return err
} }
reps = &v1.CmbQueryReply{ resp = &v1.CmbQueryReply{
Ticket: order.OrderNo, Ticket: order.OrderNo,
Status: status.GetValue(), Status: status.GetValue(),
TransDate: time.Now().Format("20060102150405"), TransDate: time.Now().Format("20060102150405"),

View File

@ -9,43 +9,31 @@ import (
) )
type Cmb struct { type Cmb struct {
bc *conf.Bootstrap bc *conf.Bootstrap
rdb *data.Rdb rdb *data.Rdb
OrderRepo repo.OrderRepo OrderRepo repo.OrderRepo
OrderWechatRepo repo.OrderWechatRepo ProductRepo repo.ProductRepo
ProductRepo repo.ProductRepo OrderNotifyRepo repo.OrderNotifyRepo
OrderNotifyRepo repo.OrderNotifyRepo WechatCpnRepo wechatrepo.WechatCpnRepo
WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo CmbMixRepo mixrepos.CmbMixRepo
WechatCpnRepo wechatrepo.WechatCpnRepo
GenerateMixRepo mixrepos.GenerateMixRepo
CmbMixRepo mixrepos.CmbMixRepo
DingMixRepo mixrepos.DingMixRepo
} }
func NewCmb( func NewCmb(
bc *conf.Bootstrap, bc *conf.Bootstrap,
rdb *data.Rdb, rdb *data.Rdb,
orderRepo repo.OrderRepo, orderRepo repo.OrderRepo,
OrderWechatRepo repo.OrderWechatRepo,
ProductRepo repo.ProductRepo, ProductRepo repo.ProductRepo,
OrderNotifyRepo repo.OrderNotifyRepo, OrderNotifyRepo repo.OrderNotifyRepo,
WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo,
WechatCpnRepo wechatrepo.WechatCpnRepo, WechatCpnRepo wechatrepo.WechatCpnRepo,
GenerateMixRepo mixrepos.GenerateMixRepo,
CmbMixRepo mixrepos.CmbMixRepo, CmbMixRepo mixrepos.CmbMixRepo,
DingMixRepo mixrepos.DingMixRepo,
) *Cmb { ) *Cmb {
return &Cmb{ return &Cmb{
bc: bc, bc: bc,
rdb: rdb, rdb: rdb,
OrderRepo: orderRepo, OrderRepo: orderRepo,
OrderWechatRepo: OrderWechatRepo, ProductRepo: ProductRepo,
ProductRepo: ProductRepo, OrderNotifyRepo: OrderNotifyRepo,
OrderNotifyRepo: OrderNotifyRepo, WechatCpnRepo: WechatCpnRepo,
WechatNotifyRegisterTagRepo: WechatNotifyRegisterTagRepo, CmbMixRepo: CmbMixRepo,
WechatCpnRepo: WechatCpnRepo,
GenerateMixRepo: GenerateMixRepo,
CmbMixRepo: CmbMixRepo,
DingMixRepo: DingMixRepo,
} }
} }

View File

@ -3,6 +3,7 @@ package cmb
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/go-kratos/kratos/v2/log"
"time" "time"
err2 "voucher/api/err" err2 "voucher/api/err"
v1 "voucher/api/v1" v1 "voucher/api/v1"
@ -10,7 +11,50 @@ import (
"voucher/internal/biz/vo" "voucher/internal/biz/vo"
) )
func (v *Cmb) bizContent(_ context.Context, orderNotify *bo.OrderNotifyBo) (string, error) { func (v *Cmb) Notify(ctx context.Context, order *bo.OrderBo) (*bo.OrderNotifyBo, error) {
event, err := order.Status.GetOrderNotifyEvent()
if err != nil {
return nil, err
}
req := &bo.OrderNotifyBo{
OrderNo: order.OrderNo,
NotifyUrl: order.NotifyUrl,
Channel: order.Channel,
Event: event,
Type: order.Type,
}
request, orderNotify, err := v.notifyCreate(ctx, order, req)
if err != nil {
return nil, err
}
x, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl)
if err != nil {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
bizStr, err := v.CmbMixRepo.VerifyResponse(ctx, x)
if err != nil {
log.Errorf("Notify CmbMixRepo.VerifyResponse error:%s", err.Error())
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
var reply *v1.CmbNotifyReply
if err = json.Unmarshal([]byte(bizStr), &reply); err != nil {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, reply.RespMsg)
}
return orderNotify, v.notifySuccess(ctx, orderNotify.ID, bizStr)
}
func (v *Cmb) bizContent(_ context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo) (string, error) {
cmbStatus, err := orderNotify.Event.GetCmbStatusText() cmbStatus, err := orderNotify.Event.GetCmbStatusText()
if err != nil { if err != nil {
@ -22,6 +66,7 @@ func (v *Cmb) bizContent(_ context.Context, orderNotify *bo.OrderNotifyBo) (stri
Status: cmbStatus.GetValue(), Status: cmbStatus.GetValue(),
TransDate: time.Now().Format("20060102150405"), TransDate: time.Now().Format("20060102150405"),
OrgNo: v.bc.Cmb.OrgNo, OrgNo: v.bc.Cmb.OrgNo,
Attach: order.Attach,
Ext: "", Ext: "",
} }
@ -33,9 +78,9 @@ func (v *Cmb) bizContent(_ context.Context, orderNotify *bo.OrderNotifyBo) (stri
return string(bizJsonBytes), nil return string(bizJsonBytes), nil
} }
func (v *Cmb) notifyCreate(ctx context.Context, req *bo.OrderNotifyBo) (*v1.CmbRequest, *bo.OrderNotifyBo, error) { func (v *Cmb) notifyCreate(ctx context.Context, order *bo.OrderBo, req *bo.OrderNotifyBo) (*v1.CmbRequest, *bo.OrderNotifyBo, error) {
bizContent, err := v.bizContent(ctx, req) bizContent, err := v.bizContent(ctx, order, req)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -1,55 +0,0 @@
package cmb
import (
"context"
"encoding/json"
"github.com/go-kratos/kratos/v2/log"
v1 "voucher/api/v1"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
)
func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) (*bo.OrderNotifyBo, error) {
event, err := order.Status.GetOrderNotifyEvent()
if err != nil {
return nil, err
}
req := &bo.OrderNotifyBo{
OrderNo: order.OrderNo,
OutRequestNo: orderOutRequestNo,
NotifyUrl: order.NotifyUrl,
Channel: order.Channel,
Event: event,
Type: order.Type,
Request: "",
}
request, orderNotify, err := v.notifyCreate(ctx, req)
if err != nil {
return nil, err
}
x, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl)
if err != nil {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
bizStr, err := v.CmbMixRepo.VerifyResponse(ctx, x)
if err != nil {
log.Errorf("NotifyConsume CmbMixRepo.VerifyResponse error:%s", err.Error())
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
var reply *v1.CmbNotifyReply
if err = json.Unmarshal([]byte(bizStr), &reply); err != nil {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, reply.RespMsg)
}
return orderNotify, v.notifySuccess(ctx, orderNotify.ID, bizStr)
}

View File

@ -12,16 +12,15 @@ import (
func (v *Cmb) NotifyRetryConsume(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo) (*bo.OrderNotifyBo, error) { func (v *Cmb) NotifyRetryConsume(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo) (*bo.OrderNotifyBo, error) {
req := &bo.OrderNotifyBo{ req := &bo.OrderNotifyBo{
OrderNo: orderNotify.OrderNo, OrderNo: orderNotify.OrderNo,
OutRequestNo: orderNotify.OutRequestNo, NotifyUrl: order.NotifyUrl,
NotifyUrl: order.NotifyUrl, Channel: order.Channel,
Channel: order.Channel, Event: orderNotify.Event,
Event: orderNotify.Event, Type: order.Type,
Type: order.Type, Request: "",
Request: "",
} }
request, orderNotify, err := v.notifyCreate(ctx, req) request, orderNotify, err := v.notifyCreate(ctx, order, req)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -4,5 +4,5 @@ import (
"github.com/google/wire" "github.com/google/wire"
) )
// ProviderSetCmb ProviderSetCmb is biz providers. // ProviderSetCmb is biz providers.
var ProviderSetCmb = wire.NewSet(NewCmb) var ProviderSetCmb = wire.NewSet(NewCmb)

View File

@ -1,37 +0,0 @@
package cmb
import (
"context"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
"voucher/internal/pkg/uid"
)
func (v *Cmb) Order(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (orderNo string, err error) {
orderNo, err = v.GenerateMixRepo.GeneratorString(ctx, uid.Order)
if err != nil {
return
}
o := &bo.OrderBo{
OrderNo: orderNo,
OutBizNo: req.OutBizNo,
ProductNo: req.ProductNo,
Account: req.Account,
AppID: req.AppID,
MerchantNo: product.MchId,
Channel: product.Channel,
BatchNo: product.BatchNo,
NotifyUrl: v.bc.Cmb.NotifyUrl,
AccountType: vo.OrderAccountTypeOpenId,
Type: vo.OrderTypeCmb,
Status: vo.OrderStatusWait,
}
if _, err = v.OrderRepo.Create(ctx, o); err != nil {
return
}
return
}

View File

@ -1,68 +0,0 @@
package biz
import (
"context"
"fmt"
"go.opentelemetry.io/otel/trace"
errPb "voucher/api/err"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
"voucher/internal/pkg/lock"
"voucher/internal/pkg/mq"
)
func (v *VoucherBiz) PushNotifyMQ(ctx context.Context, orderNo, outRequestNo string) error {
eventMap := v.bc.RocketMQ.EventMap["notify"]
sendOption := []mq.SendOption{
mq.WithSendShardingKeysOption(fmt.Sprintf("%s_%s", orderNo, outRequestNo)),
mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()),
}
if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil {
return fmt.Errorf("notify消费队列投递失败[%v]", err)
}
return nil
}
func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequestNo string) error {
var (
err error
orderNotify *bo.OrderNotifyBo
cache = vo.NotifyConsume.BuildCache([]string{orderNo})
)
err = lock.NewMutex(v.rdb.Rdb, cache.TTL).Lock(ctx, cache.Key, func(ctx context.Context) error {
order, err2 := v.OrderRepo.GetByOrderNo(ctx, orderNo)
if err2 != nil {
return err
}
if !order.Status.CanNotify() {
return fmt.Errorf("订单状态错误,不能通知:%s", order.Status.GetText())
}
if !order.Channel.IsWeChat() {
return fmt.Errorf("暂不支持订单%s渠道%s回调通知处理", order.OrderNo, order.Channel.GetText())
}
if order.Type.IsCmb() {
if orderNotify, err2 = v.Cmb.NotifyConsume(ctx, order, orderOutRequestNo); err2 != nil {
return err
}
}
return fmt.Errorf("订单类型错误:%s", order.Type.GetText())
})
if !errPb.IsNeedRetryNotify(err) {
return err
}
// 状态回调接口失败需要支持重试 重试间隔为1分钟、2分钟、12分钟、60分钟、360分钟
// 第一次通知失败重试入队
return v.PushNotifyRetryDelayMQ(ctx, 60, orderNotify.ID)
}

View File

@ -1,4 +1,4 @@
package cmb package biz
import ( import (
"context" "context"
@ -12,43 +12,75 @@ import (
"voucher/internal/pkg/uid" "voucher/internal/pkg/uid"
) )
func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (outRequestNo string, err error) { func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (*bo.OrderBo, error) {
if !order.Status.IsWait() { order, err := v.create(ctx, req, product)
return outRequestNo, fmt.Errorf("订单状态错误,%s", order.Status.GetText())
}
if !order.Channel.IsWeChat() {
return outRequestNo, fmt.Errorf("订单渠道错误,%s", order.Channel.GetText())
}
// 注册通知标签 order.MerchantNo 批次创建商户, order.BatchNo 商品批次号
if err = v.registerNotifyTag(ctx, order.MerchantNo, order.BatchNo); err != nil {
return outRequestNo, err
}
if err = v.ing(ctx, order.ID); err != nil {
return
}
orderWechat, err := v.create(ctx, order)
if err != nil { if err != nil {
return return nil, err
} }
couponId, err := v.WechatCpnRepo.Order(ctx, orderWechat) voucherNo := ""
if err != nil { if product.ProductNo == "001" {
return outRequestNo, v.fail(ctx, order, orderWechat, err.Error()) // 压测商品
voucherNo = order.OrderNo
} else {
// 注册通知标签 order.MerchantNo 批次创建商户, order.BatchNo 商品批次号
if err = v.registerNotifyTag(ctx, order.MerchantNo, order.BatchNo); err != nil {
return nil, err
}
// 真实发放
voucherNo, err = v.WechatCpnRepo.Order(ctx, order)
if err != nil {
return nil, v.fail(ctx, order, err.Error())
}
} }
if err = v.success(ctx, order, orderWechat, couponId); err != nil { if err = v.success(ctx, order, voucherNo); err != nil {
return return nil, err
} }
return orderWechat.OutRequestNo, err return order, nil
} }
func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { func (v *VoucherBiz) orderRetry(ctx context.Context, order *bo.OrderBo) error {
voucherNo, err := v.WechatCpnRepo.Order(ctx, order)
if err != nil {
return v.fail(ctx, order, err.Error())
}
order.VoucherNo = voucherNo
return v.success(ctx, order, voucherNo)
}
func (v *VoucherBiz) create(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (*bo.OrderBo, error) {
orderNo, err := v.GenerateMixRepo.GeneratorString(ctx, uid.Order)
if err != nil {
return nil, err
}
return v.OrderRepo.Create(ctx, &bo.OrderBo{
OrderNo: orderNo,
OutBizNo: req.OutBizNo,
ProductNo: req.ProductNo,
Account: req.Account,
AppID: req.AppID,
MerchantNo: product.MchId,
Channel: product.Channel,
BatchNo: product.BatchNo,
NotifyUrl: v.bc.Cmb.NotifyUrl,
AccountType: vo.OrderAccountTypeOpenId,
Type: vo.OrderTypeCmb,
Status: vo.OrderStatusIng, // 同步发放,状态至为发放中
Attach: req.Attach,
})
}
func (v *VoucherBiz) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error {
c := vo.WechatNotifyRegisterTagCacheKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) c := vo.WechatNotifyRegisterTagCacheKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID})
@ -60,8 +92,7 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID
} }
if err != redis.Nil { if err != redis.Nil {
errMsg := fmt.Sprintf("获取redis缓存%s异常:%v", c.Key, err) return fmt.Errorf(fmt.Sprintf("获取redis缓存%s异常:%v", c.Key, err))
return fmt.Errorf(errMsg)
} }
cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID})
@ -107,7 +138,7 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID
}) })
} }
func (v *Cmb) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMchID, stockID string) (*bo.WechatNotifyRegisterTagBo, error) { func (v *VoucherBiz) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMchID, stockID string) (*bo.WechatNotifyRegisterTagBo, error) {
return v.WechatNotifyRegisterTagRepo.Create(ctx, &bo.WechatNotifyRegisterTagBo{ return v.WechatNotifyRegisterTagRepo.Create(ctx, &bo.WechatNotifyRegisterTagBo{
StockID: stockID, StockID: stockID,
StockCreatorMchID: stockCreatorMchID, StockCreatorMchID: stockCreatorMchID,
@ -115,7 +146,7 @@ func (v *Cmb) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMch
}) })
} }
func (v *Cmb) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.WechatNotifyRegisterTagBo) error { func (v *VoucherBiz) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.WechatNotifyRegisterTagBo) error {
if err := v.rdb.Rdb.Set(ctx, c.Key, wechatNotifyTag.Tag, c.TTL).Err(); err != nil { if err := v.rdb.Rdb.Set(ctx, c.Key, wechatNotifyTag.Tag, c.TTL).Err(); err != nil {
return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err))
@ -124,59 +155,26 @@ func (v *Cmb) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.Wec
return nil return nil
} }
func (v *Cmb) create(ctx context.Context, order *bo.OrderBo) (*bo.OrderWechatBo, error) { func (v *VoucherBiz) ing(ctx context.Context, id uint64) error {
outRequestNo, err := v.GenerateMixRepo.GeneratorString(ctx, uid.OrderWechat)
if err != nil {
return nil, err
}
req := &bo.OrderWechatBo{
OrderNo: order.OrderNo,
OutRequestNo: outRequestNo,
AppID: order.AppID,
StockCreatorMchid: order.MerchantNo,
OpenID: order.Account,
StockID: order.BatchNo,
Status: vo.OrderWechatStatusWait,
}
orderWechat, err := v.OrderWechatRepo.Create(ctx, req)
if err != nil {
return nil, err
}
return orderWechat, nil
}
func (v *Cmb) ing(ctx context.Context, id uint64) error {
return v.OrderRepo.Ing(ctx, id) return v.OrderRepo.Ing(ctx, id)
} }
func (v *Cmb) success(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo, couponId string) error { func (v *VoucherBiz) success(ctx context.Context, order *bo.OrderBo, voucherNo string) error {
if err := v.OrderWechatRepo.Success(ctx, orderWechat.ID, couponId); err != nil { return v.OrderRepo.Success(ctx, order.ID, voucherNo)
return err
}
return v.OrderRepo.Success(ctx, order.ID)
} }
func (v *Cmb) fail(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo, errMsg string) error { func (v *VoucherBiz) fail(ctx context.Context, order *bo.OrderBo, remark string) error {
if err := v.OrderWechatRepo.Fail(ctx, orderWechat.ID, errMsg); err != nil { if err := v.OrderRepo.Fail(ctx, order.ID, remark); err != nil {
return err return err
} }
if err := v.OrderRepo.Fail(ctx, order.ID); err != nil { return v.alarm(ctx, order, remark)
return err
}
return v.alarm(ctx, order, errMsg)
} }
func (v *Cmb) alarm(ctx context.Context, order *bo.OrderBo, errMsg string) error { func (v *VoucherBiz) alarm(ctx context.Context, order *bo.OrderBo, errMsg string) error {
// 1小时 内 指定的批次号 发放 发生错误 预警 // 1小时 内 指定的批次号 发放 发生错误 预警
c := vo.OrderConsumeFailAlarmKey.BuildCache([]string{order.ProductNo}) c := vo.OrderConsumeFailAlarmKey.BuildCache([]string{order.ProductNo})
@ -208,12 +206,11 @@ func (v *Cmb) alarm(ctx context.Context, order *bo.OrderBo, errMsg string) error
} }
// 通知 // 通知
text := v.alarmText(ctx, order, errMsg) if err = v.DingMixRepo.SendMarkdownMessage(ctx, v.alarmText(ctx, order, errMsg)); err != nil {
if err = v.DingMixRepo.SendMarkdownMessage(ctx, text); err != nil {
return err return err
} }
if err := v.rdb.Rdb.Set(ctx, c.Key, order.ProductNo, c.TTL).Err(); err != nil { if err = v.rdb.Rdb.Set(ctx, c.Key, order.ProductNo, c.TTL).Err(); err != nil {
return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err))
} }
@ -221,7 +218,7 @@ func (v *Cmb) alarm(ctx context.Context, order *bo.OrderBo, errMsg string) error
}) })
} }
func (v *Cmb) alarmText(_ context.Context, order *bo.OrderBo, errMsg string) string { func (v *VoucherBiz) alarmText(_ context.Context, order *bo.OrderBo, errMsg string) string {
remarks := fmt.Sprintf("订单号:%s商品编号:%s,原因:%s", order.OrderNo, order.ProductNo, errMsg) remarks := fmt.Sprintf("订单号:%s商品编号:%s,原因:%s", order.OrderNo, order.ProductNo, errMsg)

View File

@ -1,52 +0,0 @@
package biz
import (
"context"
"fmt"
"go.opentelemetry.io/otel/trace"
"voucher/internal/biz/vo"
"voucher/internal/pkg/lock"
"voucher/internal/pkg/mq"
)
func (v *VoucherBiz) PushOrderMQ(ctx context.Context, orderNo string) error {
eventMap := v.bc.RocketMQ.EventMap["order"]
sendOption := []mq.SendOption{
mq.WithSendShardingKeysOption(fmt.Sprintf("%s", orderNo)),
mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()),
}
if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil {
return fmt.Errorf("收单成功,消费队列投递失败[%v]", err)
}
return nil
}
func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) {
c := vo.OrderConsume.BuildCache([]string{orderNo})
return lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error {
order, err := v.OrderRepo.GetByOrderNo(ctx, orderNo)
if err != nil {
return err
}
if order.Type.IsCmb() {
outRequestNo, err := v.Cmb.OrderConsume(ctx, order)
if err != nil {
return err
}
return v.PushNotifyMQ(ctx, orderNo, outRequestNo)
}
return fmt.Errorf("订单类型错误:%s", order.Type.GetText())
})
}

View File

@ -9,11 +9,12 @@ import (
type OrderRepo interface { type OrderRepo interface {
GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error) GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error)
GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error) GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error)
GetByMBV(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error)
Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderBo, error) Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderBo, error)
GetByID(ctx context.Context, id uint64) (*bo.OrderBo, error) GetByID(ctx context.Context, id uint64) (*bo.OrderBo, error)
Ing(ctx context.Context, id uint64) error Ing(ctx context.Context, id uint64) error
Success(ctx context.Context, id uint64) error Success(ctx context.Context, id uint64, voucherNo string) error
Fail(ctx context.Context, id uint64) error Fail(ctx context.Context, id uint64, remark string) error
Used(ctx context.Context, id uint64) error Used(ctx context.Context, id uint64) error
Expired(ctx context.Context, id uint64) error Expired(ctx context.Context, id uint64) error
} }

View File

@ -1,17 +0,0 @@
package repo
import (
"context"
"voucher/internal/biz/bo"
)
type OrderWechatRepo interface {
Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error)
Success(ctx context.Context, id uint64, couponId string) error
Fail(ctx context.Context, id uint64, remark string) error
GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error)
GetLastByOrderNo(ctx context.Context, orderNo string) (*bo.OrderWechatBo, error)
GetByMSCId(ctx context.Context, mchId, stockId, couponId string) (*bo.OrderWechatBo, error)
Used(ctx context.Context, id uint64) error
Expired(ctx context.Context, id uint64) error
}

View File

@ -12,8 +12,6 @@ const (
CmbQueryLockKey CacheKey = "cmb_query" CmbQueryLockKey CacheKey = "cmb_query"
CmbProductQueryLockKey CacheKey = "cmb_product_query" CmbProductQueryLockKey CacheKey = "cmb_product_query"
OrderConsume CacheKey = "order_consume"
NotifyConsume CacheKey = "notify_consume"
NotifyRetryConsume CacheKey = "notify_retry_consume" NotifyRetryConsume CacheKey = "notify_retry_consume"
OrderConsumeFailAlarmKey CacheKey = "order_consume_fail_alarm" OrderConsumeFailAlarmKey CacheKey = "order_consume_fail_alarm"
@ -29,13 +27,11 @@ var CacheKeyMap = map[CacheKey]time.Duration{
CmbOrderLockKey: 30 * time.Second, CmbOrderLockKey: 30 * time.Second,
CmbQueryLockKey: 30 * time.Second, CmbQueryLockKey: 30 * time.Second,
CmbProductQueryLockKey: 30 * time.Second, CmbProductQueryLockKey: 30 * time.Second,
OrderConsume: 60 * time.Second,
OrderConsumeFailAlarmKey: 3600 * time.Second, // 1小时 OrderConsumeFailAlarmKey: 3600 * time.Second, // 1小时
OrderConsumeFailAlarmLockKey: 60 * time.Second, OrderConsumeFailAlarmLockKey: 60 * time.Second,
NotifyConsume: 60 * time.Second,
NotifyRetryConsume: 60 * time.Second, NotifyRetryConsume: 60 * time.Second,
WechatNotifyRegisterTagCacheKey: 86400 * time.Second, // 1天 WechatNotifyRegisterTagCacheKey: 86400 * time.Second, // 1天
WechatNotifyRegisterTagCacheLockKey: 30 * time.Second, WechatNotifyRegisterTagCacheLockKey: 60 * time.Second,
WechatNotifyConsumeLockKey: 30 * time.Second, WechatNotifyConsumeLockKey: 30 * time.Second,
} }

View File

@ -1,5 +1,9 @@
package vo package vo
import (
"fmt"
)
type OrderType uint8 type OrderType uint8
const ( const (
@ -17,6 +21,10 @@ func (s OrderType) GetText() string {
return "未知类型" return "未知类型"
} }
func (s OrderType) String() string {
return fmt.Sprintf("%d", s)
}
func (s OrderType) GetValue() uint8 { func (s OrderType) GetValue() uint8 {
return uint8(s) return uint8(s)
} }

View File

@ -1,54 +0,0 @@
package vo
type OrderWechatStatus uint8
const (
OrderWechatStatusWait OrderWechatStatus = iota + 1
OrderWechatStatusSuccess
OrderWechatStatusFail
OrderWechatStatusUse
OrderWechatStatusExpired
)
func (s OrderWechatStatus) GetValue() uint8 {
return uint8(s)
}
func (s OrderWechatStatus) IsWait() bool {
return s == OrderWechatStatusWait
}
func (s OrderWechatStatus) IsSuccess() bool {
return s == OrderWechatStatusSuccess
}
func (s OrderWechatStatus) IsFail() bool {
return s == OrderWechatStatusFail
}
func (s OrderWechatStatus) IsUse() bool {
return s == OrderWechatStatusUse
}
func (s OrderWechatStatus) IsExpired() bool {
return s == OrderWechatStatusExpired
}
func (s OrderWechatStatus) CanNotify() bool {
return s.IsSuccess() || s.IsUse() || s.IsExpired()
}
var OrderWechatStatusMap = map[OrderWechatStatus]string{
OrderWechatStatusWait: "待发放",
OrderWechatStatusSuccess: "发放成功",
OrderWechatStatusFail: "发放失败",
OrderWechatStatusUse: "已使用",
OrderWechatStatusExpired: "已过期",
}
func (s OrderWechatStatus) GetText() string {
if t, ok := OrderWechatStatusMap[s]; ok {
return t
}
return "未知状态"
}

View File

@ -10,15 +10,17 @@ import (
) )
type VoucherBiz struct { type VoucherBiz struct {
bc *conf.Bootstrap bc *conf.Bootstrap
rdb *data.Rdb rdb *data.Rdb
Cmb *cmb.Cmb Cmb *cmb.Cmb
ProductRepo repo.ProductRepo ProductRepo repo.ProductRepo
OrderRepo repo.OrderRepo OrderRepo repo.OrderRepo
OrderWechatRepo repo.OrderWechatRepo OrderNotifyRepo repo.OrderNotifyRepo
OrderNotifyRepo repo.OrderNotifyRepo WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo
MqSendMixRepo mixrepos.MQSendMixRepo MqSendMixRepo mixrepos.MQSendMixRepo
WechatCpnRepo wechatrepo.WechatCpnRepo GenerateMixRepo mixrepos.GenerateMixRepo
WechatCpnRepo wechatrepo.WechatCpnRepo
DingMixRepo mixrepos.DingMixRepo
} }
func NewVoucherBiz( func NewVoucherBiz(
@ -27,20 +29,24 @@ func NewVoucherBiz(
Cmb *cmb.Cmb, Cmb *cmb.Cmb,
ProductRepo repo.ProductRepo, ProductRepo repo.ProductRepo,
OrderRepo repo.OrderRepo, OrderRepo repo.OrderRepo,
OrderWechatRepo repo.OrderWechatRepo,
OrderNotifyRepo repo.OrderNotifyRepo, OrderNotifyRepo repo.OrderNotifyRepo,
WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo,
MqSendMixRepo mixrepos.MQSendMixRepo, MqSendMixRepo mixrepos.MQSendMixRepo,
GenerateMixRepo mixrepos.GenerateMixRepo,
WechatCpnRepo wechatrepo.WechatCpnRepo, WechatCpnRepo wechatrepo.WechatCpnRepo,
DingMixRepo mixrepos.DingMixRepo,
) *VoucherBiz { ) *VoucherBiz {
return &VoucherBiz{ return &VoucherBiz{
bc: bc, bc: bc,
rdb: rdb, rdb: rdb,
Cmb: Cmb, Cmb: Cmb,
ProductRepo: ProductRepo, ProductRepo: ProductRepo,
OrderRepo: OrderRepo, OrderRepo: OrderRepo,
OrderWechatRepo: OrderWechatRepo, OrderNotifyRepo: OrderNotifyRepo,
OrderNotifyRepo: OrderNotifyRepo, WechatNotifyRegisterTagRepo: WechatNotifyRegisterTagRepo,
MqSendMixRepo: MqSendMixRepo, MqSendMixRepo: MqSendMixRepo,
WechatCpnRepo: WechatCpnRepo, GenerateMixRepo: GenerateMixRepo,
WechatCpnRepo: WechatCpnRepo,
DingMixRepo: DingMixRepo,
} }
} }

View File

@ -4,77 +4,75 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
errPb "voucher/api/err"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/vo" "voucher/internal/biz/vo"
"voucher/internal/pkg/lock" "voucher/internal/pkg/lock"
) )
func (j *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req *bo.WechatVoucherNotifyBo) error { func (v *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req *bo.WechatVoucherNotifyBo) error {
//req.PlainText.StockCreatorMchid = "1676203838"
//req.PlainText.StockID = "20215869"
//req.PlainText.CouponID = "96059179220"
c := vo.WechatNotifyConsumeLockKey.BuildCache([]string{tag, req.PlainText.StockID, req.PlainText.CouponID}) c := vo.WechatNotifyConsumeLockKey.BuildCache([]string{tag, req.PlainText.StockID, req.PlainText.CouponID})
return lock.NewMutex(j.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { return lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error {
//req.PlainText.StockCreatorMchid = "1676203838" order, err := v.OrderRepo.GetByMBV(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID)
//req.PlainText.StockID = "20215869"
//req.PlainText.CouponID = "96059179220"
if req.PlainText.Status.IsSended() {
log.Warnf("券状态可用,忽略不处理,couponId:%s,stockId:%s,status:%s",
req.PlainText.CouponID, req.PlainText.StockID, req.PlainText.Status.GetText())
return nil
}
orderWechat, err := j.OrderWechatRepo.GetByMSCId(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID)
if err != nil { if err != nil {
return err return err
} }
order, err := j.OrderRepo.GetByOrderNo(ctx, orderWechat.OrderNo)
if err != nil {
return fmt.Errorf("根据订单号%s获取订单信息失败:%s", orderWechat.OrderNo, err.Error())
}
if req.PlainText.Status.IsUsed() { if req.PlainText.Status.IsUsed() {
if err = j.wechatVoucherUsed(ctx, order, orderWechat); err != nil {
if err = v.used(ctx, order); err != nil {
return err return err
} }
} else if req.PlainText.Status.IsExpired() { } else if req.PlainText.Status.IsExpired() {
if err = j.wechatVoucherExpired(ctx, order, orderWechat); err != nil {
if err = v.expired(ctx, order); err != nil {
return err return err
} }
} else { } else {
return fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText()) return fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText())
} }
return j.PushNotifyMQ(ctx, orderWechat.OrderNo, orderWechat.OutRequestNo) if order.Type.IsCmb() {
if orderNotify, err2 := v.Cmb.Notify(ctx, order); err2 != nil {
if !errPb.IsNeedRetryNotify(err2) {
return err2
}
// 第一次通知失败重试入队
// 状态回调接口失败需要支持重试 重试间隔为1分钟、2分钟、12分钟、60分钟、360分钟
return v.PushNotifyRetryDelayMQ(ctx, 60, orderNotify.ID)
}
}
return nil
}) })
} }
func (v *VoucherBiz) wechatVoucherUsed(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo) error { func (v *VoucherBiz) used(ctx context.Context, order *bo.OrderBo) error {
if orderWechat.Status.IsUse() { if order.Status.IsUse() {
log.Warnf("券状态已是已使用,忽略不处理,orderNo:%s", orderWechat.OrderNo) log.Warnf("券状态已是已使用,忽略不处理,orderNo:%s", order.OrderNo)
return nil return nil
} }
if err := v.OrderWechatRepo.Used(ctx, orderWechat.ID); err != nil {
return err
}
return v.OrderRepo.Used(ctx, order.ID) return v.OrderRepo.Used(ctx, order.ID)
} }
func (j *VoucherBiz) wechatVoucherExpired(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo) error { func (j *VoucherBiz) expired(ctx context.Context, order *bo.OrderBo) error {
if orderWechat.Status.IsExpired() { if order.Status.IsExpired() {
log.Warnf("券状态已是已过期,忽略不处理,orderNo:%s", orderWechat.OrderNo) log.Warnf("券状态已是已过期,忽略不处理,orderNo:%s", order.OrderNo)
return nil return nil
} }
if err := j.OrderWechatRepo.Expired(ctx, orderWechat.ID); err != nil {
return err
}
return j.OrderRepo.Expired(ctx, order.ID) return j.OrderRepo.Expired(ctx, order.ID)
} }

View File

@ -8,8 +8,8 @@ import (
) )
type WechatCpnRepo interface { type WechatCpnRepo interface {
Order(ctx context.Context, orderWechat *bo.OrderWechatBo) (couponId string, err error) Order(ctx context.Context, order *bo.OrderBo) (couponId string, err error)
Query(ctx context.Context, orderWechat *bo.OrderWechatBo) (vo.OrderWechatStatus, error) Query(ctx context.Context, order *bo.OrderBo) (vo.OrderStatus, error)
QueryProduct(ctx context.Context, stockCreatorMchId, stockId string) (*cashcoupons.Stock, error) QueryProduct(ctx context.Context, stockCreatorMchId, stockId string) (*cashcoupons.Stock, error)
RegisterNotifyTag(ctx context.Context, stockID string) error RegisterNotifyTag(ctx context.Context, stockID string) error
} }

View File

@ -14,6 +14,7 @@ const TableNameOrder = "order"
type Order struct { type Order struct {
ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"` ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"`
OrderNo string `gorm:"column:order_no;not null" json:"order_no"` OrderNo string `gorm:"column:order_no;not null" json:"order_no"`
VoucherNo string `gorm:"column:voucher_no;not null" json:"voucher_no"`
OutBizNo string `gorm:"column:out_biz_no;not null;comment:外部交易号" json:"out_biz_no"` // 外部交易号 OutBizNo string `gorm:"column:out_biz_no;not null;comment:外部交易号" json:"out_biz_no"` // 外部交易号
ProductNo string `gorm:"column:product_no;not null;comment:商品编号" json:"product_no"` // 商品编号 ProductNo string `gorm:"column:product_no;not null;comment:商品编号" json:"product_no"` // 商品编号
BatchNo string `gorm:"column:batch_no;not null;comment:立减金批次号" json:"batch_no"` // 立减金批次号 BatchNo string `gorm:"column:batch_no;not null;comment:立减金批次号" json:"batch_no"` // 立减金批次号
@ -25,6 +26,8 @@ type Order struct {
MerchantNo string `gorm:"column:merchant_no;not null;comment:创建批次号的商户号" json:"merchant_no"` // 创建批次号的商户号 MerchantNo string `gorm:"column:merchant_no;not null;comment:创建批次号的商户号" json:"merchant_no"` // 创建批次号的商户号
NotifyUrl string `gorm:"column:notify_url;not null;comment:回调地址" json:"notify_url"` NotifyUrl string `gorm:"column:notify_url;not null;comment:回调地址" json:"notify_url"`
Channel uint8 `gorm:"column:channel;not null;comment:1:微信 2:支付宝" json:"channel"` // 1:微信 2:支付宝 Channel uint8 `gorm:"column:channel;not null;comment:1:微信 2:支付宝" json:"channel"` // 1:微信 2:支付宝
Remark string `gorm:"column:remark;not null" json:"remark"`
Attach string `gorm:"column:attach;not null" json:"attach"`
CreateTime *time.Time `gorm:"column:create_time" json:"create_time"` CreateTime *time.Time `gorm:"column:create_time" json:"create_time"`
UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"` UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"`
} }

View File

@ -12,19 +12,18 @@ const TableNameOrderNotify = "order_notify"
// OrderNotify mapped from table <order_notify> // OrderNotify mapped from table <order_notify>
type OrderNotify struct { type OrderNotify struct {
ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"` ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"`
OrderNo string `gorm:"column:order_no;not null" json:"order_no"` OrderNo string `gorm:"column:order_no;not null" json:"order_no"`
OutRequestNo string `gorm:"column:out_request_no;not null" json:"out_request_no"` Status uint8 `gorm:"column:status;not null;comment:状态" json:"status"`
Status uint8 `gorm:"column:status;not null;comment:状态" json:"status"` Event uint8 `gorm:"column:event;not null;comment:event" json:"event"`
Event uint8 `gorm:"column:event;not null;comment:event" json:"event"` Channel uint8 `gorm:"column:channel;not null;comment:channel" json:"channel"`
Channel uint8 `gorm:"column:channel;not null;comment:channel" json:"channel"` Type uint8 `gorm:"column:type;not null;comment:1:招行" json:"type"`
Type uint8 `gorm:"column:type;not null;comment:1:招行" json:"type"` Request string `gorm:"column:request;not null" json:"request"`
Request string `gorm:"column:request;not null" json:"request"` Responses string `gorm:"column:responses" json:"responses"`
Responses string `gorm:"column:responses" json:"responses"` Remark string `gorm:"column:remark" json:"remark"`
Remark string `gorm:"column:remark" json:"remark"` NotifyUrl string `gorm:"column:notify_url;not null;comment:回调地址" json:"notify_url"`
NotifyUrl string `gorm:"column:notify_url;not null;comment:回调地址" json:"notify_url"` CreateTime *time.Time `gorm:"column:create_time;not null" json:"create_time"`
CreateTime *time.Time `gorm:"column:create_time;not null" json:"create_time"` UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"`
UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"`
} }
// TableName OrderNotify's table name // TableName OrderNotify's table name

View File

@ -1,32 +0,0 @@
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
// Code generated by gorm.io/gen. DO NOT EDIT.
package model
import (
"time"
)
const TableNameOrderWechat = "order_wechat"
// OrderWechat mapped from table <order_wechat>
type OrderWechat struct {
ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"`
OrderNo string `gorm:"column:order_no;not null;comment:订单号" json:"order_no"` // 订单号
OutRequestNo string `gorm:"column:out_request_no;not null;comment:请求单号" json:"out_request_no"` // 请求单号
AppID string `gorm:"column:app_id;not null;comment:微信应用id" json:"app_id"` // 微信应用id
StockCreatorMchid string `gorm:"column:stock_creator_mchid;not null;comment:批次创建方商户号\n" json:"stock_creator_mchid"` // 批次创建方商户号
OpenID string `gorm:"column:open_id;not null;comment:微信openid" json:"open_id"` // 微信openid
StockID string `gorm:"column:stock_id;not null;comment:批次id" json:"stock_id"` // 批次id
Status uint8 `gorm:"column:status;not null;comment:1:发放中 2:发放成功 3:发放失败" json:"status"` // 1:发放中 2:发放成功 3:发放失败
CouponID string `gorm:"column:coupon_id;not null;comment:微信为代金券唯一分配的id" json:"coupon_id"` // 微信为代金券唯一分配的id
Remark string `gorm:"column:remark;not null;comment:备注说明" json:"remark"` // 备注说明
CreateTime *time.Time `gorm:"column:create_time;not null" json:"create_time"`
UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"`
}
// TableName OrderWechat's table name
func (*OrderWechat) TableName() string {
return TableNameOrderWechat
}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"gorm.io/gorm" "gorm.io/gorm"
"time" "time"
"unicode/utf8"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/repo" "voucher/internal/biz/repo"
"voucher/internal/biz/vo" "voucher/internal/biz/vo"
@ -102,6 +103,22 @@ func (p *OrderRepoImpl) GetByOrderNo(ctx context.Context, orderNo string) (*bo.O
return p.ToBo(info), nil return p.ToBo(info), nil
} }
func (p *OrderRepoImpl) GetByMBV(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error) {
info := &model.Order{}
tx := p.DB(ctx).Where(model.Order{MerchantNo: merchantNo, BatchNo: batchNo, VoucherNo: voucherNo}).Find(&info)
if tx.Error != nil {
return nil, tx.Error
}
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return p.ToBo(info), nil
}
func (p *OrderRepoImpl) Ing(ctx context.Context, id uint64) error { func (p *OrderRepoImpl) Ing(ctx context.Context, id uint64) error {
now := time.Now() now := time.Now()
@ -122,7 +139,7 @@ func (p *OrderRepoImpl) Ing(ctx context.Context, id uint64) error {
return nil return nil
} }
func (p *OrderRepoImpl) Success(ctx context.Context, id uint64) error { func (p *OrderRepoImpl) Success(ctx context.Context, id uint64, voucherNo string) error {
now := time.Now() now := time.Now()
res := p.db.DB(ctx). res := p.db.DB(ctx).
@ -132,6 +149,7 @@ func (p *OrderRepoImpl) Success(ctx context.Context, id uint64) error {
}). }).
Updates(model.Order{ Updates(model.Order{
Status: vo.OrderStatusSuccess.GetValue(), Status: vo.OrderStatusSuccess.GetValue(),
VoucherNo: voucherNo,
UpdateTime: &now, UpdateTime: &now,
}) })
@ -142,9 +160,16 @@ func (p *OrderRepoImpl) Success(ctx context.Context, id uint64) error {
return nil return nil
} }
func (p *OrderRepoImpl) Fail(ctx context.Context, id uint64) error { func (p *OrderRepoImpl) Fail(ctx context.Context, id uint64, remark string) error {
now := time.Now() now := time.Now()
if utf8.RuneCountInString(remark) > 100 {
runes := []rune(remark)
if len(runes) > 100 {
remark = string(runes[:100])
}
}
res := p.db.DB(ctx). res := p.db.DB(ctx).
Where(model.Order{ Where(model.Order{
ID: id, ID: id,
@ -152,6 +177,7 @@ func (p *OrderRepoImpl) Fail(ctx context.Context, id uint64) error {
}). }).
Updates(model.Order{ Updates(model.Order{
Status: vo.OrderStatusFail.GetValue(), Status: vo.OrderStatusFail.GetValue(),
Remark: remark,
UpdateTime: &now, UpdateTime: &now,
}) })

View File

@ -58,17 +58,16 @@ func (p *OrderNotifyRepoImpl) Create(ctx context.Context, req *bo.OrderNotifyBo)
now := time.Now() now := time.Now()
info := &model.OrderNotify{ info := &model.OrderNotify{
OrderNo: req.OrderNo, OrderNo: req.OrderNo,
OutRequestNo: req.OutRequestNo, Status: vo.OrderNotifyStatusWait.GetValue(),
Status: vo.OrderNotifyStatusWait.GetValue(), Request: req.Request,
Request: req.Request, Responses: "{}",
Responses: "{}", NotifyUrl: req.NotifyUrl,
NotifyUrl: req.NotifyUrl, Channel: req.Channel.GetValue(),
Channel: req.Channel.GetValue(), Event: req.Event.GetValue(),
Event: req.Event.GetValue(), Type: req.Type.GetValue(),
Type: req.Type.GetValue(), CreateTime: &now,
CreateTime: &now, UpdateTime: &now,
UpdateTime: &now,
} }
if err := p.db.DB(ctx).Create(info).Error; err != nil { if err := p.db.DB(ctx).Create(info).Error; err != nil {

View File

@ -1,191 +0,0 @@
package repoimpl
import (
"context"
"gorm.io/gorm"
"time"
"unicode/utf8"
"voucher/internal/biz/bo"
"voucher/internal/biz/repo"
"voucher/internal/biz/vo"
"voucher/internal/data"
"voucher/internal/data/model"
)
// OrderWechatRepoImpl .
type OrderWechatRepoImpl struct {
Base[model.OrderWechat, bo.OrderWechatBo]
db *data.Db
}
// NewOrderWechatRepoImpl .
func NewOrderWechatRepoImpl(db *data.Db) repo.OrderWechatRepo {
return &OrderWechatRepoImpl{db: db}
}
func (p *OrderWechatRepoImpl) DB(ctx context.Context) *gorm.DB {
return p.db.DB(ctx).Model(model.OrderWechat{})
}
func (p *OrderWechatRepoImpl) Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error) {
now := time.Now()
info := &model.OrderWechat{
OrderNo: req.OrderNo,
OutRequestNo: req.OutRequestNo,
AppID: req.AppID,
StockCreatorMchid: req.StockCreatorMchid,
OpenID: req.OpenID,
StockID: req.StockID,
Status: vo.OrderWechatStatusWait.GetValue(),
CreateTime: &now,
UpdateTime: &now,
}
if err := p.db.DB(ctx).Create(info).Error; err != nil {
return nil, err
}
return p.ToBo(info), nil
}
func (p *OrderWechatRepoImpl) GetByMSCId(ctx context.Context, mchId, stockId, couponId string) (*bo.OrderWechatBo, error) {
info := &model.OrderWechat{}
tx := p.DB(ctx).Where(model.OrderWechat{
StockCreatorMchid: mchId,
StockID: stockId,
CouponID: couponId,
}).Find(&info)
if tx.Error != nil {
return nil, tx.Error
}
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return p.ToBo(info), nil
}
func (p *OrderWechatRepoImpl) GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) {
info := &model.OrderWechat{}
tx := p.DB(ctx).Where(model.OrderWechat{OutRequestNo: outRequestNo}).Find(&info)
if tx.Error != nil {
return nil, tx.Error
}
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return p.ToBo(info), nil
}
func (p *OrderWechatRepoImpl) GetLastByOrderNo(ctx context.Context, orderNo string) (*bo.OrderWechatBo, error) {
info := &model.OrderWechat{}
tx := p.DB(ctx).Where(model.OrderWechat{OrderNo: orderNo}).Order("id desc").Find(&info)
if tx.Error != nil {
return nil, tx.Error
}
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return p.ToBo(info), nil
}
func (p *OrderWechatRepoImpl) Success(ctx context.Context, id uint64, couponId string) error {
now := time.Now()
res := p.db.DB(ctx).
Where(model.OrderWechat{
ID: id,
Status: vo.OrderStatusWait.GetValue(),
}).
Updates(model.OrderWechat{
Status: vo.OrderWechatStatusSuccess.GetValue(),
CouponID: couponId,
UpdateTime: &now,
})
if res.Error != nil {
return res.Error
}
return nil
}
func (p *OrderWechatRepoImpl) Fail(ctx context.Context, id uint64, remark string) error {
now := time.Now()
if utf8.RuneCountInString(remark) > 100 {
runes := []rune(remark)
if len(runes) > 100 {
remark = string(runes[:100])
}
}
res := p.db.DB(ctx).
Where(model.OrderWechat{
ID: id,
Status: vo.OrderStatusWait.GetValue(),
}).
Updates(model.OrderWechat{
Status: vo.OrderWechatStatusFail.GetValue(),
Remark: remark,
UpdateTime: &now,
})
if res.Error != nil {
return res.Error
}
return nil
}
func (p *OrderWechatRepoImpl) Used(ctx context.Context, id uint64) error {
now := time.Now()
res := p.db.DB(ctx).
Where(model.OrderWechat{
ID: id,
Status: vo.OrderWechatStatusSuccess.GetValue(),
}).
Updates(model.OrderWechat{
Status: vo.OrderWechatStatusUse.GetValue(),
UpdateTime: &now,
})
if res.Error != nil {
return res.Error
}
return nil
}
func (p *OrderWechatRepoImpl) Expired(ctx context.Context, id uint64) error {
now := time.Now()
res := p.db.DB(ctx).
Where(model.OrderWechat{
ID: id,
Status: vo.OrderWechatStatusSuccess.GetValue(),
}).
Updates(model.OrderWechat{
Status: vo.OrderWechatStatusExpired.GetValue(),
UpdateTime: &now,
})
if res.Error != nil {
return res.Error
}
return nil
}

View File

@ -7,7 +7,6 @@ import (
// ProviderRepoImplSet is providers. // ProviderRepoImplSet is providers.
var ProviderRepoImplSet = wire.NewSet( var ProviderRepoImplSet = wire.NewSet(
NewOrderRepoImpl, NewOrderRepoImpl,
NewOrderWechatRepoImpl,
NewProductRepoImpl, NewProductRepoImpl,
NewOrderNotifyRepoImpl, NewOrderNotifyRepoImpl,
NewWechatNotifyRegisterTagRepoImpl, NewWechatNotifyRegisterTagRepoImpl,

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/wechatpay-apiv3/wechatpay-go/core" "github.com/wechatpay-apiv3/wechatpay-go/core"
"github.com/wechatpay-apiv3/wechatpay-go/services/cashcoupons" "github.com/wechatpay-apiv3/wechatpay-go/services/cashcoupons"
"io" "io"
@ -34,15 +35,15 @@ func NewCpnRepoImpl(bc *conf.Bootstrap) (wechatrepo.WechatCpnRepo, error) {
return &CpnRepoImpl{bc: bc, Server: server}, nil return &CpnRepoImpl{bc: bc, Server: server}, nil
} }
func (c *CpnRepoImpl) Order(ctx context.Context, orderWechat *bo.OrderWechatBo) (couponId string, err error) { func (c *CpnRepoImpl) Order(ctx context.Context, order *bo.OrderBo) (couponId string, err error) {
req := cashcoupons.SendCouponRequest{ req := cashcoupons.SendCouponRequest{
OutRequestNo: core.String(orderWechat.OutRequestNo), OutRequestNo: core.String(order.OrderNo),
// 微信为发券方商户分配的公众账号ID接口传入的所有appid应该为公众号的appid在mp.weixin.qq.com申请的不能为APP的appid在open.weixin.qq.com申请的 // 微信为发券方商户分配的公众账号ID接口传入的所有appid应该为公众号的appid在mp.weixin.qq.com申请的不能为APP的appid在open.weixin.qq.com申请的
Appid: core.String(orderWechat.AppID), Appid: core.String(order.AppID),
Openid: core.String(orderWechat.OpenID), Openid: core.String(order.Account),
StockId: core.String(orderWechat.StockID), StockId: core.String(order.BatchNo),
StockCreatorMchid: core.String(orderWechat.StockCreatorMchid), StockCreatorMchid: core.String(order.MerchantNo),
} }
client, err := data.GetClient(ctx, c.Server) client, err := data.GetClient(ctx, c.Server)
@ -61,17 +62,19 @@ func (c *CpnRepoImpl) Order(ctx context.Context, orderWechat *bo.OrderWechatBo)
return return
} }
log.Errorf("请求微信返回错误=%s", string(bodyBytes))
if err = json.Unmarshal(bodyBytes, &ErrBody); err != nil { if err = json.Unmarshal(bodyBytes, &ErrBody); err != nil {
return return
} }
err = fmt.Errorf("微信返回错误=%s", ErrBody.Message) err = fmt.Errorf(ErrBody.Message)
return return
} }
if result.Response.StatusCode != CodeSuccess { if result.Response.StatusCode != CodeSuccess {
err = fmt.Errorf("Order微信返回错误StatusCode[%d]Status[%s]", result.Response.StatusCode, result.Response.Status) err = fmt.Errorf("请求错误")
return return
} }
@ -80,12 +83,12 @@ func (c *CpnRepoImpl) Order(ctx context.Context, orderWechat *bo.OrderWechatBo)
return return
} }
func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderWechatBo) (vo.OrderWechatStatus, error) { func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderBo) (vo.OrderStatus, error) {
req := cashcoupons.QueryCouponRequest{ req := cashcoupons.QueryCouponRequest{
CouponId: core.String(orderWechat.CouponID), CouponId: core.String(orderWechat.VoucherNo),
Appid: core.String(orderWechat.AppID), Appid: core.String(orderWechat.AppID),
Openid: core.String(orderWechat.OpenID), Openid: core.String(orderWechat.Account),
} }
client, err := data.GetClient(ctx, c.Server) client, err := data.GetClient(ctx, c.Server)
@ -97,21 +100,25 @@ func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderWechatBo)
resp, result, err := svc.QueryCoupon(ctx, req) resp, result, err := svc.QueryCoupon(ctx, req)
if err != nil { if err != nil {
bodyBytes, err := io.ReadAll(result.Response.Body) bodyBytes, err := io.ReadAll(result.Response.Body)
if err != nil { if err != nil {
return 0, err return 0, err
} }
log.Errorf("请求微信返回错误=%s", string(bodyBytes))
if err = json.Unmarshal(bodyBytes, &ErrBody); err != nil { if err = json.Unmarshal(bodyBytes, &ErrBody); err != nil {
return 0, err return 0, err
} }
err = fmt.Errorf(ErrBody.Message)
return 0, fmt.Errorf("微信返回错误=%s", ErrBody.Message) return 0, fmt.Errorf("微信返回错误=%s", ErrBody.Message)
} }
if result.Response.StatusCode != CodeSuccess { if result.Response.StatusCode != CodeSuccess {
err = fmt.Errorf("Query微信返回错误StatusCode[%d]Status[%s]", result.Response.StatusCode, result.Response.Status) return 0, fmt.Errorf("请求错误")
return 0, err
} }
return CpnStatus(*resp.Status).GetStatus() return CpnStatus(*resp.Status).GetStatus()

View File

@ -32,18 +32,6 @@ func NewConsumer(
SecretKey: conf.RocketMQ.SecretKey, SecretKey: conf.RocketMQ.SecretKey,
} }
if c := voucherService.GetOrderConfig(); c != nil {
if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderConsumer); err != nil {
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
}
}
if c := voucherService.GetNotifyConfig(); c != nil {
if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyConsumer); err != nil {
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
}
}
if c := voucherService.GetNotifyRetryConfig(); c != nil { if c := voucherService.GetNotifyRetryConfig(); c != nil {
if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyRetryConsumer); err != nil { if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyRetryConsumer); err != nil {
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))

View File

@ -18,7 +18,7 @@ func (s *VoucherService) CmbOrder(ctx http.Context) error {
bizReply *v1.CmbOrderReply bizReply *v1.CmbOrderReply
) )
orderNo, err := s.cmbOrder(ctx) voucherNo, err := s.cmbOrder(ctx)
if err != nil { if err != nil {
log.Errorf("cmbOrder error: %v", err) log.Errorf("cmbOrder error: %v", err)
@ -31,7 +31,7 @@ func (s *VoucherService) CmbOrder(ctx http.Context) error {
bizReply = &v1.CmbOrderReply{ bizReply = &v1.CmbOrderReply{
RespCode: vo.CmbResponseStatusSuccess.GetValue(), RespCode: vo.CmbResponseStatusSuccess.GetValue(),
RespMsg: "成功", RespMsg: "成功",
CodeNo: orderNo, CodeNo: voucherNo,
} }
} }
@ -81,16 +81,17 @@ func (s *VoucherService) cmbOrder(ctx http.Context) (string, error) {
ProductNo: bizContent.ActivityId, ProductNo: bizContent.ActivityId,
Account: bizContent.CmbUid, Account: bizContent.CmbUid,
AppID: bizContent.AppId, AppID: bizContent.AppId,
Attach: bizContent.Attach,
AccountType: vo.OrderAccountTypeOpenId, AccountType: vo.OrderAccountTypeOpenId,
Type: vo.OrderTypeCmb, Type: vo.OrderTypeCmb,
} }
orderNo, err := s.VoucherBiz.CmbOrder(ctx, boReq) voucherNo, err := s.VoucherBiz.CmbOrder(ctx, boReq)
if err != nil { if err != nil {
return "", err return "", err
} }
return orderNo, nil return voucherNo, nil
} }
func (s *VoucherService) CmbQuery(ctx http.Context) error { func (s *VoucherService) CmbQuery(ctx http.Context) error {

View File

@ -5,78 +5,9 @@ import (
"errors" "errors"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"strconv" "strconv"
"strings"
"voucher/internal/pkg/mq" "voucher/internal/pkg/mq"
) )
func (j *VoucherService) GetOrderConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["order"]
if !ok {
return nil
}
if !elm.IsOpenConsumer {
log.Warnf("order MQ is not open")
return nil
}
return &mq.ConsumerConfig{
TopicName: elm.Topic,
GroupName: elm.Group,
PerCoroutineCnt: int(elm.PerCoroutineCnt),
}
}
func (j *VoucherService) OrderConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
orderNo := msg.GetShardingKey()
if orderNo == "" {
log.Error("order 消费异常,获取 orderNo 失败")
return errors.New("order 消费异常,获取 orderNo 失败")
}
if err := j.VoucherBiz.OrderConsume(ctx, orderNo); err != nil {
log.Errorf("order 消费异常,orderNo:%s,error: %s", orderNo, err.Error())
}
return nil
}
func (j *VoucherService) GetNotifyConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["notify"]
if !ok {
return nil
}
if !elm.IsOpenConsumer {
log.Warnf("notify MQ is not open")
return nil
}
return &mq.ConsumerConfig{
TopicName: elm.Topic,
GroupName: elm.Group,
PerCoroutineCnt: int(elm.PerCoroutineCnt),
}
}
func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
shardingKey := msg.GetShardingKey()
if shardingKey == "" {
log.Error("notify 消费异常,获取 shardingKey 失败")
return errors.New("orderNotify 消费异常,获取 orderNo 失败")
}
rep := strings.Split(shardingKey, "_")
if err := j.VoucherBiz.NotifyConsume(ctx, rep[0], rep[1]); err != nil {
log.Errorf("notify 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error())
}
return nil
}
func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig { func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"] elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"]
if !ok { if !ok {

View File

@ -3,16 +3,26 @@ package service
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/go-kratos/kratos/v2/log"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
) )
func (j *VoucherService) WechatNotifyConsumer(ctx context.Context, tag, msg string) error { func (v *VoucherService) WechatNotifyConsumer(ctx context.Context, tag, msg string) error {
var x *bo.WechatVoucherNotifyBo var req *bo.WechatVoucherNotifyBo
if err := json.Unmarshal([]byte(msg), &x); err != nil { if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err return err
} }
return j.VoucherBiz.WechatNotifyConsumer(ctx, tag, x) if req.PlainText.Status.IsSended() {
log.Warnf("券状态可用,忽略不处理,couponId:%s,stockId:%s,status:%s",
req.PlainText.CouponID,
req.PlainText.StockID,
req.PlainText.Status.GetText(),
)
return nil
}
return v.VoucherBiz.WechatNotifyConsumer(ctx, tag, req)
} }