From 3c5de0b144d96999be851d9933f244b19820567d Mon Sep 17 00:00:00 2001 From: ziming Date: Fri, 1 Aug 2025 11:37:42 +0800 Subject: [PATCH] =?UTF-8?q?=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- configs/config.yaml | 8 +- internal/biz/repo/order.go | 1 + internal/biz/repo/orderBak.go | 2 +- internal/biz/retry_notify.go | 122 ++++++++++++++++++++++++++++ internal/biz/voucher.go | 3 + internal/conf/conf.pb.go | 99 ++++++++++++---------- internal/conf/conf.proto | 1 + internal/data/repoimpl/order.go | 38 +++++++++ internal/data/repoimpl/order_bak.go | 2 +- internal/server/http.go | 2 + internal/service/script.go | 29 +++++++ 11 files changed, 261 insertions(+), 46 deletions(-) create mode 100644 internal/biz/retry_notify.go diff --git a/configs/config.yaml b/configs/config.yaml index f0357a2..68d4e6c 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -101,13 +101,19 @@ rdsMQ: retryNum: 1 #重试次数 numWorkers: 3 #协程数量,不配置默认为10 waitTime: 1s #处理完成后等待时间 - isOpen: true #是否启动消费 true/false + isOpen: false #是否启动消费 true/false orderRetry: name: "orderRetry" retryNum: 1 #重试次数 numWorkers: 2 #协程数量,不配置默认为10 waitTime: 1s #处理完成后等待时间 isOpen: false #是否启动消费 true/false + retryNotify: + name: "retryNotify" + retryNum: 1 #重试次数 + numWorkers: 1 #协程数量,不配置默认为10 + waitTime: 1s #处理完成后等待时间 + isOpen: false #是否启动消费 true/false aliYunSms: accessKeyId: diff --git a/internal/biz/repo/order.go b/internal/biz/repo/order.go index 99306dd..e4be63a 100644 --- a/internal/biz/repo/order.go +++ b/internal/biz/repo/order.go @@ -8,6 +8,7 @@ import ( ) type OrderRepo interface { + SpecifyFindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error diff --git a/internal/biz/repo/orderBak.go b/internal/biz/repo/orderBak.go index 8d865cd..2c60b03 100644 --- a/internal/biz/repo/orderBak.go +++ b/internal/biz/repo/orderBak.go @@ -6,5 +6,5 @@ import ( ) type OrderBakRepo interface { - FindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error + SpecifyFindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error } diff --git a/internal/biz/retry_notify.go b/internal/biz/retry_notify.go new file mode 100644 index 0000000..df4d5e3 --- /dev/null +++ b/internal/biz/retry_notify.go @@ -0,0 +1,122 @@ +package biz + +import ( + "context" + "encoding/json" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/transport/http" + "time" + "voucher/internal/biz/bo" +) + +func (this *VoucherBiz) PushRetryNotify(ctx http.Context, req *bo.FindInBatchesBo) error { + + queue := this.bc.RdsMQ.GetRetryNotify() + if queue == nil { + return fmt.Errorf("队列不存在") + } + + msg, err := json.Marshal(req) + if err != nil { + return err + } + + strMsg := string(msg) + + _, err = this.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result() + if err != nil { + return fmt.Errorf("添加到队列失败:%v", err) + } + + return nil +} + +func (this *VoucherBiz) RetryNotify(ctx context.Context, msg string) error { + + var req *bo.FindInBatchesBo + + if err := json.Unmarshal([]byte(msg), &req); err != nil { + return err + } + + if err := this.RetryNotifyOrder(ctx, req); err != nil { + return err + } + + return this.RetryNotifyOrderBack(ctx, req) +} + +func (this *VoucherBiz) RetryNotifyOrder(ctx context.Context, req *bo.FindInBatchesBo) error { + + start := time.Now() + startStr := time.Now().String() + + log.Warnf("重试通知处理:%s", startStr) + fmt.Printf("重试通知处理:%s", startStr) + + n := 0 + eNum := 0 + notifyNum := 0 + err := this.OrderRepo.SpecifyFindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { + + n += 1 + for _, order := range rows { + + eNum += 1 + if _, err := this.Cmb.Notify(ctx, order); err != nil { + log.Errorf("重试通知处理,发生错误,orderNo:%s,outBizNo:%s,voucherNo:%s,err:%v", order.OrderNo, order.OutBizNo, order.VoucherNo, err) + } else { + notifyNum += 1 + } + + } + + log.Warnf("重试通知处理,第:%d组,已执行条数:%d,通知成功条数:%d,", n, eNum, notifyNum) + + return nil + }) + + endTime := time.Now() + log.Warnf("重试通知处理,耗时:%s,结束时间:%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum) + fmt.Printf("重试通知处理,耗时:%s,结束时间%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum) + + return err +} + +func (this *VoucherBiz) RetryNotifyOrderBack(ctx context.Context, req *bo.FindInBatchesBo) error { + + start := time.Now() + startStr := time.Now().String() + + log.Warnf("重试通知处理Bak:%s", startStr) + fmt.Printf("重试通知处理Bak:%s", startStr) + + n := 0 + eNum := 0 + notifyNum := 0 + err := this.OrderBakRepo.SpecifyFindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { + + n += 1 + for _, order := range rows { + + eNum += 1 + if _, err := this.Cmb.Notify(ctx, order); err != nil { + log.Errorf("重试通知处理Bak,发生错误,orderNo:%s,outBizNo:%s,voucherNo:%s,err:%v", order.OrderNo, order.OutBizNo, order.VoucherNo, err) + } else { + notifyNum += 1 + } + + } + + log.Warnf("重试通知处理Bak,第:%d组,已执行条数:%d,通知成功条数:%d,", n, eNum, notifyNum) + + return nil + }) + + endTime := time.Now() + log.Warnf("重试通知处理Bak,耗时:%s,结束时间:%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum) + fmt.Printf("重试通知处理Bak,耗时:%s,结束时间%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum) + + return err +} diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index cf56f38..c57ba0c 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -16,6 +16,7 @@ type VoucherBiz struct { Cmb *cmb.Cmb ProductRepo repo.ProductRepo OrderRepo repo.OrderRepo + OrderBakRepo repo.OrderBakRepo OrderNotifyRepo repo.OrderNotifyRepo WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo MqSendMixRepo mixrepos.MQSendMixRepo @@ -35,6 +36,7 @@ func NewVoucherBiz( Cmb *cmb.Cmb, ProductRepo repo.ProductRepo, OrderRepo repo.OrderRepo, + OrderBakRepo repo.OrderBakRepo, OrderNotifyRepo repo.OrderNotifyRepo, WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo, MqSendMixRepo mixrepos.MQSendMixRepo, @@ -50,6 +52,7 @@ func NewVoucherBiz( Cmb: Cmb, ProductRepo: ProductRepo, OrderRepo: OrderRepo, + OrderBakRepo: OrderBakRepo, OrderNotifyRepo: OrderNotifyRepo, WechatNotifyRegisterTagRepo: WechatNotifyRegisterTagRepo, MqSendMixRepo: MqSendMixRepo, diff --git a/internal/conf/conf.pb.go b/internal/conf/conf.pb.go index 462ee8e..ea34485 100644 --- a/internal/conf/conf.pb.go +++ b/internal/conf/conf.pb.go @@ -859,6 +859,7 @@ type RdsMQ struct { WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"` WechatTimeSliceQuery *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatTimeSliceQuery,proto3" json:"wechatTimeSliceQuery,omitempty"` WechatRetry *RdsMQ_Queue `protobuf:"bytes,3,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"` + RetryNotify *RdsMQ_Queue `protobuf:"bytes,4,opt,name=retryNotify,proto3" json:"retryNotify,omitempty"` } func (x *RdsMQ) Reset() { @@ -914,6 +915,13 @@ func (x *RdsMQ) GetWechatRetry() *RdsMQ_Queue { return nil } +func (x *RdsMQ) GetRetryNotify() *RdsMQ_Queue { + if x != nil { + return x.RetryNotify + } + return nil +} + type AliYunSms struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1662,7 +1670,7 @@ var file_conf_conf_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, - 0x22, 0xff, 0x02, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, + 0x22, 0xbe, 0x03, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, @@ -1675,35 +1683,39 @@ var file_conf_conf_proto_rawDesc = []byte{ 0x63, 0x68, 0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, - 0x63, 0x68, 0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, - 0x65, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, - 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, - 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x0d, 0x52, 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, - 0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, - 0x0a, 0x6e, 0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, - 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, - 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, - 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, - 0x6d, 0x65, 0x22, 0xb9, 0x01, 0x0a, 0x09, 0x41, 0x6c, 0x69, 0x59, 0x75, 0x6e, 0x53, 0x6d, 0x73, - 0x12, 0x20, 0x0a, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, - 0x49, 0x64, 0x12, 0x28, 0x0a, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, - 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x63, 0x63, - 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, - 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x69, 0x67, 0x6e, - 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x69, 0x67, 0x6e, - 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, - 0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x74, - 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x3a, - 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, - 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, - 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, 0x6f, - 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, 0x63, - 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x63, 0x68, 0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x12, 0x3d, 0x0a, 0x0b, 0x72, 0x65, 0x74, + 0x72, 0x79, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, + 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, + 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x72, 0x65, 0x74, + 0x72, 0x79, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, + 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, + 0x0a, 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, + 0x52, 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, + 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, + 0x6e, 0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, + 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, + 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, + 0x65, 0x22, 0xb9, 0x01, 0x0a, 0x09, 0x41, 0x6c, 0x69, 0x59, 0x75, 0x6e, 0x53, 0x6d, 0x73, 0x12, + 0x20, 0x0a, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, + 0x64, 0x12, 0x28, 0x0a, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x65, + 0x63, 0x72, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x63, 0x63, 0x65, + 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x65, + 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, + 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e, + 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e, + 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x57, + 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x65, + 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x3a, 0x0a, + 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, + 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, + 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, 0x6f, 0x75, + 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, 0x63, 0x6f, + 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1762,19 +1774,20 @@ var file_conf_conf_proto_depIdxs = []int32{ 19, // 16: voucher.config.RdsMQ.wechatQuery:type_name -> voucher.config.RdsMQ.Queue 19, // 17: voucher.config.RdsMQ.wechatTimeSliceQuery:type_name -> voucher.config.RdsMQ.Queue 19, // 18: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue - 20, // 19: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration - 20, // 20: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration - 20, // 21: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration - 20, // 22: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration - 20, // 23: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration - 4, // 24: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap - 17, // 25: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap - 20, // 26: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration - 27, // [27:27] is the sub-list for method output_type - 27, // [27:27] is the sub-list for method input_type - 27, // [27:27] is the sub-list for extension type_name - 27, // [27:27] is the sub-list for extension extendee - 0, // [0:27] is the sub-list for field type_name + 19, // 19: voucher.config.RdsMQ.retryNotify:type_name -> voucher.config.RdsMQ.Queue + 20, // 20: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration + 20, // 21: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration + 20, // 22: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration + 20, // 23: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration + 20, // 24: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration + 4, // 25: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap + 17, // 26: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap + 20, // 27: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration + 28, // [28:28] is the sub-list for method output_type + 28, // [28:28] is the sub-list for method input_type + 28, // [28:28] is the sub-list for extension type_name + 28, // [28:28] is the sub-list for extension extendee + 0, // [0:28] is the sub-list for field type_name } func init() { file_conf_conf_proto_init() } diff --git a/internal/conf/conf.proto b/internal/conf/conf.proto index 738da25..985c0da 100644 --- a/internal/conf/conf.proto +++ b/internal/conf/conf.proto @@ -132,6 +132,7 @@ message RdsMQ { Queue wechatQuery = 1; Queue wechatTimeSliceQuery = 2; Queue wechatRetry = 3; + Queue retryNotify = 4; } message AliYunSms { diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index ff2badd..fd2d587 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -31,6 +31,44 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB { return p.db.DB(ctx).Model(model.Order{}) } +func (p *OrderRepoImpl) SpecifyFindInBatches(ctx context.Context, req *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { + + tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue()) + + if req.ProductNo != "" { + tx = tx.Where("product_no = ?", req.ProductNo) + } + + if req.StartTime != nil && req.EndTime != nil { + tx = tx.Where("receive_success_time BETWEEN ? AND ?", req.StartTime, req.EndTime) + } + + if req.OrderNo != nil { + tx = tx.Where("order_no IN (?)", req.OrderNo) + } + + if req.OutBizNo != nil { + tx = tx.Where("out_biz_no IN (?)", req.OutBizNo) + } + + if req.VoucherNo != nil { + tx = tx.Where("voucher_no IN (?)", req.VoucherNo) + } + + var results = make([]*model.Order, 0) + + result := tx.FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { + + return fun(ctx, p.ToBos(results)) + }) + + if result.Error != nil { + return result.Error + } + + return nil +} + func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue()) diff --git a/internal/data/repoimpl/order_bak.go b/internal/data/repoimpl/order_bak.go index 954579b..6ace422 100644 --- a/internal/data/repoimpl/order_bak.go +++ b/internal/data/repoimpl/order_bak.go @@ -25,7 +25,7 @@ func (p *OrderBakRepoImpl) DB(ctx context.Context) *gorm.DB { return p.db.DB(ctx).Model(model.OrderBak{}) } -func (p *OrderBakRepoImpl) FindInBatches(ctx context.Context, req *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { +func (p *OrderBakRepoImpl) SpecifyFindInBatches(ctx context.Context, req *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue()) diff --git a/internal/server/http.go b/internal/server/http.go index 505217d..0c52e1b 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -44,6 +44,8 @@ func NewHTTPServer( srv.Route("/voucher/").POST("timeSliceQueryPush", cmb.TimeSliceQueryPush) srv.Route("/voucher/").POST("pushWechatRetry/{batch_no}", cmb.PushWechatRetry) srv.Route("/voucher/").POST("warningBudget/{id}", cmb.WarningBudget) + // 指定重复通知对应单子数据 + srv.Route("/voucher/").POST("specifyNotification", cmb.SpecifyNotification) v1.RegisterCmbHTTPServer(srv, cmb) diff --git a/internal/service/script.go b/internal/service/script.go index dd1cdd6..d756cd7 100644 --- a/internal/service/script.go +++ b/internal/service/script.go @@ -7,6 +7,7 @@ import ( "io" http2 "net/http" "strconv" + "voucher/internal/biz/bo" "voucher/internal/biz/do" ) @@ -154,3 +155,31 @@ func (this *CmbService) WarningBudget(ctx http.Context) error { "data": id, }) } + +func (this *CmbService) SpecifyNotification(ctx http.Context) error { + + bodyBytes, err := io.ReadAll(ctx.Request().Body) + if err != nil { + return err + } + + var req *bo.FindInBatchesBo + if err = json.Unmarshal(bodyBytes, &req); err != nil { + return err + } + + if req == nil { + return fmt.Errorf("req is empty") + } + + if req.OutBizNo == nil && req.OrderNo == nil && req.VoucherNo == nil { + return fmt.Errorf("out_biz_no or order_no or voucher_no is empty") + } + + err = this.VoucherBiz.PushRetryNotify(ctx, req) + if err != nil { + return err + } + + return ctx.String(http2.StatusOK, string(bodyBytes)) +}