This commit is contained in:
ziming 2025-08-01 11:37:42 +08:00
parent 913de4afe0
commit 3c5de0b144
11 changed files with 261 additions and 46 deletions

View File

@ -101,13 +101,19 @@ rdsMQ:
retryNum: 1 #重试次数 retryNum: 1 #重试次数
numWorkers: 3 #协程数量不配置默认为10 numWorkers: 3 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间 waitTime: 1s #处理完成后等待时间
isOpen: true #是否启动消费 true/false isOpen: false #是否启动消费 true/false
orderRetry: orderRetry:
name: "orderRetry" name: "orderRetry"
retryNum: 1 #重试次数 retryNum: 1 #重试次数
numWorkers: 2 #协程数量不配置默认为10 numWorkers: 2 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间 waitTime: 1s #处理完成后等待时间
isOpen: false #是否启动消费 true/false isOpen: false #是否启动消费 true/false
retryNotify:
name: "retryNotify"
retryNum: 1 #重试次数
numWorkers: 1 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间
isOpen: false #是否启动消费 true/false
aliYunSms: aliYunSms:
accessKeyId: accessKeyId:

View File

@ -8,6 +8,7 @@ import (
) )
type OrderRepo interface { type OrderRepo interface {
SpecifyFindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error

View File

@ -6,5 +6,5 @@ import (
) )
type OrderBakRepo interface { type OrderBakRepo interface {
FindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error SpecifyFindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
} }

View File

@ -0,0 +1,122 @@
package biz
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"time"
"voucher/internal/biz/bo"
)
func (this *VoucherBiz) PushRetryNotify(ctx http.Context, req *bo.FindInBatchesBo) error {
queue := this.bc.RdsMQ.GetRetryNotify()
if queue == nil {
return fmt.Errorf("队列不存在")
}
msg, err := json.Marshal(req)
if err != nil {
return err
}
strMsg := string(msg)
_, err = this.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
if err != nil {
return fmt.Errorf("添加到队列失败:%v", err)
}
return nil
}
func (this *VoucherBiz) RetryNotify(ctx context.Context, msg string) error {
var req *bo.FindInBatchesBo
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err
}
if err := this.RetryNotifyOrder(ctx, req); err != nil {
return err
}
return this.RetryNotifyOrderBack(ctx, req)
}
func (this *VoucherBiz) RetryNotifyOrder(ctx context.Context, req *bo.FindInBatchesBo) error {
start := time.Now()
startStr := time.Now().String()
log.Warnf("重试通知处理:%s", startStr)
fmt.Printf("重试通知处理:%s", startStr)
n := 0
eNum := 0
notifyNum := 0
err := this.OrderRepo.SpecifyFindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
n += 1
for _, order := range rows {
eNum += 1
if _, err := this.Cmb.Notify(ctx, order); err != nil {
log.Errorf("重试通知处理,发生错误,orderNo:%s,outBizNo:%s,voucherNo:%s,err:%v", order.OrderNo, order.OutBizNo, order.VoucherNo, err)
} else {
notifyNum += 1
}
}
log.Warnf("重试通知处理,第:%d组,已执行条数:%d,通知成功条数:%d,", n, eNum, notifyNum)
return nil
})
endTime := time.Now()
log.Warnf("重试通知处理,耗时:%s,结束时间:%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum)
fmt.Printf("重试通知处理,耗时:%s,结束时间%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum)
return err
}
func (this *VoucherBiz) RetryNotifyOrderBack(ctx context.Context, req *bo.FindInBatchesBo) error {
start := time.Now()
startStr := time.Now().String()
log.Warnf("重试通知处理Bak:%s", startStr)
fmt.Printf("重试通知处理Bak:%s", startStr)
n := 0
eNum := 0
notifyNum := 0
err := this.OrderBakRepo.SpecifyFindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
n += 1
for _, order := range rows {
eNum += 1
if _, err := this.Cmb.Notify(ctx, order); err != nil {
log.Errorf("重试通知处理Bak,发生错误,orderNo:%s,outBizNo:%s,voucherNo:%s,err:%v", order.OrderNo, order.OutBizNo, order.VoucherNo, err)
} else {
notifyNum += 1
}
}
log.Warnf("重试通知处理Bak,第:%d组,已执行条数:%d,通知成功条数:%d,", n, eNum, notifyNum)
return nil
})
endTime := time.Now()
log.Warnf("重试通知处理Bak,耗时:%s,结束时间:%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum)
fmt.Printf("重试通知处理Bak,耗时:%s,结束时间%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum)
return err
}

View File

@ -16,6 +16,7 @@ type VoucherBiz struct {
Cmb *cmb.Cmb Cmb *cmb.Cmb
ProductRepo repo.ProductRepo ProductRepo repo.ProductRepo
OrderRepo repo.OrderRepo OrderRepo repo.OrderRepo
OrderBakRepo repo.OrderBakRepo
OrderNotifyRepo repo.OrderNotifyRepo OrderNotifyRepo repo.OrderNotifyRepo
WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo
MqSendMixRepo mixrepos.MQSendMixRepo MqSendMixRepo mixrepos.MQSendMixRepo
@ -35,6 +36,7 @@ func NewVoucherBiz(
Cmb *cmb.Cmb, Cmb *cmb.Cmb,
ProductRepo repo.ProductRepo, ProductRepo repo.ProductRepo,
OrderRepo repo.OrderRepo, OrderRepo repo.OrderRepo,
OrderBakRepo repo.OrderBakRepo,
OrderNotifyRepo repo.OrderNotifyRepo, OrderNotifyRepo repo.OrderNotifyRepo,
WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo, WechatNotifyRegisterTagRepo repo.WechatNotifyRegisterTagRepo,
MqSendMixRepo mixrepos.MQSendMixRepo, MqSendMixRepo mixrepos.MQSendMixRepo,
@ -50,6 +52,7 @@ func NewVoucherBiz(
Cmb: Cmb, Cmb: Cmb,
ProductRepo: ProductRepo, ProductRepo: ProductRepo,
OrderRepo: OrderRepo, OrderRepo: OrderRepo,
OrderBakRepo: OrderBakRepo,
OrderNotifyRepo: OrderNotifyRepo, OrderNotifyRepo: OrderNotifyRepo,
WechatNotifyRegisterTagRepo: WechatNotifyRegisterTagRepo, WechatNotifyRegisterTagRepo: WechatNotifyRegisterTagRepo,
MqSendMixRepo: MqSendMixRepo, MqSendMixRepo: MqSendMixRepo,

View File

@ -859,6 +859,7 @@ type RdsMQ struct {
WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"` WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"`
WechatTimeSliceQuery *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatTimeSliceQuery,proto3" json:"wechatTimeSliceQuery,omitempty"` WechatTimeSliceQuery *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatTimeSliceQuery,proto3" json:"wechatTimeSliceQuery,omitempty"`
WechatRetry *RdsMQ_Queue `protobuf:"bytes,3,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"` WechatRetry *RdsMQ_Queue `protobuf:"bytes,3,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"`
RetryNotify *RdsMQ_Queue `protobuf:"bytes,4,opt,name=retryNotify,proto3" json:"retryNotify,omitempty"`
} }
func (x *RdsMQ) Reset() { func (x *RdsMQ) Reset() {
@ -914,6 +915,13 @@ func (x *RdsMQ) GetWechatRetry() *RdsMQ_Queue {
return nil return nil
} }
func (x *RdsMQ) GetRetryNotify() *RdsMQ_Queue {
if x != nil {
return x.RetryNotify
}
return nil
}
type AliYunSms struct { type AliYunSms struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@ -1662,7 +1670,7 @@ var file_conf_conf_proto_rawDesc = []byte{
0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61,
0x6e, 0x64, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
0x22, 0xff, 0x02, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x22, 0xbe, 0x03, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65,
0x63, 0x68, 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x63, 0x68, 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67,
0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65,
@ -1675,35 +1683,39 @@ var file_conf_conf_proto_rawDesc = []byte{
0x63, 0x68, 0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x63, 0x68, 0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67,
0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65,
0x63, 0x68, 0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x63, 0x68, 0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x12, 0x3d, 0x0a, 0x0b, 0x72, 0x65, 0x74,
0x65, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x72, 0x79, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b,
0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e,
0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x72, 0x65, 0x74,
0x1a, 0x0a, 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x72, 0x79, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65,
0x0d, 0x52, 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e,
0x0a, 0x6e, 0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a,
0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x0a, 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d,
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x52, 0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75,
0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a,
0x6d, 0x65, 0x22, 0xb9, 0x01, 0x0a, 0x09, 0x41, 0x6c, 0x69, 0x59, 0x75, 0x6e, 0x53, 0x6d, 0x73, 0x6e, 0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61,
0x12, 0x20, 0x0a, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x18, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44,
0x49, 0x64, 0x12, 0x28, 0x0a, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d,
0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x22, 0xb9, 0x01, 0x0a, 0x09, 0x41, 0x6c, 0x69, 0x59, 0x75, 0x6e, 0x53, 0x6d, 0x73, 0x12,
0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x20, 0x0a, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x18, 0x01,
0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49,
0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x64, 0x12, 0x28, 0x0a, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x65,
0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x63, 0x72, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x63, 0x63, 0x65,
0x4e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x65,
0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65,
0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x3a, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e,
0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e,
0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x57,
0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x65,
0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, 0x6f, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x3a, 0x0a,
0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, 0x63, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73,
0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73,
0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28,
0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, 0x6f, 0x75,
0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, 0x63, 0x6f,
0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@ -1762,19 +1774,20 @@ var file_conf_conf_proto_depIdxs = []int32{
19, // 16: voucher.config.RdsMQ.wechatQuery:type_name -> voucher.config.RdsMQ.Queue 19, // 16: voucher.config.RdsMQ.wechatQuery:type_name -> voucher.config.RdsMQ.Queue
19, // 17: voucher.config.RdsMQ.wechatTimeSliceQuery:type_name -> voucher.config.RdsMQ.Queue 19, // 17: voucher.config.RdsMQ.wechatTimeSliceQuery:type_name -> voucher.config.RdsMQ.Queue
19, // 18: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue 19, // 18: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue
20, // 19: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration 19, // 19: voucher.config.RdsMQ.retryNotify:type_name -> voucher.config.RdsMQ.Queue
20, // 20: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration 20, // 20: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration
20, // 21: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration 20, // 21: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration
20, // 22: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration 20, // 22: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration
20, // 23: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration 20, // 23: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration
4, // 24: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap 20, // 24: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration
17, // 25: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap 4, // 25: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap
20, // 26: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration 17, // 26: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap
27, // [27:27] is the sub-list for method output_type 20, // 27: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration
27, // [27:27] is the sub-list for method input_type 28, // [28:28] is the sub-list for method output_type
27, // [27:27] is the sub-list for extension type_name 28, // [28:28] is the sub-list for method input_type
27, // [27:27] is the sub-list for extension extendee 28, // [28:28] is the sub-list for extension type_name
0, // [0:27] is the sub-list for field type_name 28, // [28:28] is the sub-list for extension extendee
0, // [0:28] is the sub-list for field type_name
} }
func init() { file_conf_conf_proto_init() } func init() { file_conf_conf_proto_init() }

View File

@ -132,6 +132,7 @@ message RdsMQ {
Queue wechatQuery = 1; Queue wechatQuery = 1;
Queue wechatTimeSliceQuery = 2; Queue wechatTimeSliceQuery = 2;
Queue wechatRetry = 3; Queue wechatRetry = 3;
Queue retryNotify = 4;
} }
message AliYunSms { message AliYunSms {

View File

@ -31,6 +31,44 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB {
return p.db.DB(ctx).Model(model.Order{}) return p.db.DB(ctx).Model(model.Order{})
} }
func (p *OrderRepoImpl) SpecifyFindInBatches(ctx context.Context, req *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue())
if req.ProductNo != "" {
tx = tx.Where("product_no = ?", req.ProductNo)
}
if req.StartTime != nil && req.EndTime != nil {
tx = tx.Where("receive_success_time BETWEEN ? AND ?", req.StartTime, req.EndTime)
}
if req.OrderNo != nil {
tx = tx.Where("order_no IN (?)", req.OrderNo)
}
if req.OutBizNo != nil {
tx = tx.Where("out_biz_no IN (?)", req.OutBizNo)
}
if req.VoucherNo != nil {
tx = tx.Where("voucher_no IN (?)", req.VoucherNo)
}
var results = make([]*model.Order, 0)
result := tx.FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
return fun(ctx, p.ToBos(results))
})
if result.Error != nil {
return result.Error
}
return nil
}
func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue()) tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue())

View File

@ -25,7 +25,7 @@ func (p *OrderBakRepoImpl) DB(ctx context.Context) *gorm.DB {
return p.db.DB(ctx).Model(model.OrderBak{}) return p.db.DB(ctx).Model(model.OrderBak{})
} }
func (p *OrderBakRepoImpl) FindInBatches(ctx context.Context, req *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { func (p *OrderBakRepoImpl) SpecifyFindInBatches(ctx context.Context, req *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue()) tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue())

View File

@ -44,6 +44,8 @@ func NewHTTPServer(
srv.Route("/voucher/").POST("timeSliceQueryPush", cmb.TimeSliceQueryPush) srv.Route("/voucher/").POST("timeSliceQueryPush", cmb.TimeSliceQueryPush)
srv.Route("/voucher/").POST("pushWechatRetry/{batch_no}", cmb.PushWechatRetry) srv.Route("/voucher/").POST("pushWechatRetry/{batch_no}", cmb.PushWechatRetry)
srv.Route("/voucher/").POST("warningBudget/{id}", cmb.WarningBudget) srv.Route("/voucher/").POST("warningBudget/{id}", cmb.WarningBudget)
// 指定重复通知对应单子数据
srv.Route("/voucher/").POST("specifyNotification", cmb.SpecifyNotification)
v1.RegisterCmbHTTPServer(srv, cmb) v1.RegisterCmbHTTPServer(srv, cmb)

View File

@ -7,6 +7,7 @@ import (
"io" "io"
http2 "net/http" http2 "net/http"
"strconv" "strconv"
"voucher/internal/biz/bo"
"voucher/internal/biz/do" "voucher/internal/biz/do"
) )
@ -154,3 +155,31 @@ func (this *CmbService) WarningBudget(ctx http.Context) error {
"data": id, "data": id,
}) })
} }
func (this *CmbService) SpecifyNotification(ctx http.Context) error {
bodyBytes, err := io.ReadAll(ctx.Request().Body)
if err != nil {
return err
}
var req *bo.FindInBatchesBo
if err = json.Unmarshal(bodyBytes, &req); err != nil {
return err
}
if req == nil {
return fmt.Errorf("req is empty")
}
if req.OutBizNo == nil && req.OrderNo == nil && req.VoucherNo == nil {
return fmt.Errorf("out_biz_no or order_no or voucher_no is empty")
}
err = this.VoucherBiz.PushRetryNotify(ctx, req)
if err != nil {
return err
}
return ctx.String(http2.StatusOK, string(bodyBytes))
}