diff --git a/configs/config.yaml b/configs/config.yaml index 31a43e5..ffd5222 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -120,6 +120,12 @@ rdsMQ: numWorkers: 1 #协程数量,不配置默认为10 waitTime: 1s #处理完成后等待时间 isOpen: false #是否启动消费 true/false + usedNotify: + name: "usedNotify" + retryNum: 1 #重试次数 + numWorkers: 1 #协程数量,不配置默认为10 + waitTime: 1s #处理完成后等待时间 + isOpen: true #是否启动消费 true/false aliYunSms: accessKeyId: diff --git a/internal/biz/do/rds_mq.go b/internal/biz/do/rds_mq.go index e9c17f6..3009437 100644 --- a/internal/biz/do/rds_mq.go +++ b/internal/biz/do/rds_mq.go @@ -25,3 +25,11 @@ type RdsWechatQuery struct { GoNum int `json:"go_num"` // 并发数 TimeSliceHours int64 `json:"time_slice_hours"` // 时间片"小时" } + +type WechatUsedQuery struct { + ProductNo string `json:"product_no"` + BatchNo string `json:"batch_no"` + OrderNo string `json:"order_no"` + StartTime string `json:"start_time"` + EndTime string `json:"end_time"` +} diff --git a/internal/biz/repo/order.go b/internal/biz/repo/order.go index e4be63a..3cc8bbc 100644 --- a/internal/biz/repo/order.go +++ b/internal/biz/repo/order.go @@ -8,6 +8,7 @@ import ( ) type OrderRepo interface { + FinUsedInBatches(ctx context.Context, req *do.WechatUsedQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error SpecifyFindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error diff --git a/internal/biz/timeslicequery/query.go b/internal/biz/timeslicequery/query.go index 998046c..eb1f58c 100644 --- a/internal/biz/timeslicequery/query.go +++ b/internal/biz/timeslicequery/query.go @@ -7,6 +7,11 @@ import ( func (v *Query) wechatQuery(ctx context.Context, order *bo.OrderBo, useNum *int) error { + if order.Status.IsExpired() { + _, err := v.cmb.Notify(ctx, order) + return err + } + status, err := v.wechatCpnRepo.Query(ctx, order) if err != nil { return err diff --git a/internal/biz/used_notify.go b/internal/biz/used_notify.go new file mode 100644 index 0000000..f93017b --- /dev/null +++ b/internal/biz/used_notify.go @@ -0,0 +1,77 @@ +package biz + +import ( + "context" + "encoding/json" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/transport/http" + "voucher/internal/biz/bo" + "voucher/internal/biz/do" +) + +func (this *VoucherBiz) UsedNotifyPush(ctx http.Context, req *do.WechatUsedQuery) error { + + queue := this.bc.RdsMQ.GetUsedNotify() + if queue == nil { + return fmt.Errorf("队列不存在") + } + + msg, err := json.Marshal(req) + if err != nil { + return err + } + + strMsg := string(msg) + + _, err = this.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result() + if err != nil { + return fmt.Errorf("添加到队列失败:%v", err) + } + + return nil +} + +func (this *VoucherBiz) UsedNotify(ctx context.Context, msg string) error { + + log.Warnf("核销重试通知处理,开始:%s", msg) + + var req *do.WechatUsedQuery + + if err := json.Unmarshal([]byte(msg), &req); err != nil { + return err + } + + errNum := 0 + + return this.OrderRepo.FinUsedInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { + + for _, order := range rows { + + event, err := order.Status.GetOrderNotifyEvent() + if err != nil { + return err + } + + notify := &bo.OrderNotifyBo{ + OrderNo: order.OrderNo, + NotifyUrl: order.NotifyUrl, + Channel: order.Channel, + Event: event, + Type: order.Type, + } + + if err = this.request(ctx, order, notify); err != nil { + errNum++ + if errNum > 50 { + return fmt.Errorf("核销重试通知处理,通知失败次数超过50次,请检查:%v", err) + } + log.Warnf("核销重试通知处理,通知失败:%v", err) + return err + } + + } + + return nil + }) +} diff --git a/internal/conf/conf.pb.go b/internal/conf/conf.pb.go index f4b735c..10176f4 100644 --- a/internal/conf/conf.pb.go +++ b/internal/conf/conf.pb.go @@ -869,6 +869,7 @@ type RdsMQ struct { WechatRetry *RdsMQ_Queue `protobuf:"bytes,3,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"` RetryNotify *RdsMQ_Queue `protobuf:"bytes,4,opt,name=retryNotify,proto3" json:"retryNotify,omitempty"` OrderNotifyRetry *RdsMQ_Queue `protobuf:"bytes,5,opt,name=orderNotifyRetry,proto3" json:"orderNotifyRetry,omitempty"` + UsedNotify *RdsMQ_Queue `protobuf:"bytes,6,opt,name=usedNotify,proto3" json:"usedNotify,omitempty"` } func (x *RdsMQ) Reset() { @@ -938,6 +939,13 @@ func (x *RdsMQ) GetOrderNotifyRetry() *RdsMQ_Queue { return nil } +func (x *RdsMQ) GetUsedNotify() *RdsMQ_Queue { + if x != nil { + return x.UsedNotify + } + return nil +} + type AliYunSms struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1687,7 +1695,7 @@ var file_conf_conf_proto_rawDesc = []byte{ 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x3a, 0x02, 0x38, 0x01, 0x22, 0x87, 0x04, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, + 0x3a, 0x02, 0x38, 0x01, 0x22, 0xc4, 0x04, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, @@ -1709,35 +1717,39 @@ var file_conf_conf_proto_rawDesc = []byte{ 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x10, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, - 0x52, 0x65, 0x74, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65, 0x12, - 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, - 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x72, - 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x72, - 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x57, 0x6f, - 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, 0x75, 0x6d, - 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, - 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, - 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xb9, - 0x01, 0x0a, 0x09, 0x41, 0x6c, 0x69, 0x59, 0x75, 0x6e, 0x53, 0x6d, 0x73, 0x12, 0x20, 0x0a, 0x0b, - 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x12, 0x28, - 0x0a, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x65, 0x63, 0x72, 0x65, - 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, - 0x65, 0x79, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, - 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, - 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e, 0x61, 0x6d, 0x65, - 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e, 0x61, 0x6d, 0x65, - 0x12, 0x28, 0x0a, 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x57, 0x61, 0x72, 0x6e, - 0x69, 0x6e, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, - 0x61, 0x74, 0x65, 0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, - 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x12, 0x16, - 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, - 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, - 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x65, 0x74, 0x72, 0x79, 0x12, 0x3b, 0x0a, 0x0a, 0x75, 0x73, 0x65, 0x64, 0x4e, 0x6f, 0x74, + 0x69, 0x66, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, + 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, + 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0a, 0x75, 0x73, 0x65, 0x64, 0x4e, 0x6f, 0x74, 0x69, + 0x66, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, + 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x74, 0x72, + 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x72, 0x65, 0x74, 0x72, + 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, + 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x57, 0x6f, 0x72, + 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xb9, 0x01, 0x0a, 0x09, + 0x41, 0x6c, 0x69, 0x59, 0x75, 0x6e, 0x53, 0x6d, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x61, 0x63, 0x63, + 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, + 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x0f, 0x61, + 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, + 0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, + 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, + 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, + 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, + 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, + 0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, + 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, + 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, + 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, + 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1798,19 +1810,20 @@ var file_conf_conf_proto_depIdxs = []int32{ 19, // 18: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue 19, // 19: voucher.config.RdsMQ.retryNotify:type_name -> voucher.config.RdsMQ.Queue 19, // 20: voucher.config.RdsMQ.orderNotifyRetry:type_name -> voucher.config.RdsMQ.Queue - 20, // 21: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration - 20, // 22: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration - 20, // 23: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration - 20, // 24: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration - 20, // 25: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration - 4, // 26: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap - 17, // 27: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap - 20, // 28: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration - 29, // [29:29] is the sub-list for method output_type - 29, // [29:29] is the sub-list for method input_type - 29, // [29:29] is the sub-list for extension type_name - 29, // [29:29] is the sub-list for extension extendee - 0, // [0:29] is the sub-list for field type_name + 19, // 21: voucher.config.RdsMQ.usedNotify:type_name -> voucher.config.RdsMQ.Queue + 20, // 22: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration + 20, // 23: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration + 20, // 24: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration + 20, // 25: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration + 20, // 26: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration + 4, // 27: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap + 17, // 28: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap + 20, // 29: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration + 30, // [30:30] is the sub-list for method output_type + 30, // [30:30] is the sub-list for method input_type + 30, // [30:30] is the sub-list for extension type_name + 30, // [30:30] is the sub-list for extension extendee + 0, // [0:30] is the sub-list for field type_name } func init() { file_conf_conf_proto_init() } diff --git a/internal/conf/conf.proto b/internal/conf/conf.proto index b31dadd..5787df9 100644 --- a/internal/conf/conf.proto +++ b/internal/conf/conf.proto @@ -135,6 +135,7 @@ message RdsMQ { Queue wechatRetry = 3; Queue retryNotify = 4; Queue orderNotifyRetry = 5; + Queue usedNotify = 6; } message AliYunSms { diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 830974b..6a70b7a 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -74,7 +74,7 @@ func (p *OrderRepoImpl) SpecifyFindInBatches(ctx context.Context, req *bo.FindIn func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { tx := p.DB(ctx). - Where("`status` = ?", vo.OrderStatusSuccess.GetValue()). + Where("`status` in (?)", []uint8{vo.OrderStatusSuccess.GetValue(), vo.OrderStatusUse.GetValue(), vo.OrderStatusExpired.GetValue()}). Where("activity_id = ''") if req.ProductNo != "" { @@ -109,6 +109,42 @@ func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.We return nil } +func (p *OrderRepoImpl) FinUsedInBatches(ctx context.Context, req *do.WechatUsedQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { + + var results = make([]*model.Order, 0) + + tx := p.DB(ctx). + Where("`status` = ?", vo.OrderStatusUse.GetValue()). + Where("activity_id = ''") + + if req.StartTime != "" { + tx = tx.Where("last_use_time > ?", req.StartTime) + } + if req.EndTime != "" { + tx = tx.Where("last_use_time <= ?", req.EndTime) + } + if req.ProductNo != "" { + tx = tx.Where("product_no = ?", req.ProductNo) + } + if req.BatchNo != "" { + tx = tx.Where("batch_no = ?", req.ProductNo) + } + if req.OrderNo != "" { + tx = tx.Where("order_no = ?", req.OrderNo) + } + + // 显式清除排序,移除默认的 ORDER BY + result := tx.Order("receive_success_time asc").FindInBatches(&results, 500, func(tx *gorm.DB, batch int) error { + return fun(ctx, p.ToBos(results)) + }) + + if result.Error != nil { + return result.Error + } + + return nil +} + func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { var results = make([]*model.Order, 0) @@ -117,7 +153,7 @@ func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo s Where("batch_no = ?", batchNo). Where("`status` = ?", vo.OrderStatusFail.GetValue()). Order("receive_success_time asc"). // 显式清除排序,移除默认的 ORDER BY - FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { + FindInBatches(&results, 200, func(tx *gorm.DB, batch int) error { return fun(ctx, p.ToBos(results)) }) diff --git a/internal/server/http.go b/internal/server/http.go index 5ac8555..55e33d6 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -55,6 +55,8 @@ func NewHTTPServer( srv.Route("/voucher/").POST("warningBudget/{id}", cmb.WarningBudget) // 指定重复通知对应单子数据 srv.Route("/voucher/").POST("specifyNotification", cmb.SpecifyNotification) + // 订单使用通知下游 + srv.Route("/voucher/").POST("UsedNotifyPush", cmb.UsedNotifyPush) v1.RegisterCmbHTTPServer(srv, cmb) diff --git a/internal/server/rds_consume.go b/internal/server/rds_consume.go index 52e2805..c1c9636 100644 --- a/internal/server/rds_consume.go +++ b/internal/server/rds_consume.go @@ -46,6 +46,10 @@ func NewRdbConsumer( manager.Add(cf4) } + if cf5 := voucherService.GetUsedNotifyConfig(); cf5 != nil { + manager.Add(cf5) + } + return &RdbConsumer{hLog: hLog, conf: conf, manager: manager} } diff --git a/internal/service/script.go b/internal/service/script.go index e9e1678..62b3282 100644 --- a/internal/service/script.go +++ b/internal/service/script.go @@ -222,3 +222,31 @@ func (this *CmbService) SpecifyNotification(ctx http.Context) error { return ctx.String(http2.StatusOK, string(bodyBytes)) } + +func (this *CmbService) UsedNotifyPush(ctx http.Context) error { + + bodyBytes, err := io.ReadAll(ctx.Request().Body) + if err != nil { + return err + } + + var req *do.WechatUsedQuery + if err = json.Unmarshal(bodyBytes, &req); err != nil { + return err + } + + if req == nil { + return fmt.Errorf("req is empty") + } + + if req.StartTime == "" && req.EndTime == "" { + return fmt.Errorf("start time or end time is empty") + } + + err = this.VoucherBiz.UsedNotifyPush(ctx, req) + if err != nil { + return err + } + + return ctx.String(http2.StatusOK, string(bodyBytes)) +} diff --git a/internal/service/used_notify.go b/internal/service/used_notify.go new file mode 100644 index 0000000..860a707 --- /dev/null +++ b/internal/service/used_notify.go @@ -0,0 +1,45 @@ +package service + +import ( + "context" + "fmt" + "github.com/go-kratos/kratos/v2/log" + "voucher/internal/pkg/rdsmq" +) + +func (s *VoucherService) GetUsedNotifyConfig() *rdsmq.ConsumeConfig { + + queue := s.bc.RdsMQ.GetUsedNotify() + if queue == nil { + return nil + } + + if !queue.GetIsOpen() { + log.Warn(fmt.Sprintf("[%s]RdsMQ is not open", queue.Name)) + return nil + } + + return &rdsmq.ConsumeConfig{ + Rdb: s.rdb.Rdb, + QueueName: queue.Name, + NumWorkers: queue.NumWorkers, + WaitTime: queue.GetWaitTime().AsDuration(), + RetryNum: queue.RetryNum, + Fn: s.HandleUsedNotify, + Logger: s.logHelper, + } +} + +func (s *VoucherService) HandleUsedNotify(ctx context.Context, msg string) error { + + if msg == "" { + s.logHelper.Errorf("RdsMQ used notify error: msg is empty") + return nil + } + + if err := s.VoucherBiz.UsedNotify(ctx, msg); err != nil { + s.logHelper.Error(err) + } + + return nil +}