voucher
This commit is contained in:
parent
f348df3229
commit
b6a618b10c
|
|
@ -120,6 +120,12 @@ rdsMQ:
|
||||||
numWorkers: 1 #协程数量,不配置默认为10
|
numWorkers: 1 #协程数量,不配置默认为10
|
||||||
waitTime: 1s #处理完成后等待时间
|
waitTime: 1s #处理完成后等待时间
|
||||||
isOpen: false #是否启动消费 true/false
|
isOpen: false #是否启动消费 true/false
|
||||||
|
usedNotify:
|
||||||
|
name: "usedNotify"
|
||||||
|
retryNum: 1 #重试次数
|
||||||
|
numWorkers: 1 #协程数量,不配置默认为10
|
||||||
|
waitTime: 1s #处理完成后等待时间
|
||||||
|
isOpen: true #是否启动消费 true/false
|
||||||
|
|
||||||
aliYunSms:
|
aliYunSms:
|
||||||
accessKeyId:
|
accessKeyId:
|
||||||
|
|
|
||||||
|
|
@ -25,3 +25,11 @@ type RdsWechatQuery struct {
|
||||||
GoNum int `json:"go_num"` // 并发数
|
GoNum int `json:"go_num"` // 并发数
|
||||||
TimeSliceHours int64 `json:"time_slice_hours"` // 时间片"小时"
|
TimeSliceHours int64 `json:"time_slice_hours"` // 时间片"小时"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type WechatUsedQuery struct {
|
||||||
|
ProductNo string `json:"product_no"`
|
||||||
|
BatchNo string `json:"batch_no"`
|
||||||
|
OrderNo string `json:"order_no"`
|
||||||
|
StartTime string `json:"start_time"`
|
||||||
|
EndTime string `json:"end_time"`
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type OrderRepo interface {
|
type OrderRepo interface {
|
||||||
|
FinUsedInBatches(ctx context.Context, req *do.WechatUsedQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
||||||
SpecifyFindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
SpecifyFindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
||||||
FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
||||||
FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,11 @@ import (
|
||||||
|
|
||||||
func (v *Query) wechatQuery(ctx context.Context, order *bo.OrderBo, useNum *int) error {
|
func (v *Query) wechatQuery(ctx context.Context, order *bo.OrderBo, useNum *int) error {
|
||||||
|
|
||||||
|
if order.Status.IsExpired() {
|
||||||
|
_, err := v.cmb.Notify(ctx, order)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
status, err := v.wechatCpnRepo.Query(ctx, order)
|
status, err := v.wechatCpnRepo.Query(ctx, order)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,77 @@
|
||||||
|
package biz
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"github.com/go-kratos/kratos/v2/log"
|
||||||
|
"github.com/go-kratos/kratos/v2/transport/http"
|
||||||
|
"voucher/internal/biz/bo"
|
||||||
|
"voucher/internal/biz/do"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (this *VoucherBiz) UsedNotifyPush(ctx http.Context, req *do.WechatUsedQuery) error {
|
||||||
|
|
||||||
|
queue := this.bc.RdsMQ.GetUsedNotify()
|
||||||
|
if queue == nil {
|
||||||
|
return fmt.Errorf("队列不存在")
|
||||||
|
}
|
||||||
|
|
||||||
|
msg, err := json.Marshal(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
strMsg := string(msg)
|
||||||
|
|
||||||
|
_, err = this.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("添加到队列失败:%v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *VoucherBiz) UsedNotify(ctx context.Context, msg string) error {
|
||||||
|
|
||||||
|
log.Warnf("核销重试通知处理,开始:%s", msg)
|
||||||
|
|
||||||
|
var req *do.WechatUsedQuery
|
||||||
|
|
||||||
|
if err := json.Unmarshal([]byte(msg), &req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
errNum := 0
|
||||||
|
|
||||||
|
return this.OrderRepo.FinUsedInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
||||||
|
|
||||||
|
for _, order := range rows {
|
||||||
|
|
||||||
|
event, err := order.Status.GetOrderNotifyEvent()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
notify := &bo.OrderNotifyBo{
|
||||||
|
OrderNo: order.OrderNo,
|
||||||
|
NotifyUrl: order.NotifyUrl,
|
||||||
|
Channel: order.Channel,
|
||||||
|
Event: event,
|
||||||
|
Type: order.Type,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = this.request(ctx, order, notify); err != nil {
|
||||||
|
errNum++
|
||||||
|
if errNum > 50 {
|
||||||
|
return fmt.Errorf("核销重试通知处理,通知失败次数超过50次,请检查:%v", err)
|
||||||
|
}
|
||||||
|
log.Warnf("核销重试通知处理,通知失败:%v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
@ -869,6 +869,7 @@ type RdsMQ struct {
|
||||||
WechatRetry *RdsMQ_Queue `protobuf:"bytes,3,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"`
|
WechatRetry *RdsMQ_Queue `protobuf:"bytes,3,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"`
|
||||||
RetryNotify *RdsMQ_Queue `protobuf:"bytes,4,opt,name=retryNotify,proto3" json:"retryNotify,omitempty"`
|
RetryNotify *RdsMQ_Queue `protobuf:"bytes,4,opt,name=retryNotify,proto3" json:"retryNotify,omitempty"`
|
||||||
OrderNotifyRetry *RdsMQ_Queue `protobuf:"bytes,5,opt,name=orderNotifyRetry,proto3" json:"orderNotifyRetry,omitempty"`
|
OrderNotifyRetry *RdsMQ_Queue `protobuf:"bytes,5,opt,name=orderNotifyRetry,proto3" json:"orderNotifyRetry,omitempty"`
|
||||||
|
UsedNotify *RdsMQ_Queue `protobuf:"bytes,6,opt,name=usedNotify,proto3" json:"usedNotify,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *RdsMQ) Reset() {
|
func (x *RdsMQ) Reset() {
|
||||||
|
|
@ -938,6 +939,13 @@ func (x *RdsMQ) GetOrderNotifyRetry() *RdsMQ_Queue {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *RdsMQ) GetUsedNotify() *RdsMQ_Queue {
|
||||||
|
if x != nil {
|
||||||
|
return x.UsedNotify
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type AliYunSms struct {
|
type AliYunSms struct {
|
||||||
state protoimpl.MessageState
|
state protoimpl.MessageState
|
||||||
sizeCache protoimpl.SizeCache
|
sizeCache protoimpl.SizeCache
|
||||||
|
|
@ -1687,7 +1695,7 @@ var file_conf_conf_proto_rawDesc = []byte{
|
||||||
0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68,
|
0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68,
|
||||||
0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43,
|
0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43,
|
||||||
0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
|
0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65,
|
||||||
0x3a, 0x02, 0x38, 0x01, 0x22, 0x87, 0x04, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d,
|
0x3a, 0x02, 0x38, 0x01, 0x22, 0xc4, 0x04, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d,
|
||||||
0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20,
|
0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20,
|
||||||
0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f,
|
0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f,
|
||||||
0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65,
|
0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65,
|
||||||
|
|
@ -1709,35 +1717,39 @@ var file_conf_conf_proto_rawDesc = []byte{
|
||||||
0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72,
|
0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72,
|
||||||
0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75,
|
0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75,
|
||||||
0x65, 0x75, 0x65, 0x52, 0x10, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79,
|
0x65, 0x75, 0x65, 0x52, 0x10, 0x6f, 0x72, 0x64, 0x65, 0x72, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79,
|
||||||
0x52, 0x65, 0x74, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65, 0x12,
|
0x52, 0x65, 0x74, 0x72, 0x79, 0x12, 0x3b, 0x0a, 0x0a, 0x75, 0x73, 0x65, 0x64, 0x4e, 0x6f, 0x74,
|
||||||
0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e,
|
0x69, 0x66, 0x79, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63,
|
||||||
0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x02, 0x20,
|
0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51,
|
||||||
0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x72,
|
0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0a, 0x75, 0x73, 0x65, 0x64, 0x4e, 0x6f, 0x74, 0x69,
|
||||||
0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x72,
|
0x66, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04,
|
||||||
0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x57, 0x6f,
|
0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65,
|
||||||
0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, 0x75, 0x6d,
|
0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08,
|
||||||
0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54,
|
0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x74, 0x72,
|
||||||
0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
|
0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x72, 0x65, 0x74, 0x72,
|
||||||
0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61,
|
0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65,
|
||||||
0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xb9,
|
0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x57, 0x6f, 0x72,
|
||||||
0x01, 0x0a, 0x09, 0x41, 0x6c, 0x69, 0x59, 0x75, 0x6e, 0x53, 0x6d, 0x73, 0x12, 0x20, 0x0a, 0x0b,
|
0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65,
|
||||||
0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
|
0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
|
||||||
0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x12, 0x28,
|
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f,
|
||||||
0x0a, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x65, 0x63, 0x72, 0x65,
|
0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x22, 0xb9, 0x01, 0x0a, 0x09,
|
||||||
0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b,
|
0x41, 0x6c, 0x69, 0x59, 0x75, 0x6e, 0x53, 0x6d, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x61, 0x63, 0x63,
|
||||||
0x65, 0x79, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70,
|
0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b,
|
||||||
0x6f, 0x69, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70,
|
0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x0f, 0x61,
|
||||||
0x6f, 0x69, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e, 0x61, 0x6d, 0x65,
|
0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53, 0x65, 0x63, 0x72, 0x65, 0x74, 0x18, 0x02,
|
||||||
0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e, 0x61, 0x6d, 0x65,
|
0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x4b, 0x65, 0x79, 0x53,
|
||||||
0x12, 0x28, 0x0a, 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x57, 0x61, 0x72, 0x6e,
|
0x65, 0x63, 0x72, 0x65, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e,
|
||||||
0x69, 0x6e, 0x67, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c,
|
0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e,
|
||||||
0x61, 0x74, 0x65, 0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f,
|
0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x04, 0x20,
|
||||||
0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x18, 0x01,
|
0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x69, 0x67, 0x6e, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a,
|
||||||
0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x12, 0x16,
|
0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65, 0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67,
|
||||||
0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06,
|
0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x74, 0x65, 0x6d, 0x70, 0x6c, 0x61, 0x74, 0x65,
|
||||||
0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65,
|
0x57, 0x61, 0x72, 0x6e, 0x69, 0x6e, 0x67, 0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12,
|
||||||
0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62,
|
0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28,
|
||||||
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61,
|
||||||
|
0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63,
|
||||||
|
0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63,
|
||||||
|
0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72,
|
||||||
|
0x6f, 0x74, 0x6f, 0x33,
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -1798,19 +1810,20 @@ var file_conf_conf_proto_depIdxs = []int32{
|
||||||
19, // 18: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue
|
19, // 18: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue
|
||||||
19, // 19: voucher.config.RdsMQ.retryNotify:type_name -> voucher.config.RdsMQ.Queue
|
19, // 19: voucher.config.RdsMQ.retryNotify:type_name -> voucher.config.RdsMQ.Queue
|
||||||
19, // 20: voucher.config.RdsMQ.orderNotifyRetry:type_name -> voucher.config.RdsMQ.Queue
|
19, // 20: voucher.config.RdsMQ.orderNotifyRetry:type_name -> voucher.config.RdsMQ.Queue
|
||||||
20, // 21: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration
|
19, // 21: voucher.config.RdsMQ.usedNotify:type_name -> voucher.config.RdsMQ.Queue
|
||||||
20, // 22: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration
|
20, // 22: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration
|
||||||
20, // 23: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration
|
20, // 23: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration
|
||||||
20, // 24: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration
|
20, // 24: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration
|
||||||
20, // 25: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration
|
20, // 25: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration
|
||||||
4, // 26: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap
|
20, // 26: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration
|
||||||
17, // 27: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap
|
4, // 27: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap
|
||||||
20, // 28: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration
|
17, // 28: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap
|
||||||
29, // [29:29] is the sub-list for method output_type
|
20, // 29: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration
|
||||||
29, // [29:29] is the sub-list for method input_type
|
30, // [30:30] is the sub-list for method output_type
|
||||||
29, // [29:29] is the sub-list for extension type_name
|
30, // [30:30] is the sub-list for method input_type
|
||||||
29, // [29:29] is the sub-list for extension extendee
|
30, // [30:30] is the sub-list for extension type_name
|
||||||
0, // [0:29] is the sub-list for field type_name
|
30, // [30:30] is the sub-list for extension extendee
|
||||||
|
0, // [0:30] is the sub-list for field type_name
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() { file_conf_conf_proto_init() }
|
func init() { file_conf_conf_proto_init() }
|
||||||
|
|
|
||||||
|
|
@ -135,6 +135,7 @@ message RdsMQ {
|
||||||
Queue wechatRetry = 3;
|
Queue wechatRetry = 3;
|
||||||
Queue retryNotify = 4;
|
Queue retryNotify = 4;
|
||||||
Queue orderNotifyRetry = 5;
|
Queue orderNotifyRetry = 5;
|
||||||
|
Queue usedNotify = 6;
|
||||||
}
|
}
|
||||||
|
|
||||||
message AliYunSms {
|
message AliYunSms {
|
||||||
|
|
|
||||||
|
|
@ -74,7 +74,7 @@ func (p *OrderRepoImpl) SpecifyFindInBatches(ctx context.Context, req *bo.FindIn
|
||||||
func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
|
func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
|
||||||
|
|
||||||
tx := p.DB(ctx).
|
tx := p.DB(ctx).
|
||||||
Where("`status` = ?", vo.OrderStatusSuccess.GetValue()).
|
Where("`status` in (?)", []uint8{vo.OrderStatusSuccess.GetValue(), vo.OrderStatusUse.GetValue(), vo.OrderStatusExpired.GetValue()}).
|
||||||
Where("activity_id = ''")
|
Where("activity_id = ''")
|
||||||
|
|
||||||
if req.ProductNo != "" {
|
if req.ProductNo != "" {
|
||||||
|
|
@ -109,6 +109,42 @@ func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.We
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *OrderRepoImpl) FinUsedInBatches(ctx context.Context, req *do.WechatUsedQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
|
||||||
|
|
||||||
|
var results = make([]*model.Order, 0)
|
||||||
|
|
||||||
|
tx := p.DB(ctx).
|
||||||
|
Where("`status` = ?", vo.OrderStatusUse.GetValue()).
|
||||||
|
Where("activity_id = ''")
|
||||||
|
|
||||||
|
if req.StartTime != "" {
|
||||||
|
tx = tx.Where("last_use_time > ?", req.StartTime)
|
||||||
|
}
|
||||||
|
if req.EndTime != "" {
|
||||||
|
tx = tx.Where("last_use_time <= ?", req.EndTime)
|
||||||
|
}
|
||||||
|
if req.ProductNo != "" {
|
||||||
|
tx = tx.Where("product_no = ?", req.ProductNo)
|
||||||
|
}
|
||||||
|
if req.BatchNo != "" {
|
||||||
|
tx = tx.Where("batch_no = ?", req.ProductNo)
|
||||||
|
}
|
||||||
|
if req.OrderNo != "" {
|
||||||
|
tx = tx.Where("order_no = ?", req.OrderNo)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 显式清除排序,移除默认的 ORDER BY
|
||||||
|
result := tx.Order("receive_success_time asc").FindInBatches(&results, 500, func(tx *gorm.DB, batch int) error {
|
||||||
|
return fun(ctx, p.ToBos(results))
|
||||||
|
})
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
return result.Error
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
|
func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
|
||||||
|
|
||||||
var results = make([]*model.Order, 0)
|
var results = make([]*model.Order, 0)
|
||||||
|
|
@ -117,7 +153,7 @@ func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo s
|
||||||
Where("batch_no = ?", batchNo).
|
Where("batch_no = ?", batchNo).
|
||||||
Where("`status` = ?", vo.OrderStatusFail.GetValue()).
|
Where("`status` = ?", vo.OrderStatusFail.GetValue()).
|
||||||
Order("receive_success_time asc"). // 显式清除排序,移除默认的 ORDER BY
|
Order("receive_success_time asc"). // 显式清除排序,移除默认的 ORDER BY
|
||||||
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
|
FindInBatches(&results, 200, func(tx *gorm.DB, batch int) error {
|
||||||
return fun(ctx, p.ToBos(results))
|
return fun(ctx, p.ToBos(results))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -55,6 +55,8 @@ func NewHTTPServer(
|
||||||
srv.Route("/voucher/").POST("warningBudget/{id}", cmb.WarningBudget)
|
srv.Route("/voucher/").POST("warningBudget/{id}", cmb.WarningBudget)
|
||||||
// 指定重复通知对应单子数据
|
// 指定重复通知对应单子数据
|
||||||
srv.Route("/voucher/").POST("specifyNotification", cmb.SpecifyNotification)
|
srv.Route("/voucher/").POST("specifyNotification", cmb.SpecifyNotification)
|
||||||
|
// 订单使用通知下游
|
||||||
|
srv.Route("/voucher/").POST("UsedNotifyPush", cmb.UsedNotifyPush)
|
||||||
|
|
||||||
v1.RegisterCmbHTTPServer(srv, cmb)
|
v1.RegisterCmbHTTPServer(srv, cmb)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -46,6 +46,10 @@ func NewRdbConsumer(
|
||||||
manager.Add(cf4)
|
manager.Add(cf4)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cf5 := voucherService.GetUsedNotifyConfig(); cf5 != nil {
|
||||||
|
manager.Add(cf5)
|
||||||
|
}
|
||||||
|
|
||||||
return &RdbConsumer{hLog: hLog, conf: conf, manager: manager}
|
return &RdbConsumer{hLog: hLog, conf: conf, manager: manager}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -222,3 +222,31 @@ func (this *CmbService) SpecifyNotification(ctx http.Context) error {
|
||||||
|
|
||||||
return ctx.String(http2.StatusOK, string(bodyBytes))
|
return ctx.String(http2.StatusOK, string(bodyBytes))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *CmbService) UsedNotifyPush(ctx http.Context) error {
|
||||||
|
|
||||||
|
bodyBytes, err := io.ReadAll(ctx.Request().Body)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var req *do.WechatUsedQuery
|
||||||
|
if err = json.Unmarshal(bodyBytes, &req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if req == nil {
|
||||||
|
return fmt.Errorf("req is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.StartTime == "" && req.EndTime == "" {
|
||||||
|
return fmt.Errorf("start time or end time is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = this.VoucherBiz.UsedNotifyPush(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.String(http2.StatusOK, string(bodyBytes))
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,45 @@
|
||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/go-kratos/kratos/v2/log"
|
||||||
|
"voucher/internal/pkg/rdsmq"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (s *VoucherService) GetUsedNotifyConfig() *rdsmq.ConsumeConfig {
|
||||||
|
|
||||||
|
queue := s.bc.RdsMQ.GetUsedNotify()
|
||||||
|
if queue == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !queue.GetIsOpen() {
|
||||||
|
log.Warn(fmt.Sprintf("[%s]RdsMQ is not open", queue.Name))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &rdsmq.ConsumeConfig{
|
||||||
|
Rdb: s.rdb.Rdb,
|
||||||
|
QueueName: queue.Name,
|
||||||
|
NumWorkers: queue.NumWorkers,
|
||||||
|
WaitTime: queue.GetWaitTime().AsDuration(),
|
||||||
|
RetryNum: queue.RetryNum,
|
||||||
|
Fn: s.HandleUsedNotify,
|
||||||
|
Logger: s.logHelper,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *VoucherService) HandleUsedNotify(ctx context.Context, msg string) error {
|
||||||
|
|
||||||
|
if msg == "" {
|
||||||
|
s.logHelper.Errorf("RdsMQ used notify error: msg is empty")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.VoucherBiz.UsedNotify(ctx, msg); err != nil {
|
||||||
|
s.logHelper.Error(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue