timeSliceQueryPush
This commit is contained in:
parent
153dd55b4d
commit
3aa1ad9520
55
READEME.md
55
READEME.md
|
|
@ -1,59 +1,8 @@
|
|||
# <p align="center">营销系统后台API</p>
|
||||
# <p align="center">招行立减金券系统</p>
|
||||
|
||||
### 参与开发
|
||||
[请参阅](https://tvd8jq9lqkp.feishu.cn/wiki/LNWVweZ64iY2UBkkTkZcezy0n5h?from=from_copylink)
|
||||
* * *
|
||||
### 主要工作
|
||||
+ 后台接口API
|
||||
+ 发券API
|
||||
* * *
|
||||
### 规则说明
|
||||
+ 路由前缀都为 __/admin__ 开始,路由规则全小写+下划线,例如:/admin/v1/demo_1
|
||||
* * *
|
||||
### 构建部署
|
||||
+ 采用多阶段构建,以获得最小体积的容器镜像
|
||||
````bash
|
||||
cd /项目根目录 && make deploy folder=./configs_dev marketing=marketing_backend container_name=marketing_backend http_port=8090
|
||||
````
|
||||
* * *
|
||||
### docker环境下开发
|
||||
+ 一、[下载Docker Desktop安装程序](https://www.docker.com/products/docker-desktop)
|
||||
+ 二、在项目根目录下执行命令
|
||||
```shell
|
||||
docker build -f Dockerfile_win -t 镜像名称 .
|
||||
docker run --privileged -itd --name 容器名称 --restart=always -v ./:/src 镜像名称
|
||||
docker ps
|
||||
docker exec -it 容器名称 sh
|
||||
make init
|
||||
make all
|
||||
```
|
||||
|
||||
### windows非docker开发
|
||||
1 安装插件(配置goproxy,GOPROXY=https://goproxy.cn,direct)
|
||||
```shell
|
||||
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
|
||||
go install github.com/go-kratos/kratos/cmd/kratos/v2@latest
|
||||
go install github.com/go-kratos/kratos/cmd/protoc-gen-go-http/v2@latest
|
||||
go install github.com/google/gnostic/cmd/protoc-gen-openapi@latest
|
||||
go install github.com/google/wire/cmd/wire@latest
|
||||
go install github.com/go-kratos/kratos/cmd/protoc-gen-go-errors/v2@latest
|
||||
go install gorm.io/gen/tools/gentool@latest
|
||||
```
|
||||
2生成相应rpo
|
||||
|
||||
命令:kratos proto client api/helloworld/v1/demo.proto
|
||||
|
||||
位置:api和internal下面的conf
|
||||
|
||||
3 wire生成依赖
|
||||
cd cmd/server
|
||||
wire
|
||||
|
||||
4 配置编译
|
||||
|
||||

|
||||
|
||||
5 生成service
|
||||
kratos proto server api/helloworld/v1/demo.proto -t internal/service
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/robfig/cron"
|
||||
"voucher/internal/biz"
|
||||
"voucher/internal/biz/cmb"
|
||||
"voucher/internal/biz/timeslicequery"
|
||||
"voucher/internal/conf"
|
||||
"voucher/internal/data"
|
||||
"voucher/internal/data/mixrepoimpl"
|
||||
|
|
@ -33,6 +34,7 @@ func wireApp(*conf.Bootstrap, log.Logger, *log2.AccessLogger) (*kratos.App, func
|
|||
repoimpl.ProviderRepoImplSet,
|
||||
wechatrepoimpl.ProviderWechatReposImplSet,
|
||||
mixrepoimpl.ProviderMixRepoImplSet,
|
||||
timeslicequery.ProviderSetTimeSliceQuery,
|
||||
log2.NewLogHelper,
|
||||
cron.New,
|
||||
newApp,
|
||||
|
|
|
|||
|
|
@ -85,18 +85,24 @@ cron:
|
|||
command: "0 0 1 * * ?" # 每天凌晨1点执行一次
|
||||
|
||||
rdsMQ:
|
||||
wechatQuery: #发放结算
|
||||
wechatQuery:
|
||||
name: "wechatQuery"
|
||||
retryNum: 1 #重试次数
|
||||
numWorkers: 2 #协程数量,不配置默认为10
|
||||
waitTime: 1s #处理完成后等待时间
|
||||
isOpen: false #是否启动消费 true/false
|
||||
wechatTimeSliceQuery:
|
||||
name: "wechatTimeSliceQuery"
|
||||
retryNum: 1 #重试次数
|
||||
numWorkers: 3 #协程数量,不配置默认为10
|
||||
waitTime: 1s #处理完成后等待时间
|
||||
isOpen: true #是否启动消费 true/false
|
||||
orderRetry: #发放结算
|
||||
orderRetry:
|
||||
name: "orderRetry"
|
||||
retryNum: 1 #重试次数
|
||||
numWorkers: 2 #协程数量,不配置默认为10
|
||||
waitTime: 1s #处理完成后等待时间
|
||||
isOpen: true #是否启动消费 true/false
|
||||
isOpen: false #是否启动消费 true/false
|
||||
|
||||
#配置日志
|
||||
logs:
|
||||
|
|
|
|||
|
|
@ -91,7 +91,12 @@ rdsMQ:
|
|||
numWorkers: 1 #协程数量,不配置默认为10
|
||||
waitTime: 1s #处理完成后等待时间
|
||||
isOpen: true #是否启动消费 true/false
|
||||
|
||||
wechatTimeSliceQuery:
|
||||
name: "wechatTimeSliceQuery"
|
||||
retryNum: 1 #重试次数
|
||||
numWorkers: 3 #协程数量,不配置默认为10
|
||||
waitTime: 1s #处理完成后等待时间
|
||||
isOpen: true #是否启动消费 true/false
|
||||
#配置日志
|
||||
logs:
|
||||
business: business.log #业务日志路径:如果不写日志,则不配置或配置为空
|
||||
|
|
|
|||
|
|
@ -0,0 +1,14 @@
|
|||
# <p align="center">券状态同步</p>
|
||||
* * *
|
||||
### 主要工作
|
||||
+ 券状态查询同步
|
||||
* * *
|
||||
### 规则说明
|
||||
+ 按照时间分片处理,按照2小时为一个时间片启用一个协程消费处理
|
||||
+ 协程最大可同时运行指定数量,设置为2
|
||||
* * *
|
||||
### 使用方式
|
||||
+ 消费处理,按照时间范围上报消费
|
||||
+ 每次请求按照时间片分别启用2个协程消费处理,也就是每个请求可以同时启用2个协程消费处理
|
||||
+ 请不要无休止的访问,请按照时间片进行访问,并且不要重复的时间片访问,增加系统负载,特殊发券日期量较大,建议缩短时间范围上报,多切分上报处理
|
||||
* * *
|
||||
|
|
@ -0,0 +1,84 @@
|
|||
package timeslicequery
|
||||
|
||||
import (
|
||||
"github.com/nacos-group/nacos-sdk-go/util"
|
||||
"sync"
|
||||
"voucher/internal/biz/cmb"
|
||||
"voucher/internal/biz/mixrepos"
|
||||
"voucher/internal/biz/repo"
|
||||
"voucher/internal/biz/wechatrepo"
|
||||
"voucher/internal/conf"
|
||||
"voucher/internal/data"
|
||||
)
|
||||
|
||||
type Query struct {
|
||||
mu sync.RWMutex
|
||||
queryMap map[string]bool
|
||||
|
||||
bc *conf.Bootstrap
|
||||
rdb *data.Rdb
|
||||
cmb *cmb.Cmb
|
||||
|
||||
productRepo repo.ProductRepo
|
||||
orderRepo repo.OrderRepo
|
||||
|
||||
wechatCpnRepo wechatrepo.WechatCpnRepo
|
||||
mqSendMixRepo mixrepos.MQSendMixRepo
|
||||
}
|
||||
|
||||
func NewQuery(
|
||||
bc *conf.Bootstrap,
|
||||
rdb *data.Rdb,
|
||||
cmb *cmb.Cmb,
|
||||
productRepo repo.ProductRepo,
|
||||
orderRepo repo.OrderRepo,
|
||||
wechatCpnRepo wechatrepo.WechatCpnRepo,
|
||||
mqSendMixRepo mixrepos.MQSendMixRepo) *Query {
|
||||
return &Query{
|
||||
queryMap: make(map[string]bool),
|
||||
bc: bc,
|
||||
rdb: rdb,
|
||||
cmb: cmb,
|
||||
productRepo: productRepo,
|
||||
orderRepo: orderRepo,
|
||||
wechatCpnRepo: wechatCpnRepo,
|
||||
mqSendMixRepo: mqSendMixRepo}
|
||||
}
|
||||
|
||||
func (v *Query) uid(no string) string {
|
||||
return util.Md5("query" + no)
|
||||
}
|
||||
|
||||
func (this *Query) GetAll() map[string]bool {
|
||||
return this.queryMap
|
||||
}
|
||||
|
||||
func (this *Query) Get(uid string) bool {
|
||||
|
||||
this.mu.Lock()
|
||||
defer this.mu.Unlock()
|
||||
|
||||
if _, ok := this.queryMap[uid]; ok {
|
||||
return ok
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (this *Query) Add(uid string) {
|
||||
|
||||
this.mu.Lock()
|
||||
defer this.mu.Unlock()
|
||||
|
||||
this.queryMap[uid] = true
|
||||
}
|
||||
|
||||
func (this *Query) Remove(uid string) {
|
||||
|
||||
this.mu.Lock()
|
||||
defer this.mu.Unlock()
|
||||
|
||||
if _, ok := this.queryMap[uid]; ok {
|
||||
delete(this.queryMap, uid)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,86 @@
|
|||
package timeslicequery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
"time"
|
||||
"voucher/internal/biz/bo"
|
||||
"voucher/internal/biz/do"
|
||||
"voucher/internal/pkg/timeslice"
|
||||
)
|
||||
|
||||
func (v *Query) execute(ctx context.Context, req *timeslice.Manager) error {
|
||||
|
||||
managerStartStr := req.StartTime.Format(time.DateTime)
|
||||
managerEndStr := req.EndTime.Format(time.DateTime)
|
||||
|
||||
taskCount, err := timeslice.NewManager(v.callbackFunc).Run(ctx, req)
|
||||
if err != nil {
|
||||
log.Errorf("%s到%s,发生错误:%v", managerStartStr, managerEndStr, err)
|
||||
}
|
||||
|
||||
fmt.Printf("%s到%s,总任务数:%d\n", managerStartStr, managerEndStr, taskCount)
|
||||
log.Warnf("%s到%s,总任务数:%d", managerStartStr, managerEndStr, taskCount)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *Query) callbackFunc(ctx context.Context, req *timeslice.Task) error {
|
||||
|
||||
startTimeStr := req.Process.Manager.StartTime.Format(time.DateTime)
|
||||
endTimeStr := req.Process.Manager.EndTime.Format(time.DateTime)
|
||||
|
||||
currentStartTimeStr := req.CurrentStartTime.Format(time.DateTime)
|
||||
currentEndTimeStr := req.CurrentEndTime.Format(time.DateTime)
|
||||
|
||||
start := time.Now()
|
||||
startStr := start.Format(time.DateTime)
|
||||
|
||||
x := &do.WechatQuery{
|
||||
StartTime: currentStartTimeStr,
|
||||
EndTime: currentEndTimeStr,
|
||||
ProductNo: req.Process.Manager.ProductNo,
|
||||
}
|
||||
|
||||
n := 0
|
||||
num := 0
|
||||
notifyNum := 0
|
||||
|
||||
err := v.orderRepo.FinSucByStockIdInBatches(ctx, x, func(ctx context.Context, rows []*bo.OrderBo) error {
|
||||
|
||||
n += 1
|
||||
for _, order := range rows {
|
||||
|
||||
num += 1
|
||||
if err := v.wechatQuery(ctx, order, ¬ifyNum); err != nil {
|
||||
logFields := map[string]string{
|
||||
"order_no": order.OrderNo,
|
||||
"coupon_id": order.VoucherNo,
|
||||
"open_id": order.Account,
|
||||
"err": err.Error(),
|
||||
}
|
||||
log.Errorf("%s到%s,第%d个任务,第%d组,发生错误:+v", startTimeStr, endTimeStr, req.TaskID, n, logFields)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
end := time.Now()
|
||||
|
||||
logFields := map[string]interface{}{
|
||||
"总处理组数": n,
|
||||
"总处理条数": num,
|
||||
"总通知条数": notifyNum,
|
||||
"执行任务开始时间": startStr,
|
||||
"执行任务结束时间": end.Format(time.DateTime),
|
||||
"任务处理开始时间": currentStartTimeStr,
|
||||
"任务处理结束时间": currentEndTimeStr,
|
||||
"总处理耗时": end.Sub(start).String(),
|
||||
}
|
||||
log.Warnf("%s到%s,第%d个任务,处理完毕:%+v", startTimeStr, endTimeStr, req.TaskID, logFields)
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
@ -0,0 +1,118 @@
|
|||
package timeslicequery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
"github.com/go-kratos/kratos/v2/transport/http"
|
||||
"time"
|
||||
"voucher/internal/biz/do"
|
||||
"voucher/internal/pkg/timeslice"
|
||||
)
|
||||
|
||||
func (v *Query) Push(ctx http.Context, req *do.WechatQuery) (string, error) {
|
||||
|
||||
if req.StartTime == "" || req.EndTime == "" {
|
||||
return "", fmt.Errorf("时间参数不能为空")
|
||||
}
|
||||
|
||||
queue := v.bc.RdsMQ.GetWechatTimeSliceQuery()
|
||||
if queue == nil {
|
||||
return "", fmt.Errorf("队列不存在")
|
||||
}
|
||||
|
||||
if queue.Name == "" {
|
||||
return "", fmt.Errorf("队列不存在")
|
||||
}
|
||||
|
||||
if queue.IsOpen == false {
|
||||
return "", fmt.Errorf("队列未开启")
|
||||
}
|
||||
|
||||
if req.ProductNo != "" {
|
||||
_, err := v.productRepo.GetByProductNo(ctx, req.ProductNo)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
b, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
strMsg := string(b)
|
||||
|
||||
uid := v.uid(strMsg)
|
||||
if v.Get(uid) {
|
||||
return "", fmt.Errorf("此台服务队列正在处理中,%s-%s,ip:%s", uid, strMsg, ctx.Header().Get("X-Forwarded-For"))
|
||||
}
|
||||
|
||||
v.Add(uid)
|
||||
|
||||
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
|
||||
if err != nil {
|
||||
v.Remove(uid)
|
||||
return "", fmt.Errorf("添加到队列失败:%v", err)
|
||||
}
|
||||
|
||||
return strMsg, nil
|
||||
}
|
||||
|
||||
func (v *Query) getManager(_ context.Context, msg string) (*timeslice.Manager, error) {
|
||||
|
||||
var req *do.WechatQuery
|
||||
|
||||
if err := json.Unmarshal([]byte(msg), &req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if req.StartTime == "" || req.EndTime == "" {
|
||||
return nil, fmt.Errorf("时间参数不能为空")
|
||||
}
|
||||
|
||||
start, err := time.Parse(time.DateTime, req.StartTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
end, err := time.Parse(time.DateTime, req.EndTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ×lice.Manager{
|
||||
StartTime: start,
|
||||
EndTime: end,
|
||||
ProductNo: req.ProductNo,
|
||||
GoNum: 2, // 协程数量
|
||||
TimeSliceHours: 2, // 时间间隔
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (v *Query) Consumer(ctx context.Context, msg string) error {
|
||||
|
||||
defer v.Remove(v.uid(msg))
|
||||
|
||||
req, err := v.getManager(ctx, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
executeStart := time.Now()
|
||||
executeStartStr := executeStart.Format(time.DateTime)
|
||||
|
||||
log.Warnf("微信券查询处理开始:%s,msg:%s", executeStartStr, msg)
|
||||
fmt.Printf("微信券查询处理开始:%s,msg:%s", executeStartStr, msg)
|
||||
|
||||
if err = v.execute(ctx, req); err != nil {
|
||||
log.Errorf("微信券查询处理失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
|
||||
return fmt.Errorf("微信券查询处理失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
|
||||
}
|
||||
|
||||
executeEnd := time.Now()
|
||||
log.Warnf("微信券查询处理耗时:%s,结束时间%s,msg:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), msg)
|
||||
fmt.Printf("微信券查询处理耗时:%s,结束时间%s,msg:%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), msg)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -0,0 +1,7 @@
|
|||
package timeslicequery
|
||||
|
||||
import (
|
||||
"github.com/google/wire"
|
||||
)
|
||||
|
||||
var ProviderSetTimeSliceQuery = wire.NewSet(NewQuery)
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
package timeslicequery
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
"voucher/internal/biz/bo"
|
||||
)
|
||||
|
||||
func (v *Query) wechatQuery(ctx context.Context, order *bo.OrderBo, notifyNum *int) error {
|
||||
|
||||
status, err := v.wechatCpnRepo.Query(ctx, order)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if status.IsUse() {
|
||||
return v.queryUsed(ctx, order, notifyNum)
|
||||
} else if status.IsExpired() {
|
||||
return v.queryExpired(ctx, order)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *Query) queryUsed(ctx context.Context, order *bo.OrderBo, notifyNum *int) error {
|
||||
|
||||
*notifyNum += 1
|
||||
|
||||
if order.Status.IsUse() {
|
||||
return v.notify(ctx, order)
|
||||
}
|
||||
|
||||
if err := v.orderRepo.Used(ctx, order.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return v.notify(ctx, order)
|
||||
}
|
||||
|
||||
func (v *Query) queryExpired(ctx context.Context, order *bo.OrderBo) error {
|
||||
|
||||
if order.Status.IsExpired() {
|
||||
log.Warnf("券状态已是已过期,忽略不处理,orderNo:%s", order.OrderNo)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := v.orderRepo.Expired(ctx, order.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil // 过期不做通知
|
||||
}
|
||||
|
||||
func (v *Query) notify(ctx context.Context, order *bo.OrderBo) error {
|
||||
|
||||
order, err := v.orderRepo.GetByID(ctx, order.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = v.cmb.Notify(ctx, order); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -840,8 +840,9 @@ type RdsMQ struct {
|
|||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"`
|
||||
WechatRetry *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"`
|
||||
WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"`
|
||||
WechatTimeSliceQuery *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatTimeSliceQuery,proto3" json:"wechatTimeSliceQuery,omitempty"`
|
||||
WechatRetry *RdsMQ_Queue `protobuf:"bytes,3,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"`
|
||||
}
|
||||
|
||||
func (x *RdsMQ) Reset() {
|
||||
|
|
@ -883,6 +884,13 @@ func (x *RdsMQ) GetWechatQuery() *RdsMQ_Queue {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (x *RdsMQ) GetWechatTimeSliceQuery() *RdsMQ_Queue {
|
||||
if x != nil {
|
||||
return x.WechatTimeSliceQuery
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *RdsMQ) GetWechatRetry() *RdsMQ_Queue {
|
||||
if x != nil {
|
||||
return x.WechatRetry
|
||||
|
|
@ -1552,32 +1560,37 @@ var file_conf_conf_proto_rawDesc = []byte{
|
|||
0x6b, 0x65, 0x79, 0x12, 0x35, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01,
|
||||
0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e,
|
||||
0x66, 0x69, 0x67, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
|
||||
0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xae,
|
||||
0x4d, 0x61, 0x70, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xff,
|
||||
0x02, 0x0a, 0x05, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68,
|
||||
0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e,
|
||||
0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52,
|
||||
0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68,
|
||||
0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61,
|
||||
0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76,
|
||||
0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64,
|
||||
0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68, 0x61,
|
||||
0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75, 0x65,
|
||||
0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
|
||||
0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18, 0x02,
|
||||
0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08,
|
||||
0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08,
|
||||
0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d, 0x57,
|
||||
0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e, 0x75,
|
||||
0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69, 0x74,
|
||||
0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f, 0x6f,
|
||||
0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75, 0x72,
|
||||
0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x22,
|
||||
0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e,
|
||||
0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69, 0x6e,
|
||||
0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15, 0x76,
|
||||
0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x3b,
|
||||
0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x61, 0x74, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x4f, 0x0a, 0x14, 0x77, 0x65, 0x63, 0x68, 0x61,
|
||||
0x74, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x6c, 0x69, 0x63, 0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x18,
|
||||
0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e,
|
||||
0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52, 0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65,
|
||||
0x75, 0x65, 0x52, 0x14, 0x77, 0x65, 0x63, 0x68, 0x61, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x53, 0x6c,
|
||||
0x69, 0x63, 0x65, 0x51, 0x75, 0x65, 0x72, 0x79, 0x12, 0x3d, 0x0a, 0x0b, 0x77, 0x65, 0x63, 0x68,
|
||||
0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e,
|
||||
0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x52,
|
||||
0x64, 0x73, 0x4d, 0x51, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x52, 0x0b, 0x77, 0x65, 0x63, 0x68,
|
||||
0x61, 0x74, 0x52, 0x65, 0x74, 0x72, 0x79, 0x1a, 0xa6, 0x01, 0x0a, 0x05, 0x51, 0x75, 0x65, 0x75,
|
||||
0x65, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x18,
|
||||
0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4f, 0x70, 0x65, 0x6e, 0x12, 0x1a, 0x0a,
|
||||
0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52,
|
||||
0x08, 0x72, 0x65, 0x74, 0x72, 0x79, 0x4e, 0x75, 0x6d, 0x12, 0x1e, 0x0a, 0x0a, 0x6e, 0x75, 0x6d,
|
||||
0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x6e,
|
||||
0x75, 0x6d, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x12, 0x35, 0x0a, 0x08, 0x77, 0x61, 0x69,
|
||||
0x74, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x67, 0x6f,
|
||||
0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x44, 0x75,
|
||||
0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x77, 0x61, 0x69, 0x74, 0x54, 0x69, 0x6d, 0x65,
|
||||
0x22, 0x3a, 0x0a, 0x04, 0x4c, 0x6f, 0x67, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x62, 0x75, 0x73, 0x69,
|
||||
0x6e, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x62, 0x75, 0x73, 0x69,
|
||||
0x6e, 0x65, 0x73, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x02,
|
||||
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x42, 0x17, 0x5a, 0x15,
|
||||
0x76, 0x6f, 0x75, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x63, 0x70, 0x6e, 0x2f, 0x63, 0x6f, 0x6e, 0x66,
|
||||
0x3b, 0x63, 0x6f, 0x6e, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
@ -1632,20 +1645,21 @@ var file_conf_conf_proto_depIdxs = []int32{
|
|||
15, // 13: voucher.config.RocketMQ.eventMap:type_name -> voucher.config.RocketMQ.EventMapEntry
|
||||
17, // 14: voucher.config.Cron.commandMap:type_name -> voucher.config.Cron.CommandMapEntry
|
||||
18, // 15: voucher.config.RdsMQ.wechatQuery:type_name -> voucher.config.RdsMQ.Queue
|
||||
18, // 16: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue
|
||||
19, // 17: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration
|
||||
19, // 18: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration
|
||||
19, // 19: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration
|
||||
19, // 20: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration
|
||||
19, // 21: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration
|
||||
4, // 22: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap
|
||||
16, // 23: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap
|
||||
19, // 24: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration
|
||||
25, // [25:25] is the sub-list for method output_type
|
||||
25, // [25:25] is the sub-list for method input_type
|
||||
25, // [25:25] is the sub-list for extension type_name
|
||||
25, // [25:25] is the sub-list for extension extendee
|
||||
0, // [0:25] is the sub-list for field type_name
|
||||
18, // 16: voucher.config.RdsMQ.wechatTimeSliceQuery:type_name -> voucher.config.RdsMQ.Queue
|
||||
18, // 17: voucher.config.RdsMQ.wechatRetry:type_name -> voucher.config.RdsMQ.Queue
|
||||
19, // 18: voucher.config.Server.HTTP.timeout:type_name -> google.protobuf.Duration
|
||||
19, // 19: voucher.config.Data.Database.maxLifetime:type_name -> google.protobuf.Duration
|
||||
19, // 20: voucher.config.Data.Redis.readTimeout:type_name -> google.protobuf.Duration
|
||||
19, // 21: voucher.config.Data.Redis.writeTimeout:type_name -> google.protobuf.Duration
|
||||
19, // 22: voucher.config.Data.Redis.connMaxIdleTime:type_name -> google.protobuf.Duration
|
||||
4, // 23: voucher.config.RocketMQ.EventMapEntry.value:type_name -> voucher.config.EventMap
|
||||
16, // 24: voucher.config.Cron.CommandMapEntry.value:type_name -> voucher.config.Cron.CommandMap
|
||||
19, // 25: voucher.config.RdsMQ.Queue.waitTime:type_name -> google.protobuf.Duration
|
||||
26, // [26:26] is the sub-list for method output_type
|
||||
26, // [26:26] is the sub-list for method input_type
|
||||
26, // [26:26] is the sub-list for extension type_name
|
||||
26, // [26:26] is the sub-list for extension extendee
|
||||
0, // [0:26] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_conf_conf_proto_init() }
|
||||
|
|
|
|||
|
|
@ -128,7 +128,8 @@ message RdsMQ {
|
|||
google.protobuf.Duration waitTime = 5;
|
||||
}
|
||||
Queue wechatQuery = 1;
|
||||
Queue wechatRetry = 2;
|
||||
Queue wechatTimeSliceQuery = 2;
|
||||
Queue wechatRetry = 3;
|
||||
}
|
||||
|
||||
message Logs {
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.We
|
|||
tx = tx.Where("product_no = ?", req.ProductNo)
|
||||
}
|
||||
if req.StartTime != "" {
|
||||
tx = tx.Where("receive_success_time >= ?", req.StartTime)
|
||||
tx = tx.Where("receive_success_time > ?", req.StartTime)
|
||||
}
|
||||
if req.EndTime != "" {
|
||||
tx = tx.Where("receive_success_time <= ?", req.EndTime)
|
||||
|
|
|
|||
|
|
@ -8,11 +8,15 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const TimeSliceHours = 2
|
||||
|
||||
type Callback func(ctx context.Context, req *Task) error
|
||||
|
||||
type ManagerSrv struct {
|
||||
callback func(ctx context.Context, req *Task) error
|
||||
callback Callback
|
||||
}
|
||||
|
||||
func NewManager(callback func(ctx context.Context, req *Task) error) *ManagerSrv {
|
||||
func NewManager(callback Callback) *ManagerSrv {
|
||||
return &ManagerSrv{callback: callback}
|
||||
}
|
||||
|
||||
|
|
@ -22,17 +26,24 @@ func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) {
|
|||
return 0, fmt.Errorf("start_time不能大于end_time")
|
||||
}
|
||||
|
||||
totalHours := req.EndTime.Sub(req.StartTime).Hours()
|
||||
taskCount := int(totalHours / 2)
|
||||
if req.GoNum == 0 {
|
||||
return 0, fmt.Errorf("协程数量不能为0")
|
||||
}
|
||||
if req.GoNum > 100 {
|
||||
return 0, fmt.Errorf("协程数量不能大于100")
|
||||
}
|
||||
|
||||
// 如果剩余时间不足2小时,增加任务数
|
||||
if totalHours-float64(taskCount)*float64(2) > 0 {
|
||||
totalHours := req.EndTime.Sub(req.StartTime).Hours()
|
||||
taskCount := int(totalHours / TimeSliceHours)
|
||||
|
||||
// 如果剩余时间不足 TimeSliceHours 小时,增加任务数
|
||||
if totalHours-float64(taskCount)*float64(TimeSliceHours) > 0 {
|
||||
taskCount++
|
||||
}
|
||||
|
||||
processReq := &Process{
|
||||
manager: req,
|
||||
taskCount: taskCount,
|
||||
Manager: req,
|
||||
TaskCount: taskCount,
|
||||
}
|
||||
|
||||
return taskCount, m.process(ctx, processReq)
|
||||
|
|
@ -40,31 +51,24 @@ func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) {
|
|||
|
||||
func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
|
||||
|
||||
if req.taskCount == 0 {
|
||||
if req.TaskCount == 0 {
|
||||
return fmt.Errorf("该时间范围无可执行任务次数,请检查时间范围")
|
||||
}
|
||||
|
||||
if req.manager.GoNum == 0 {
|
||||
return fmt.Errorf("协程数量不能为0")
|
||||
}
|
||||
if req.manager.GoNum > 100 {
|
||||
return fmt.Errorf("协程数量不能大于100")
|
||||
}
|
||||
|
||||
// 设置最大并发任务数为 5
|
||||
eg := new(errgroup.Group)
|
||||
eg.SetLimit(req.manager.GoNum)
|
||||
eg.SetLimit(req.Manager.GoNum)
|
||||
|
||||
errs := make([]error, 0) // 用于存储所有错误
|
||||
|
||||
// 为每个任务分配开始和结束时间
|
||||
for i := 0; i < req.taskCount; i++ {
|
||||
// 为每个任务按指定的时间片 TimeSliceHours 分配开始和结束时间
|
||||
for i := 0; i < req.TaskCount; i++ {
|
||||
|
||||
currentStart := req.manager.StartTime.Add(time.Duration(i) * 2 * time.Hour)
|
||||
currentEnd := currentStart.Add(2 * time.Hour)
|
||||
currentStart := req.Manager.StartTime.Add(time.Duration(i) * TimeSliceHours * time.Hour)
|
||||
currentEnd := currentStart.Add(TimeSliceHours * time.Hour)
|
||||
|
||||
if currentEnd.After(req.manager.EndTime) {
|
||||
currentEnd = req.manager.EndTime
|
||||
if currentEnd.After(req.Manager.EndTime) {
|
||||
currentEnd = req.Manager.EndTime
|
||||
}
|
||||
taskID := i + 1
|
||||
|
||||
|
|
|
|||
|
|
@ -49,10 +49,10 @@ func TestNewManager(t *testing.T) {
|
|||
// 模拟任务执行,休眠随机时间
|
||||
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
|
||||
// 生成任务执行结果
|
||||
result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.Process.manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
|
||||
result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.Process.Manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
|
||||
results = append(results, result)
|
||||
//return nil
|
||||
return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.Process.manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
|
||||
return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.Process.Manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
|
|
@ -179,8 +179,8 @@ func CallbackFunc(start, end time.Time) error {
|
|||
|
||||
func callbackFunc(_ context.Context, req *Task) error {
|
||||
|
||||
managerStartTimeStr := req.Process.manager.StartTime.Format(time.DateTime)
|
||||
managerEndTimeStr := req.Process.manager.EndTime.Format(time.DateTime)
|
||||
managerStartTimeStr := req.Process.Manager.StartTime.Format(time.DateTime)
|
||||
managerEndTimeStr := req.Process.Manager.EndTime.Format(time.DateTime)
|
||||
|
||||
currentStartTimeStr := req.CurrentStartTime.Format(time.DateTime)
|
||||
currentEndTimeStr := req.CurrentEndTime.Format(time.DateTime)
|
||||
|
|
@ -225,9 +225,7 @@ func callbackFunc(_ context.Context, req *Task) error {
|
|||
|
||||
// 生成任务执行结果
|
||||
result := fmt.Sprintf("%s到%s,第%d个任务,处理完毕,%+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, logFields)
|
||||
|
||||
fmt.Printf(result)
|
||||
|
||||
//return fmt.Errorf(result)
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,20 +5,21 @@ import (
|
|||
)
|
||||
|
||||
type Manager struct {
|
||||
StartTime time.Time
|
||||
EndTime time.Time
|
||||
ProductNo string
|
||||
GoNum int
|
||||
StartTime time.Time // 开始时间
|
||||
EndTime time.Time // 结束时间
|
||||
ProductNo string // 产品编号
|
||||
GoNum int // 并发数
|
||||
TimeSliceHours int // 时间片"小时"
|
||||
}
|
||||
|
||||
type Process struct {
|
||||
manager *Manager
|
||||
taskCount int
|
||||
Manager *Manager
|
||||
TaskCount int // 任务数
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
Process *Process
|
||||
CurrentStartTime time.Time
|
||||
CurrentEndTime time.Time
|
||||
TaskID int
|
||||
CurrentStartTime time.Time // 时间片开始时间
|
||||
CurrentEndTime time.Time // 时间片结束时间
|
||||
TaskID int // 任务ID
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ func NewHTTPServer(
|
|||
srv.Route("/voucher/").POST("queryOrder/{order_no}", cmb.QueryOrder)
|
||||
srv.Route("/voucher/").POST("registerTag/{product_no}", cmb.RegisterTag)
|
||||
srv.Route("/voucher/").POST("pushWechatQuery", cmb.PushWechatQuery)
|
||||
srv.Route("/voucher/").POST("timeSliceQueryPush", cmb.TimeSliceQueryPush)
|
||||
srv.Route("/voucher/").POST("pushWechatRetry/{product_no}", cmb.PushWechatRetry)
|
||||
|
||||
v1.RegisterCmbHTTPServer(srv, cmb)
|
||||
|
|
|
|||
|
|
@ -2,19 +2,16 @@ package service
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/go-kratos/kratos/v2/log"
|
||||
"github.com/go-kratos/kratos/v2/transport/http"
|
||||
"github.com/robfig/cron"
|
||||
"io"
|
||||
http2 "net/http"
|
||||
"strconv"
|
||||
v1 "voucher/api/v1"
|
||||
"voucher/internal/biz"
|
||||
"voucher/internal/biz/bo"
|
||||
"voucher/internal/biz/do"
|
||||
"voucher/internal/biz/mixrepos"
|
||||
"voucher/internal/biz/timeslicequery"
|
||||
"voucher/internal/biz/vo"
|
||||
"voucher/internal/biz/wechatrepo"
|
||||
"voucher/internal/conf"
|
||||
|
|
@ -23,11 +20,12 @@ import (
|
|||
var _ v1.CmbHTTPServer = (*CmbService)(nil)
|
||||
|
||||
type CmbService struct {
|
||||
bc *conf.Bootstrap
|
||||
cron *cron.Cron
|
||||
VoucherBiz *biz.VoucherBiz
|
||||
CmbMixRepo mixrepos.CmbMixRepo
|
||||
WechatCpnRepo wechatrepo.WechatCpnRepo
|
||||
bc *conf.Bootstrap
|
||||
cron *cron.Cron
|
||||
VoucherBiz *biz.VoucherBiz
|
||||
CmbMixRepo mixrepos.CmbMixRepo
|
||||
WechatCpnRepo wechatrepo.WechatCpnRepo
|
||||
timeSliceQuery *timeslicequery.Query
|
||||
}
|
||||
|
||||
func NewCmbService(
|
||||
|
|
@ -36,13 +34,15 @@ func NewCmbService(
|
|||
VoucherBiz *biz.VoucherBiz,
|
||||
CmbMixRepo mixrepos.CmbMixRepo,
|
||||
WechatCpnRepo wechatrepo.WechatCpnRepo,
|
||||
timeSliceQuery *timeslicequery.Query,
|
||||
) *CmbService {
|
||||
return &CmbService{
|
||||
bc: bc,
|
||||
cron: cron,
|
||||
VoucherBiz: VoucherBiz,
|
||||
CmbMixRepo: CmbMixRepo,
|
||||
WechatCpnRepo: WechatCpnRepo,
|
||||
bc: bc,
|
||||
cron: cron,
|
||||
VoucherBiz: VoucherBiz,
|
||||
CmbMixRepo: CmbMixRepo,
|
||||
WechatCpnRepo: WechatCpnRepo,
|
||||
timeSliceQuery: timeSliceQuery,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -63,37 +63,6 @@ func (c *CmbService) GetResponse(ctx context.Context, replyBizContent []byte) (*
|
|||
return reply, nil
|
||||
}
|
||||
|
||||
func (this *CmbService) NotifyRetry(ctx http.Context) error {
|
||||
id := ctx.Vars().Get("id")
|
||||
if id == "" {
|
||||
return fmt.Errorf("id is empty")
|
||||
}
|
||||
|
||||
orderNotifyId, err := strconv.ParseUint(id, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return this.VoucherBiz.PushNotifyRetryDelayMQ(ctx, 1, orderNotifyId)
|
||||
}
|
||||
|
||||
func (this *CmbService) QueryOrder(ctx http.Context) error {
|
||||
|
||||
orderNo := ctx.Vars().Get("order_no")
|
||||
if orderNo == "" {
|
||||
return fmt.Errorf("orderNo is empty")
|
||||
}
|
||||
|
||||
str, err := this.VoucherBiz.QueryOrder(ctx, orderNo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctx.JSON(http2.StatusOK, map[string]interface{}{
|
||||
"data": str,
|
||||
})
|
||||
}
|
||||
|
||||
func (c *CmbService) OrderRetry(ctx context.Context, request *v1.OrderRetryRequest) (*v1.Empty, error) {
|
||||
|
||||
return nil, c.VoucherBiz.OrderRetry(ctx, request.GetTransactionIds())
|
||||
|
|
@ -115,49 +84,3 @@ func (this *CmbService) RegisterTag(ctx http.Context) error {
|
|||
"data": productNo,
|
||||
})
|
||||
}
|
||||
|
||||
func (this *CmbService) PushWechatQuery(ctx http.Context) error {
|
||||
|
||||
bodyBytes, err := io.ReadAll(ctx.Request().Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var req *do.WechatQuery
|
||||
if err = json.Unmarshal(bodyBytes, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if req == nil {
|
||||
return fmt.Errorf("req is empty")
|
||||
}
|
||||
|
||||
if req.StartTime == "" || req.EndTime == "" {
|
||||
return fmt.Errorf("start_time or end_time is empty")
|
||||
}
|
||||
|
||||
if err = this.VoucherBiz.PushWechatQuery(ctx, req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctx.JSON(http2.StatusOK, map[string]interface{}{
|
||||
"data": req,
|
||||
})
|
||||
}
|
||||
|
||||
func (this *CmbService) PushWechatRetry(ctx http.Context) error {
|
||||
|
||||
productNo := ctx.Vars().Get("product_no")
|
||||
if productNo == "" {
|
||||
return fmt.Errorf("product_no is empty")
|
||||
}
|
||||
|
||||
err := this.VoucherBiz.PushWechatRetry(ctx, productNo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctx.JSON(http2.StatusOK, map[string]interface{}{
|
||||
"data": productNo,
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,118 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/go-kratos/kratos/v2/transport/http"
|
||||
"io"
|
||||
http2 "net/http"
|
||||
"strconv"
|
||||
"voucher/internal/biz/do"
|
||||
)
|
||||
|
||||
func (this *CmbService) NotifyRetry(ctx http.Context) error {
|
||||
id := ctx.Vars().Get("id")
|
||||
if id == "" {
|
||||
return fmt.Errorf("id is empty")
|
||||
}
|
||||
|
||||
orderNotifyId, err := strconv.ParseUint(id, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return this.VoucherBiz.PushNotifyRetryDelayMQ(ctx, 1, orderNotifyId)
|
||||
}
|
||||
|
||||
func (this *CmbService) QueryOrder(ctx http.Context) error {
|
||||
|
||||
orderNo := ctx.Vars().Get("order_no")
|
||||
if orderNo == "" {
|
||||
return fmt.Errorf("orderNo is empty")
|
||||
}
|
||||
|
||||
str, err := this.VoucherBiz.QueryOrder(ctx, orderNo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctx.JSON(http2.StatusOK, map[string]interface{}{
|
||||
"data": str,
|
||||
})
|
||||
}
|
||||
|
||||
func (this *CmbService) PushWechatQuery(ctx http.Context) error {
|
||||
|
||||
bodyBytes, err := io.ReadAll(ctx.Request().Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var req *do.WechatQuery
|
||||
if err = json.Unmarshal(bodyBytes, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if req == nil {
|
||||
return fmt.Errorf("req is empty")
|
||||
}
|
||||
|
||||
if req.StartTime == "" || req.EndTime == "" {
|
||||
return fmt.Errorf("start_time or end_time is empty")
|
||||
}
|
||||
|
||||
if err = this.VoucherBiz.PushWechatQuery(ctx, req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctx.JSON(http2.StatusOK, map[string]interface{}{
|
||||
"data": req,
|
||||
})
|
||||
}
|
||||
|
||||
func (this *CmbService) TimeSliceQueryPush(ctx http.Context) error {
|
||||
|
||||
bodyBytes, err := io.ReadAll(ctx.Request().Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var req *do.WechatQuery
|
||||
if err = json.Unmarshal(bodyBytes, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if req == nil {
|
||||
return fmt.Errorf("req is empty")
|
||||
}
|
||||
|
||||
if req.StartTime == "" || req.EndTime == "" {
|
||||
return fmt.Errorf("start_time or end_time is empty")
|
||||
}
|
||||
|
||||
reps, err := this.timeSliceQuery.Push(ctx, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctx.JSON(http2.StatusOK, map[string]interface{}{
|
||||
"data": reps,
|
||||
})
|
||||
}
|
||||
|
||||
func (this *CmbService) PushWechatRetry(ctx http.Context) error {
|
||||
|
||||
productNo := ctx.Vars().Get("product_no")
|
||||
if productNo == "" {
|
||||
return fmt.Errorf("product_no is empty")
|
||||
}
|
||||
|
||||
err := this.VoucherBiz.PushWechatRetry(ctx, productNo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctx.JSON(http2.StatusOK, map[string]interface{}{
|
||||
"data": productNo,
|
||||
})
|
||||
}
|
||||
|
|
@ -10,6 +10,7 @@ import (
|
|||
"time"
|
||||
"voucher/internal/biz"
|
||||
"voucher/internal/biz/bo"
|
||||
"voucher/internal/biz/timeslicequery"
|
||||
"voucher/internal/conf"
|
||||
"voucher/internal/data"
|
||||
"voucher/internal/pkg/mq"
|
||||
|
|
@ -21,6 +22,8 @@ type VoucherService struct {
|
|||
VoucherBiz *biz.VoucherBiz
|
||||
rdb *data.Rdb
|
||||
logHelper *log.Helper
|
||||
|
||||
timeSliceQuery *timeslicequery.Query
|
||||
}
|
||||
|
||||
func NewVoucherService(
|
||||
|
|
@ -29,13 +32,15 @@ func NewVoucherService(
|
|||
VoucherBiz *biz.VoucherBiz,
|
||||
rdb *data.Rdb,
|
||||
logHelper *log.Helper,
|
||||
timeSliceQuery *timeslicequery.Query,
|
||||
) *VoucherService {
|
||||
return &VoucherService{
|
||||
bc: bc,
|
||||
cron: cron,
|
||||
VoucherBiz: VoucherBiz,
|
||||
rdb: rdb,
|
||||
logHelper: logHelper,
|
||||
bc: bc,
|
||||
cron: cron,
|
||||
VoucherBiz: VoucherBiz,
|
||||
rdb: rdb,
|
||||
logHelper: logHelper,
|
||||
timeSliceQuery: timeSliceQuery,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -43,3 +43,40 @@ func (s *VoucherService) WechatQueryHandle(ctx context.Context, msg string) erro
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *VoucherService) GetWechatTimeSliceQueryConfig() *rdsmq.ConsumeConfig {
|
||||
|
||||
queue := s.bc.RdsMQ.GetWechatTimeSliceQuery()
|
||||
if queue == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !queue.GetIsOpen() {
|
||||
log.Warn(fmt.Sprintf("[%s]RdsMQ is not open", queue.Name))
|
||||
return nil
|
||||
}
|
||||
|
||||
return &rdsmq.ConsumeConfig{
|
||||
Rdb: s.rdb.Rdb,
|
||||
QueueName: queue.Name,
|
||||
NumWorkers: queue.NumWorkers,
|
||||
WaitTime: queue.GetWaitTime().AsDuration(),
|
||||
RetryNum: queue.RetryNum,
|
||||
Fn: s.WechatTimeSliceQueryHandle,
|
||||
Logger: s.logHelper,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *VoucherService) WechatTimeSliceQueryHandle(ctx context.Context, msg string) error {
|
||||
|
||||
if msg == "" {
|
||||
s.logHelper.Errorf("wechat TimeSlice query error: batchNo is empty")
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.timeSliceQuery.Consumer(ctx, msg); err != nil {
|
||||
s.logHelper.Errorf("wechat TimeSlice query msg:%s error: %v", msg, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue