This commit is contained in:
李子铭 2025-03-11 13:57:23 +08:00
parent 982e1c3877
commit 4350b11ec6
24 changed files with 537 additions and 120 deletions

View File

@ -23,3 +23,9 @@ enum Err {
//
COMMON = 4 [(errors.code) = 555];
}
enum NotifyConsumeErr{
option (errors.default_code) = 1;
//
NeedRetryNotify = 0 [(errors.code) = 500];
}

View File

@ -41,6 +41,12 @@ rocketMQ:
isOpenConsumer: false #是否启动消费 true/false
PerCoroutineCnt: 2 #协程数量不配置默认为20
RetryCnt: 3 #重试次数,不配置默认38
notifyRetry: # 重试延迟队列
topic: voucher_order_notifyRetry
group: voucher_order_notifyRetry_group
isOpenConsumer: false #是否启动消费 true/false
PerCoroutineCnt: 2 #协程数量不配置默认为20
RetryCnt: 3 #重试次数,不配置默认38
wechatNotifyMQ:
accessKeyId: "LTAI5tPyV7FynQNTfEvbEBuX"

View File

@ -41,6 +41,12 @@ rocketMQ:
isOpenConsumer: false #是否启动消费 true/false
PerCoroutineCnt: 2 #协程数量不配置默认为20
RetryCnt: 3 #重试次数,不配置默认38
notifyRetry: # 重试延迟队列
topic: voucher_order_notifyRetry
group: voucher_order_notifyRetry_group
isOpenConsumer: false #是否启动消费 true/false
PerCoroutineCnt: 2 #协程数量不配置默认为20
RetryCnt: 3 #重试次数,不配置默认38
wechatNotifyMQ:
accessKeyId: "LTAI5tPyV7FynQNTfEvbEBuX"

View File

@ -12,6 +12,9 @@ type OrderNotifyBo struct {
OutRequestNo string
Status vo.OrderNotifyStatus
Request string
Event vo.OrderNotifyEvent
Channel vo.Channel
Type vo.OrderType
Responses string
Remark string
NotifyUrl string

View File

@ -54,18 +54,18 @@ func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (reps *v1.Cmb
err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error {
orderWechat, err := v.OrderWechatRepo.GetLastByOrderNo(ctx, orderNo)
order, err := v.OrderRepo.GetByOrderNo(ctx, orderNo)
if err != nil {
return err
}
status, err := orderWechat.Status.GetCmbStatusText()
status, err := order.Status.GetCmbStatusText()
if err != nil {
return err
}
reps = &v1.CmbQueryReply{
Ticket: orderWechat.OrderNo,
Ticket: order.OrderNo,
Status: status.GetValue(),
TransDate: time.Now().Format("20060102150405"),
OrgNo: v.bc.Cmb.OrgNo,

View File

@ -0,0 +1,78 @@
package cmb
import (
"context"
"encoding/json"
"time"
err2 "voucher/api/err"
v1 "voucher/api/v1"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
)
func (v *Cmb) bizContent(_ context.Context, orderNotify *bo.OrderNotifyBo) (string, error) {
cmbStatus, err := orderNotify.Event.GetCmbStatusText()
if err != nil {
return "", err
}
req := &v1.CmbNotifyRequest{
Ticket: orderNotify.OrderNo,
Status: cmbStatus.GetValue(),
TransDate: time.Now().Format("20060102150405"),
OrgNo: v.bc.Cmb.OrgNo,
Ext: "",
}
bizJsonBytes, err := json.Marshal(req)
if err != nil {
return "", err
}
return string(bizJsonBytes), nil
}
func (v *Cmb) notifyCreate(ctx context.Context, req *bo.OrderNotifyBo) (*v1.CmbRequest, *bo.OrderNotifyBo, error) {
bizContent, err := v.bizContent(ctx, req)
if err != nil {
return nil, nil, err
}
request, err := v.CmbMixRepo.GetRequest(ctx, &bo.CmbRequestBo{
FuncName: vo.CmbNotifyFuncName,
BizContent: bizContent,
})
if err != nil {
return nil, nil, err
}
requestBytes, err := json.Marshal(request)
if err != nil {
return nil, nil, err
}
req.Request = string(requestBytes)
orderNotify, err := v.OrderNotifyRepo.Create(ctx, req)
if err != nil {
return nil, nil, err
}
return request, orderNotify, err
}
func (v *Cmb) notifySuccess(ctx context.Context, notifyId uint64, bizStr string) error {
return v.OrderNotifyRepo.Success(ctx, notifyId, bizStr)
}
func (v *Cmb) notifyFail(ctx context.Context, notifyId uint64, errMsg string) error {
if err := v.OrderNotifyRepo.Fail(ctx, notifyId, errMsg); err != nil {
return err
}
return err2.ErrorNeedRetryNotify(errMsg)
}

View File

@ -3,109 +3,53 @@ package cmb
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"time"
v1 "voucher/api/v1"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
)
func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) error {
func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) (*bo.OrderNotifyBo, error) {
if !order.Channel.IsWeChat() {
return fmt.Errorf("暂不支持订单%s渠道%s回调通知处理", order.OrderNo, order.Channel.GetText())
}
orderWechat, err := v.orderWechat(ctx, order, orderOutRequestNo)
event, err := order.Status.GetOrderNotifyEvent()
if err != nil {
return err
return nil, err
}
bizContent, err := v.bizContent(ctx, orderWechat)
if err != nil {
return err
}
request, err := v.CmbMixRepo.GetRequest(ctx, &bo.CmbRequestBo{
FuncName: vo.CmbNotifyFuncName,
BizContent: bizContent,
})
if err != nil {
return err
}
requestBytes, err := json.Marshal(request)
if err != nil {
return err
}
orderNotify, err := v.OrderNotifyRepo.Create(ctx, &bo.OrderNotifyBo{
OrderNo: orderWechat.OrderNo,
OutRequestNo: orderWechat.OutRequestNo,
Request: string(requestBytes),
req := &bo.OrderNotifyBo{
OrderNo: order.OrderNo,
OutRequestNo: orderOutRequestNo,
NotifyUrl: order.NotifyUrl,
})
if err != nil {
return err
Channel: order.Channel,
Event: event,
Type: order.Type,
Request: "",
}
x, err := v.CmbMixRepo.Request(ctx, request, v.bc.Cmb.NotifyUrl)
request, orderNotify, err := v.notifyCreate(ctx, req)
if err != nil {
return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, err.Error())
return nil, err
}
x, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl)
if err != nil {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
bizStr, err := v.CmbMixRepo.VerifyResponse(ctx, x)
if err != nil {
log.Errorf("NotifyConsume CmbMixRepo.VerifyResponse error:%s", err.Error())
return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, err.Error())
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
var s *v1.CmbNotifyReply
if err = json.Unmarshal([]byte(bizStr), &s); err != nil {
return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, err.Error())
var reply *v1.CmbNotifyReply
if err = json.Unmarshal([]byte(bizStr), &reply); err != nil {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
if s.RespCode != vo.CmbResponseStatusSuccess.GetValue() {
return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, s.RespMsg)
if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, reply.RespMsg)
}
return v.OrderNotifyRepo.Success(ctx, orderNotify.ID, bizStr)
}
func (v *Cmb) orderWechat(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) (*bo.OrderWechatBo, error) {
orderWechat, err := v.OrderWechatRepo.GetByOutRequestNo(ctx, orderOutRequestNo)
if err != nil {
return nil, fmt.Errorf("根据订单号%s获取微信订单失败:%s", orderWechat.OrderNo, err.Error())
}
if !orderWechat.Status.CanNotify() {
return nil, fmt.Errorf("微信订单状态错误,不能通知:%s", order.Status.GetText())
}
return orderWechat, err
}
func (v *Cmb) bizContent(_ context.Context, orderWechat *bo.OrderWechatBo) (string, error) {
status, err := orderWechat.Status.GetCmbStatusText()
if err != nil {
return "", err
}
req := &v1.CmbNotifyRequest{
Ticket: orderWechat.OrderNo,
Status: status.GetValue(),
TransDate: time.Now().Format("20060102150405"),
OrgNo: v.bc.Cmb.OrgNo,
Ext: "",
}
bizJsonBytes, err := json.Marshal(req)
if err != nil {
return "", err
}
return string(bizJsonBytes), nil
return orderNotify, v.notifySuccess(ctx, orderNotify.ID, bizStr)
}

View File

@ -0,0 +1,50 @@
package cmb
import (
"context"
"encoding/json"
"github.com/go-kratos/kratos/v2/log"
v1 "voucher/api/v1"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
)
func (v *Cmb) NotifyRetryConsume(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo) (*bo.OrderNotifyBo, error) {
req := &bo.OrderNotifyBo{
OrderNo: orderNotify.OrderNo,
OutRequestNo: orderNotify.OutRequestNo,
NotifyUrl: order.NotifyUrl,
Channel: order.Channel,
Event: orderNotify.Event,
Type: order.Type,
Request: "",
}
request, orderNotify, err := v.notifyCreate(ctx, req)
if err != nil {
return nil, err
}
x, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl)
if err != nil {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
bizStr, err := v.CmbMixRepo.VerifyResponse(ctx, x)
if err != nil {
log.Errorf("NotifyRetryConsume CmbMixRepo.VerifyResponse error:%s", err.Error())
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
var reply *v1.CmbNotifyReply
if err = json.Unmarshal([]byte(bizStr), &reply); err != nil {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error())
}
if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() {
return orderNotify, v.notifyFail(ctx, orderNotify.ID, reply.RespMsg)
}
return orderNotify, v.notifySuccess(ctx, orderNotify.ID, bizStr)
}

View File

@ -50,7 +50,7 @@ func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (outRequestNo
func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error {
c := vo.WechatNotifyRegisterTagCacheKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID)
c := vo.WechatNotifyRegisterTagCacheKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID})
_, err := v.rdb.Rdb.Get(ctx, c.Key).Result()
@ -64,7 +64,7 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID
return fmt.Errorf(errMsg)
}
cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID)
cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID})
return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error {
// 二次获取,判定处理,以免获取锁后又执行了一次

View File

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"go.opentelemetry.io/otel/trace"
errPb "voucher/api/err"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
"voucher/internal/pkg/lock"
"voucher/internal/pkg/mq"
@ -24,14 +26,18 @@ func (v *VoucherBiz) PushNotifyMQ(ctx context.Context, orderNo, outRequestNo str
return nil
}
func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequestNo string) (err error) {
func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequestNo string) error {
c := vo.NotifyConsume.BuildCache([]string{orderNo})
var (
err error
orderNotify *bo.OrderNotifyBo
cache = vo.NotifyConsume.BuildCache([]string{orderNo})
)
err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error {
err = lock.NewMutex(v.rdb.Rdb, cache.TTL).Lock(ctx, cache.Key, func(ctx context.Context) error {
order, err := v.OrderRepo.GetByOrderNo(ctx, orderNo)
if err != nil {
order, err2 := v.OrderRepo.GetByOrderNo(ctx, orderNo)
if err2 != nil {
return err
}
@ -39,12 +45,24 @@ func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequest
return fmt.Errorf("订单状态错误,不能通知:%s", order.Status.GetText())
}
if !order.Channel.IsWeChat() {
return fmt.Errorf("暂不支持订单%s渠道%s回调通知处理", order.OrderNo, order.Channel.GetText())
}
if order.Type.IsCmb() {
return v.Cmb.NotifyConsume(ctx, order, orderOutRequestNo)
if orderNotify, err2 = v.Cmb.NotifyConsume(ctx, order, orderOutRequestNo); err2 != nil {
return err
}
}
return fmt.Errorf("订单类型错误:%s", order.Type.GetText())
})
return
if !errPb.IsNeedRetryNotify(err) {
return err
}
// 状态回调接口失败需要支持重试 重试间隔为1分钟、2分钟、12分钟、60分钟、360分钟
// 第一次通知失败重试入队
return v.PushNotifyRetryDelayMQ(ctx, 60, orderNotify.ID)
}

View File

@ -0,0 +1,97 @@
package biz
import (
"context"
"fmt"
"go.opentelemetry.io/otel/trace"
"strconv"
errPb "voucher/api/err"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
"voucher/internal/pkg/lock"
"voucher/internal/pkg/mq"
)
func (v *VoucherBiz) PushNotifyRetryDelayMQ(ctx context.Context, level int, orderNotifyId uint64) error {
str := strconv.FormatUint(orderNotifyId, 10)
eventMap := v.bc.RocketMQ.EventMap["notifyRetry"]
sendOption := []mq.SendOption{
mq.WithSendShardingKeysOption(str),
mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()),
mq.WithSendDelayLevelOption(level),
}
if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil {
return fmt.Errorf("回调通知延迟队列,投递消息出错err=%s", err.Error())
}
return nil
}
func (v *VoucherBiz) NotifyRetryConsume(ctx context.Context, orderNotifyId uint64) error {
var (
err error
orderNotify *bo.OrderNotifyBo
cache = vo.NotifyRetryConsume.BuildCacheUint64([]uint64{orderNotifyId})
)
err = lock.NewMutex(v.rdb.Rdb, cache.TTL).Lock(ctx, cache.Key, func(ctx context.Context) error {
orderNotify, err = v.OrderNotifyRepo.GetByID(ctx, orderNotifyId)
if err != nil {
return err
}
order, err2 := v.OrderRepo.GetByOrderNo(ctx, orderNotify.OrderNo)
if err2 != nil {
return err2
}
if order.Type.IsCmb() {
orderNotify, err2 = v.Cmb.NotifyRetryConsume(ctx, order, orderNotify)
if err2 != nil {
return err2
}
}
return fmt.Errorf("订单类型错误:%s", order.Type.GetText())
})
if !errPb.IsNeedRetryNotify(err) {
return err
}
level, err2 := v.level(ctx, orderNotify)
if err2 != nil {
return err2
}
return v.PushNotifyRetryDelayMQ(ctx, level, orderNotify.ID)
}
func (v *VoucherBiz) level(ctx context.Context, orderNotify *bo.OrderNotifyBo) (int, error) {
// 状态回调接口失败需要支持重试 重试间隔为1分钟、2分钟、12分钟、60分钟、360分钟
count, err := v.OrderNotifyRepo.GetCountByOrderNoAndEvent(ctx, orderNotify.OrderNo, orderNotify.Event)
if err != nil {
return 0, err
}
switch count {
case 1:
return 60, nil
case 2:
return 120, nil
case 3:
return 720, nil
case 4:
return 3600, nil
case 5:
return 21600, nil
}
return 0, fmt.Errorf("回调通知失败次数超过5次不再重试")
}

View File

@ -3,9 +3,12 @@ package repo
import (
"context"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
)
type OrderNotifyRepo interface {
GetByID(ctx context.Context, id uint64) (*bo.OrderNotifyBo, error)
GetCountByOrderNoAndEvent(ctx context.Context, orderNo string, event vo.OrderNotifyEvent) (int64, error)
Create(ctx context.Context, req *bo.OrderNotifyBo) (*bo.OrderNotifyBo, error)
Success(ctx context.Context, id uint64, responses string) error
Fail(ctx context.Context, id uint64, remark string) error

View File

@ -1,8 +1,8 @@
package vo
import (
"fmt"
"time"
"voucher/internal/pkg/helper"
)
type CacheKey string
@ -14,6 +14,7 @@ const (
OrderConsume CacheKey = "order_consume"
NotifyConsume CacheKey = "notify_consume"
NotifyRetryConsume CacheKey = "notify_retry_consume"
WechatNotifyRegisterTagCacheKey CacheKey = "wechat_notify_register_tag"
WechatNotifyRegisterTagCacheLockKey CacheKey = "wechat_notify_register_tag_lock"
@ -25,8 +26,9 @@ var CacheKeyMap = map[CacheKey]time.Duration{
CmbOrderLockKey: 30 * time.Second,
CmbQueryLockKey: 30 * time.Second,
CmbProductQueryLockKey: 30 * time.Second,
OrderConsume: 30 * time.Second,
NotifyConsume: 30 * time.Second,
OrderConsume: 60 * time.Second,
NotifyConsume: 60 * time.Second,
NotifyRetryConsume: 60 * time.Second,
WechatNotifyRegisterTagCacheKey: 86400 * time.Second,
WechatNotifyRegisterTagCacheLockKey: 30 * time.Second,
WechatNotifyConsumeLockKey: 30 * time.Second,
@ -37,14 +39,14 @@ type Cache struct {
TTL time.Duration
}
func (s CacheKey) BuildCache(ids []string) *Cache {
k := fmt.Sprintf("%s", s)
for _, id := range ids {
k = fmt.Sprintf("%s_%s", k, id)
func (s CacheKey) GetValue() string {
return string(s)
}
func (s CacheKey) BuildCache(strArr []string) *Cache {
k := helper.BuildStr(s.GetValue(), strArr)
c := &Cache{
Key: k,
}
@ -59,8 +61,9 @@ func (s CacheKey) BuildCache(ids []string) *Cache {
return c
}
func (s CacheKey) BuildRegisterCache(tag, stockCreatorMchID, stockID string) *Cache {
k := fmt.Sprintf("%s_%s_%s_%s", s, tag, stockCreatorMchID, stockID)
func (s CacheKey) BuildCacheUint64(ids []uint64) *Cache {
k := helper.BuildStr(s.GetValue(), ids)
c := &Cache{
Key: k,

View File

@ -0,0 +1,53 @@
package vo
import "fmt"
type OrderNotifyEvent uint8
const (
OrderNotifyEventSendDEd OrderNotifyEvent = iota + 1
OrderNotifyEventUsed
OrderNotifyEventExpired
)
var OrderNotifyEventMap = map[OrderNotifyEvent]string{
OrderNotifyEventSendDEd: "可用",
OrderNotifyEventUsed: "已实扣",
OrderNotifyEventExpired: "已过期",
}
func (s OrderNotifyEvent) GetText() string {
if t, ok := OrderNotifyEventMap[s]; ok {
return t
}
return "未知通知事件"
}
func (s OrderNotifyEvent) GetValue() uint8 {
return uint8(s)
}
func (s OrderNotifyEvent) IsSendDEd() bool {
return s == OrderNotifyEventSendDEd
}
func (s OrderNotifyEvent) IsUsed() bool {
return s == OrderNotifyEventUsed
}
func (s OrderNotifyEvent) IsExpired() bool {
return s == OrderNotifyEventExpired
}
var OrderNotifyEventMapCmbStatus = map[OrderNotifyEvent]CmbStatus{
OrderNotifyEventSendDEd: CmbStatusSuccess,
OrderNotifyEventUsed: CmbStatusUse,
OrderNotifyEventExpired: CmbStatusExpired,
}
func (s OrderNotifyEvent) GetCmbStatusText() (CmbStatus, error) {
if t, ok := OrderNotifyEventMapCmbStatus[s]; ok {
return t, nil
}
return "", fmt.Errorf("cmbStatus[%s]未定义", s)
}

View File

@ -1,5 +1,7 @@
package vo
import "fmt"
type OrderStatus uint8
const (
@ -58,3 +60,29 @@ func (s OrderStatus) IsExpired() bool {
func (s OrderStatus) CanNotify() bool {
return s.IsSuccess() || s.IsUse() || s.IsExpired()
}
var OrderStatusMapOrderNotifyEvent = map[OrderStatus]OrderNotifyEvent{
OrderStatusSuccess: OrderNotifyEventSendDEd,
OrderStatusUse: OrderNotifyEventUsed,
OrderStatusExpired: OrderNotifyEventExpired,
}
func (s OrderStatus) GetOrderNotifyEvent() (OrderNotifyEvent, error) {
if t, ok := OrderStatusMapOrderNotifyEvent[s]; ok {
return t, nil
}
return 0, fmt.Errorf("CmbStatus[%s]未定义", s)
}
var OrderStatusMapCmbStatus = map[OrderStatus]CmbStatus{
OrderStatusSuccess: CmbStatusSuccess,
OrderStatusUse: CmbStatusUse,
OrderStatusExpired: CmbStatusExpired,
}
func (s OrderStatus) GetCmbStatusText() (CmbStatus, error) {
if t, ok := OrderStatusMapCmbStatus[s]; ok {
return t, nil
}
return "", fmt.Errorf("cmbStatus[%s]未定义", s)
}

View File

@ -1,7 +1,5 @@
package vo
import "fmt"
type OrderWechatStatus uint8
const (
@ -48,19 +46,6 @@ var OrderWechatStatusMap = map[OrderWechatStatus]string{
OrderWechatStatusExpired: "已过期",
}
var OrderStatusMapCmbStatus = map[OrderWechatStatus]CmbStatus{
OrderWechatStatusSuccess: CmbStatusSuccess,
OrderWechatStatusUse: CmbStatusUse,
OrderWechatStatusExpired: CmbStatusExpired,
}
func (s OrderWechatStatus) GetCmbStatusText() (CmbStatus, error) {
if t, ok := OrderStatusMapCmbStatus[s]; ok {
return t, nil
}
return "", fmt.Errorf("CmbStatus[%s]未定义", s)
}
func (s OrderWechatStatus) GetText() string {
if t, ok := OrderWechatStatusMap[s]; ok {
return t

View File

@ -16,6 +16,7 @@ type VoucherBiz struct {
ProductRepo repo.ProductRepo
OrderRepo repo.OrderRepo
OrderWechatRepo repo.OrderWechatRepo
OrderNotifyRepo repo.OrderNotifyRepo
MqSendMixRepo mixrepos.MQSendMixRepo
WechatCpnRepo wechatrepo.WechatCpnRepo
}
@ -27,6 +28,7 @@ func NewVoucherBiz(
ProductRepo repo.ProductRepo,
OrderRepo repo.OrderRepo,
OrderWechatRepo repo.OrderWechatRepo,
OrderNotifyRepo repo.OrderNotifyRepo,
MqSendMixRepo mixrepos.MQSendMixRepo,
WechatCpnRepo wechatrepo.WechatCpnRepo,
) *VoucherBiz {
@ -37,6 +39,7 @@ func NewVoucherBiz(
ProductRepo: ProductRepo,
OrderRepo: OrderRepo,
OrderWechatRepo: OrderWechatRepo,
OrderNotifyRepo: OrderNotifyRepo,
MqSendMixRepo: MqSendMixRepo,
WechatCpnRepo: WechatCpnRepo,
}

View File

@ -225,6 +225,7 @@ func (s *CmbMixRepoImpl) Request(ctx context.Context, req *v1.CmbRequest, uri st
var response *v1.CmbReply
if err = json.Unmarshal(bodyBytes, &response); err != nil {
log.Errorf("请求掌上生活返回数据解析报错,bodyBytes=%s,err=%s", string(bodyBytes), err.Error())
return nil, err
}

View File

@ -16,6 +16,9 @@ type OrderNotify struct {
OrderNo string `gorm:"column:order_no;not null" json:"order_no"`
OutRequestNo string `gorm:"column:out_request_no;not null" json:"out_request_no"`
Status uint8 `gorm:"column:status;not null;comment:状态" json:"status"`
Event uint8 `gorm:"column:event;not null;comment:event" json:"event"`
Channel uint8 `gorm:"column:channel;not null;comment:channel" json:"channel"`
Type uint8 `gorm:"column:type;not null;comment:1:招行" json:"type"`
Request string `gorm:"column:request;not null" json:"request"`
Responses string `gorm:"column:responses" json:"responses"`
Remark string `gorm:"column:remark" json:"remark"`

View File

@ -27,6 +27,33 @@ func (p *OrderNotifyRepoImpl) DB(ctx context.Context) *gorm.DB {
return p.db.DB(ctx).Model(model.OrderNotify{})
}
func (p *OrderNotifyRepoImpl) GetByID(ctx context.Context, id uint64) (*bo.OrderNotifyBo, error) {
info := &model.OrderNotify{}
tx := p.DB(ctx).Where(model.OrderNotify{ID: id}).Find(&info)
if tx.Error != nil {
return nil, tx.Error
}
if tx.RowsAffected == 0 {
return nil, gorm.ErrRecordNotFound
}
return p.ToBo(info), nil
}
func (p *OrderNotifyRepoImpl) GetCountByOrderNoAndEvent(ctx context.Context, orderNo string, event vo.OrderNotifyEvent) (int64, error) {
var total int64
tx := p.DB(ctx).Where(model.OrderNotify{OrderNo: orderNo, Event: event.GetValue()}).Count(&total)
if tx.Error != nil {
return 0, tx.Error
}
return total, nil
}
func (p *OrderNotifyRepoImpl) Create(ctx context.Context, req *bo.OrderNotifyBo) (*bo.OrderNotifyBo, error) {
now := time.Now()
@ -37,6 +64,9 @@ func (p *OrderNotifyRepoImpl) Create(ctx context.Context, req *bo.OrderNotifyBo)
Request: req.Request,
Responses: "{}",
NotifyUrl: req.NotifyUrl,
Channel: req.Channel.GetValue(),
Event: req.Event.GetValue(),
Type: req.Type.GetValue(),
CreateTime: &now,
UpdateTime: &now,
}

View File

@ -1,5 +1,10 @@
package helper
import (
"fmt"
"strings"
)
func ArrDeduplication[T int | int32 | uint32 | uint64 | int64 | int8 | string](arr []T) []T {
seen := map[T]bool{}
var result []T
@ -11,3 +16,35 @@ func ArrDeduplication[T int | int32 | uint32 | uint64 | int64 | int8 | string](a
}
return result
}
// BuildStr 函数用于将 uid 和 arr 中的元素拼接成一个字符串
func BuildStr[T int | int32 | uint32 | uint64 | int64 | int8 | string](uid string, arr []T) string {
// 创建一个 strings.Builder 实例,用于高效地构建字符串
var sb strings.Builder
// 写入初始的 uid
sb.WriteString(uid)
// 遍历 arr 切片中的每个元素
for i, id := range arr {
// 如果不是第一个元素,先写入下划线分隔符
if i > 0 {
sb.WriteByte('_')
} else if sb.Len() > 0 {
// 如果 uid 不为空且是第一个元素,也写入下划线分隔符
sb.WriteByte('_')
}
// 根据元素的类型进行不同的处理
switch v := any(id).(type) {
case string:
// 如果是字符串类型,直接写入
sb.WriteString(v)
default:
// 对于其他类型,使用 fmt.Sprint 转换为字符串后写入
sb.WriteString(fmt.Sprint(v))
}
}
// 将 strings.Builder 中的内容转换为字符串并返回
return sb.String()
}

View File

@ -0,0 +1,17 @@
package helper
import (
"fmt"
"testing"
)
func TestBuildStr(t *testing.T) {
uid := "example_uid"
arr := []int{1, 2, 3}
result := BuildStr(uid, arr)
fmt.Println(result)
arrStr := []string{"a", "b", "c"}
resultStr := BuildStr(uid, arrStr)
fmt.Println(resultStr)
}

View File

@ -44,6 +44,12 @@ func NewConsumer(
}
}
if c := voucherService.GetNotifyRetryConfig(); c != nil {
if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyRetryConsumer); err != nil {
panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err))
}
}
return &Consumer{manager: manager, hLog: hLog, conf: conf}
}

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"github.com/go-kratos/kratos/v2/log"
"strconv"
"strings"
"voucher/internal/pkg/mq"
)
@ -63,7 +64,7 @@ func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMes
shardingKey := msg.GetShardingKey()
if shardingKey == "" {
log.Error("orderNotify 消费异常,获取 shardingKey 失败")
log.Error("notify 消费异常,获取 shardingKey 失败")
return errors.New("orderNotify 消费异常,获取 orderNo 失败")
}
@ -75,3 +76,42 @@ func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMes
return nil
}
func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"]
if !ok {
return nil
}
if !elm.IsOpenConsumer {
log.Warnf("notify MQ is not open")
return nil
}
return &mq.ConsumerConfig{
TopicName: elm.Topic,
GroupName: elm.Group,
PerCoroutineCnt: int(elm.PerCoroutineCnt),
}
}
func (j *VoucherService) NotifyRetryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error {
shardingKey := msg.GetShardingKey()
if shardingKey == "" {
log.Error("notify retry 消费异常,获取 shardingKey 失败")
return errors.New("orderNotify 消费异常,获取 orderNo 失败")
}
orderNotifyId, err := strconv.ParseUint(shardingKey, 10, 64)
if err != nil {
log.Error("notify retry 消费异常,orderNotifyId转换失败,shardingKey=%s", shardingKey)
return err
}
if err = j.VoucherBiz.NotifyRetryConsume(ctx, orderNotifyId); err != nil {
log.Errorf("notify retry 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error())
}
return nil
}