From 9e101492b2790aa89607b29482049df665060cf8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AD=90=E9=93=AD?= Date: Wed, 12 Mar 2025 17:47:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/v1/cmb_cpn.proto | 3 + configs/config.yaml | 12 -- configs/config_test.yaml | 12 -- internal/biz/bo/order_bo.go | 4 + internal/biz/bo/order_notify_bo.go | 25 ++- internal/biz/bo/order_wechat_bo.go | 22 -- internal/biz/cmb.go | 24 ++- internal/biz/cmb/cmb.go | 40 ++-- internal/biz/cmb/notify.go | 51 ++++- internal/biz/cmb/notify_consume.go | 55 ----- internal/biz/cmb/notify_retry_consume.go | 15 +- internal/biz/cmb/provider_set.go | 2 +- internal/biz/cmb/voucher.go | 37 ---- internal/biz/notify_consume.go | 68 ------- .../biz/{cmb/order_consume.go => order.go} | 149 +++++++------- internal/biz/order_consume.go | 52 ----- internal/biz/repo/order.go | 5 +- internal/biz/repo/order_wechat.go | 17 -- internal/biz/vo/cache.go | 6 +- internal/biz/vo/order_type.go | 8 + internal/biz/vo/order_wechant_status.go | 54 ----- internal/biz/voucher.go | 44 ++-- internal/biz/wechat_notify_consume.go | 68 +++---- internal/biz/wechatrepo/cpn.go | 4 +- internal/data/model/order.gen.go | 3 + internal/data/model/order_notify.gen.go | 25 ++- internal/data/model/order_wechat.gen.go | 32 --- internal/data/repoimpl/order.go | 30 ++- internal/data/repoimpl/order_notify.go | 21 +- internal/data/repoimpl/order_wechat.go | 191 ------------------ internal/data/repoimpl/provider_set.go | 1 - internal/data/wechatrepoimpl/cpn.go | 33 +-- internal/server/consume.go | 12 -- internal/service/cmb.go | 9 +- internal/service/consume.go | 69 ------- internal/service/wechat_notify_consume.go | 18 +- 36 files changed, 343 insertions(+), 878 deletions(-) delete mode 100644 internal/biz/bo/order_wechat_bo.go delete mode 100644 internal/biz/cmb/notify_consume.go delete mode 100644 internal/biz/cmb/voucher.go delete mode 100644 internal/biz/notify_consume.go rename internal/biz/{cmb/order_consume.go => order.go} (56%) delete mode 100644 internal/biz/order_consume.go delete mode 100644 internal/biz/repo/order_wechat.go delete mode 100644 internal/biz/vo/order_wechant_status.go delete mode 100644 internal/data/model/order_wechat.gen.go delete mode 100644 internal/data/repoimpl/order_wechat.go diff --git a/api/v1/cmb_cpn.proto b/api/v1/cmb_cpn.proto index a617252..59cf40a 100644 --- a/api/v1/cmb_cpn.proto +++ b/api/v1/cmb_cpn.proto @@ -57,6 +57,8 @@ message CmbOrderRequest { string cmbUidType = 12 [json_name = "cmbUidType", (validate.rules).string = {min_len: 1,max_len: 10}]; // 时间戳,长度为13位,精度为毫秒 string timestamp = 13 [json_name = "timestamp", (validate.rules).string = {min_len: 1,max_len: 14}]; + // 拓展参数 + string attach = 15 [json_name = "attach"]; } message CmbOrderReply { // 接口调用返回码,1000 成功,1001 失败 @@ -132,6 +134,7 @@ message CmbNotifyRequest { string orgNo = 12 [json_name = "orgNo"]; // 扩展字段 string ext = 13 [json_name = "ext"]; + string attach = 14 [json_name = "attach"]; } message CmbNotifyReply { // 接口调用返回码,1000 成功,1001 失败 diff --git a/configs/config.yaml b/configs/config.yaml index af1dd88..bbacf35 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -29,18 +29,6 @@ rocketMQ: secretKey: "Z3596KCFA9RAUR6k" secretToken: "" eventMap: - order: - topic: voucher_order_create - group: voucher_order_create_group - isOpenConsumer: false #是否启动消费 true/false - PerCoroutineCnt: 2 #协程数量,不配置默认为20 - RetryCnt: 3 #重试次数,不配置默认38 - notify: - topic: voucher_order_notify - group: voucher_order_notify_group - isOpenConsumer: false #是否启动消费 true/false - PerCoroutineCnt: 2 #协程数量,不配置默认为20 - RetryCnt: 3 #重试次数,不配置默认38 notifyRetry: # 重试延迟队列 topic: voucher_order_notifyRetry group: voucher_order_notifyRetry_group diff --git a/configs/config_test.yaml b/configs/config_test.yaml index 467020c..7efd0e8 100644 --- a/configs/config_test.yaml +++ b/configs/config_test.yaml @@ -29,18 +29,6 @@ rocketMQ: secretKey: "Z3596KCFA9RAUR6k" secretToken: "" eventMap: - order: - topic: voucher_order_create - group: voucher_order_create_group - isOpenConsumer: false #是否启动消费 true/false - PerCoroutineCnt: 2 #协程数量,不配置默认为20 - RetryCnt: 3 #重试次数,不配置默认38 - notify: - topic: voucher_order_notify - group: voucher_order_notify_group - isOpenConsumer: false #是否启动消费 true/false - PerCoroutineCnt: 2 #协程数量,不配置默认为20 - RetryCnt: 3 #重试次数,不配置默认38 notifyRetry: # 重试延迟队列 topic: voucher_order_notifyRetry group: voucher_order_notifyRetry_group diff --git a/internal/biz/bo/order_bo.go b/internal/biz/bo/order_bo.go index 547c8df..30b0a33 100644 --- a/internal/biz/bo/order_bo.go +++ b/internal/biz/bo/order_bo.go @@ -10,6 +10,7 @@ type OrderBo struct { ID uint64 OrderNo string OutBizNo string + VoucherNo string ProductNo string BatchNo string Account string @@ -20,6 +21,8 @@ type OrderBo struct { MerchantNo string NotifyUrl string Channel vo.Channel + Attach string + Remark string CreateTime *time.Time UpdateTime *time.Time } @@ -31,6 +34,7 @@ type OrderCreateReqBo struct { AppID string Type vo.OrderType AccountType vo.OrderAccountType + Attach string } type OrderCreateRepBo struct { diff --git a/internal/biz/bo/order_notify_bo.go b/internal/biz/bo/order_notify_bo.go index 56e6a0f..0d6180d 100644 --- a/internal/biz/bo/order_notify_bo.go +++ b/internal/biz/bo/order_notify_bo.go @@ -7,17 +7,16 @@ import ( // OrderNotifyBo 领域实体Bo结构,字段和模型字段保持一致 type OrderNotifyBo struct { - ID uint64 - OrderNo string - OutRequestNo string - Status vo.OrderNotifyStatus - Request string - Event vo.OrderNotifyEvent - Channel vo.Channel - Type vo.OrderType - Responses string - Remark string - NotifyUrl string - CreateTime *time.Time - UpdateTime *time.Time + ID uint64 + OrderNo string + Status vo.OrderNotifyStatus + Request string + Event vo.OrderNotifyEvent + Channel vo.Channel + Type vo.OrderType + Responses string + Remark string + NotifyUrl string + CreateTime *time.Time + UpdateTime *time.Time } diff --git a/internal/biz/bo/order_wechat_bo.go b/internal/biz/bo/order_wechat_bo.go deleted file mode 100644 index db33167..0000000 --- a/internal/biz/bo/order_wechat_bo.go +++ /dev/null @@ -1,22 +0,0 @@ -package bo - -import ( - "time" - "voucher/internal/biz/vo" -) - -// OrderWechatBo 领域实体Bo结构,字段和模型字段保持一致 -type OrderWechatBo struct { - ID uint64 - OrderNo string - OutRequestNo string - AppID string - StockCreatorMchid string - OpenID string - StockID string - Status vo.OrderWechatStatus - CouponID string - Remark string - CreateTime *time.Time - UpdateTime *time.Time -} diff --git a/internal/biz/cmb.go b/internal/biz/cmb.go index ced0cc1..8f57509 100644 --- a/internal/biz/cmb.go +++ b/internal/biz/cmb.go @@ -12,9 +12,9 @@ import ( "voucher/internal/pkg/lock" ) -func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) { +func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (voucherNo string, err error) { - c := vo.CmbOrderLockKey.BuildCache([]string{req.OutBizNo}) + c := vo.CmbOrderLockKey.BuildCache([]string{req.OutBizNo, req.Type.String()}) err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { @@ -25,7 +25,14 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or } if order != nil { - orderNo = order.OrderNo + + if order.Status.IsFail() { + if err = v.orderRetry(ctx, order); err != nil { + return err + } + } + + voucherNo = order.VoucherNo return nil } @@ -38,17 +45,20 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or return fmt.Errorf("只支持微信") } - if orderNo, err = v.Cmb.Order(ctx, req, product); err != nil { + order, err = v.order(ctx, req, product) + if err != nil { return err } - return v.PushOrderMQ(ctx, orderNo) + voucherNo = order.VoucherNo + + return nil }) return } -func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (reps *v1.CmbQueryReply, err error) { +func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (resp *v1.CmbQueryReply, err error) { c := vo.CmbQueryLockKey.BuildCache([]string{orderNo}) @@ -64,7 +74,7 @@ func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (reps *v1.Cmb return err } - reps = &v1.CmbQueryReply{ + resp = &v1.CmbQueryReply{ Ticket: order.OrderNo, Status: status.GetValue(), TransDate: time.Now().Format("20060102150405"), diff --git a/internal/biz/cmb/cmb.go b/internal/biz/cmb/cmb.go index f66c123..e881eae 100644 --- a/internal/biz/cmb/cmb.go +++ b/internal/biz/cmb/cmb.go @@ -9,43 +9,31 @@ import ( ) type Cmb struct { - bc *conf.Bootstrap - rdb *data.Rdb - OrderRepo repo.OrderRepo - OrderWechatRepo repo.OrderWechatRepo - ProductRepo repo.ProductRepo - OrderNotifyRepo repo.OrderNotifyRepo - WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo - WechatCpnRepo wechatrepo.WechatCpnRepo - GenerateMixRepo mixrepos.GenerateMixRepo - CmbMixRepo mixrepos.CmbMixRepo - DingMixRepo mixrepos.DingMixRepo + bc *conf.Bootstrap + rdb *data.Rdb + OrderRepo repo.OrderRepo + ProductRepo repo.ProductRepo + OrderNotifyRepo repo.OrderNotifyRepo + WechatCpnRepo wechatrepo.WechatCpnRepo + CmbMixRepo mixrepos.CmbMixRepo } func NewCmb( bc *conf.Bootstrap, rdb *data.Rdb, orderRepo repo.OrderRepo, - OrderWechatRepo repo.OrderWechatRepo, ProductRepo repo.ProductRepo, OrderNotifyRepo repo.OrderNotifyRepo, - WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo, WechatCpnRepo wechatrepo.WechatCpnRepo, - GenerateMixRepo mixrepos.GenerateMixRepo, CmbMixRepo mixrepos.CmbMixRepo, - DingMixRepo mixrepos.DingMixRepo, ) *Cmb { return &Cmb{ - bc: bc, - rdb: rdb, - OrderRepo: orderRepo, - OrderWechatRepo: OrderWechatRepo, - ProductRepo: ProductRepo, - OrderNotifyRepo: OrderNotifyRepo, - WechatNotifyRegisterTagRepo: WechatNotifyRegisterTagRepo, - WechatCpnRepo: WechatCpnRepo, - GenerateMixRepo: GenerateMixRepo, - CmbMixRepo: CmbMixRepo, - DingMixRepo: DingMixRepo, + bc: bc, + rdb: rdb, + OrderRepo: orderRepo, + ProductRepo: ProductRepo, + OrderNotifyRepo: OrderNotifyRepo, + WechatCpnRepo: WechatCpnRepo, + CmbMixRepo: CmbMixRepo, } } diff --git a/internal/biz/cmb/notify.go b/internal/biz/cmb/notify.go index 0bd5a5c..4d729b3 100644 --- a/internal/biz/cmb/notify.go +++ b/internal/biz/cmb/notify.go @@ -3,6 +3,7 @@ package cmb import ( "context" "encoding/json" + "github.com/go-kratos/kratos/v2/log" "time" err2 "voucher/api/err" v1 "voucher/api/v1" @@ -10,7 +11,50 @@ import ( "voucher/internal/biz/vo" ) -func (v *Cmb) bizContent(_ context.Context, orderNotify *bo.OrderNotifyBo) (string, error) { +func (v *Cmb) Notify(ctx context.Context, order *bo.OrderBo) (*bo.OrderNotifyBo, error) { + + event, err := order.Status.GetOrderNotifyEvent() + if err != nil { + return nil, err + } + + req := &bo.OrderNotifyBo{ + OrderNo: order.OrderNo, + NotifyUrl: order.NotifyUrl, + Channel: order.Channel, + Event: event, + Type: order.Type, + } + + request, orderNotify, err := v.notifyCreate(ctx, order, req) + if err != nil { + return nil, err + } + + x, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl) + if err != nil { + return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) + } + + bizStr, err := v.CmbMixRepo.VerifyResponse(ctx, x) + if err != nil { + log.Errorf("Notify CmbMixRepo.VerifyResponse error:%s", err.Error()) + return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) + } + + var reply *v1.CmbNotifyReply + if err = json.Unmarshal([]byte(bizStr), &reply); err != nil { + return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) + } + + if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() { + return orderNotify, v.notifyFail(ctx, orderNotify.ID, reply.RespMsg) + } + + return orderNotify, v.notifySuccess(ctx, orderNotify.ID, bizStr) +} + +func (v *Cmb) bizContent(_ context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo) (string, error) { cmbStatus, err := orderNotify.Event.GetCmbStatusText() if err != nil { @@ -22,6 +66,7 @@ func (v *Cmb) bizContent(_ context.Context, orderNotify *bo.OrderNotifyBo) (stri Status: cmbStatus.GetValue(), TransDate: time.Now().Format("20060102150405"), OrgNo: v.bc.Cmb.OrgNo, + Attach: order.Attach, Ext: "", } @@ -33,9 +78,9 @@ func (v *Cmb) bizContent(_ context.Context, orderNotify *bo.OrderNotifyBo) (stri return string(bizJsonBytes), nil } -func (v *Cmb) notifyCreate(ctx context.Context, req *bo.OrderNotifyBo) (*v1.CmbRequest, *bo.OrderNotifyBo, error) { +func (v *Cmb) notifyCreate(ctx context.Context, order *bo.OrderBo, req *bo.OrderNotifyBo) (*v1.CmbRequest, *bo.OrderNotifyBo, error) { - bizContent, err := v.bizContent(ctx, req) + bizContent, err := v.bizContent(ctx, order, req) if err != nil { return nil, nil, err } diff --git a/internal/biz/cmb/notify_consume.go b/internal/biz/cmb/notify_consume.go deleted file mode 100644 index 52d7ee0..0000000 --- a/internal/biz/cmb/notify_consume.go +++ /dev/null @@ -1,55 +0,0 @@ -package cmb - -import ( - "context" - "encoding/json" - "github.com/go-kratos/kratos/v2/log" - v1 "voucher/api/v1" - "voucher/internal/biz/bo" - "voucher/internal/biz/vo" -) - -func (v *Cmb) NotifyConsume(ctx context.Context, order *bo.OrderBo, orderOutRequestNo string) (*bo.OrderNotifyBo, error) { - - event, err := order.Status.GetOrderNotifyEvent() - if err != nil { - return nil, err - } - - req := &bo.OrderNotifyBo{ - OrderNo: order.OrderNo, - OutRequestNo: orderOutRequestNo, - NotifyUrl: order.NotifyUrl, - Channel: order.Channel, - Event: event, - Type: order.Type, - Request: "", - } - - request, orderNotify, err := v.notifyCreate(ctx, req) - if err != nil { - return nil, err - } - - x, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl) - if err != nil { - return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) - } - - bizStr, err := v.CmbMixRepo.VerifyResponse(ctx, x) - if err != nil { - log.Errorf("NotifyConsume CmbMixRepo.VerifyResponse error:%s", err.Error()) - return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) - } - - var reply *v1.CmbNotifyReply - if err = json.Unmarshal([]byte(bizStr), &reply); err != nil { - return orderNotify, v.notifyFail(ctx, orderNotify.ID, err.Error()) - } - - if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() { - return orderNotify, v.notifyFail(ctx, orderNotify.ID, reply.RespMsg) - } - - return orderNotify, v.notifySuccess(ctx, orderNotify.ID, bizStr) -} diff --git a/internal/biz/cmb/notify_retry_consume.go b/internal/biz/cmb/notify_retry_consume.go index 270a0a5..037a7ca 100644 --- a/internal/biz/cmb/notify_retry_consume.go +++ b/internal/biz/cmb/notify_retry_consume.go @@ -12,16 +12,15 @@ import ( func (v *Cmb) NotifyRetryConsume(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo) (*bo.OrderNotifyBo, error) { req := &bo.OrderNotifyBo{ - OrderNo: orderNotify.OrderNo, - OutRequestNo: orderNotify.OutRequestNo, - NotifyUrl: order.NotifyUrl, - Channel: order.Channel, - Event: orderNotify.Event, - Type: order.Type, - Request: "", + OrderNo: orderNotify.OrderNo, + NotifyUrl: order.NotifyUrl, + Channel: order.Channel, + Event: orderNotify.Event, + Type: order.Type, + Request: "", } - request, orderNotify, err := v.notifyCreate(ctx, req) + request, orderNotify, err := v.notifyCreate(ctx, order, req) if err != nil { return nil, err } diff --git a/internal/biz/cmb/provider_set.go b/internal/biz/cmb/provider_set.go index d6b41fe..e22c1d4 100644 --- a/internal/biz/cmb/provider_set.go +++ b/internal/biz/cmb/provider_set.go @@ -4,5 +4,5 @@ import ( "github.com/google/wire" ) -// ProviderSetCmb ProviderSetCmb is biz providers. +// ProviderSetCmb is biz providers. var ProviderSetCmb = wire.NewSet(NewCmb) diff --git a/internal/biz/cmb/voucher.go b/internal/biz/cmb/voucher.go deleted file mode 100644 index b1e223f..0000000 --- a/internal/biz/cmb/voucher.go +++ /dev/null @@ -1,37 +0,0 @@ -package cmb - -import ( - "context" - "voucher/internal/biz/bo" - "voucher/internal/biz/vo" - "voucher/internal/pkg/uid" -) - -func (v *Cmb) Order(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (orderNo string, err error) { - - orderNo, err = v.GenerateMixRepo.GeneratorString(ctx, uid.Order) - if err != nil { - return - } - - o := &bo.OrderBo{ - OrderNo: orderNo, - OutBizNo: req.OutBizNo, - ProductNo: req.ProductNo, - Account: req.Account, - AppID: req.AppID, - MerchantNo: product.MchId, - Channel: product.Channel, - BatchNo: product.BatchNo, - NotifyUrl: v.bc.Cmb.NotifyUrl, - AccountType: vo.OrderAccountTypeOpenId, - Type: vo.OrderTypeCmb, - Status: vo.OrderStatusWait, - } - - if _, err = v.OrderRepo.Create(ctx, o); err != nil { - return - } - - return -} diff --git a/internal/biz/notify_consume.go b/internal/biz/notify_consume.go deleted file mode 100644 index d7a6157..0000000 --- a/internal/biz/notify_consume.go +++ /dev/null @@ -1,68 +0,0 @@ -package biz - -import ( - "context" - "fmt" - "go.opentelemetry.io/otel/trace" - errPb "voucher/api/err" - "voucher/internal/biz/bo" - "voucher/internal/biz/vo" - "voucher/internal/pkg/lock" - "voucher/internal/pkg/mq" -) - -func (v *VoucherBiz) PushNotifyMQ(ctx context.Context, orderNo, outRequestNo string) error { - - eventMap := v.bc.RocketMQ.EventMap["notify"] - sendOption := []mq.SendOption{ - mq.WithSendShardingKeysOption(fmt.Sprintf("%s_%s", orderNo, outRequestNo)), - mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), - } - - if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { - return fmt.Errorf("notify,消费队列投递失败[%v]", err) - } - - return nil -} - -func (v *VoucherBiz) NotifyConsume(ctx context.Context, orderNo, orderOutRequestNo string) error { - - var ( - err error - orderNotify *bo.OrderNotifyBo - cache = vo.NotifyConsume.BuildCache([]string{orderNo}) - ) - - err = lock.NewMutex(v.rdb.Rdb, cache.TTL).Lock(ctx, cache.Key, func(ctx context.Context) error { - - order, err2 := v.OrderRepo.GetByOrderNo(ctx, orderNo) - if err2 != nil { - return err - } - - if !order.Status.CanNotify() { - return fmt.Errorf("订单状态错误,不能通知:%s", order.Status.GetText()) - } - - if !order.Channel.IsWeChat() { - return fmt.Errorf("暂不支持订单%s渠道%s回调通知处理", order.OrderNo, order.Channel.GetText()) - } - - if order.Type.IsCmb() { - if orderNotify, err2 = v.Cmb.NotifyConsume(ctx, order, orderOutRequestNo); err2 != nil { - return err - } - } - - return fmt.Errorf("订单类型错误:%s", order.Type.GetText()) - }) - - if !errPb.IsNeedRetryNotify(err) { - return err - } - - // 状态回调接口失败需要支持重试 重试间隔为1分钟、2分钟、12分钟、60分钟、360分钟 - // 第一次通知失败重试入队 - return v.PushNotifyRetryDelayMQ(ctx, 60, orderNotify.ID) -} diff --git a/internal/biz/cmb/order_consume.go b/internal/biz/order.go similarity index 56% rename from internal/biz/cmb/order_consume.go rename to internal/biz/order.go index 8ddef01..0fbf47c 100644 --- a/internal/biz/cmb/order_consume.go +++ b/internal/biz/order.go @@ -1,4 +1,4 @@ -package cmb +package biz import ( "context" @@ -12,43 +12,75 @@ import ( "voucher/internal/pkg/uid" ) -func (v *Cmb) OrderConsume(ctx context.Context, order *bo.OrderBo) (outRequestNo string, err error) { +func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (*bo.OrderBo, error) { - if !order.Status.IsWait() { - return outRequestNo, fmt.Errorf("订单状态错误,%s", order.Status.GetText()) - } - - if !order.Channel.IsWeChat() { - return outRequestNo, fmt.Errorf("订单渠道错误,%s", order.Channel.GetText()) - } - - // 注册通知标签 order.MerchantNo 批次创建商户, order.BatchNo 商品批次号 - if err = v.registerNotifyTag(ctx, order.MerchantNo, order.BatchNo); err != nil { - return outRequestNo, err - } - - if err = v.ing(ctx, order.ID); err != nil { - return - } - - orderWechat, err := v.create(ctx, order) + order, err := v.create(ctx, req, product) if err != nil { - return + return nil, err } - couponId, err := v.WechatCpnRepo.Order(ctx, orderWechat) - if err != nil { - return outRequestNo, v.fail(ctx, order, orderWechat, err.Error()) + voucherNo := "" + if product.ProductNo == "001" { + // 压测商品 + voucherNo = order.OrderNo + } else { + + // 注册通知标签 order.MerchantNo 批次创建商户, order.BatchNo 商品批次号 + if err = v.registerNotifyTag(ctx, order.MerchantNo, order.BatchNo); err != nil { + return nil, err + } + + // 真实发放 + voucherNo, err = v.WechatCpnRepo.Order(ctx, order) + if err != nil { + return nil, v.fail(ctx, order, err.Error()) + } + } - if err = v.success(ctx, order, orderWechat, couponId); err != nil { - return + if err = v.success(ctx, order, voucherNo); err != nil { + return nil, err } - return orderWechat.OutRequestNo, err + return order, nil } -func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { +func (v *VoucherBiz) orderRetry(ctx context.Context, order *bo.OrderBo) error { + + voucherNo, err := v.WechatCpnRepo.Order(ctx, order) + if err != nil { + return v.fail(ctx, order, err.Error()) + } + + order.VoucherNo = voucherNo + return v.success(ctx, order, voucherNo) +} + +func (v *VoucherBiz) create(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (*bo.OrderBo, error) { + + orderNo, err := v.GenerateMixRepo.GeneratorString(ctx, uid.Order) + if err != nil { + return nil, err + } + + return v.OrderRepo.Create(ctx, &bo.OrderBo{ + OrderNo: orderNo, + OutBizNo: req.OutBizNo, + ProductNo: req.ProductNo, + Account: req.Account, + AppID: req.AppID, + MerchantNo: product.MchId, + Channel: product.Channel, + BatchNo: product.BatchNo, + NotifyUrl: v.bc.Cmb.NotifyUrl, + AccountType: vo.OrderAccountTypeOpenId, + Type: vo.OrderTypeCmb, + Status: vo.OrderStatusIng, // 同步发放,状态至为发放中 + Attach: req.Attach, + }) +} + +func (v *VoucherBiz) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { c := vo.WechatNotifyRegisterTagCacheKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) @@ -60,8 +92,7 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID } if err != redis.Nil { - errMsg := fmt.Sprintf("获取redis缓存%s异常:%v", c.Key, err) - return fmt.Errorf(errMsg) + return fmt.Errorf(fmt.Sprintf("获取redis缓存%s异常:%v", c.Key, err)) } cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) @@ -107,7 +138,7 @@ func (v *Cmb) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID }) } -func (v *Cmb) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMchID, stockID string) (*bo.WechatNotifyRegisterTagBo, error) { +func (v *VoucherBiz) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMchID, stockID string) (*bo.WechatNotifyRegisterTagBo, error) { return v.WechatNotifyRegisterTagRepo.Create(ctx, &bo.WechatNotifyRegisterTagBo{ StockID: stockID, StockCreatorMchID: stockCreatorMchID, @@ -115,7 +146,7 @@ func (v *Cmb) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMch }) } -func (v *Cmb) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.WechatNotifyRegisterTagBo) error { +func (v *VoucherBiz) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.WechatNotifyRegisterTagBo) error { if err := v.rdb.Rdb.Set(ctx, c.Key, wechatNotifyTag.Tag, c.TTL).Err(); err != nil { return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) @@ -124,59 +155,26 @@ func (v *Cmb) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.Wec return nil } -func (v *Cmb) create(ctx context.Context, order *bo.OrderBo) (*bo.OrderWechatBo, error) { - - outRequestNo, err := v.GenerateMixRepo.GeneratorString(ctx, uid.OrderWechat) - if err != nil { - return nil, err - } - - req := &bo.OrderWechatBo{ - OrderNo: order.OrderNo, - OutRequestNo: outRequestNo, - AppID: order.AppID, - StockCreatorMchid: order.MerchantNo, - OpenID: order.Account, - StockID: order.BatchNo, - Status: vo.OrderWechatStatusWait, - } - - orderWechat, err := v.OrderWechatRepo.Create(ctx, req) - if err != nil { - return nil, err - } - - return orderWechat, nil -} - -func (v *Cmb) ing(ctx context.Context, id uint64) error { +func (v *VoucherBiz) ing(ctx context.Context, id uint64) error { return v.OrderRepo.Ing(ctx, id) } -func (v *Cmb) success(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo, couponId string) error { +func (v *VoucherBiz) success(ctx context.Context, order *bo.OrderBo, voucherNo string) error { - if err := v.OrderWechatRepo.Success(ctx, orderWechat.ID, couponId); err != nil { - return err - } - - return v.OrderRepo.Success(ctx, order.ID) + return v.OrderRepo.Success(ctx, order.ID, voucherNo) } -func (v *Cmb) fail(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo, errMsg string) error { +func (v *VoucherBiz) fail(ctx context.Context, order *bo.OrderBo, remark string) error { - if err := v.OrderWechatRepo.Fail(ctx, orderWechat.ID, errMsg); err != nil { + if err := v.OrderRepo.Fail(ctx, order.ID, remark); err != nil { return err } - if err := v.OrderRepo.Fail(ctx, order.ID); err != nil { - return err - } - - return v.alarm(ctx, order, errMsg) + return v.alarm(ctx, order, remark) } -func (v *Cmb) alarm(ctx context.Context, order *bo.OrderBo, errMsg string) error { +func (v *VoucherBiz) alarm(ctx context.Context, order *bo.OrderBo, errMsg string) error { // 1小时 内 指定的批次号 发放 发生错误 预警 c := vo.OrderConsumeFailAlarmKey.BuildCache([]string{order.ProductNo}) @@ -208,12 +206,11 @@ func (v *Cmb) alarm(ctx context.Context, order *bo.OrderBo, errMsg string) error } // 通知 - text := v.alarmText(ctx, order, errMsg) - if err = v.DingMixRepo.SendMarkdownMessage(ctx, text); err != nil { + if err = v.DingMixRepo.SendMarkdownMessage(ctx, v.alarmText(ctx, order, errMsg)); err != nil { return err } - if err := v.rdb.Rdb.Set(ctx, c.Key, order.ProductNo, c.TTL).Err(); err != nil { + if err = v.rdb.Rdb.Set(ctx, c.Key, order.ProductNo, c.TTL).Err(); err != nil { return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) } @@ -221,7 +218,7 @@ func (v *Cmb) alarm(ctx context.Context, order *bo.OrderBo, errMsg string) error }) } -func (v *Cmb) alarmText(_ context.Context, order *bo.OrderBo, errMsg string) string { +func (v *VoucherBiz) alarmText(_ context.Context, order *bo.OrderBo, errMsg string) string { remarks := fmt.Sprintf("订单号:%s,商品编号:%s,原因:%s", order.OrderNo, order.ProductNo, errMsg) diff --git a/internal/biz/order_consume.go b/internal/biz/order_consume.go deleted file mode 100644 index edfb15d..0000000 --- a/internal/biz/order_consume.go +++ /dev/null @@ -1,52 +0,0 @@ -package biz - -import ( - "context" - "fmt" - "go.opentelemetry.io/otel/trace" - "voucher/internal/biz/vo" - "voucher/internal/pkg/lock" - "voucher/internal/pkg/mq" -) - -func (v *VoucherBiz) PushOrderMQ(ctx context.Context, orderNo string) error { - - eventMap := v.bc.RocketMQ.EventMap["order"] - sendOption := []mq.SendOption{ - mq.WithSendShardingKeysOption(fmt.Sprintf("%s", orderNo)), - mq.WithOpenTelemetryOption(trace.SpanFromContext(ctx).SpanContext().TraceID().String()), - } - - if err := v.MqSendMixRepo.SendSync(ctx, eventMap.Topic, []byte("{}"), sendOption...); err != nil { - return fmt.Errorf("收单成功,消费队列投递失败[%v]", err) - } - - return nil -} - -func (v *VoucherBiz) OrderConsume(ctx context.Context, orderNo string) (err error) { - - c := vo.OrderConsume.BuildCache([]string{orderNo}) - - return lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { - - order, err := v.OrderRepo.GetByOrderNo(ctx, orderNo) - if err != nil { - return err - } - - if order.Type.IsCmb() { - - outRequestNo, err := v.Cmb.OrderConsume(ctx, order) - if err != nil { - return err - } - - return v.PushNotifyMQ(ctx, orderNo, outRequestNo) - - } - - return fmt.Errorf("订单类型错误:%s", order.Type.GetText()) - }) - -} diff --git a/internal/biz/repo/order.go b/internal/biz/repo/order.go index 6c7dc69..008c5b4 100644 --- a/internal/biz/repo/order.go +++ b/internal/biz/repo/order.go @@ -9,11 +9,12 @@ import ( type OrderRepo interface { GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error) GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error) + GetByMBV(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error) Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderBo, error) GetByID(ctx context.Context, id uint64) (*bo.OrderBo, error) Ing(ctx context.Context, id uint64) error - Success(ctx context.Context, id uint64) error - Fail(ctx context.Context, id uint64) error + Success(ctx context.Context, id uint64, voucherNo string) error + Fail(ctx context.Context, id uint64, remark string) error Used(ctx context.Context, id uint64) error Expired(ctx context.Context, id uint64) error } diff --git a/internal/biz/repo/order_wechat.go b/internal/biz/repo/order_wechat.go deleted file mode 100644 index 956b89f..0000000 --- a/internal/biz/repo/order_wechat.go +++ /dev/null @@ -1,17 +0,0 @@ -package repo - -import ( - "context" - "voucher/internal/biz/bo" -) - -type OrderWechatRepo interface { - Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error) - Success(ctx context.Context, id uint64, couponId string) error - Fail(ctx context.Context, id uint64, remark string) error - GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) - GetLastByOrderNo(ctx context.Context, orderNo string) (*bo.OrderWechatBo, error) - GetByMSCId(ctx context.Context, mchId, stockId, couponId string) (*bo.OrderWechatBo, error) - Used(ctx context.Context, id uint64) error - Expired(ctx context.Context, id uint64) error -} diff --git a/internal/biz/vo/cache.go b/internal/biz/vo/cache.go index fcdc1ab..4b64793 100644 --- a/internal/biz/vo/cache.go +++ b/internal/biz/vo/cache.go @@ -12,8 +12,6 @@ const ( CmbQueryLockKey CacheKey = "cmb_query" CmbProductQueryLockKey CacheKey = "cmb_product_query" - OrderConsume CacheKey = "order_consume" - NotifyConsume CacheKey = "notify_consume" NotifyRetryConsume CacheKey = "notify_retry_consume" OrderConsumeFailAlarmKey CacheKey = "order_consume_fail_alarm" @@ -29,13 +27,11 @@ var CacheKeyMap = map[CacheKey]time.Duration{ CmbOrderLockKey: 30 * time.Second, CmbQueryLockKey: 30 * time.Second, CmbProductQueryLockKey: 30 * time.Second, - OrderConsume: 60 * time.Second, OrderConsumeFailAlarmKey: 3600 * time.Second, // 1小时 OrderConsumeFailAlarmLockKey: 60 * time.Second, - NotifyConsume: 60 * time.Second, NotifyRetryConsume: 60 * time.Second, WechatNotifyRegisterTagCacheKey: 86400 * time.Second, // 1天 - WechatNotifyRegisterTagCacheLockKey: 30 * time.Second, + WechatNotifyRegisterTagCacheLockKey: 60 * time.Second, WechatNotifyConsumeLockKey: 30 * time.Second, } diff --git a/internal/biz/vo/order_type.go b/internal/biz/vo/order_type.go index 9fd8fdb..7ad1cd1 100644 --- a/internal/biz/vo/order_type.go +++ b/internal/biz/vo/order_type.go @@ -1,5 +1,9 @@ package vo +import ( + "fmt" +) + type OrderType uint8 const ( @@ -17,6 +21,10 @@ func (s OrderType) GetText() string { return "未知类型" } +func (s OrderType) String() string { + return fmt.Sprintf("%d", s) +} + func (s OrderType) GetValue() uint8 { return uint8(s) } diff --git a/internal/biz/vo/order_wechant_status.go b/internal/biz/vo/order_wechant_status.go deleted file mode 100644 index aceb109..0000000 --- a/internal/biz/vo/order_wechant_status.go +++ /dev/null @@ -1,54 +0,0 @@ -package vo - -type OrderWechatStatus uint8 - -const ( - OrderWechatStatusWait OrderWechatStatus = iota + 1 - OrderWechatStatusSuccess - OrderWechatStatusFail - OrderWechatStatusUse - OrderWechatStatusExpired -) - -func (s OrderWechatStatus) GetValue() uint8 { - return uint8(s) -} - -func (s OrderWechatStatus) IsWait() bool { - return s == OrderWechatStatusWait -} - -func (s OrderWechatStatus) IsSuccess() bool { - return s == OrderWechatStatusSuccess -} - -func (s OrderWechatStatus) IsFail() bool { - return s == OrderWechatStatusFail -} - -func (s OrderWechatStatus) IsUse() bool { - return s == OrderWechatStatusUse -} - -func (s OrderWechatStatus) IsExpired() bool { - return s == OrderWechatStatusExpired -} - -func (s OrderWechatStatus) CanNotify() bool { - return s.IsSuccess() || s.IsUse() || s.IsExpired() -} - -var OrderWechatStatusMap = map[OrderWechatStatus]string{ - OrderWechatStatusWait: "待发放", - OrderWechatStatusSuccess: "发放成功", - OrderWechatStatusFail: "发放失败", - OrderWechatStatusUse: "已使用", - OrderWechatStatusExpired: "已过期", -} - -func (s OrderWechatStatus) GetText() string { - if t, ok := OrderWechatStatusMap[s]; ok { - return t - } - return "未知状态" -} diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index 3b85f09..41c517f 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -10,15 +10,17 @@ import ( ) type VoucherBiz struct { - bc *conf.Bootstrap - rdb *data.Rdb - Cmb *cmb.Cmb - ProductRepo repo.ProductRepo - OrderRepo repo.OrderRepo - OrderWechatRepo repo.OrderWechatRepo - OrderNotifyRepo repo.OrderNotifyRepo - MqSendMixRepo mixrepos.MQSendMixRepo - WechatCpnRepo wechatrepo.WechatCpnRepo + bc *conf.Bootstrap + rdb *data.Rdb + Cmb *cmb.Cmb + ProductRepo repo.ProductRepo + OrderRepo repo.OrderRepo + OrderNotifyRepo repo.OrderNotifyRepo + WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo + MqSendMixRepo mixrepos.MQSendMixRepo + GenerateMixRepo mixrepos.GenerateMixRepo + WechatCpnRepo wechatrepo.WechatCpnRepo + DingMixRepo mixrepos.DingMixRepo } func NewVoucherBiz( @@ -27,20 +29,24 @@ func NewVoucherBiz( Cmb *cmb.Cmb, ProductRepo repo.ProductRepo, OrderRepo repo.OrderRepo, - OrderWechatRepo repo.OrderWechatRepo, OrderNotifyRepo repo.OrderNotifyRepo, + WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo, MqSendMixRepo mixrepos.MQSendMixRepo, + GenerateMixRepo mixrepos.GenerateMixRepo, WechatCpnRepo wechatrepo.WechatCpnRepo, + DingMixRepo mixrepos.DingMixRepo, ) *VoucherBiz { return &VoucherBiz{ - bc: bc, - rdb: rdb, - Cmb: Cmb, - ProductRepo: ProductRepo, - OrderRepo: OrderRepo, - OrderWechatRepo: OrderWechatRepo, - OrderNotifyRepo: OrderNotifyRepo, - MqSendMixRepo: MqSendMixRepo, - WechatCpnRepo: WechatCpnRepo, + bc: bc, + rdb: rdb, + Cmb: Cmb, + ProductRepo: ProductRepo, + OrderRepo: OrderRepo, + OrderNotifyRepo: OrderNotifyRepo, + WechatNotifyRegisterTagRepo: WechatNotifyRegisterTagRepo, + MqSendMixRepo: MqSendMixRepo, + GenerateMixRepo: GenerateMixRepo, + WechatCpnRepo: WechatCpnRepo, + DingMixRepo: DingMixRepo, } } diff --git a/internal/biz/wechat_notify_consume.go b/internal/biz/wechat_notify_consume.go index b1c63a1..ee8e797 100644 --- a/internal/biz/wechat_notify_consume.go +++ b/internal/biz/wechat_notify_consume.go @@ -4,77 +4,75 @@ import ( "context" "fmt" "github.com/go-kratos/kratos/v2/log" + errPb "voucher/api/err" "voucher/internal/biz/bo" "voucher/internal/biz/vo" "voucher/internal/pkg/lock" ) -func (j *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req *bo.WechatVoucherNotifyBo) error { +func (v *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req *bo.WechatVoucherNotifyBo) error { + + //req.PlainText.StockCreatorMchid = "1676203838" + //req.PlainText.StockID = "20215869" + //req.PlainText.CouponID = "96059179220" c := vo.WechatNotifyConsumeLockKey.BuildCache([]string{tag, req.PlainText.StockID, req.PlainText.CouponID}) - return lock.NewMutex(j.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { + return lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { - //req.PlainText.StockCreatorMchid = "1676203838" - //req.PlainText.StockID = "20215869" - //req.PlainText.CouponID = "96059179220" - - if req.PlainText.Status.IsSended() { - log.Warnf("券状态可用,忽略不处理,couponId:%s,stockId:%s,status:%s", - req.PlainText.CouponID, req.PlainText.StockID, req.PlainText.Status.GetText()) - return nil - } - - orderWechat, err := j.OrderWechatRepo.GetByMSCId(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID) + order, err := v.OrderRepo.GetByMBV(ctx, req.PlainText.StockCreatorMchid, req.PlainText.StockID, req.PlainText.CouponID) if err != nil { return err } - order, err := j.OrderRepo.GetByOrderNo(ctx, orderWechat.OrderNo) - if err != nil { - return fmt.Errorf("根据订单号%s获取订单信息失败:%s", orderWechat.OrderNo, err.Error()) - } - if req.PlainText.Status.IsUsed() { - if err = j.wechatVoucherUsed(ctx, order, orderWechat); err != nil { + + if err = v.used(ctx, order); err != nil { return err } + } else if req.PlainText.Status.IsExpired() { - if err = j.wechatVoucherExpired(ctx, order, orderWechat); err != nil { + + if err = v.expired(ctx, order); err != nil { return err } + } else { return fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText()) } - return j.PushNotifyMQ(ctx, orderWechat.OrderNo, orderWechat.OutRequestNo) + if order.Type.IsCmb() { + + if orderNotify, err2 := v.Cmb.Notify(ctx, order); err2 != nil { + if !errPb.IsNeedRetryNotify(err2) { + return err2 + } + // 第一次通知失败重试入队 + // 状态回调接口失败需要支持重试 重试间隔为1分钟、2分钟、12分钟、60分钟、360分钟 + return v.PushNotifyRetryDelayMQ(ctx, 60, orderNotify.ID) + } + } + + return nil }) } -func (v *VoucherBiz) wechatVoucherUsed(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo) error { +func (v *VoucherBiz) used(ctx context.Context, order *bo.OrderBo) error { - if orderWechat.Status.IsUse() { - log.Warnf("券状态已是已使用,忽略不处理,orderNo:%s", orderWechat.OrderNo) + if order.Status.IsUse() { + log.Warnf("券状态已是已使用,忽略不处理,orderNo:%s", order.OrderNo) return nil } - if err := v.OrderWechatRepo.Used(ctx, orderWechat.ID); err != nil { - return err - } - return v.OrderRepo.Used(ctx, order.ID) } -func (j *VoucherBiz) wechatVoucherExpired(ctx context.Context, order *bo.OrderBo, orderWechat *bo.OrderWechatBo) error { +func (j *VoucherBiz) expired(ctx context.Context, order *bo.OrderBo) error { - if orderWechat.Status.IsExpired() { - log.Warnf("券状态已是已过期,忽略不处理,orderNo:%s", orderWechat.OrderNo) + if order.Status.IsExpired() { + log.Warnf("券状态已是已过期,忽略不处理,orderNo:%s", order.OrderNo) return nil } - if err := j.OrderWechatRepo.Expired(ctx, orderWechat.ID); err != nil { - return err - } - return j.OrderRepo.Expired(ctx, order.ID) } diff --git a/internal/biz/wechatrepo/cpn.go b/internal/biz/wechatrepo/cpn.go index a81801e..bf13c38 100644 --- a/internal/biz/wechatrepo/cpn.go +++ b/internal/biz/wechatrepo/cpn.go @@ -8,8 +8,8 @@ import ( ) type WechatCpnRepo interface { - Order(ctx context.Context, orderWechat *bo.OrderWechatBo) (couponId string, err error) - Query(ctx context.Context, orderWechat *bo.OrderWechatBo) (vo.OrderWechatStatus, error) + Order(ctx context.Context, order *bo.OrderBo) (couponId string, err error) + Query(ctx context.Context, order *bo.OrderBo) (vo.OrderStatus, error) QueryProduct(ctx context.Context, stockCreatorMchId, stockId string) (*cashcoupons.Stock, error) RegisterNotifyTag(ctx context.Context, stockID string) error } diff --git a/internal/data/model/order.gen.go b/internal/data/model/order.gen.go index 58947b0..63404f1 100644 --- a/internal/data/model/order.gen.go +++ b/internal/data/model/order.gen.go @@ -14,6 +14,7 @@ const TableNameOrder = "order" type Order struct { ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"` OrderNo string `gorm:"column:order_no;not null" json:"order_no"` + VoucherNo string `gorm:"column:voucher_no;not null" json:"voucher_no"` OutBizNo string `gorm:"column:out_biz_no;not null;comment:外部交易号" json:"out_biz_no"` // 外部交易号 ProductNo string `gorm:"column:product_no;not null;comment:商品编号" json:"product_no"` // 商品编号 BatchNo string `gorm:"column:batch_no;not null;comment:立减金批次号" json:"batch_no"` // 立减金批次号 @@ -25,6 +26,8 @@ type Order struct { MerchantNo string `gorm:"column:merchant_no;not null;comment:创建批次号的商户号" json:"merchant_no"` // 创建批次号的商户号 NotifyUrl string `gorm:"column:notify_url;not null;comment:回调地址" json:"notify_url"` Channel uint8 `gorm:"column:channel;not null;comment:1:微信 2:支付宝" json:"channel"` // 1:微信 2:支付宝 + Remark string `gorm:"column:remark;not null" json:"remark"` + Attach string `gorm:"column:attach;not null" json:"attach"` CreateTime *time.Time `gorm:"column:create_time" json:"create_time"` UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"` } diff --git a/internal/data/model/order_notify.gen.go b/internal/data/model/order_notify.gen.go index 0a1c7af..4a6496e 100644 --- a/internal/data/model/order_notify.gen.go +++ b/internal/data/model/order_notify.gen.go @@ -12,19 +12,18 @@ const TableNameOrderNotify = "order_notify" // OrderNotify mapped from table type OrderNotify struct { - ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"` - OrderNo string `gorm:"column:order_no;not null" json:"order_no"` - OutRequestNo string `gorm:"column:out_request_no;not null" json:"out_request_no"` - Status uint8 `gorm:"column:status;not null;comment:状态" json:"status"` - Event uint8 `gorm:"column:event;not null;comment:event" json:"event"` - Channel uint8 `gorm:"column:channel;not null;comment:channel" json:"channel"` - Type uint8 `gorm:"column:type;not null;comment:1:招行" json:"type"` - Request string `gorm:"column:request;not null" json:"request"` - Responses string `gorm:"column:responses" json:"responses"` - Remark string `gorm:"column:remark" json:"remark"` - NotifyUrl string `gorm:"column:notify_url;not null;comment:回调地址" json:"notify_url"` - CreateTime *time.Time `gorm:"column:create_time;not null" json:"create_time"` - UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"` + ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"` + OrderNo string `gorm:"column:order_no;not null" json:"order_no"` + Status uint8 `gorm:"column:status;not null;comment:状态" json:"status"` + Event uint8 `gorm:"column:event;not null;comment:event" json:"event"` + Channel uint8 `gorm:"column:channel;not null;comment:channel" json:"channel"` + Type uint8 `gorm:"column:type;not null;comment:1:招行" json:"type"` + Request string `gorm:"column:request;not null" json:"request"` + Responses string `gorm:"column:responses" json:"responses"` + Remark string `gorm:"column:remark" json:"remark"` + NotifyUrl string `gorm:"column:notify_url;not null;comment:回调地址" json:"notify_url"` + CreateTime *time.Time `gorm:"column:create_time;not null" json:"create_time"` + UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"` } // TableName OrderNotify's table name diff --git a/internal/data/model/order_wechat.gen.go b/internal/data/model/order_wechat.gen.go deleted file mode 100644 index 37179d0..0000000 --- a/internal/data/model/order_wechat.gen.go +++ /dev/null @@ -1,32 +0,0 @@ -// Code generated by gorm.io/gen. DO NOT EDIT. -// Code generated by gorm.io/gen. DO NOT EDIT. -// Code generated by gorm.io/gen. DO NOT EDIT. - -package model - -import ( - "time" -) - -const TableNameOrderWechat = "order_wechat" - -// OrderWechat mapped from table -type OrderWechat struct { - ID uint64 `gorm:"column:id;primaryKey;autoIncrement:true" json:"id"` - OrderNo string `gorm:"column:order_no;not null;comment:订单号" json:"order_no"` // 订单号 - OutRequestNo string `gorm:"column:out_request_no;not null;comment:请求单号" json:"out_request_no"` // 请求单号 - AppID string `gorm:"column:app_id;not null;comment:微信应用id" json:"app_id"` // 微信应用id - StockCreatorMchid string `gorm:"column:stock_creator_mchid;not null;comment:批次创建方商户号\n" json:"stock_creator_mchid"` // 批次创建方商户号 - OpenID string `gorm:"column:open_id;not null;comment:微信openid" json:"open_id"` // 微信openid - StockID string `gorm:"column:stock_id;not null;comment:批次id" json:"stock_id"` // 批次id - Status uint8 `gorm:"column:status;not null;comment:1:发放中 2:发放成功 3:发放失败" json:"status"` // 1:发放中 2:发放成功 3:发放失败 - CouponID string `gorm:"column:coupon_id;not null;comment:微信为代金券唯一分配的id" json:"coupon_id"` // 微信为代金券唯一分配的id - Remark string `gorm:"column:remark;not null;comment:备注说明" json:"remark"` // 备注说明 - CreateTime *time.Time `gorm:"column:create_time;not null" json:"create_time"` - UpdateTime *time.Time `gorm:"column:update_time" json:"update_time"` -} - -// TableName OrderWechat's table name -func (*OrderWechat) TableName() string { - return TableNameOrderWechat -} diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index a4eb85a..7d33a88 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -4,6 +4,7 @@ import ( "context" "gorm.io/gorm" "time" + "unicode/utf8" "voucher/internal/biz/bo" "voucher/internal/biz/repo" "voucher/internal/biz/vo" @@ -102,6 +103,22 @@ func (p *OrderRepoImpl) GetByOrderNo(ctx context.Context, orderNo string) (*bo.O return p.ToBo(info), nil } +func (p *OrderRepoImpl) GetByMBV(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error) { + info := &model.Order{} + + tx := p.DB(ctx).Where(model.Order{MerchantNo: merchantNo, BatchNo: batchNo, VoucherNo: voucherNo}).Find(&info) + + if tx.Error != nil { + return nil, tx.Error + } + + if tx.RowsAffected == 0 { + return nil, gorm.ErrRecordNotFound + } + + return p.ToBo(info), nil +} + func (p *OrderRepoImpl) Ing(ctx context.Context, id uint64) error { now := time.Now() @@ -122,7 +139,7 @@ func (p *OrderRepoImpl) Ing(ctx context.Context, id uint64) error { return nil } -func (p *OrderRepoImpl) Success(ctx context.Context, id uint64) error { +func (p *OrderRepoImpl) Success(ctx context.Context, id uint64, voucherNo string) error { now := time.Now() res := p.db.DB(ctx). @@ -132,6 +149,7 @@ func (p *OrderRepoImpl) Success(ctx context.Context, id uint64) error { }). Updates(model.Order{ Status: vo.OrderStatusSuccess.GetValue(), + VoucherNo: voucherNo, UpdateTime: &now, }) @@ -142,9 +160,16 @@ func (p *OrderRepoImpl) Success(ctx context.Context, id uint64) error { return nil } -func (p *OrderRepoImpl) Fail(ctx context.Context, id uint64) error { +func (p *OrderRepoImpl) Fail(ctx context.Context, id uint64, remark string) error { now := time.Now() + if utf8.RuneCountInString(remark) > 100 { + runes := []rune(remark) + if len(runes) > 100 { + remark = string(runes[:100]) + } + } + res := p.db.DB(ctx). Where(model.Order{ ID: id, @@ -152,6 +177,7 @@ func (p *OrderRepoImpl) Fail(ctx context.Context, id uint64) error { }). Updates(model.Order{ Status: vo.OrderStatusFail.GetValue(), + Remark: remark, UpdateTime: &now, }) diff --git a/internal/data/repoimpl/order_notify.go b/internal/data/repoimpl/order_notify.go index 0ddb472..99799f4 100644 --- a/internal/data/repoimpl/order_notify.go +++ b/internal/data/repoimpl/order_notify.go @@ -58,17 +58,16 @@ func (p *OrderNotifyRepoImpl) Create(ctx context.Context, req *bo.OrderNotifyBo) now := time.Now() info := &model.OrderNotify{ - OrderNo: req.OrderNo, - OutRequestNo: req.OutRequestNo, - Status: vo.OrderNotifyStatusWait.GetValue(), - Request: req.Request, - Responses: "{}", - NotifyUrl: req.NotifyUrl, - Channel: req.Channel.GetValue(), - Event: req.Event.GetValue(), - Type: req.Type.GetValue(), - CreateTime: &now, - UpdateTime: &now, + OrderNo: req.OrderNo, + Status: vo.OrderNotifyStatusWait.GetValue(), + Request: req.Request, + Responses: "{}", + NotifyUrl: req.NotifyUrl, + Channel: req.Channel.GetValue(), + Event: req.Event.GetValue(), + Type: req.Type.GetValue(), + CreateTime: &now, + UpdateTime: &now, } if err := p.db.DB(ctx).Create(info).Error; err != nil { diff --git a/internal/data/repoimpl/order_wechat.go b/internal/data/repoimpl/order_wechat.go deleted file mode 100644 index e0a7415..0000000 --- a/internal/data/repoimpl/order_wechat.go +++ /dev/null @@ -1,191 +0,0 @@ -package repoimpl - -import ( - "context" - "gorm.io/gorm" - "time" - "unicode/utf8" - "voucher/internal/biz/bo" - "voucher/internal/biz/repo" - "voucher/internal/biz/vo" - "voucher/internal/data" - "voucher/internal/data/model" -) - -// OrderWechatRepoImpl . -type OrderWechatRepoImpl struct { - Base[model.OrderWechat, bo.OrderWechatBo] - db *data.Db -} - -// NewOrderWechatRepoImpl . -func NewOrderWechatRepoImpl(db *data.Db) repo.OrderWechatRepo { - return &OrderWechatRepoImpl{db: db} -} - -func (p *OrderWechatRepoImpl) DB(ctx context.Context) *gorm.DB { - return p.db.DB(ctx).Model(model.OrderWechat{}) -} - -func (p *OrderWechatRepoImpl) Create(ctx context.Context, req *bo.OrderWechatBo) (*bo.OrderWechatBo, error) { - now := time.Now() - - info := &model.OrderWechat{ - OrderNo: req.OrderNo, - OutRequestNo: req.OutRequestNo, - AppID: req.AppID, - StockCreatorMchid: req.StockCreatorMchid, - OpenID: req.OpenID, - StockID: req.StockID, - Status: vo.OrderWechatStatusWait.GetValue(), - CreateTime: &now, - UpdateTime: &now, - } - - if err := p.db.DB(ctx).Create(info).Error; err != nil { - return nil, err - } - - return p.ToBo(info), nil -} - -func (p *OrderWechatRepoImpl) GetByMSCId(ctx context.Context, mchId, stockId, couponId string) (*bo.OrderWechatBo, error) { - info := &model.OrderWechat{} - - tx := p.DB(ctx).Where(model.OrderWechat{ - StockCreatorMchid: mchId, - StockID: stockId, - CouponID: couponId, - }).Find(&info) - - if tx.Error != nil { - return nil, tx.Error - } - - if tx.RowsAffected == 0 { - return nil, gorm.ErrRecordNotFound - } - - return p.ToBo(info), nil -} - -func (p *OrderWechatRepoImpl) GetByOutRequestNo(ctx context.Context, outRequestNo string) (*bo.OrderWechatBo, error) { - info := &model.OrderWechat{} - - tx := p.DB(ctx).Where(model.OrderWechat{OutRequestNo: outRequestNo}).Find(&info) - - if tx.Error != nil { - return nil, tx.Error - } - - if tx.RowsAffected == 0 { - return nil, gorm.ErrRecordNotFound - } - - return p.ToBo(info), nil -} - -func (p *OrderWechatRepoImpl) GetLastByOrderNo(ctx context.Context, orderNo string) (*bo.OrderWechatBo, error) { - info := &model.OrderWechat{} - - tx := p.DB(ctx).Where(model.OrderWechat{OrderNo: orderNo}).Order("id desc").Find(&info) - - if tx.Error != nil { - return nil, tx.Error - } - - if tx.RowsAffected == 0 { - return nil, gorm.ErrRecordNotFound - } - - return p.ToBo(info), nil -} - -func (p *OrderWechatRepoImpl) Success(ctx context.Context, id uint64, couponId string) error { - now := time.Now() - - res := p.db.DB(ctx). - Where(model.OrderWechat{ - ID: id, - Status: vo.OrderStatusWait.GetValue(), - }). - Updates(model.OrderWechat{ - Status: vo.OrderWechatStatusSuccess.GetValue(), - CouponID: couponId, - UpdateTime: &now, - }) - - if res.Error != nil { - return res.Error - } - - return nil -} - -func (p *OrderWechatRepoImpl) Fail(ctx context.Context, id uint64, remark string) error { - now := time.Now() - - if utf8.RuneCountInString(remark) > 100 { - runes := []rune(remark) - if len(runes) > 100 { - remark = string(runes[:100]) - } - } - - res := p.db.DB(ctx). - Where(model.OrderWechat{ - ID: id, - Status: vo.OrderStatusWait.GetValue(), - }). - Updates(model.OrderWechat{ - Status: vo.OrderWechatStatusFail.GetValue(), - Remark: remark, - UpdateTime: &now, - }) - - if res.Error != nil { - return res.Error - } - - return nil -} - -func (p *OrderWechatRepoImpl) Used(ctx context.Context, id uint64) error { - now := time.Now() - - res := p.db.DB(ctx). - Where(model.OrderWechat{ - ID: id, - Status: vo.OrderWechatStatusSuccess.GetValue(), - }). - Updates(model.OrderWechat{ - Status: vo.OrderWechatStatusUse.GetValue(), - UpdateTime: &now, - }) - - if res.Error != nil { - return res.Error - } - - return nil -} - -func (p *OrderWechatRepoImpl) Expired(ctx context.Context, id uint64) error { - now := time.Now() - - res := p.db.DB(ctx). - Where(model.OrderWechat{ - ID: id, - Status: vo.OrderWechatStatusSuccess.GetValue(), - }). - Updates(model.OrderWechat{ - Status: vo.OrderWechatStatusExpired.GetValue(), - UpdateTime: &now, - }) - - if res.Error != nil { - return res.Error - } - - return nil -} diff --git a/internal/data/repoimpl/provider_set.go b/internal/data/repoimpl/provider_set.go index cd396f9..b02119e 100644 --- a/internal/data/repoimpl/provider_set.go +++ b/internal/data/repoimpl/provider_set.go @@ -7,7 +7,6 @@ import ( // ProviderRepoImplSet is providers. var ProviderRepoImplSet = wire.NewSet( NewOrderRepoImpl, - NewOrderWechatRepoImpl, NewProductRepoImpl, NewOrderNotifyRepoImpl, NewWechatNotifyRegisterTagRepoImpl, diff --git a/internal/data/wechatrepoimpl/cpn.go b/internal/data/wechatrepoimpl/cpn.go index 6e8fd34..a9be1c9 100644 --- a/internal/data/wechatrepoimpl/cpn.go +++ b/internal/data/wechatrepoimpl/cpn.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/go-kratos/kratos/v2/log" "github.com/wechatpay-apiv3/wechatpay-go/core" "github.com/wechatpay-apiv3/wechatpay-go/services/cashcoupons" "io" @@ -34,15 +35,15 @@ func NewCpnRepoImpl(bc *conf.Bootstrap) (wechatrepo.WechatCpnRepo, error) { return &CpnRepoImpl{bc: bc, Server: server}, nil } -func (c *CpnRepoImpl) Order(ctx context.Context, orderWechat *bo.OrderWechatBo) (couponId string, err error) { +func (c *CpnRepoImpl) Order(ctx context.Context, order *bo.OrderBo) (couponId string, err error) { req := cashcoupons.SendCouponRequest{ - OutRequestNo: core.String(orderWechat.OutRequestNo), + OutRequestNo: core.String(order.OrderNo), // 微信为发券方商户分配的公众账号ID,接口传入的所有appid应该为公众号的appid(在mp.weixin.qq.com申请的),不能为APP的appid(在open.weixin.qq.com申请的)。 - Appid: core.String(orderWechat.AppID), - Openid: core.String(orderWechat.OpenID), - StockId: core.String(orderWechat.StockID), - StockCreatorMchid: core.String(orderWechat.StockCreatorMchid), + Appid: core.String(order.AppID), + Openid: core.String(order.Account), + StockId: core.String(order.BatchNo), + StockCreatorMchid: core.String(order.MerchantNo), } client, err := data.GetClient(ctx, c.Server) @@ -61,17 +62,19 @@ func (c *CpnRepoImpl) Order(ctx context.Context, orderWechat *bo.OrderWechatBo) return } + log.Errorf("请求微信返回错误=%s", string(bodyBytes)) + if err = json.Unmarshal(bodyBytes, &ErrBody); err != nil { return } - err = fmt.Errorf("微信返回错误=%s", ErrBody.Message) + err = fmt.Errorf(ErrBody.Message) return } if result.Response.StatusCode != CodeSuccess { - err = fmt.Errorf("Order微信返回错误StatusCode[%d]Status[%s]", result.Response.StatusCode, result.Response.Status) + err = fmt.Errorf("请求错误") return } @@ -80,12 +83,12 @@ func (c *CpnRepoImpl) Order(ctx context.Context, orderWechat *bo.OrderWechatBo) return } -func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderWechatBo) (vo.OrderWechatStatus, error) { +func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderBo) (vo.OrderStatus, error) { req := cashcoupons.QueryCouponRequest{ - CouponId: core.String(orderWechat.CouponID), + CouponId: core.String(orderWechat.VoucherNo), Appid: core.String(orderWechat.AppID), - Openid: core.String(orderWechat.OpenID), + Openid: core.String(orderWechat.Account), } client, err := data.GetClient(ctx, c.Server) @@ -97,21 +100,25 @@ func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderWechatBo) resp, result, err := svc.QueryCoupon(ctx, req) if err != nil { + bodyBytes, err := io.ReadAll(result.Response.Body) if err != nil { return 0, err } + log.Errorf("请求微信返回错误=%s", string(bodyBytes)) + if err = json.Unmarshal(bodyBytes, &ErrBody); err != nil { return 0, err } + err = fmt.Errorf(ErrBody.Message) + return 0, fmt.Errorf("微信返回错误=%s", ErrBody.Message) } if result.Response.StatusCode != CodeSuccess { - err = fmt.Errorf("Query微信返回错误StatusCode[%d]Status[%s]", result.Response.StatusCode, result.Response.Status) - return 0, err + return 0, fmt.Errorf("请求错误") } return CpnStatus(*resp.Status).GetStatus() diff --git a/internal/server/consume.go b/internal/server/consume.go index 414c39d..740c200 100644 --- a/internal/server/consume.go +++ b/internal/server/consume.go @@ -32,18 +32,6 @@ func NewConsumer( SecretKey: conf.RocketMQ.SecretKey, } - if c := voucherService.GetOrderConfig(); c != nil { - if err := manager.Subscribe(context.Background(), cf, c, voucherService.OrderConsumer); err != nil { - panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) - } - } - - if c := voucherService.GetNotifyConfig(); c != nil { - if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyConsumer); err != nil { - panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) - } - } - if c := voucherService.GetNotifyRetryConfig(); c != nil { if err := manager.Subscribe(context.Background(), cf, c, voucherService.NotifyRetryConsumer); err != nil { panic(fmt.Sprintf("订阅失败: topicName:%s, groupName:%s, err:%v", c.TopicName, c.GroupName, err)) diff --git a/internal/service/cmb.go b/internal/service/cmb.go index 755db84..9584f96 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -18,7 +18,7 @@ func (s *VoucherService) CmbOrder(ctx http.Context) error { bizReply *v1.CmbOrderReply ) - orderNo, err := s.cmbOrder(ctx) + voucherNo, err := s.cmbOrder(ctx) if err != nil { log.Errorf("cmbOrder error: %v", err) @@ -31,7 +31,7 @@ func (s *VoucherService) CmbOrder(ctx http.Context) error { bizReply = &v1.CmbOrderReply{ RespCode: vo.CmbResponseStatusSuccess.GetValue(), RespMsg: "成功", - CodeNo: orderNo, + CodeNo: voucherNo, } } @@ -81,16 +81,17 @@ func (s *VoucherService) cmbOrder(ctx http.Context) (string, error) { ProductNo: bizContent.ActivityId, Account: bizContent.CmbUid, AppID: bizContent.AppId, + Attach: bizContent.Attach, AccountType: vo.OrderAccountTypeOpenId, Type: vo.OrderTypeCmb, } - orderNo, err := s.VoucherBiz.CmbOrder(ctx, boReq) + voucherNo, err := s.VoucherBiz.CmbOrder(ctx, boReq) if err != nil { return "", err } - return orderNo, nil + return voucherNo, nil } func (s *VoucherService) CmbQuery(ctx http.Context) error { diff --git a/internal/service/consume.go b/internal/service/consume.go index 3ade7dc..a2cd14c 100644 --- a/internal/service/consume.go +++ b/internal/service/consume.go @@ -5,78 +5,9 @@ import ( "errors" "github.com/go-kratos/kratos/v2/log" "strconv" - "strings" "voucher/internal/pkg/mq" ) -func (j *VoucherService) GetOrderConfig() *mq.ConsumerConfig { - elm, ok := j.bc.RocketMQ.EventMap["order"] - if !ok { - return nil - } - - if !elm.IsOpenConsumer { - log.Warnf("order MQ is not open") - return nil - } - - return &mq.ConsumerConfig{ - TopicName: elm.Topic, - GroupName: elm.Group, - PerCoroutineCnt: int(elm.PerCoroutineCnt), - } -} - -func (j *VoucherService) OrderConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { - - orderNo := msg.GetShardingKey() - if orderNo == "" { - log.Error("order 消费异常,获取 orderNo 失败") - return errors.New("order 消费异常,获取 orderNo 失败") - } - - if err := j.VoucherBiz.OrderConsume(ctx, orderNo); err != nil { - log.Errorf("order 消费异常,orderNo:%s,error: %s", orderNo, err.Error()) - } - - return nil -} - -func (j *VoucherService) GetNotifyConfig() *mq.ConsumerConfig { - elm, ok := j.bc.RocketMQ.EventMap["notify"] - if !ok { - return nil - } - - if !elm.IsOpenConsumer { - log.Warnf("notify MQ is not open") - return nil - } - - return &mq.ConsumerConfig{ - TopicName: elm.Topic, - GroupName: elm.Group, - PerCoroutineCnt: int(elm.PerCoroutineCnt), - } -} - -func (j *VoucherService) NotifyConsumer(ctx context.Context, msg *mq.ConsumerMessage) error { - - shardingKey := msg.GetShardingKey() - if shardingKey == "" { - log.Error("notify 消费异常,获取 shardingKey 失败") - return errors.New("orderNotify 消费异常,获取 orderNo 失败") - } - - rep := strings.Split(shardingKey, "_") - - if err := j.VoucherBiz.NotifyConsume(ctx, rep[0], rep[1]); err != nil { - log.Errorf("notify 消费异常,shardingKey:%s,error: %s", shardingKey, err.Error()) - } - - return nil -} - func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig { elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"] if !ok { diff --git a/internal/service/wechat_notify_consume.go b/internal/service/wechat_notify_consume.go index 73af37d..912676c 100644 --- a/internal/service/wechat_notify_consume.go +++ b/internal/service/wechat_notify_consume.go @@ -3,16 +3,26 @@ package service import ( "context" "encoding/json" + "github.com/go-kratos/kratos/v2/log" "voucher/internal/biz/bo" ) -func (j *VoucherService) WechatNotifyConsumer(ctx context.Context, tag, msg string) error { +func (v *VoucherService) WechatNotifyConsumer(ctx context.Context, tag, msg string) error { - var x *bo.WechatVoucherNotifyBo + var req *bo.WechatVoucherNotifyBo - if err := json.Unmarshal([]byte(msg), &x); err != nil { + if err := json.Unmarshal([]byte(msg), &req); err != nil { return err } - return j.VoucherBiz.WechatNotifyConsumer(ctx, tag, x) + if req.PlainText.Status.IsSended() { + log.Warnf("券状态可用,忽略不处理,couponId:%s,stockId:%s,status:%s", + req.PlainText.CouponID, + req.PlainText.StockID, + req.PlainText.Status.GetText(), + ) + return nil + } + + return v.VoucherBiz.WechatNotifyConsumer(ctx, tag, req) }