From 3120c34de5bef2326f6d0b5aad99893f9bd5f834 Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 18:03:36 +0800 Subject: [PATCH 01/20] order retry --- api/v1/cmb_cpn.proto | 10 ++++ internal/biz/cmb.go | 9 ++-- internal/biz/order.go | 43 ++++++++++++++++- internal/biz/repo/order.go | 1 + internal/data/repoimpl/order.go | 24 ++++++++- internal/server/wechat_notify_consumer.go | 59 ++++++++++++++++++++++- internal/service/cmb.go | 5 ++ third_party/swagger_ui/openapi.yaml | 25 ++++++++++ 8 files changed, 166 insertions(+), 10 deletions(-) diff --git a/api/v1/cmb_cpn.proto b/api/v1/cmb_cpn.proto index b92b7a5..a3cd78e 100644 --- a/api/v1/cmb_cpn.proto +++ b/api/v1/cmb_cpn.proto @@ -29,6 +29,12 @@ service Cmb { }; } + rpc OrderRetry (OrderRetryRequest) returns (Empty) { + option (google.api.http) = { + post: "/voucher/cmb/v1/orderRetry", + body: "*" + }; + } rpc OrderMock (CmbOrderRequest) returns (CmbRequest) { option (google.api.http) = { @@ -64,6 +70,10 @@ service Cmb { } } +message OrderRetryRequest { + repeated string transactionIds = 1 [json_name = "transactionIds"]; +} + message CmbRequest { // 请求公共参数 // 合作方唯一ID,32位定长 diff --git a/internal/biz/cmb.go b/internal/biz/cmb.go index c394d85..53acbf3 100644 --- a/internal/biz/cmb.go +++ b/internal/biz/cmb.go @@ -31,15 +31,14 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or if order != nil { - if order.Status.IsFail() { + if order.Status.IsFail() || order.Status.IsIng() { if err4 := v.orderRetry(ctx, order); err4 != nil { return orderNo, err4 } } - orderNo = order.OrderNo - return orderNo, err + return order.OrderNo, err } product, err3 := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) @@ -52,9 +51,7 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or return orderNo, err3 } - orderNo = order.OrderNo - - return orderNo, nil + return order.OrderNo, nil } func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (resp *v1.CmbQueryReply, err error) { diff --git a/internal/biz/order.go b/internal/biz/order.go index 8cccf2c..174c181 100644 --- a/internal/biz/order.go +++ b/internal/biz/order.go @@ -11,6 +11,43 @@ import ( "voucher/internal/pkg/lock" ) +func (v *VoucherBiz) OrderRetry(ctx context.Context, outBizNos []string) error { + + if len(outBizNos) > 0 { + + for _, outBizNo := range outBizNos { + + order, err := v.OrderRepo.GetByOutBizNo(ctx, vo.OrderTypeCmb, outBizNo) + if err != nil { + return fmt.Errorf(fmt.Sprintf("获取订单%s异常:%v", outBizNo, err)) + } + + if !order.Status.IsIng() { + return fmt.Errorf(fmt.Sprintf("订单%s状态异常:%s", order.OrderNo, order.Status)) + } + + if err4 := v.orderRetry(ctx, order); err4 != nil { + return err4 + } + } + + return nil + } + + return v.OrderRepo.FindIngInBatches(ctx, func(ctx context.Context, rows []*bo.OrderBo) error { + + for _, order := range rows { + + if err4 := v.orderRetry(ctx, order); err4 != nil { + return err4 + } + } + + return nil + }) + +} + func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (*bo.OrderBo, error) { order, err := v.create(ctx, req, product) @@ -32,7 +69,11 @@ func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, produc return nil, err } - return order, v.success(ctx, order, voucherNo) + if err = v.success(ctx, order, voucherNo); err != nil { + return nil, err + } + + return order, nil } func (v *VoucherBiz) orderRetry(ctx context.Context, order *bo.OrderBo) error { diff --git a/internal/biz/repo/order.go b/internal/biz/repo/order.go index b2c72ea..a1c583a 100644 --- a/internal/biz/repo/order.go +++ b/internal/biz/repo/order.go @@ -7,6 +7,7 @@ import ( ) type OrderRepo interface { + FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error) GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error) diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index a8efa66..89c7ef1 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -31,6 +31,26 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB { return p.db.DB(ctx).Model(model.Order{}) } +func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { + + var results = make([]*model.Order, 0) + + result := p.DB(ctx). + Where("status = ?", vo.OrderStatusIng.GetValue()). + FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { + // tx.RowsAffected 提供当前批处理中记录的计数(the count of records in the current batch) + // 'batch' 变量表示当前批号(the current batch number) + // 返回 error 将阻止更多的批处理 + return fun(ctx, p.ToBos(results)) + }) + + if result.Error != nil { + return result.Error + } + + return nil +} + func (p *OrderRepoImpl) FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { var results = make([]*model.Order, 0) @@ -199,8 +219,8 @@ func (p *OrderRepoImpl) Success(ctx context.Context, id uint64, voucherNo string tx := p.DB(ctx). Where(model.Order{ - ID: id, - Status: vo.OrderStatusIng.GetValue(), + ID: id, + //Status: vo.OrderStatusIng.GetValue(), }). Updates(model.Order{ Status: vo.OrderStatusSuccess.GetValue(), diff --git a/internal/server/wechat_notify_consumer.go b/internal/server/wechat_notify_consumer.go index d80ad2c..e9de469 100644 --- a/internal/server/wechat_notify_consumer.go +++ b/internal/server/wechat_notify_consumer.go @@ -8,6 +8,7 @@ import ( "github.com/go-kratos/kratos/v2/transport" "github.com/gogap/errors" "strings" + "sync/atomic" "time" "voucher/internal/conf" "voucher/internal/service" @@ -18,6 +19,9 @@ var _ transport.Server = (*WechatNotifyConsumer)(nil) type WechatNotifyConsumer struct { conf *conf.Bootstrap voucherService *service.VoucherService + + activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出 + shutdownFlag atomic.Bool // 关闭标记 } func NewWechatNotifyConsumer( @@ -128,6 +132,24 @@ func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer // 业务逻辑处理 func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntry) { + // 收到消息 + if w.shutdownFlag.Load() { + fmt.Println("wechat consumer 正在退出中,延期处理") + // 卡住,不再继续消费,等待退出 + time.Sleep(24 * time.Hour) + return + } + + // 标记活跃状态 + w.activeCnt.Add(1) + defer func() { + w.activeCnt.Add(-1) + if v := recover(); v != nil { + log.Errorf("wechat consumer 处理消息panic, ,%+v", v) + return + } + }() + log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody) ctx := context.Background() @@ -138,6 +160,41 @@ func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntr // Stop 停止消息消费 func (w *WechatNotifyConsumer) Stop(_ context.Context) error { - fmt.Println("关闭 wechat consumer 中...") + + if !w.conf.WechatNotifyMQ.IsOpenConsumer { + fmt.Println("wechat consumer 关闭完成!") + return nil + } + + fmt.Println("wechat consumer 关闭中...") + + w.shutdownFlag.Store(true) + + //shutdown之间,保证正在处理的消费先提交 + _ = w.blockWaitFinish() + + fmt.Println("wechat consumer 关闭完成") + + return nil +} + +// blockWaitFinish 阻塞等待业务完成 +func (c *WechatNotifyConsumer) blockWaitFinish() error { + // 每1s检查下业务是否都处理完成 + + for { + cnt := c.activeCnt.Load() + if cnt == 0 { + fmt.Println("wechat consumer 无业务处理,正常退") + break + } else { + fmt.Printf("wechat consumer 等待消费者退出,%d 个正在运行\n", cnt) + } + time.Sleep(1 * time.Second) + } + + //防止极端情况下commit未完成 + // nolint + time.Sleep(1 * time.Second) return nil } diff --git a/internal/service/cmb.go b/internal/service/cmb.go index 2a3043f..af6e7dc 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -43,6 +43,11 @@ func NewCmbService( } } +func (c *CmbService) OrderRetry(ctx context.Context, request *v1.OrderRetryRequest) (*v1.Empty, error) { + + return nil, c.VoucherBiz.OrderRetry(ctx, request.GetTransactionIds()) +} + func (c *CmbService) GetResponse(ctx context.Context, replyBizContent []byte) (*v1.CmbReply, error) { req := &bo.CmbResponseBo{ diff --git a/third_party/swagger_ui/openapi.yaml b/third_party/swagger_ui/openapi.yaml index eb8440b..26d9946 100644 --- a/third_party/swagger_ui/openapi.yaml +++ b/third_party/swagger_ui/openapi.yaml @@ -60,6 +60,24 @@ paths: application/json: schema: $ref: '#/components/schemas/api.v1.CmbRequest' + /voucher/cmb/v1/orderRetry: + post: + tags: + - Cmb + operationId: Cmb_OrderRetry + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/api.v1.OrderRetryRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/api.v1.Empty' /voucher/cmb/v1/product/query: post: tags: @@ -263,5 +281,12 @@ components: properties: encryptBody: type: string + api.v1.OrderRetryRequest: + type: object + properties: + transactionIds: + type: array + items: + type: string tags: - name: Cmb From b0a1fdaef7e32d7fe525507b80ffee74e697f2ca Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 19:09:10 +0800 Subject: [PATCH 02/20] RegisterTag --- internal/biz/order.go | 17 ++++++++++++++--- internal/server/http.go | 1 + internal/service/cmb.go | 17 +++++++++++++++++ 3 files changed, 32 insertions(+), 3 deletions(-) diff --git a/internal/biz/order.go b/internal/biz/order.go index 174c181..09154f0 100644 --- a/internal/biz/order.go +++ b/internal/biz/order.go @@ -56,9 +56,9 @@ func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, produc } // 注册通知标签 order.MerchantNo 批次创建商户, order.BatchNo 商品批次号 - if err = v.registerNotifyTag(ctx, order.MerchantNo, order.BatchNo); err != nil { - return nil, err - } + //if err = v.registerNotifyTag(ctx, order.MerchantNo, order.BatchNo); err != nil { + // return nil, err + //} // 真实发放 voucherNo, err := v.WechatCpnRepo.Order(ctx, order) @@ -111,6 +111,17 @@ func (v *VoucherBiz) create(ctx context.Context, req *bo.OrderCreateReqBo, produ return v.OrderRepo.Create(ctx, o) } +func (this *VoucherBiz) RegisterTag(ctx context.Context, productNo string) error { + + stock, err := this.ProductRepo.GetByProductNo(ctx, productNo) + if err != nil { + return err + } + + // order.MerchantNo, order.BatchNo + return this.registerNotifyTag(ctx, stock.MchId, stock.BatchNo) +} + func (v *VoucherBiz) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { c := vo.WechatNotifyRegisterTagCacheKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) diff --git a/internal/server/http.go b/internal/server/http.go index 3e0023e..f573347 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -38,6 +38,7 @@ func NewHTTPServer( srv.Route("/voucher/").GET("notifyRetry/{id}", cmb.NotifyRetry) srv.Route("/voucher/").GET("queryOrder/{order_no}", cmb.QueryOrder) + srv.Route("/voucher/").GET("registerTag/{product_no}", cmb.RegisterTag) v1.RegisterCmbHTTPServer(srv, cmb) diff --git a/internal/service/cmb.go b/internal/service/cmb.go index af6e7dc..e61f2e2 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -95,3 +95,20 @@ func (this *CmbService) QueryOrder(ctx http.Context) error { "data": str, }) } + +func (this *CmbService) RegisterTag(ctx http.Context) error { + + productNo := ctx.Vars().Get("product_no") + if productNo == "" { + return fmt.Errorf("product_no is empty") + } + + err := this.VoucherBiz.RegisterTag(ctx, productNo) + if err != nil { + return err + } + + return ctx.JSON(http2.StatusOK, map[string]interface{}{ + "data": productNo, + }) +} From 3b6c9923af512a11438b294bc087f9b02bc081b6 Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 19:14:46 +0800 Subject: [PATCH 03/20] =?UTF-8?q?=E5=86=8C=E9=80=9A=E7=9F=A5=E6=A0=87?= =?UTF-8?q?=E7=AD=BE=20stock.MchId=20=E6=89=B9=E6=AC=A1=E5=88=9B=E5=BB=BA?= =?UTF-8?q?=E5=95=86=E6=88=B7,=20stock.BatchNo=20=E5=95=86=E5=93=81?= =?UTF-8?q?=E6=89=B9=E6=AC=A1=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/biz/order.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/internal/biz/order.go b/internal/biz/order.go index 09154f0..b1bd4ed 100644 --- a/internal/biz/order.go +++ b/internal/biz/order.go @@ -55,11 +55,6 @@ func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, produc return nil, err } - // 注册通知标签 order.MerchantNo 批次创建商户, order.BatchNo 商品批次号 - //if err = v.registerNotifyTag(ctx, order.MerchantNo, order.BatchNo); err != nil { - // return nil, err - //} - // 真实发放 voucherNo, err := v.WechatCpnRepo.Order(ctx, order) if err != nil { @@ -111,6 +106,7 @@ func (v *VoucherBiz) create(ctx context.Context, req *bo.OrderCreateReqBo, produ return v.OrderRepo.Create(ctx, o) } +// RegisterTag 注册通知标签 stock.MchId 批次创建商户, stock.BatchNo 商品批次号 func (this *VoucherBiz) RegisterTag(ctx context.Context, productNo string) error { stock, err := this.ProductRepo.GetByProductNo(ctx, productNo) @@ -118,7 +114,6 @@ func (this *VoucherBiz) RegisterTag(ctx context.Context, productNo string) error return err } - // order.MerchantNo, order.BatchNo return this.registerNotifyTag(ctx, stock.MchId, stock.BatchNo) } From b523b035ad812dcda81fc509fcb211894f7336c7 Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 19:22:17 +0800 Subject: [PATCH 04/20] =?UTF-8?q?=E9=87=8D=E8=AF=95=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/biz/alarm.go | 69 +++++++++++++ internal/biz/order.go | 191 +---------------------------------- internal/biz/query.go | 44 ++++++++ internal/biz/register_tag.go | 102 +++++++++++++++++++ 4 files changed, 217 insertions(+), 189 deletions(-) create mode 100644 internal/biz/alarm.go create mode 100644 internal/biz/query.go create mode 100644 internal/biz/register_tag.go diff --git a/internal/biz/alarm.go b/internal/biz/alarm.go new file mode 100644 index 0000000..09945e2 --- /dev/null +++ b/internal/biz/alarm.go @@ -0,0 +1,69 @@ +package biz + +import ( + "context" + "fmt" + "github.com/redis/go-redis/v9" + "voucher/internal/biz/bo" + "voucher/internal/biz/vo" + "voucher/internal/pkg/lock" +) + +func (v *VoucherBiz) alarm(ctx context.Context, order *bo.OrderBo, errMsg string) error { + + // 1小时 内 指定的批次号 发放 发生错误 预警 + c := vo.OrderConsumeFailAlarmKey.BuildCache([]string{order.ProductNo}) + + _, err := v.rdb.Rdb.Get(ctx, c.Key).Result() + + if err == nil { + // 缓存存在,直接返回 + return nil + } + + if err != redis.Nil { + return fmt.Errorf(fmt.Sprintf("alarm 获取redis缓存%s异常:%v", c.Key, err)) + } + + cl := vo.OrderConsumeFailAlarmLockKey.BuildCache([]string{order.ProductNo}) + + return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { + // 二次获取,判定处理,以免获取锁后又执行了一次 + + cacheValue, err3 := v.rdb.Rdb.Get(ctx, c.Key).Result() + + if err3 != nil && err3 != redis.Nil { + return fmt.Errorf(fmt.Sprintf("alarm 二次获取redis缓存%s异常:%v", c.Key, err)) + } + + if len(cacheValue) > 0 { + return nil // 有直接返回 + } + + // 通知 + if err = v.DingMixRepo.SendMarkdownMessage(ctx, "异常通知", v.alarmText(ctx, order, errMsg)); err != nil { + return err + } + + if err = v.rdb.Rdb.Set(ctx, c.Key, order.ProductNo, c.TTL).Err(); err != nil { + return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) + } + + return nil + }) +} + +func (v *VoucherBiz) alarmText(_ context.Context, order *bo.OrderBo, errMsg string) string { + + remarks := fmt.Sprintf("订单号:%s,商品编号:%s,原因:%s", order.OrderNo, order.ProductNo, errMsg) + + msg := "# " + + "

立减金发放平台报警通知

" + + "
\n" + + "" + + "不好了,订单发放发生异常了" + + "[%s]请尽快处理@相关人员。" + + "" + + return fmt.Sprintf(msg, remarks) +} diff --git a/internal/biz/order.go b/internal/biz/order.go index b1bd4ed..29db30e 100644 --- a/internal/biz/order.go +++ b/internal/biz/order.go @@ -3,12 +3,9 @@ package biz import ( "context" "fmt" - "github.com/go-kratos/kratos/v2/log" - "github.com/redis/go-redis/v9" err2 "voucher/api/err" "voucher/internal/biz/bo" "voucher/internal/biz/vo" - "voucher/internal/pkg/lock" ) func (v *VoucherBiz) OrderRetry(ctx context.Context, outBizNos []string) error { @@ -106,102 +103,13 @@ func (v *VoucherBiz) create(ctx context.Context, req *bo.OrderCreateReqBo, produ return v.OrderRepo.Create(ctx, o) } -// RegisterTag 注册通知标签 stock.MchId 批次创建商户, stock.BatchNo 商品批次号 -func (this *VoucherBiz) RegisterTag(ctx context.Context, productNo string) error { - - stock, err := this.ProductRepo.GetByProductNo(ctx, productNo) - if err != nil { - return err - } - - return this.registerNotifyTag(ctx, stock.MchId, stock.BatchNo) -} - -func (v *VoucherBiz) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { - - c := vo.WechatNotifyRegisterTagCacheKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) - - _, err := v.rdb.Rdb.Get(ctx, c.Key).Result() - - if err == nil { - // 缓存存在,直接返回 - return nil - } - - if err != redis.Nil { - return fmt.Errorf(fmt.Sprintf("获取redis缓存%s异常:%v", c.Key, err)) - } - - cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) - - return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { - // 二次获取,判定处理,以免获取锁后又执行了一次 - - cacheValue, err3 := v.rdb.Rdb.Get(ctx, c.Key).Result() - - if err3 != nil && err3 != redis.Nil { - return fmt.Errorf(fmt.Sprintf("二次获取redis缓存%s异常:%v", c.Key, err)) - } - - if cacheValue != "" { - return nil // 有直接返回 - } - - wechatNotifyTag, err3 := v.WechatNotifyRegisterTagRepo.GetByStockIdAndMchId(ctx, stockCreatorMchID, stockID) - if err3 != nil && !err2.IsDbNotFound(err3) { - return err3 - } - - if wechatNotifyTag != nil { - if wechatNotifyTag.Tag != v.bc.WechatNotifyMQ.Tag { - return fmt.Errorf("tag不一致,请检查tag配置:%s", wechatNotifyTag.Tag) - } - - if wechatNotifyTag.Status.IsSuccess() { - return v.setCache(ctx, c, wechatNotifyTag) - } - } else { - wechatNotifyTag, err3 = v.createWechatNotifyRegisterTag(ctx, stockCreatorMchID, stockID) - if err3 != nil { - return err3 - } - } - - if err = v.WechatCpnRepo.RegisterNotifyTag(ctx, stockID); err != nil { - - return v.WechatNotifyRegisterTagRepo.Fail(ctx, wechatNotifyTag.ID, err.Error()) - } - - if err = v.WechatNotifyRegisterTagRepo.Success(ctx, wechatNotifyTag.ID); err != nil { - return err - } - - return v.setCache(ctx, c, wechatNotifyTag) - }) -} - -func (v *VoucherBiz) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMchID, stockID string) (*bo.WechatNotifyRegisterTagBo, error) { - return v.WechatNotifyRegisterTagRepo.Create(ctx, &bo.WechatNotifyRegisterTagBo{ - StockID: stockID, - StockCreatorMchID: stockCreatorMchID, - Tag: v.bc.WechatNotifyMQ.Tag, - }) -} - -func (v *VoucherBiz) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.WechatNotifyRegisterTagBo) error { - - if err := v.rdb.Rdb.Set(ctx, c.Key, wechatNotifyTag.Tag, c.TTL).Err(); err != nil { - return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) - } - - return nil -} - func (v *VoucherBiz) ing(ctx context.Context, id uint64) error { + return v.OrderRepo.Ing(ctx, id) } func (v *VoucherBiz) success(ctx context.Context, order *bo.OrderBo, voucherNo string) error { + return v.OrderRepo.Success(ctx, order.ID, voucherNo) } @@ -218,86 +126,6 @@ func (v *VoucherBiz) fail(ctx context.Context, order *bo.OrderBo, errReq error) return v.alarm(ctx, order, errReq.Error()) } -func (v *VoucherBiz) alarm(ctx context.Context, order *bo.OrderBo, errMsg string) error { - - // 1小时 内 指定的批次号 发放 发生错误 预警 - c := vo.OrderConsumeFailAlarmKey.BuildCache([]string{order.ProductNo}) - - _, err := v.rdb.Rdb.Get(ctx, c.Key).Result() - - if err == nil { - // 缓存存在,直接返回 - return nil - } - - if err != redis.Nil { - return fmt.Errorf(fmt.Sprintf("alarm 获取redis缓存%s异常:%v", c.Key, err)) - } - - cl := vo.OrderConsumeFailAlarmLockKey.BuildCache([]string{order.ProductNo}) - - return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { - // 二次获取,判定处理,以免获取锁后又执行了一次 - - cacheValue, err3 := v.rdb.Rdb.Get(ctx, c.Key).Result() - - if err3 != nil && err3 != redis.Nil { - return fmt.Errorf(fmt.Sprintf("alarm 二次获取redis缓存%s异常:%v", c.Key, err)) - } - - if len(cacheValue) > 0 { - return nil // 有直接返回 - } - - // 通知 - if err = v.DingMixRepo.SendMarkdownMessage(ctx, "异常通知", v.alarmText(ctx, order, errMsg)); err != nil { - return err - } - - if err = v.rdb.Rdb.Set(ctx, c.Key, order.ProductNo, c.TTL).Err(); err != nil { - return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) - } - - return nil - }) -} - -func (v *VoucherBiz) alarmText(_ context.Context, order *bo.OrderBo, errMsg string) string { - - remarks := fmt.Sprintf("订单号:%s,商品编号:%s,原因:%s", order.OrderNo, order.ProductNo, errMsg) - - msg := "# " + - "

立减金发放平台报警通知

" + - "
\n" + - "" + - "不好了,订单发放发生异常了" + - "[%s]请尽快处理@相关人员。" + - "" - - return fmt.Sprintf(msg, remarks) -} - -func (v *VoucherBiz) Query(ctx context.Context, order *bo.OrderBo) error { - - status, err := v.WechatCpnRepo.Query(ctx, order) - if err != nil { - return err - } - - if order.Status == status { - log.Warnf("券状态未改变:%s,忽略不处理,orderNo:%s", order.Status.GetText(), order.OrderNo) - return nil - } - - if err = v.UpdateOrderStatus(ctx, order.ID, status); err != nil { - return err - } - - order.Status = status - - return nil -} - func (v *VoucherBiz) UpdateOrderStatus(ctx context.Context, orderId uint64, status vo.OrderStatus) error { if status.IsSuccess() { @@ -315,18 +143,3 @@ func (v *VoucherBiz) UpdateOrderStatus(ctx context.Context, orderId uint64, stat return fmt.Errorf("notice 未知券状态,orderId:%d,statuText:%s", orderId, status.GetText()) } - -func (v *VoucherBiz) QueryOrder(ctx context.Context, orderNo string) (string, error) { - - order, err3 := v.OrderRepo.GetByOrderNo(ctx, orderNo) - if err3 != nil { - return "", err3 - } - - status, err := v.WechatCpnRepo.Query(ctx, order) - if err != nil { - return "", err - } - - return fmt.Sprintf("orderNo:%s,订单状态:%s,微信查询返回状态:%s", orderNo, order.Status.GetText(), status.GetText()), nil -} diff --git a/internal/biz/query.go b/internal/biz/query.go new file mode 100644 index 0000000..5bdc0af --- /dev/null +++ b/internal/biz/query.go @@ -0,0 +1,44 @@ +package biz + +import ( + "context" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "voucher/internal/biz/bo" +) + +func (v *VoucherBiz) Query(ctx context.Context, order *bo.OrderBo) error { + + status, err := v.WechatCpnRepo.Query(ctx, order) + if err != nil { + return err + } + + if order.Status == status { + log.Warnf("券状态未改变:%s,忽略不处理,orderNo:%s", order.Status.GetText(), order.OrderNo) + return nil + } + + if err = v.UpdateOrderStatus(ctx, order.ID, status); err != nil { + return err + } + + order.Status = status + + return nil +} + +func (v *VoucherBiz) QueryOrder(ctx context.Context, orderNo string) (string, error) { + + order, err3 := v.OrderRepo.GetByOrderNo(ctx, orderNo) + if err3 != nil { + return "", err3 + } + + status, err := v.WechatCpnRepo.Query(ctx, order) + if err != nil { + return "", err + } + + return fmt.Sprintf("orderNo:%s,订单状态:%s,微信查询返回状态:%s", orderNo, order.Status.GetText(), status.GetText()), nil +} diff --git a/internal/biz/register_tag.go b/internal/biz/register_tag.go new file mode 100644 index 0000000..cb8314d --- /dev/null +++ b/internal/biz/register_tag.go @@ -0,0 +1,102 @@ +package biz + +import ( + "context" + "fmt" + "github.com/redis/go-redis/v9" + err2 "voucher/api/err" + "voucher/internal/biz/bo" + "voucher/internal/biz/vo" + "voucher/internal/pkg/lock" +) + +// RegisterTag 注册通知标签 stock.MchId 批次创建商户, stock.BatchNo 商品批次号 +func (this *VoucherBiz) RegisterTag(ctx context.Context, productNo string) error { + + stock, err := this.ProductRepo.GetByProductNo(ctx, productNo) + if err != nil { + return err + } + + return this.registerNotifyTag(ctx, stock.MchId, stock.BatchNo) +} + +func (v *VoucherBiz) registerNotifyTag(ctx context.Context, stockCreatorMchID, stockID string) error { + + c := vo.WechatNotifyRegisterTagCacheKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) + + _, err := v.rdb.Rdb.Get(ctx, c.Key).Result() + + if err == nil { + // 缓存存在,直接返回 + return nil + } + + if err != redis.Nil { + return fmt.Errorf(fmt.Sprintf("获取redis缓存%s异常:%v", c.Key, err)) + } + + cl := vo.WechatNotifyRegisterTagCacheLockKey.BuildCache([]string{v.bc.WechatNotifyMQ.Tag, stockCreatorMchID, stockID}) + + return lock.NewMutex(v.rdb.Rdb, cl.TTL).Lock(ctx, cl.Key, func(ctx context.Context) error { + // 二次获取,判定处理,以免获取锁后又执行了一次 + + cacheValue, err3 := v.rdb.Rdb.Get(ctx, c.Key).Result() + + if err3 != nil && err3 != redis.Nil { + return fmt.Errorf(fmt.Sprintf("二次获取redis缓存%s异常:%v", c.Key, err)) + } + + if cacheValue != "" { + return nil // 有直接返回 + } + + wechatNotifyTag, err3 := v.WechatNotifyRegisterTagRepo.GetByStockIdAndMchId(ctx, stockCreatorMchID, stockID) + if err3 != nil && !err2.IsDbNotFound(err3) { + return err3 + } + + if wechatNotifyTag != nil { + if wechatNotifyTag.Tag != v.bc.WechatNotifyMQ.Tag { + return fmt.Errorf("tag不一致,请检查tag配置:%s", wechatNotifyTag.Tag) + } + + if wechatNotifyTag.Status.IsSuccess() { + return v.setCache(ctx, c, wechatNotifyTag) + } + } else { + wechatNotifyTag, err3 = v.createWechatNotifyRegisterTag(ctx, stockCreatorMchID, stockID) + if err3 != nil { + return err3 + } + } + + if err = v.WechatCpnRepo.RegisterNotifyTag(ctx, stockID); err != nil { + + return v.WechatNotifyRegisterTagRepo.Fail(ctx, wechatNotifyTag.ID, err.Error()) + } + + if err = v.WechatNotifyRegisterTagRepo.Success(ctx, wechatNotifyTag.ID); err != nil { + return err + } + + return v.setCache(ctx, c, wechatNotifyTag) + }) +} + +func (v *VoucherBiz) createWechatNotifyRegisterTag(ctx context.Context, stockCreatorMchID, stockID string) (*bo.WechatNotifyRegisterTagBo, error) { + return v.WechatNotifyRegisterTagRepo.Create(ctx, &bo.WechatNotifyRegisterTagBo{ + StockID: stockID, + StockCreatorMchID: stockCreatorMchID, + Tag: v.bc.WechatNotifyMQ.Tag, + }) +} + +func (v *VoucherBiz) setCache(ctx context.Context, c *vo.Cache, wechatNotifyTag *bo.WechatNotifyRegisterTagBo) error { + + if err := v.rdb.Rdb.Set(ctx, c.Key, wechatNotifyTag.Tag, c.TTL).Err(); err != nil { + return fmt.Errorf(fmt.Sprintf("设置redis缓存%s异常:%v", c.Key, err)) + } + + return nil +} From 91f7db4caa9230a5d0ddc2ffcca4f8abc11fe716 Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 19:28:32 +0800 Subject: [PATCH 05/20] =?UTF-8?q?=E9=87=8D=E8=AF=95=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/service/cmb.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/service/cmb.go b/internal/service/cmb.go index e61f2e2..6ceabdf 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -43,11 +43,6 @@ func NewCmbService( } } -func (c *CmbService) OrderRetry(ctx context.Context, request *v1.OrderRetryRequest) (*v1.Empty, error) { - - return nil, c.VoucherBiz.OrderRetry(ctx, request.GetTransactionIds()) -} - func (c *CmbService) GetResponse(ctx context.Context, replyBizContent []byte) (*v1.CmbReply, error) { req := &bo.CmbResponseBo{ @@ -96,6 +91,11 @@ func (this *CmbService) QueryOrder(ctx http.Context) error { }) } +func (c *CmbService) OrderRetry(ctx context.Context, request *v1.OrderRetryRequest) (*v1.Empty, error) { + + return nil, c.VoucherBiz.OrderRetry(ctx, request.GetTransactionIds()) +} + func (this *CmbService) RegisterTag(ctx http.Context) error { productNo := ctx.Vars().Get("product_no") From 832827bee2a70c4b1844962dffa68f5072821e23 Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 19:38:51 +0800 Subject: [PATCH 06/20] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/biz/cmb.go | 158 ---------------------------------- internal/biz/order.go | 56 ++++++------ internal/biz/product.go | 80 +++++++++++++++++ internal/biz/query.go | 38 ++++++++ internal/biz/retry.go | 45 ++++++++++ internal/service/cmb_order.go | 70 +++++++-------- 6 files changed, 229 insertions(+), 218 deletions(-) create mode 100644 internal/biz/product.go create mode 100644 internal/biz/retry.go diff --git a/internal/biz/cmb.go b/internal/biz/cmb.go index 53acbf3..26ded5e 100644 --- a/internal/biz/cmb.go +++ b/internal/biz/cmb.go @@ -1,159 +1 @@ package biz - -import ( - "context" - "fmt" - "time" - err2 "voucher/api/err" - v1 "voucher/api/v1" - "voucher/internal/biz/bo" - "voucher/internal/biz/vo" - "voucher/internal/pkg/lock" -) - -func (v *VoucherBiz) GetByOutBizNo(ctx context.Context, req *bo.OrderCreateReqBo) (*bo.OrderBo, error) { - - order, err := v.OrderRepo.GetByOutBizNo(ctx, vo.OrderTypeCmb, req.OutBizNo) - - if err != nil && !err2.IsDbNotFound(err) { - return nil, err - } - - return order, nil -} - -func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) { - - order, err3 := v.GetByOutBizNo(ctx, req) - if err3 != nil { - return orderNo, err3 - } - - if order != nil { - - if order.Status.IsFail() || order.Status.IsIng() { - - if err4 := v.orderRetry(ctx, order); err4 != nil { - return orderNo, err4 - } - } - - return order.OrderNo, err - } - - product, err3 := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) - if err3 != nil { - return orderNo, err3 - } - - order, err3 = v.order(ctx, req, product) - if err3 != nil { - return orderNo, err3 - } - - return order.OrderNo, nil -} - -func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (resp *v1.CmbQueryReply, err error) { - - c := vo.CmbQueryLockKey.BuildCache([]string{orderNo}) - - err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { - - order, err3 := v.OrderRepo.GetByOrderNo(ctx, orderNo) - if err3 != nil { - return err3 - } - - if err = v.Query(ctx, order); err != nil { - return err - } - - status, err3 := order.Status.GetCmbStatusText() - if err3 != nil { - return err3 - } - - resp = &v1.CmbQueryReply{ - Ticket: order.OrderNo, - Status: status.GetValue(), - TransDate: time.Now().Format("20060102150405"), - OrgNo: v.bc.Cmb.OrgNo, - Ext: "", - } - - return nil - }) - - return -} - -func (v *VoucherBiz) CmbProductQuery(ctx context.Context, productNo string) (reps *v1.CmbQueryProductReply, err error) { - - c := vo.CmbProductQueryLockKey.BuildCache([]string{productNo}) - - err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { - - product, err3 := v.ProductRepo.GetByProductNo(ctx, productNo) - if err3 != nil { - return err3 - } - - if !product.Channel.IsWeChat() { - return fmt.Errorf("只支持微信") - } - - wechatResp, err4 := v.WechatCpnRepo.QueryProduct(ctx, product.MchId, product.BatchNo) - if err4 != nil { - return err4 - } - - reps = &v1.CmbQueryProductReply{ - RespCode: vo.CmbResponseStatusSuccess.GetValue(), - RespMsg: "成功", - ActivityName: product.Name, - ActivityId: product.ProductNo, - Amount: "", - MinAmount: "", - AvailableType: "", - AvailableDays: "", // 动态有效期天数 - StartTime: "", - EndTime: "", - AvailableStock: "", - Detail: *wechatResp.Description, - } - - inputFormat := time.RFC3339 - - if wechatResp.AvailableBeginTime != nil { - - availableBeginTime, _ := time.Parse(inputFormat, *wechatResp.AvailableBeginTime) - reps.StartTime = availableBeginTime.Format("2006-01-02 15:04:05.000") - reps.SaleStartTime = reps.StartTime - } - - if wechatResp.AvailableEndTime != nil { - availableEndTime, _ := time.Parse(inputFormat, *wechatResp.AvailableEndTime) - reps.EndTime = availableEndTime.Format("2006-01-02 15:04:05.000") - reps.SaleEndTime = reps.EndTime - } - - reps.Amount = fmt.Sprintf("%d", *wechatResp.StockUseRule.FixedNormalCoupon.CouponAmount) - reps.MinAmount = fmt.Sprintf("%d", *wechatResp.StockUseRule.FixedNormalCoupon.TransactionMinimum) - - availableStock := *wechatResp.StockUseRule.MaxCoupons - *wechatResp.DistributedCoupons - reps.AvailableStock = fmt.Sprintf("%d", availableStock) - - availableType, err3 := product.AvailableType.GetCmbAvailableType() - if err3 != nil { - return err3 - } - - reps.AvailableType = availableType.GetValue() - reps.AvailableDays = fmt.Sprintf("%d", product.AvailableDays) - - return nil - }) - - return -} diff --git a/internal/biz/order.go b/internal/biz/order.go index 29db30e..5cd6a61 100644 --- a/internal/biz/order.go +++ b/internal/biz/order.go @@ -8,41 +8,36 @@ import ( "voucher/internal/biz/vo" ) -func (v *VoucherBiz) OrderRetry(ctx context.Context, outBizNos []string) error { +func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (orderNo string, err error) { - if len(outBizNos) > 0 { - - for _, outBizNo := range outBizNos { - - order, err := v.OrderRepo.GetByOutBizNo(ctx, vo.OrderTypeCmb, outBizNo) - if err != nil { - return fmt.Errorf(fmt.Sprintf("获取订单%s异常:%v", outBizNo, err)) - } - - if !order.Status.IsIng() { - return fmt.Errorf(fmt.Sprintf("订单%s状态异常:%s", order.OrderNo, order.Status)) - } - - if err4 := v.orderRetry(ctx, order); err4 != nil { - return err4 - } - } - - return nil + order, err3 := v.GetByOutBizNo(ctx, req) + if err3 != nil { + return orderNo, err3 } - return v.OrderRepo.FindIngInBatches(ctx, func(ctx context.Context, rows []*bo.OrderBo) error { + if order != nil { - for _, order := range rows { + if order.Status.IsFail() || order.Status.IsIng() { if err4 := v.orderRetry(ctx, order); err4 != nil { - return err4 + return orderNo, err4 } } - return nil - }) + return order.OrderNo, err + } + product, err3 := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) + if err3 != nil { + return orderNo, err3 + } + + order, err3 = v.order(ctx, req, product) + if err3 != nil { + return orderNo, err3 + } + + return order.OrderNo, nil } func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, product *bo.ProductBo) (*bo.OrderBo, error) { @@ -126,6 +121,17 @@ func (v *VoucherBiz) fail(ctx context.Context, order *bo.OrderBo, errReq error) return v.alarm(ctx, order, errReq.Error()) } +func (v *VoucherBiz) GetByOutBizNo(ctx context.Context, req *bo.OrderCreateReqBo) (*bo.OrderBo, error) { + + order, err := v.OrderRepo.GetByOutBizNo(ctx, vo.OrderTypeCmb, req.OutBizNo) + + if err != nil && !err2.IsDbNotFound(err) { + return nil, err + } + + return order, nil +} + func (v *VoucherBiz) UpdateOrderStatus(ctx context.Context, orderId uint64, status vo.OrderStatus) error { if status.IsSuccess() { diff --git a/internal/biz/product.go b/internal/biz/product.go new file mode 100644 index 0000000..9d62b66 --- /dev/null +++ b/internal/biz/product.go @@ -0,0 +1,80 @@ +package biz + +import ( + "context" + "fmt" + "time" + v1 "voucher/api/v1" + "voucher/internal/biz/vo" + "voucher/internal/pkg/lock" +) + +func (v *VoucherBiz) CmbProductQuery(ctx context.Context, productNo string) (reps *v1.CmbQueryProductReply, err error) { + + c := vo.CmbProductQueryLockKey.BuildCache([]string{productNo}) + + err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { + + product, err3 := v.ProductRepo.GetByProductNo(ctx, productNo) + if err3 != nil { + return err3 + } + + if !product.Channel.IsWeChat() { + return fmt.Errorf("只支持微信") + } + + wechatResp, err4 := v.WechatCpnRepo.QueryProduct(ctx, product.MchId, product.BatchNo) + if err4 != nil { + return err4 + } + + reps = &v1.CmbQueryProductReply{ + RespCode: vo.CmbResponseStatusSuccess.GetValue(), + RespMsg: "成功", + ActivityName: product.Name, + ActivityId: product.ProductNo, + Amount: "", + MinAmount: "", + AvailableType: "", + AvailableDays: "", // 动态有效期天数 + StartTime: "", + EndTime: "", + AvailableStock: "", + Detail: *wechatResp.Description, + } + + inputFormat := time.RFC3339 + + if wechatResp.AvailableBeginTime != nil { + + availableBeginTime, _ := time.Parse(inputFormat, *wechatResp.AvailableBeginTime) + reps.StartTime = availableBeginTime.Format("2006-01-02 15:04:05.000") + reps.SaleStartTime = reps.StartTime + } + + if wechatResp.AvailableEndTime != nil { + availableEndTime, _ := time.Parse(inputFormat, *wechatResp.AvailableEndTime) + reps.EndTime = availableEndTime.Format("2006-01-02 15:04:05.000") + reps.SaleEndTime = reps.EndTime + } + + reps.Amount = fmt.Sprintf("%d", *wechatResp.StockUseRule.FixedNormalCoupon.CouponAmount) + reps.MinAmount = fmt.Sprintf("%d", *wechatResp.StockUseRule.FixedNormalCoupon.TransactionMinimum) + + availableStock := *wechatResp.StockUseRule.MaxCoupons - *wechatResp.DistributedCoupons + reps.AvailableStock = fmt.Sprintf("%d", availableStock) + + availableType, err3 := product.AvailableType.GetCmbAvailableType() + if err3 != nil { + return err3 + } + + reps.AvailableType = availableType.GetValue() + reps.AvailableDays = fmt.Sprintf("%d", product.AvailableDays) + + return nil + }) + + return +} diff --git a/internal/biz/query.go b/internal/biz/query.go index 5bdc0af..5d8515c 100644 --- a/internal/biz/query.go +++ b/internal/biz/query.go @@ -4,9 +4,47 @@ import ( "context" "fmt" "github.com/go-kratos/kratos/v2/log" + "time" + v1 "voucher/api/v1" "voucher/internal/biz/bo" + "voucher/internal/biz/vo" + "voucher/internal/pkg/lock" ) +func (v *VoucherBiz) CmbQuery(ctx context.Context, orderNo string) (resp *v1.CmbQueryReply, err error) { + + c := vo.CmbQueryLockKey.BuildCache([]string{orderNo}) + + err = lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { + + order, err3 := v.OrderRepo.GetByOrderNo(ctx, orderNo) + if err3 != nil { + return err3 + } + + if err = v.Query(ctx, order); err != nil { + return err + } + + status, err3 := order.Status.GetCmbStatusText() + if err3 != nil { + return err3 + } + + resp = &v1.CmbQueryReply{ + Ticket: order.OrderNo, + Status: status.GetValue(), + TransDate: time.Now().Format("20060102150405"), + OrgNo: v.bc.Cmb.OrgNo, + Ext: "", + } + + return nil + }) + + return +} + func (v *VoucherBiz) Query(ctx context.Context, order *bo.OrderBo) error { status, err := v.WechatCpnRepo.Query(ctx, order) diff --git a/internal/biz/retry.go b/internal/biz/retry.go new file mode 100644 index 0000000..2816fdf --- /dev/null +++ b/internal/biz/retry.go @@ -0,0 +1,45 @@ +package biz + +import ( + "context" + "fmt" + "voucher/internal/biz/bo" + "voucher/internal/biz/vo" +) + +func (v *VoucherBiz) OrderRetry(ctx context.Context, outBizNos []string) error { + + if len(outBizNos) > 0 { + + for _, outBizNo := range outBizNos { + + order, err := v.OrderRepo.GetByOutBizNo(ctx, vo.OrderTypeCmb, outBizNo) + if err != nil { + return fmt.Errorf(fmt.Sprintf("获取订单%s异常:%v", outBizNo, err)) + } + + if !order.Status.IsIng() { + return fmt.Errorf(fmt.Sprintf("订单%s状态异常:%s", order.OrderNo, order.Status)) + } + + if err4 := v.orderRetry(ctx, order); err4 != nil { + return err4 + } + } + + return nil + } + + return v.OrderRepo.FindIngInBatches(ctx, func(ctx context.Context, rows []*bo.OrderBo) error { + + for _, order := range rows { + + if err4 := v.orderRetry(ctx, order); err4 != nil { + return err4 + } + } + + return nil + }) + +} diff --git a/internal/service/cmb_order.go b/internal/service/cmb_order.go index 3351c1f..fa9e6fb 100644 --- a/internal/service/cmb_order.go +++ b/internal/service/cmb_order.go @@ -11,41 +11,6 @@ import ( "voucher/internal/biz/vo" ) -func (c *CmbService) OrderSuccess(ctx context.Context, orderNo string) (*v1.CmbReply, error) { - - bizReply := &v1.CmbOrderReply{ - RespCode: vo.CmbResponseStatusSuccess.GetValue(), - RespMsg: "成功", - CodeNo: orderNo, - } - - replyBizContent, _ := json.Marshal(bizReply) - - return c.GetResponse(ctx, replyBizContent) -} - -func (c *CmbService) OrderFail(ctx context.Context, err error) (*v1.CmbReply, error) { - - se := errors.FromError(err) - - if len(se.Reason) == 0 { - se.Reason = err2.CmbErr_CMB_UNKNOWN.String() - } - - log.Errorf("order fail: %v", se) - - bizReply := &v1.CmbOrderReply{ - RespCode: vo.CmbResponseStatusFail.GetValue(), - RespMsg: se.Message, - CodeNo: "", - ThirdErrCode: se.Reason, - } - - replyBizContent, _ := json.Marshal(bizReply) - - return c.GetResponse(ctx, replyBizContent) -} - func (c *CmbService) Order(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { orderNo, err := c.order(ctx, request) @@ -84,3 +49,38 @@ func (c *CmbService) order(ctx context.Context, request *v1.CmbRequest) (string, return orderNo, nil } + +func (c *CmbService) OrderSuccess(ctx context.Context, orderNo string) (*v1.CmbReply, error) { + + bizReply := &v1.CmbOrderReply{ + RespCode: vo.CmbResponseStatusSuccess.GetValue(), + RespMsg: "成功", + CodeNo: orderNo, + } + + replyBizContent, _ := json.Marshal(bizReply) + + return c.GetResponse(ctx, replyBizContent) +} + +func (c *CmbService) OrderFail(ctx context.Context, err error) (*v1.CmbReply, error) { + + se := errors.FromError(err) + + if len(se.Reason) == 0 { + se.Reason = err2.CmbErr_CMB_UNKNOWN.String() + } + + log.Errorf("order fail: %v", se) + + bizReply := &v1.CmbOrderReply{ + RespCode: vo.CmbResponseStatusFail.GetValue(), + RespMsg: se.Message, + CodeNo: "", + ThirdErrCode: se.Reason, + } + + replyBizContent, _ := json.Marshal(bizReply) + + return c.GetResponse(ctx, replyBizContent) +} From a8621cf93d204201f9d5ff0b712ecb1fd69db4dd Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 19:39:24 +0800 Subject: [PATCH 07/20] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/biz/cmb.go | 1 - 1 file changed, 1 deletion(-) delete mode 100644 internal/biz/cmb.go diff --git a/internal/biz/cmb.go b/internal/biz/cmb.go deleted file mode 100644 index 26ded5e..0000000 --- a/internal/biz/cmb.go +++ /dev/null @@ -1 +0,0 @@ -package biz From 2e7f368e06254d4bb061127ab2a67d52fb21cbde Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 19:41:54 +0800 Subject: [PATCH 08/20] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/biz/order.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/biz/order.go b/internal/biz/order.go index 5cd6a61..eaa147b 100644 --- a/internal/biz/order.go +++ b/internal/biz/order.go @@ -47,7 +47,6 @@ func (v *VoucherBiz) order(ctx context.Context, req *bo.OrderCreateReqBo, produc return nil, err } - // 真实发放 voucherNo, err := v.WechatCpnRepo.Order(ctx, order) if err != nil { if err3 := v.fail(ctx, order, err); err3 != nil { From 08a35b40704f984cd8d41a4efb8778f24791d50f Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 19:51:11 +0800 Subject: [PATCH 09/20] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/service/cmb_product.go | 40 +++++++++++++++---------------- internal/service/cmb_query.go | 42 ++++++++++++++++----------------- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/internal/service/cmb_product.go b/internal/service/cmb_product.go index 3595509..46632a4 100644 --- a/internal/service/cmb_product.go +++ b/internal/service/cmb_product.go @@ -10,6 +10,26 @@ import ( "voucher/internal/biz/vo" ) +func (c *CmbService) QueryProduct(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { + + bizReply, err := c.queryProduct(ctx, request) + if err != nil { + return c.QueryProductFail(ctx, err) + } + + return c.QueryProductSuccess(ctx, bizReply) +} + +func (c *CmbService) queryProduct(ctx context.Context, request *v1.CmbRequest) (*v1.CmbQueryProductReply, error) { + + bizContent, err := c.CmbMixRepo.ProductQueryVerify(ctx, request) + if err != nil { + return nil, err + } + + return c.VoucherBiz.CmbProductQuery(ctx, bizContent.ActivityId) +} + func (c *CmbService) QueryProductSuccess(ctx context.Context, bizReply *v1.CmbQueryProductReply) (*v1.CmbReply, error) { replyBizContent, _ := json.Marshal(bizReply) @@ -37,23 +57,3 @@ func (c *CmbService) QueryProductFail(ctx context.Context, err error) (*v1.CmbRe return c.GetResponse(ctx, replyBizContent) } - -func (c *CmbService) QueryProduct(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { - - bizReply, err := c.queryProduct(ctx, request) - if err != nil { - return c.QueryProductFail(ctx, err) - } - - return c.QueryProductSuccess(ctx, bizReply) -} - -func (c *CmbService) queryProduct(ctx context.Context, request *v1.CmbRequest) (*v1.CmbQueryProductReply, error) { - - bizContent, err := c.CmbMixRepo.ProductQueryVerify(ctx, request) - if err != nil { - return nil, err - } - - return c.VoucherBiz.CmbProductQuery(ctx, bizContent.ActivityId) -} diff --git a/internal/service/cmb_query.go b/internal/service/cmb_query.go index 88577a9..433d7db 100644 --- a/internal/service/cmb_query.go +++ b/internal/service/cmb_query.go @@ -8,6 +8,27 @@ import ( v1 "voucher/api/v1" ) +func (c *CmbService) Query(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { + + orderNo, err := c.query(ctx, request) + + if err != nil { + return c.QueryFail(ctx, err) + } + + return c.QuerySuccess(ctx, orderNo) +} + +func (c *CmbService) query(ctx context.Context, request *v1.CmbRequest) (*v1.CmbQueryReply, error) { + + bizContent, err := c.CmbMixRepo.QueryVerify(ctx, request) + if err != nil { + return nil, err + } + + return c.VoucherBiz.CmbQuery(ctx, bizContent.CodeNo) +} + func (c *CmbService) QuerySuccess(ctx context.Context, bizReply *v1.CmbQueryReply) (*v1.CmbReply, error) { replyBizContent, _ := json.Marshal(bizReply) @@ -29,24 +50,3 @@ func (c *CmbService) QueryFail(ctx context.Context, err error) (*v1.CmbReply, er return c.GetResponse(ctx, replyBizContent) } - -func (c *CmbService) Query(ctx context.Context, request *v1.CmbRequest) (*v1.CmbReply, error) { - - orderNo, err := c.query(ctx, request) - - if err != nil { - return c.QueryFail(ctx, err) - } - - return c.QuerySuccess(ctx, orderNo) -} - -func (c *CmbService) query(ctx context.Context, request *v1.CmbRequest) (*v1.CmbQueryReply, error) { - - bizContent, err := c.CmbMixRepo.QueryVerify(ctx, request) - if err != nil { - return nil, err - } - - return c.VoucherBiz.CmbQuery(ctx, bizContent.CodeNo) -} From 2d34354ef2ba8b2f3d16e3668b85f0ed87158d65 Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 20:02:37 +0800 Subject: [PATCH 10/20] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/biz/{notify_retry_consume.go => notify_retry.go} | 0 internal/biz/{wechat_notify_consume.go => wechat_notify.go} | 0 internal/server/{wechat_notify_consumer.go => wechat_consumer.go} | 0 internal/service/{cmb_mock.go => mock.go} | 0 internal/service/{cmb_order.go => order.go} | 0 internal/service/{cmb_product.go => product.go} | 0 internal/service/{cmb_query.go => query.go} | 0 7 files changed, 0 insertions(+), 0 deletions(-) rename internal/biz/{notify_retry_consume.go => notify_retry.go} (100%) rename internal/biz/{wechat_notify_consume.go => wechat_notify.go} (100%) rename internal/server/{wechat_notify_consumer.go => wechat_consumer.go} (100%) rename internal/service/{cmb_mock.go => mock.go} (100%) rename internal/service/{cmb_order.go => order.go} (100%) rename internal/service/{cmb_product.go => product.go} (100%) rename internal/service/{cmb_query.go => query.go} (100%) diff --git a/internal/biz/notify_retry_consume.go b/internal/biz/notify_retry.go similarity index 100% rename from internal/biz/notify_retry_consume.go rename to internal/biz/notify_retry.go diff --git a/internal/biz/wechat_notify_consume.go b/internal/biz/wechat_notify.go similarity index 100% rename from internal/biz/wechat_notify_consume.go rename to internal/biz/wechat_notify.go diff --git a/internal/server/wechat_notify_consumer.go b/internal/server/wechat_consumer.go similarity index 100% rename from internal/server/wechat_notify_consumer.go rename to internal/server/wechat_consumer.go diff --git a/internal/service/cmb_mock.go b/internal/service/mock.go similarity index 100% rename from internal/service/cmb_mock.go rename to internal/service/mock.go diff --git a/internal/service/cmb_order.go b/internal/service/order.go similarity index 100% rename from internal/service/cmb_order.go rename to internal/service/order.go diff --git a/internal/service/cmb_product.go b/internal/service/product.go similarity index 100% rename from internal/service/cmb_product.go rename to internal/service/product.go diff --git a/internal/service/cmb_query.go b/internal/service/query.go similarity index 100% rename from internal/service/cmb_query.go rename to internal/service/query.go From a7e9fc653bceae405c35cae5fbda3e1a3614749e Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 20:35:43 +0800 Subject: [PATCH 11/20] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/biz/cron_notice.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/biz/cron_notice.go b/internal/biz/cron_notice.go index fc6f45f..276eb7a 100644 --- a/internal/biz/cron_notice.go +++ b/internal/biz/cron_notice.go @@ -85,10 +85,10 @@ func (v *VoucherBiz) isCanNotice(ctx context.Context) error { // 二次获取,判定处理,以免获取锁后又执行了一次 - cacheValue, err := v.rdb.Rdb.Get(ctx, cache.Key).Result() + cacheValue, err2 := v.rdb.Rdb.Get(ctx, cache.Key).Result() - if err != nil && err != redis.Nil { - return fmt.Errorf(fmt.Sprintf("notice 二次获取redis缓存%s异常:%v", cache.Key, err)) + if err2 != nil && err2 != redis.Nil { + return fmt.Errorf(fmt.Sprintf("notice 二次获取redis缓存%s异常:%v", cache.Key, err2)) } if len(cacheValue) > 0 { @@ -113,7 +113,7 @@ func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo) error { } if order.Status == status { - log.Warnf("notice 券状态未改变:%s,忽略不处理,orderNo:%s", order.Status.GetText(), order.OrderNo) + //log.Warnf("notice 券状态未改变:%s,忽略不处理,orderNo:%s", order.Status.GetText(), order.OrderNo) return nil } @@ -140,7 +140,7 @@ func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo) error { func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo) error { if !orderNotify.Event.CanNotify() { - log.Warnf("notice 券状态:%s,忽略不通知,orderNo:%s", orderNotify.Event.GetText(), order.OrderNo) + //log.Warnf("notice 券状态:%s,忽略不通知,orderNo:%s", orderNotify.Event.GetText(), order.OrderNo) return nil } From 84451f8aaabbc945820a8fcbcbaeb21b48d5b47b Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 20:40:53 +0800 Subject: [PATCH 12/20] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/pkg/helper/utils_test.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal/pkg/helper/utils_test.go b/internal/pkg/helper/utils_test.go index 5ab23c1..64996c6 100644 --- a/internal/pkg/helper/utils_test.go +++ b/internal/pkg/helper/utils_test.go @@ -3,9 +3,25 @@ package helper import ( "fmt" "testing" + "time" ) func TestHashMod(t *testing.T) { serverId := HashMod("1dfsfdsfsddf12dddd5451212iodewnsanf2") fmt.Println(serverId) } + +func TestNoticeTime(t *testing.T) { + now := time.Now() + + // 获取七天前的日期 + noticeStartDay := now.AddDate(0, 0, -29) + // 获取七天前 00:00:00 的时间 + startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location()) + + noticeEndDay := now.AddDate(0, 0, -28) + // 获取昨天 23:59:59 的时间 + endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location()) + + t.Logf("startTime:%s,endTime:%s", startTime, endTime) +} From 28968842ed0fda5b8eca09b58c58c8d0776fb88d Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 21:14:36 +0800 Subject: [PATCH 13/20] =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/pkg/mq_http/mq_http_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/pkg/mq_http/mq_http_test.go b/internal/pkg/mq_http/mq_http_test.go index 81a903c..47e5758 100644 --- a/internal/pkg/mq_http/mq_http_test.go +++ b/internal/pkg/mq_http/mq_http_test.go @@ -37,7 +37,7 @@ func Test_WechatNotifyProducer(t *testing.T) { func Test_WechatNotifyProducer2(t *testing.T) { - tag := "voucher_notify_dev" + tag := "voucher_notify_pro" bodyStr := `{"id":"5465699d-de6a-5414-a8df-283167b577ca", "create_time":"2025-03-07T15:57:24+08:00", @@ -48,15 +48,15 @@ func Test_WechatNotifyProducer2(t *testing.T) { "associated_data":"coupon", "plain_text":{ "stock_creator_mchid":"1652465541", -"stock_id":"20259610", -"coupon_id":"97225743207", +"stock_id":"20393435", +"coupon_id":"101423873113", "coupon_name":"test", "description":"","status":"USED", "create_time":"2025-03-07T15:49:31+08:00", "coupon_type":"NORMAL", "no_cash":false, "singleitem":false, -"consume_information":{"consume_time":"2025-03-07T15:57:24+08:00","consume_mchid":"1800002761","transaction_id":"4200002544202503077103159055"}}}` +"consume_information":{"consume_time":"2025-05-13T15:57:24+08:00","consume_mchid":"1800002761","transaction_id":"4200002544202503077103159055"}}}` if err := wechatNotifyProducer(tag, bodyStr); err != nil { t.Errorf("入队失败 error = %v", err) From e3085019f41999945ac91a6549fba8ed41cba05b Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 21:29:22 +0800 Subject: [PATCH 14/20] =?UTF-8?q?=E5=BE=AE=E4=BF=A1=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E6=8E=A5=E6=94=B6=E6=B6=88=E6=81=AF=E6=88=90?= =?UTF-8?q?=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/server/wechat_consumer.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/server/wechat_consumer.go b/internal/server/wechat_consumer.go index e9de469..3becbb4 100644 --- a/internal/server/wechat_consumer.go +++ b/internal/server/wechat_consumer.go @@ -150,11 +150,9 @@ func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntr } }() - log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody) - ctx := context.Background() if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil { - log.Errorf("微信回调消费处理失败:%+v", err) + log.Errorf("微信回调消费接收消息成功,处理失败 messageId:%s, messageTag:%s, message: %s, err:%+v", msg.MessageId, msg.MessageTag, msg.MessageBody, err) } } From 5924ee0c11e76ab148402d1016eb73dda47bd964 Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 21:29:57 +0800 Subject: [PATCH 15/20] =?UTF-8?q?=E5=BE=AE=E4=BF=A1=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E6=8E=A5=E6=94=B6=E6=B6=88=E6=81=AF=E6=88=90?= =?UTF-8?q?=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/server/wechat_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/server/wechat_consumer.go b/internal/server/wechat_consumer.go index 3becbb4..3017144 100644 --- a/internal/server/wechat_consumer.go +++ b/internal/server/wechat_consumer.go @@ -152,7 +152,7 @@ func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntr ctx := context.Background() if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil { - log.Errorf("微信回调消费接收消息成功,处理失败 messageId:%s, messageTag:%s, message: %s, err:%+v", msg.MessageId, msg.MessageTag, msg.MessageBody, err) + log.Errorf("微信回调消费接收消息成功,处理失败 messageId:%s, messageTag:%s, message:%s, err:%+v", msg.MessageId, msg.MessageTag, msg.MessageBody, err) } } From f4cecc138b5b024d18b1f61229e2b260b1262213 Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 22:09:30 +0800 Subject: [PATCH 16/20] =?UTF-8?q?=E5=BE=AE=E4=BF=A1=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E6=8E=A5=E6=94=B6=E6=B6=88=E6=81=AF=E6=88=90?= =?UTF-8?q?=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/biz/order.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/biz/order.go b/internal/biz/order.go index eaa147b..96df93d 100644 --- a/internal/biz/order.go +++ b/internal/biz/order.go @@ -12,7 +12,7 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or order, err3 := v.GetByOutBizNo(ctx, req) if err3 != nil { - return orderNo, err3 + return "", err3 } if order != nil { @@ -20,7 +20,7 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or if order.Status.IsFail() || order.Status.IsIng() { if err4 := v.orderRetry(ctx, order); err4 != nil { - return orderNo, err4 + return "", err4 } } @@ -29,12 +29,12 @@ func (v *VoucherBiz) CmbOrder(ctx context.Context, req *bo.OrderCreateReqBo) (or product, err3 := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) if err3 != nil { - return orderNo, err3 + return "", err3 } order, err3 = v.order(ctx, req, product) if err3 != nil { - return orderNo, err3 + return "", err3 } return order.OrderNo, nil From 216500e4d10878e4800174d161305234471c2acd Mon Sep 17 00:00:00 2001 From: ziming Date: Wed, 28 May 2025 22:12:30 +0800 Subject: [PATCH 17/20] =?UTF-8?q?=E5=BE=AE=E4=BF=A1=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E6=8E=A5=E6=94=B6=E6=B6=88=E6=81=AF=E6=88=90?= =?UTF-8?q?=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/service/order.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/service/order.go b/internal/service/order.go index fa9e6fb..164fc1e 100644 --- a/internal/service/order.go +++ b/internal/service/order.go @@ -29,8 +29,6 @@ func (c *CmbService) order(ctx context.Context, request *v1.CmbRequest) (string, return "", err } - ctx2 := context.Background() - boReq := &bo.OrderCreateReqBo{ OutBizNo: bizContent.TransactionId, ProductNo: bizContent.ActivityId, @@ -42,7 +40,7 @@ func (c *CmbService) order(ctx context.Context, request *v1.CmbRequest) (string, NotifyUrl: c.bc.Cmb.NotifyUrl, } - orderNo, err := c.VoucherBiz.CmbOrder(ctx2, boReq) + orderNo, err := c.VoucherBiz.CmbOrder(ctx, boReq) if err != nil { return "", err } From 73a0321d99769dc0d00a5cf12bae31e0ba557c00 Mon Sep 17 00:00:00 2001 From: ziming Date: Thu, 29 May 2025 17:05:12 +0800 Subject: [PATCH 18/20] =?UTF-8?q?=E5=BE=AE=E4=BF=A1=E5=9B=9E=E8=B0=83?= =?UTF-8?q?=E6=B6=88=E8=B4=B9=E6=8E=A5=E6=94=B6=E6=B6=88=E6=81=AF=E6=88=90?= =?UTF-8?q?=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/data/wechatrepoimpl/cpn.go | 30 ++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/internal/data/wechatrepoimpl/cpn.go b/internal/data/wechatrepoimpl/cpn.go index c1ab450..c212f61 100644 --- a/internal/data/wechatrepoimpl/cpn.go +++ b/internal/data/wechatrepoimpl/cpn.go @@ -88,7 +88,11 @@ func (c *CpnRepoImpl) Order(ctx context.Context, order *bo.OrderBo) (string, err if err != nil { - return "", c.bodyErr(ctx, result) + if result.Response.Body != nil { + return "", c.bodyErr(ctx, result) + } + + return "", err } return *resp.CouponId, nil @@ -112,7 +116,11 @@ func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderBo) (vo.Or resp, result, err := svc.QueryCoupon(ctx, req) if err != nil { - return 0, c.bodyErr(ctx, result) + if result.Response.Body != nil { + return 0, c.bodyErr(ctx, result) + } + + return 0, err } return CpnStatus(*resp.Status).GetStatus() @@ -139,7 +147,11 @@ func (c *CpnRepoImpl) QueryProduct(ctx context.Context, stockCreatorMchId, stock if err != nil { - return nil, c.bodyErr(ctx, result) + if result.Response.Body != nil { + return nil, c.bodyErr(ctx, result) + } + + return nil, err } return response, nil @@ -160,7 +172,11 @@ func (c *CpnRepoImpl) QueryCallback(ctx context.Context) (*cashcoupons.Callback, if err != nil { - return nil, c.bodyErr(ctx, result) + if result.Response.Body != nil { + return nil, c.bodyErr(ctx, result) + } + + return nil, err } return response, nil @@ -183,7 +199,11 @@ func (c *CpnRepoImpl) SetCallback(ctx context.Context, url string) (*cashcoupons if err != nil { - return nil, c.bodyErr(ctx, result) + if result.Response.Body != nil { + return nil, c.bodyErr(ctx, result) + } + + return nil, err } return response, nil From 451169b22da23576a1671331af1d763ec3542939 Mon Sep 17 00:00:00 2001 From: ziming Date: Thu, 29 May 2025 17:37:14 +0800 Subject: [PATCH 19/20] =?UTF-8?q?=E5=BE=AE=E4=BF=A1=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/data/repoimpl/order.go | 21 ++++----------------- internal/service/order.go | 3 --- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 89c7ef1..d7780b0 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/go-kratos/kratos/v2/log" "gorm.io/gorm" "time" "unicode/utf8" @@ -37,10 +36,8 @@ func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx conte result := p.DB(ctx). Where("status = ?", vo.OrderStatusIng.GetValue()). - FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { - // tx.RowsAffected 提供当前批处理中记录的计数(the count of records in the current batch) - // 'batch' 变量表示当前批号(the current batch number) - // 返回 error 将阻止更多的批处理 + Limit(100). + FindInBatches(&results, 20, func(tx *gorm.DB, batch int) error { return fun(ctx, p.ToBos(results)) }) @@ -95,12 +92,9 @@ func (p *OrderRepoImpl) Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderB UpdateTime: &now, } - db := p.DB(ctx) - tx := db.Create(info) + tx := p.DB(ctx).Create(info) if tx.Error != nil { - sqlDB, _ := db.DB() - log.Warnf("order create 当前打开连接数:%d,空闲连接数:%d", sqlDB.Stats().OpenConnections, sqlDB.Stats().Idle) return nil, fmt.Errorf("create db fail %w", tx.Error) } @@ -111,8 +105,7 @@ func (p *OrderRepoImpl) GetByOutBizNo(ctx context.Context, t vo.OrderType, outBi info := &model.Order{} - db := p.DB(ctx) - tx := db.Where(model.Order{Type: t.GetValue(), OutBizNo: outBizNo}).First(&info) + tx := p.DB(ctx).Where(model.Order{Type: t.GetValue(), OutBizNo: outBizNo}).First(&info) if tx.Error != nil { @@ -120,11 +113,6 @@ func (p *OrderRepoImpl) GetByOutBizNo(ctx context.Context, t vo.OrderType, outBi return nil, err2.ErrorDbNotFound("订单数据不存在") } - if errors.Is(tx.Error, context.DeadlineExceeded) { - sqlDB, _ := db.DB() - log.Warnf("order当前打开连接数:%d,空闲连接数:%d", sqlDB.Stats().OpenConnections, sqlDB.Stats().Idle) - } - return nil, fmt.Errorf("order db fail %w", tx.Error) } @@ -220,7 +208,6 @@ func (p *OrderRepoImpl) Success(ctx context.Context, id uint64, voucherNo string tx := p.DB(ctx). Where(model.Order{ ID: id, - //Status: vo.OrderStatusIng.GetValue(), }). Updates(model.Order{ Status: vo.OrderStatusSuccess.GetValue(), diff --git a/internal/service/order.go b/internal/service/order.go index 164fc1e..398076b 100644 --- a/internal/service/order.go +++ b/internal/service/order.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "github.com/go-kratos/kratos/v2/errors" - "github.com/go-kratos/kratos/v2/log" err2 "voucher/api/err" v1 "voucher/api/v1" "voucher/internal/biz/bo" @@ -69,8 +68,6 @@ func (c *CmbService) OrderFail(ctx context.Context, err error) (*v1.CmbReply, er se.Reason = err2.CmbErr_CMB_UNKNOWN.String() } - log.Errorf("order fail: %v", se) - bizReply := &v1.CmbOrderReply{ RespCode: vo.CmbResponseStatusFail.GetValue(), RespMsg: se.Message, From 41df5bff23b8c7af5a2a36556c76bd00b42bd852 Mon Sep 17 00:00:00 2001 From: ziming Date: Thu, 29 May 2025 17:39:42 +0800 Subject: [PATCH 20/20] =?UTF-8?q?=E5=BE=AE=E4=BF=A1=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/data/wechatrepoimpl/cpn.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/data/wechatrepoimpl/cpn.go b/internal/data/wechatrepoimpl/cpn.go index c212f61..0bc9ea8 100644 --- a/internal/data/wechatrepoimpl/cpn.go +++ b/internal/data/wechatrepoimpl/cpn.go @@ -88,7 +88,7 @@ func (c *CpnRepoImpl) Order(ctx context.Context, order *bo.OrderBo) (string, err if err != nil { - if result.Response.Body != nil { + if result.Response != nil && result.Response.Body != nil { return "", c.bodyErr(ctx, result) } @@ -116,7 +116,7 @@ func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderBo) (vo.Or resp, result, err := svc.QueryCoupon(ctx, req) if err != nil { - if result.Response.Body != nil { + if result.Response != nil && result.Response.Body != nil { return 0, c.bodyErr(ctx, result) } @@ -147,7 +147,7 @@ func (c *CpnRepoImpl) QueryProduct(ctx context.Context, stockCreatorMchId, stock if err != nil { - if result.Response.Body != nil { + if result.Response != nil && result.Response.Body != nil { return nil, c.bodyErr(ctx, result) } @@ -172,7 +172,7 @@ func (c *CpnRepoImpl) QueryCallback(ctx context.Context) (*cashcoupons.Callback, if err != nil { - if result.Response.Body != nil { + if result.Response != nil && result.Response.Body != nil { return nil, c.bodyErr(ctx, result) } @@ -199,7 +199,7 @@ func (c *CpnRepoImpl) SetCallback(ctx context.Context, url string) (*cashcoupons if err != nil { - if result.Response.Body != nil { + if result.Response != nil && result.Response.Body != nil { return nil, c.bodyErr(ctx, result) }