From 4350b11ec61d54362948a2bd7fcf4714dd8ec34b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AD=90=E9=93=AD?= Date: Tue, 11 Mar 2025 13:57:23 +0800 Subject: [PATCH] cmb --- api/err/err.proto | 6 ++ configs/config.yaml | 6 ++ configs/config_test.yaml | 6 ++ internal/biz/bo/order_notify_bo.go | 3 + internal/biz/cmb.go | 6 +- internal/biz/cmb/notify.go | 78 +++++++++++++++++ internal/biz/cmb/notify_consume.go | 104 ++++++----------------- internal/biz/cmb/notify_retry_consume.go | 50 +++++++++++ internal/biz/cmb/order_consume.go | 4 +- internal/biz/notify_consume.go | 32 +++++-- internal/biz/notify_retry_consume.go | 97 +++++++++++++++++++++ internal/biz/repo/order_notify.go | 3 + internal/biz/vo/cache.go | 27 +++--- internal/biz/vo/order_notify_event.go | 53 ++++++++++++ internal/biz/vo/order_status.go | 28 ++++++ internal/biz/vo/order_wechant_status.go | 15 ---- internal/biz/voucher.go | 3 + internal/data/mixrepoimpl/cmb.go | 1 + internal/data/model/order_notify.gen.go | 3 + internal/data/repoimpl/order_notify.go | 30 +++++++ internal/pkg/helper/int.go | 37 ++++++++ internal/pkg/helper/int_test.go | 17 ++++ internal/server/consume.go | 6 ++ internal/service/consume.go | 42 ++++++++- 24 files changed, 537 insertions(+), 120 deletions(-) create mode 100644 internal/biz/cmb/notify.go create mode 100644 internal/biz/cmb/notify_retry_consume.go create mode 100644 internal/biz/notify_retry_consume.go create mode 100644 internal/biz/vo/order_notify_event.go create mode 100644 internal/pkg/helper/int_test.go diff --git a/api/err/err.proto b/api/err/err.proto index 2282475..81d60ab 100644 --- a/api/err/err.proto +++ b/api/err/err.proto @@ -22,4 +22,10 @@ enum Err { // 通用错误 COMMON = 4 [(errors.code) = 555]; +} + +enum NotifyConsumeErr{ + option (errors.default_code) = 1; + // 需要重试通知错误 + NeedRetryNotify = 0 [(errors.code) = 500]; } \ No newline at end of file diff --git a/configs/config.yaml b/configs/config.yaml index e5b57d8..ceadbdc 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -41,6 +41,12 @@ rocketMQ: isOpenConsumer: false #是否启动消费 true/false PerCoroutineCnt: 2 #协程数量,不配置默认为20 RetryCnt: 3 #重试次数,不配置默认38 + notifyRetry: # 重试延迟队列 + topic: voucher_order_notifyRetry + group: voucher_order_notifyRetry_group + isOpenConsumer: false #是否启动消费 true/false + PerCoroutineCnt: 2 #协程数量,不配置默认为20 + RetryCnt: 3 #重试次数,不配置默认38 wechatNotifyMQ: accessKeyId: "LTAI5tPyV7FynQNTfEvbEBuX" diff --git a/configs/config_test.yaml b/configs/config_test.yaml index cacc94f..3dbf33f 100644 --- a/configs/config_test.yaml +++ b/configs/config_test.yaml @@ -41,6 +41,12 @@ rocketMQ: isOpenConsumer: false #是否启动消费 true/false PerCoroutineCnt: 2 #协程数量,不配置默认为20 RetryCnt: 3 #重试次数,不配置默认38 + notifyRetry: # 重试延迟队列 + topic: voucher_order_notifyRetry + group: voucher_order_notifyRetry_group + isOpenConsumer: false #是否启动消费 true/false + PerCoroutineCnt: 2 #协程数量,不配置默认为20 + RetryCnt: 3 #重试次数,不配置默认38 wechatNotifyMQ: accessKeyId: "LTAI5tPyV7FynQNTfEvbEBuX" diff --git a/internal/biz/bo/order_notify_bo.go b/internal/biz/bo/order_notify_bo.go index fcbf134..56e6a0f 100644 --- a/internal/biz/bo/order_notify_bo.go +++ b/internal/biz/bo/order_notify_bo.go @@ -12,6 +12,9 @@ type OrderNotifyBo struct { OutRequestNo string Status vo.OrderNotifyStatus Request string + Event vo.OrderNotifyEvent + Channel vo.Channel + Type vo.OrderType Responses string Remark string NotifyUrl string diff --git a/internal/biz/cmb.go b/internal/biz/cmb.go index 23c167d..b40809e 100644 --- a/internal/biz/cmb.go +++ b/internal/biz/cmb.go @@ -54,18 +54,18 @@ func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (reps *v1.Cmb err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { - orderWechat, err := v.OrderWechatRepo.GetLastByOrderNo(ctx, orderNo) + order, err := v.OrderRepo.GetByOrderNo(ctx, orderNo) if err != nil { return err } - status, err := orderWechat.Status.GetCmbStatusText() + status, err := order.Status.GetCmbStatusText() if err != nil { return err } reps = &v1.CmbQueryReply{ - Ticket: orderWechat.OrderNo, + Ticket: order.OrderNo, Status: status.GetValue(), TransDate: time.Now().Format("20060102150405"), OrgNo: v.bc.Cmb.OrgNo, diff --git a/internal/biz/cmb/notify.go b/internal/biz/cmb/notify.go new file mode 100644 index 0000000..0bd5a5c --- /dev/null +++ b/internal/biz/cmb/notify.go @@ -0,0 +1,78 @@ +package cmb + +import ( + "context" + "encoding/json" + "time" + err2 "voucher/api/err" + v1 "voucher/api/v1" + "voucher/internal/biz/bo" + "voucher/internal/biz/vo" +) + +func (v *Cmb) bizContent(_ context.Context, orderNotify *bo.OrderNotifyBo) (string, error) { + + cmbStatus, err := orderNotify.Event.GetCmbStatusText() + if err != nil { + return "", err + } + + req := &v1.CmbNotifyRequest{ + Ticket: orderNotify.OrderNo, + Status: cmbStatus.GetValue(), + TransDate: time.Now().Format("20060102150405"), + OrgNo: v.bc.Cmb.OrgNo, + Ext: "", + } + + bizJsonBytes, err := json.Marshal(req) + if err != nil { + return "", err + } + + return string(bizJsonBytes), nil +} + +func (v *Cmb) notifyCreate(ctx context.Context, req *bo.OrderNotifyBo) (*v1.CmbRequest, *bo.OrderNotifyBo, error) { + + bizContent, err := v.bizContent(ctx, req) + if err != nil { + return nil, nil, err + } + + request, err := v.CmbMixRepo.GetRequest(ctx, &bo.CmbRequestBo{ + FuncName: vo.CmbNotifyFuncName, + BizContent: bizContent, + }) + if err != nil { + return nil, nil, err + } + + requestBytes, err := json.Marshal(request) + if err != nil { + return nil, nil, err + } + + req.Request = string(requestBytes) + + orderNotify, err := v.OrderNotifyRepo.Create(ctx, req) + if err != nil { + return nil, nil, err + } + + return request, orderNotify, err +} + +func (v *Cmb) notifySuccess(ctx context.Context, notifyId uint64, bizStr string) error { + + return v.OrderNotifyRepo.Success(ctx, notifyId, bizStr) +} + +func (v *Cmb) notifyFail(ctx context.Context, notifyId uint64, errMsg string) error { + + if err := v.OrderNotifyRepo.Fail(ctx, notifyId, errMsg); err != nil { + return err + } + + return err2.ErrorNeedRetryNotify(errMsg) +} diff --git a/internal/biz/cmb/notify_consume.go b/internal/biz/cmb/notify_consume.go index a916f5b..52d7ee0 100644 --- a/internal/biz/cmb/notify_consume.go +++ b/internal/biz/cmb/notify_consume.go @@ -3,109 +3,53 @@ package cmb import ( "context" "encoding/json" - "fmt" "github.com/go-kratos/kratos/v2/log" - "time" v1 "voucher/api/v1" "voucher/internal/biz/bo" "voucher/internal/biz/vo" ) -func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) error { +func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) (*bo.OrderNotifyBo, error) { - if !order.Channel.IsWeChat() { - return fmt.Errorf("暂不支持订单%s渠道%s回调通知处理", order.OrderNo, order.Channel.GetText()) - } - - orderWechat, err := v.orderWechat(ctx, order, orderOutRequestNo) + event, err := order.Status.GetOrderNotifyEvent() if err != nil { - return err + return nil, err } - bizContent, err := v.bizContent(ctx, orderWechat) - if err != nil { - return err - } - - request, err := v.CmbMixRepo.GetRequest(ctx, &bo.CmbRequestBo{ - FuncName: vo.CmbNotifyFuncName, - BizContent: bizContent, - }) - if err != nil { - return err - } - - requestBytes, err := json.Marshal(request) - if err != nil { - return err - } - - orderNotify, err := v.OrderNotifyRepo.Create(ctx, &bo.OrderNotifyBo{ - OrderNo: orderWechat.OrderNo, - OutRequestNo: orderWechat.OutRequestNo, - Request: string(requestBytes), + req := &bo.OrderNotifyBo{ + OrderNo: order.OrderNo, + OutRequestNo: orderOutRequestNo, NotifyUrl: order.NotifyUrl, - }) - if err != nil { - return err + Channel: order.Channel, + Event: event, + Type: order.Type, + Request: "", } - x, err := v.CmbMixRepo.Request(ctx, request, v.bc.Cmb.NotifyUrl) + request, orderNotify, err := v.notifyCreate(ctx, req) if err != nil { - return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, err.Error()) + return nil, err + } + + x, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl) + if err != nil { + return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) } bizStr, err := v.CmbMixRepo.VerifyResponse(ctx, x) if err != nil { log.Errorf("NotifyConsume CmbMixRepo.VerifyResponse error:%s", err.Error()) - return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, err.Error()) + return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) } - var s *v1.CmbNotifyReply - if err = json.Unmarshal([]byte(bizStr), &s); err != nil { - return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, err.Error()) + var reply *v1.CmbNotifyReply + if err = json.Unmarshal([]byte(bizStr), &reply); err != nil { + return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) } - if s.RespCode != vo.CmbResponseStatusSuccess.GetValue() { - return v.OrderNotifyRepo.Fail(ctx, orderNotify.ID, s.RespMsg) + if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() { + return orderNotify, v.notifyFail(ctx, orderNotify.ID, reply.RespMsg) } - return v.OrderNotifyRepo.Success(ctx, orderNotify.ID, bizStr) -} - -func (v *Cmb) orderWechat(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) (*bo.OrderWechatBo, error) { - - orderWechat, err := v.OrderWechatRepo.GetByOutRequestNo(ctx, orderOutRequestNo) - if err != nil { - return nil, fmt.Errorf("根据订单号%s获取微信订单失败:%s", orderWechat.OrderNo, err.Error()) - } - - if !orderWechat.Status.CanNotify() { - return nil, fmt.Errorf("微信订单状态错误,不能通知:%s", order.Status.GetText()) - } - - return orderWechat, err -} - -func (v *Cmb) bizContent(_ context.Context, orderWechat *bo.OrderWechatBo) (string, error) { - - status, err := orderWechat.Status.GetCmbStatusText() - if err != nil { - return "", err - } - - req := &v1.CmbNotifyRequest{ - Ticket: orderWechat.OrderNo, - Status: status.GetValue(), - TransDate: time.Now().Format("20060102150405"), - OrgNo: v.bc.Cmb.OrgNo, - Ext: "", - } - - bizJsonBytes, err := json.Marshal(req) - if err != nil { - return "", err - } - - return string(bizJsonBytes), nil + return orderNotify, v.notifySuccess(ctx, orderNotify.ID, bizStr) } diff --git a/internal/biz/cmb/notify_retry_consume.go b/internal/biz/cmb/notify_retry_consume.go new file mode 100644 index 0000000..270a0a5 --- /dev/null +++ b/internal/biz/cmb/notify_retry_consume.go @@ -0,0 +1,50 @@ +package cmb + +import ( + "context" + "encoding/json" + "github.com/go-kratos/kratos/v2/log" + v1 "voucher/api/v1" + "voucher/internal/biz/bo" + "voucher/internal/biz/vo" +) + +func (v *Cmb) NotifyRetryConsume(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo) (*bo.OrderNotifyBo, error) { + + req := &bo.OrderNotifyBo{ + OrderNo: orderNotify.OrderNo, + OutRequestNo: orderNotify.OutRequestNo, + NotifyUrl: order.NotifyUrl, + Channel: order.Channel, + Event: orderNotify.Event, + Type: order.Type, + Request: "", + } + + request, orderNotify, err := v.notifyCreate(ctx, req) + if err != nil { + return nil, err + } + + x, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl) + if err != nil { + return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) + } + + bizStr, err := v.CmbMixRepo.VerifyResponse(ctx, x) + if err != nil { + log.Errorf("NotifyRetryConsume CmbMixRepo.VerifyResponse error:%s", err.Error()) + return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) + } + + var reply *v1.CmbNotifyReply + if err = json.Unmarshal([]byte(bizStr), &reply); err != nil { + return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) + } + + if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() { + return orderNotify, v.notifyFail(ctx, orderNotify.ID, reply.RespMsg) + } + + return orderNotify, v.notifySuccess(ctx, orderNotify.ID, bizStr) +} diff --git a/internal/biz/cmb/order_consume.go b/internal/biz/cmb/order_consume.go index c4d06fc..f104d7b 100644 --- a/internal/biz/cmb/order_consume.go +++ b/internal/biz/cmb/order_consume.go @@ -50,7 +50,7 @@ func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (outRequestNo func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { - c := vo.WechatNotifyRegisterTagCacheKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) + c := vo.WechatNotifyRegisterTagCacheKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) _, err := v.rdb.Rdb.Get(ctx, c.Key).Result() @@ -64,7 +64,7 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID return fmt.Errorf(errMsg) } - cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildRegisterCache(v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID) + cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { // 二次获取,判定处理,以免获取锁后又执行了一次 diff --git a/internal/biz/notify_consume.go b/internal/biz/notify_consume.go index 1aa370c..d7a6157 100644 --- a/internal/biz/notify_consume.go +++ b/internal/biz/notify_consume.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "go.opentelemetry.io/otel/trace" + errPb "voucher/api/err" + "voucher/internal/biz/bo" "voucher/internal/biz/vo" "voucher/internal/pkg/lock" "voucher/internal/pkg/mq" @@ -24,14 +26,18 @@ func (v *VoucherBiz) PushNotifyMQ(ctx context.Context, orderNo, outRequestNo str return nil } -func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequestNo string) (err error) { +func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequestNo string) error { - c := vo.NotifyConsume.BuildCache([]string{orderNo}) + var ( + err error + orderNotify *bo.OrderNotifyBo + cache = vo.NotifyConsume.BuildCache([]string{orderNo}) + ) - err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { + err = lock.NewMutex(v.rdb.Rdb, cache.TTL).Lock(ctx, cache.Key, func(ctx context.Context) error { - order, err := v.OrderRepo.GetByOrderNo(ctx, orderNo) - if err != nil { + order, err2 := v.OrderRepo.GetByOrderNo(ctx, orderNo) + if err2 != nil { return err } @@ -39,12 +45,24 @@ func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequest return fmt.Errorf("订单状态错误,不能通知:%s", order.Status.GetText()) } + if !order.Channel.IsWeChat() { + return fmt.Errorf("暂不支持订单%s渠道%s回调通知处理", order.OrderNo, order.Channel.GetText()) + } + if order.Type.IsCmb() { - return v.Cmb.NotifyConsume(ctx, order, orderOutRequestNo) + if orderNotify, err2 = v.Cmb.NotifyConsume(ctx, order, orderOutRequestNo); err2 != nil { + return err + } } return fmt.Errorf("订单类型错误:%s", order.Type.GetText()) }) - return + if !errPb.IsNeedRetryNotify(err) { + return err + } + + // 状态回调接口失败需要支持重试 重试间隔为1分钟、2分钟、12分钟、60分钟、360分钟 + // 第一次通知失败重试入队 + return v.PushNotifyRetryDelayMQ(ctx, 60, orderNotify.ID) } diff --git a/internal/biz/notify_retry_consume.go b/internal/biz/notify_retry_consume.go new file mode 100644 index 0000000..864b5b7 --- /dev/null +++ b/internal/biz/notify_retry_consume.go @@ -0,0 +1,97 @@ +package biz + +import ( + "context" + "fmt" + "go.opentelemetry.io/otel/trace" + "strconv" + errPb "voucher/api/err" + "voucher/internal/biz/bo" + "voucher/internal/biz/vo" + "voucher/internal/pkg/lock" + "voucher/internal/pkg/mq" +) + +func (v *VoucherBiz) PushNotifyRetryDelayMQ(ctx context.Context, level int, orderNotifyId uint64) error { + + str := strconv.FormatUint(orderNotifyId, 10) + + eventMap := v.bc.RocketMQ.EventMap["notifyRetry"] + sendOption := []mq.SendOption{ + mq.WithSendShardingKeysOption(str), + mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), + mq.WithSendDelayLevelOption(level), + } + + if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { + return fmt.Errorf("回调通知延迟队列,投递消息出错:err=%s", err.Error()) + } + + return nil +} + +func (v *VoucherBiz) NotifyRetryConsume(ctx context.Context, orderNotifyId uint64) error { + + var ( + err error + orderNotify *bo.OrderNotifyBo + cache = vo.NotifyRetryConsume.BuildCacheUint64([]uint64{orderNotifyId}) + ) + + err = lock.NewMutex(v.rdb.Rdb, cache.TTL).Lock(ctx, cache.Key, func(ctx context.Context) error { + + orderNotify, err = v.OrderNotifyRepo.GetByID(ctx, orderNotifyId) + if err != nil { + return err + } + + order, err2 := v.OrderRepo.GetByOrderNo(ctx, orderNotify.OrderNo) + if err2 != nil { + return err2 + } + + if order.Type.IsCmb() { + orderNotify, err2 = v.Cmb.NotifyRetryConsume(ctx, order, orderNotify) + if err2 != nil { + return err2 + } + } + + return fmt.Errorf("订单类型错误:%s", order.Type.GetText()) + }) + + if !errPb.IsNeedRetryNotify(err) { + return err + } + + level, err2 := v.level(ctx, orderNotify) + if err2 != nil { + return err2 + } + + return v.PushNotifyRetryDelayMQ(ctx, level, orderNotify.ID) +} + +func (v *VoucherBiz) level(ctx context.Context, orderNotify *bo.OrderNotifyBo) (int, error) { + // 状态回调接口失败需要支持重试 重试间隔为1分钟、2分钟、12分钟、60分钟、360分钟 + + count, err := v.OrderNotifyRepo.GetCountByOrderNoAndEvent(ctx, orderNotify.OrderNo, orderNotify.Event) + if err != nil { + return 0, err + } + + switch count { + case 1: + return 60, nil + case 2: + return 120, nil + case 3: + return 720, nil + case 4: + return 3600, nil + case 5: + return 21600, nil + } + + return 0, fmt.Errorf("回调通知失败次数超过5次,不再重试") +} diff --git a/internal/biz/repo/order_notify.go b/internal/biz/repo/order_notify.go index 592fffc..0e097fa 100644 --- a/internal/biz/repo/order_notify.go +++ b/internal/biz/repo/order_notify.go @@ -3,9 +3,12 @@ package repo import ( "context" "voucher/internal/biz/bo" + "voucher/internal/biz/vo" ) type OrderNotifyRepo interface { + GetByID(ctx context.Context, id uint64) (*bo.OrderNotifyBo, error) + GetCountByOrderNoAndEvent(ctx context.Context, orderNo string, event vo.OrderNotifyEvent) (int64, error) Create(ctx context.Context, req *bo.OrderNotifyBo) (*bo.OrderNotifyBo, error) Success(ctx context.Context, id uint64, responses string) error Fail(ctx context.Context, id uint64, remark string) error diff --git a/internal/biz/vo/cache.go b/internal/biz/vo/cache.go index ecb5f52..0a2b62d 100644 --- a/internal/biz/vo/cache.go +++ b/internal/biz/vo/cache.go @@ -1,8 +1,8 @@ package vo import ( - "fmt" "time" + "voucher/internal/pkg/helper" ) type CacheKey string @@ -12,8 +12,9 @@ const ( CmbQueryLockKey CacheKey = "cmb_query" CmbProductQueryLockKey CacheKey = "cmb_product_query" - OrderConsume CacheKey = "order_consume" - NotifyConsume CacheKey = "notify_consume" + OrderConsume CacheKey = "order_consume" + NotifyConsume CacheKey = "notify_consume" + NotifyRetryConsume CacheKey = "notify_retry_consume" WechatNotifyRegisterTagCacheKey CacheKey = "wechat_notify_register_tag" WechatNotifyRegisterTagCacheLockKey CacheKey = "wechat_notify_register_tag_lock" @@ -25,8 +26,9 @@ var CacheKeyMap = map[CacheKey]time.Duration{ CmbOrderLockKey: 30 * time.Second, CmbQueryLockKey: 30 * time.Second, CmbProductQueryLockKey: 30 * time.Second, - OrderConsume: 30 * time.Second, - NotifyConsume: 30 * time.Second, + OrderConsume: 60 * time.Second, + NotifyConsume: 60 * time.Second, + NotifyRetryConsume: 60 * time.Second, WechatNotifyRegisterTagCacheKey: 86400 * time.Second, WechatNotifyRegisterTagCacheLockKey: 30 * time.Second, WechatNotifyConsumeLockKey: 30 * time.Second, @@ -37,13 +39,13 @@ type Cache struct { TTL time.Duration } -func (s CacheKey) BuildCache(ids []string) *Cache { +func (s CacheKey) GetValue() string { + return string(s) +} - k := fmt.Sprintf("%s", s) +func (s CacheKey) BuildCache(strArr []string) *Cache { - for _, id := range ids { - k = fmt.Sprintf("%s_%s", k, id) - } + k := helper.BuildStr(s.GetValue(), strArr) c := &Cache{ Key: k, @@ -59,8 +61,9 @@ func (s CacheKey) BuildCache(ids []string) *Cache { return c } -func (s CacheKey) BuildRegisterCache(tag, stockCreatorMchID, stockID string) *Cache { - k := fmt.Sprintf("%s_%s_%s_%s", s, tag, stockCreatorMchID, stockID) +func (s CacheKey) BuildCacheUint64(ids []uint64) *Cache { + + k := helper.BuildStr(s.GetValue(), ids) c := &Cache{ Key: k, diff --git a/internal/biz/vo/order_notify_event.go b/internal/biz/vo/order_notify_event.go new file mode 100644 index 0000000..e7f6416 --- /dev/null +++ b/internal/biz/vo/order_notify_event.go @@ -0,0 +1,53 @@ +package vo + +import "fmt" + +type OrderNotifyEvent uint8 + +const ( + OrderNotifyEventSendDEd OrderNotifyEvent = iota + 1 + OrderNotifyEventUsed + OrderNotifyEventExpired +) + +var OrderNotifyEventMap = map[OrderNotifyEvent]string{ + OrderNotifyEventSendDEd: "可用", + OrderNotifyEventUsed: "已实扣", + OrderNotifyEventExpired: "已过期", +} + +func (s OrderNotifyEvent) GetText() string { + if t, ok := OrderNotifyEventMap[s]; ok { + return t + } + return "未知通知事件" +} + +func (s OrderNotifyEvent) GetValue() uint8 { + return uint8(s) +} + +func (s OrderNotifyEvent) IsSendDEd() bool { + return s == OrderNotifyEventSendDEd +} + +func (s OrderNotifyEvent) IsUsed() bool { + return s == OrderNotifyEventUsed +} + +func (s OrderNotifyEvent) IsExpired() bool { + return s == OrderNotifyEventExpired +} + +var OrderNotifyEventMapCmbStatus = map[OrderNotifyEvent]CmbStatus{ + OrderNotifyEventSendDEd: CmbStatusSuccess, + OrderNotifyEventUsed: CmbStatusUse, + OrderNotifyEventExpired: CmbStatusExpired, +} + +func (s OrderNotifyEvent) GetCmbStatusText() (CmbStatus, error) { + if t, ok := OrderNotifyEventMapCmbStatus[s]; ok { + return t, nil + } + return "", fmt.Errorf("cmbStatus[%s]未定义", s) +} diff --git a/internal/biz/vo/order_status.go b/internal/biz/vo/order_status.go index 0d69ff2..55ae7ba 100644 --- a/internal/biz/vo/order_status.go +++ b/internal/biz/vo/order_status.go @@ -1,5 +1,7 @@ package vo +import "fmt" + type OrderStatus uint8 const ( @@ -58,3 +60,29 @@ func (s OrderStatus) IsExpired() bool { func (s OrderStatus) CanNotify() bool { return s.IsSuccess() || s.IsUse() || s.IsExpired() } + +var OrderStatusMapOrderNotifyEvent = map[OrderStatus]OrderNotifyEvent{ + OrderStatusSuccess: OrderNotifyEventSendDEd, + OrderStatusUse: OrderNotifyEventUsed, + OrderStatusExpired: OrderNotifyEventExpired, +} + +func (s OrderStatus) GetOrderNotifyEvent() (OrderNotifyEvent, error) { + if t, ok := OrderStatusMapOrderNotifyEvent[s]; ok { + return t, nil + } + return 0, fmt.Errorf("CmbStatus[%s]未定义", s) +} + +var OrderStatusMapCmbStatus = map[OrderStatus]CmbStatus{ + OrderStatusSuccess: CmbStatusSuccess, + OrderStatusUse: CmbStatusUse, + OrderStatusExpired: CmbStatusExpired, +} + +func (s OrderStatus) GetCmbStatusText() (CmbStatus, error) { + if t, ok := OrderStatusMapCmbStatus[s]; ok { + return t, nil + } + return "", fmt.Errorf("cmbStatus[%s]未定义", s) +} diff --git a/internal/biz/vo/order_wechant_status.go b/internal/biz/vo/order_wechant_status.go index dd751be..aceb109 100644 --- a/internal/biz/vo/order_wechant_status.go +++ b/internal/biz/vo/order_wechant_status.go @@ -1,7 +1,5 @@ package vo -import "fmt" - type OrderWechatStatus uint8 const ( @@ -48,19 +46,6 @@ var OrderWechatStatusMap = map[OrderWechatStatus]string{ OrderWechatStatusExpired: "已过期", } -var OrderStatusMapCmbStatus = map[OrderWechatStatus]CmbStatus{ - OrderWechatStatusSuccess: CmbStatusSuccess, - OrderWechatStatusUse: CmbStatusUse, - OrderWechatStatusExpired: CmbStatusExpired, -} - -func (s OrderWechatStatus) GetCmbStatusText() (CmbStatus, error) { - if t, ok := OrderStatusMapCmbStatus[s]; ok { - return t, nil - } - return "", fmt.Errorf("CmbStatus[%s]未定义", s) -} - func (s OrderWechatStatus) GetText() string { if t, ok := OrderWechatStatusMap[s]; ok { return t diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index 20f826e..3b85f09 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -16,6 +16,7 @@ type VoucherBiz struct { ProductRepo repo.ProductRepo OrderRepo repo.OrderRepo OrderWechatRepo repo.OrderWechatRepo + OrderNotifyRepo repo.OrderNotifyRepo MqSendMixRepo mixrepos.MQSendMixRepo WechatCpnRepo wechatrepo.WechatCpnRepo } @@ -27,6 +28,7 @@ func NewVoucherBiz( ProductRepo repo.ProductRepo, OrderRepo repo.OrderRepo, OrderWechatRepo repo.OrderWechatRepo, + OrderNotifyRepo repo.OrderNotifyRepo, MqSendMixRepo mixrepos.MQSendMixRepo, WechatCpnRepo wechatrepo.WechatCpnRepo, ) *VoucherBiz { @@ -37,6 +39,7 @@ func NewVoucherBiz( ProductRepo: ProductRepo, OrderRepo: OrderRepo, OrderWechatRepo: OrderWechatRepo, + OrderNotifyRepo: OrderNotifyRepo, MqSendMixRepo: MqSendMixRepo, WechatCpnRepo: WechatCpnRepo, } diff --git a/internal/data/mixrepoimpl/cmb.go b/internal/data/mixrepoimpl/cmb.go index 3ed841f..0692c48 100644 --- a/internal/data/mixrepoimpl/cmb.go +++ b/internal/data/mixrepoimpl/cmb.go @@ -225,6 +225,7 @@ func (s *CmbMixRepoImpl) Request(ctx context.Context, req *v1.CmbRequest, uri st var response *v1.CmbReply if err = json.Unmarshal(bodyBytes, &response); err != nil { + log.Errorf("请求掌上生活返回数据解析报错,bodyBytes=%s,err=%s", string(bodyBytes), err.Error()) return nil, err } diff --git a/internal/data/model/order_notify.gen.go b/internal/data/model/order_notify.gen.go index 4401f6e..0a1c7af 100644 --- a/internal/data/model/order_notify.gen.go +++ b/internal/data/model/order_notify.gen.go @@ -16,6 +16,9 @@ type OrderNotify struct { OrderNo string `gorm:"column:order_no;not null" json:"order_no"` OutRequestNo string `gorm:"column:out_request_no;not null" json:"out_request_no"` Status uint8 `gorm:"column:status;not null;comment:状态" json:"status"` + Event uint8 `gorm:"column:event;not null;comment:event" json:"event"` + Channel uint8 `gorm:"column:channel;not null;comment:channel" json:"channel"` + Type uint8 `gorm:"column:type;not null;comment:1:招行" json:"type"` Request string `gorm:"column:request;not null" json:"request"` Responses string `gorm:"column:responses" json:"responses"` Remark string `gorm:"column:remark" json:"remark"` diff --git a/internal/data/repoimpl/order_notify.go b/internal/data/repoimpl/order_notify.go index 688ff11..0ddb472 100644 --- a/internal/data/repoimpl/order_notify.go +++ b/internal/data/repoimpl/order_notify.go @@ -27,6 +27,33 @@ func (p *OrderNotifyRepoImpl) DB(ctx context.Context) *gorm.DB { return p.db.DB(ctx).Model(model.OrderNotify{}) } +func (p *OrderNotifyRepoImpl) GetByID(ctx context.Context, id uint64) (*bo.OrderNotifyBo, error) { + info := &model.OrderNotify{} + + tx := p.DB(ctx).Where(model.OrderNotify{ID: id}).Find(&info) + + if tx.Error != nil { + return nil, tx.Error + } + + if tx.RowsAffected == 0 { + return nil, gorm.ErrRecordNotFound + } + + return p.ToBo(info), nil +} + +func (p *OrderNotifyRepoImpl) GetCountByOrderNoAndEvent(ctx context.Context, orderNo string, event vo.OrderNotifyEvent) (int64, error) { + var total int64 + + tx := p.DB(ctx).Where(model.OrderNotify{OrderNo: orderNo, Event: event.GetValue()}).Count(&total) + if tx.Error != nil { + return 0, tx.Error + } + + return total, nil +} + func (p *OrderNotifyRepoImpl) Create(ctx context.Context, req *bo.OrderNotifyBo) (*bo.OrderNotifyBo, error) { now := time.Now() @@ -37,6 +64,9 @@ func (p *OrderNotifyRepoImpl) Create(ctx context.Context, req *bo.OrderNotifyBo) Request: req.Request, Responses: "{}", NotifyUrl: req.NotifyUrl, + Channel: req.Channel.GetValue(), + Event: req.Event.GetValue(), + Type: req.Type.GetValue(), CreateTime: &now, UpdateTime: &now, } diff --git a/internal/pkg/helper/int.go b/internal/pkg/helper/int.go index f365e24..1c4b58d 100644 --- a/internal/pkg/helper/int.go +++ b/internal/pkg/helper/int.go @@ -1,5 +1,10 @@ package helper +import ( + "fmt" + "strings" +) + func ArrDeduplication[T int | int32 | uint32 | uint64 | int64 | int8 | string](arr []T) []T { seen := map[T]bool{} var result []T @@ -11,3 +16,35 @@ func ArrDeduplication[T int | int32 | uint32 | uint64 | int64 | int8 | string](a } return result } + +// BuildStr 函数用于将 uid 和 arr 中的元素拼接成一个字符串 +func BuildStr[T int | int32 | uint32 | uint64 | int64 | int8 | string](uid string, arr []T) string { + // 创建一个 strings.Builder 实例,用于高效地构建字符串 + var sb strings.Builder + + // 写入初始的 uid + sb.WriteString(uid) + + // 遍历 arr 切片中的每个元素 + for i, id := range arr { + // 如果不是第一个元素,先写入下划线分隔符 + if i > 0 { + sb.WriteByte('_') + } else if sb.Len() > 0 { + // 如果 uid 不为空且是第一个元素,也写入下划线分隔符 + sb.WriteByte('_') + } + // 根据元素的类型进行不同的处理 + switch v := any(id).(type) { + case string: + // 如果是字符串类型,直接写入 + sb.WriteString(v) + default: + // 对于其他类型,使用 fmt.Sprint 转换为字符串后写入 + sb.WriteString(fmt.Sprint(v)) + } + } + + // 将 strings.Builder 中的内容转换为字符串并返回 + return sb.String() +} diff --git a/internal/pkg/helper/int_test.go b/internal/pkg/helper/int_test.go new file mode 100644 index 0000000..c81a643 --- /dev/null +++ b/internal/pkg/helper/int_test.go @@ -0,0 +1,17 @@ +package helper + +import ( + "fmt" + "testing" +) + +func TestBuildStr(t *testing.T) { + uid := "example_uid" + arr := []int{1, 2, 3} + result := BuildStr(uid, arr) + fmt.Println(result) + + arrStr := []string{"a", "b", "c"} + resultStr := BuildStr(uid, arrStr) + fmt.Println(resultStr) +} diff --git a/internal/server/consume.go b/internal/server/consume.go index 52c5c75..414c39d 100644 --- a/internal/server/consume.go +++ b/internal/server/consume.go @@ -44,6 +44,12 @@ func NewConsumer( } } + if c := voucherService.GetNotifyRetryConfig(); c != nil { + if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyRetryConsumer); err != nil { + panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) + } + } + return &Consumer{manager: manager, hLog: hLog, conf: conf} } diff --git a/internal/service/consume.go b/internal/service/consume.go index 9ca2e5d..3ade7dc 100644 --- a/internal/service/consume.go +++ b/internal/service/consume.go @@ -4,6 +4,7 @@ import ( "context" "errors" "github.com/go-kratos/kratos/v2/log" + "strconv" "strings" "voucher/internal/pkg/mq" ) @@ -63,7 +64,7 @@ func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMes shardingKey := msg.GetShardingKey() if shardingKey == "" { - log.Error("orderNotify 消费异常,获取 shardingKey 失败") + log.Error("notify 消费异常,获取 shardingKey 失败") return errors.New("orderNotify 消费异常,获取 orderNo 失败") } @@ -75,3 +76,42 @@ func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMes return nil } + +func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig { + elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"] + if !ok { + return nil + } + + if !elm.IsOpenConsumer { + log.Warnf("notify MQ is not open") + return nil + } + + return &mq.ConsumerConfig{ + TopicName: elm.Topic, + GroupName: elm.Group, + PerCoroutineCnt: int(elm.PerCoroutineCnt), + } +} + +func (j *VoucherService) NotifyRetryConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { + + shardingKey := msg.GetShardingKey() + if shardingKey == "" { + log.Error("notify retry 消费异常,获取 shardingKey 失败") + return errors.New("orderNotify 消费异常,获取 orderNo 失败") + } + + orderNotifyId, err := strconv.ParseUint(shardingKey, 10, 64) + if err != nil { + log.Error("notify retry 消费异常,orderNotifyId转换失败,shardingKey=%s", shardingKey) + return err + } + + if err = j.VoucherBiz.NotifyRetryConsume(ctx, orderNotifyId); err != nil { + log.Errorf("notify retry 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error()) + } + + return nil +}