Merge branch 'pro' into kx

# Conflicts:
#	READEME.md
#	internal/biz/wechat_notify.go
#	internal/conf/conf.pb.go
This commit is contained in:
ziming 2025-06-13 10:27:06 +08:00
commit 744e7d4169
37 changed files with 1532 additions and 213 deletions

View File

@ -1,15 +1,10 @@
# <p align="center">招行立减金</p>
# <p align="center">招行立减金券系统</p>
### 参与开发
[请参阅](https://tvd8jq9lqkp.feishu.cn/wiki/LNWVweZ64iY2UBkkTkZcezy0n5h?from=from_copylink)
* * *
### 主要工作
+ 对接招行,调用微信发券
+ 发券API
* * *
### 性能要求
````text
并发要求每秒200-300 持续5分钟即可
````
### 压测说明
1、调用接口 POST[http://open.cszfan.com/voucher/cmb/v1/orderMock]
+ 参数如下
@ -39,4 +34,4 @@
2、调用接口 POST[http://open.cszfan.com/voucher/cmb/v1/order]
````text
将接口1返回的数据作为接口2的请求参数发起请求http状态码200为成功其它属于异常
````
````

View File

@ -12,6 +12,7 @@ import (
"github.com/robfig/cron"
"voucher/internal/biz"
"voucher/internal/biz/cmb"
"voucher/internal/biz/timeslicequery"
"voucher/internal/conf"
"voucher/internal/data"
"voucher/internal/data/mixrepoimpl"
@ -33,6 +34,7 @@ func wireApp(*conf.Bootstrap, log.Logger, *log2.AccessLogger) (*kratos.App, func
repoimpl.ProviderRepoImplSet,
wechatrepoimpl.ProviderWechatReposImplSet,
mixrepoimpl.ProviderMixRepoImplSet,
timeslicequery.ProviderSetTimeSliceQuery,
log2.NewLogHelper,
cron.New,
newApp,

View File

@ -86,18 +86,24 @@ cron:
command: "0 0 1 * * ?" # 每天凌晨1点执行一次
rdsMQ:
wechatQuery: #发放结算
wechatQuery:
name: "wechatQuery"
retryNum: 1 #重试次数
numWorkers: 2 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间
isOpen: false #是否启动消费 true/false
wechatTimeSliceQuery:
name: "wechatTimeSliceQuery"
retryNum: 1 #重试次数
numWorkers: 3 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间
isOpen: true #是否启动消费 true/false
orderRetry: #发放结算
orderRetry:
name: "orderRetry"
retryNum: 1 #重试次数
numWorkers: 2 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间
isOpen: true #是否启动消费 true/false
isOpen: false #是否启动消费 true/false
#配置日志
logs:

View File

@ -91,7 +91,12 @@ rdsMQ:
numWorkers: 1 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间
isOpen: true #是否启动消费 true/false
wechatTimeSliceQuery:
name: "wechatTimeSliceQuery"
retryNum: 1 #重试次数
numWorkers: 3 #协程数量不配置默认为10
waitTime: 1s #处理完成后等待时间
isOpen: true #是否启动消费 true/false
#配置日志
logs:
business: business.log #业务日志路径:如果不写日志,则不配置或配置为空

2
go.mod
View File

@ -63,6 +63,8 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect

4
go.sum
View File

@ -150,6 +150,10 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grokify/html-strip-tags-go v0.0.1 h1:0fThFwLbW7P/kOiTBs03FsJSV9RM2M/Q/MOnCQxKMo0=
github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618ARB8iVo6/DR99A6d78=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=

View File

@ -45,7 +45,6 @@ type OrderCreateReqBo struct {
}
type FindInBatchesUseBo struct {
Type vo.OrderType
StartTime *time.Time
EndTime *time.Time
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"time"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
@ -13,57 +14,14 @@ import (
"voucher/internal/pkg/lock"
)
func (v *VoucherBiz) Notice(ctx context.Context) error {
if err := v.isCanNotice(ctx); err != nil {
return err
}
now := time.Now()
// 获取七天前的日期
noticeStartDay := now.AddDate(0, 0, int(-v.bc.Cmb.NoticeStartDays))
// 获取七天前 00:00:00 的时间
startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location())
noticeEndDay := now.AddDate(0, 0, int(-v.bc.Cmb.NoticeEndDays))
// 获取昨天 23:59:59 的时间
endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location())
req := &bo.FindInBatchesUseBo{
Type: vo.OrderTypeCmb,
StartTime: &startTime,
EndTime: &endTime,
}
return v.ExecuteNotice(ctx, req)
}
func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUseBo) error {
return v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
for _, order := range rows {
if err := v.notice(ctx, order); err != nil {
log.Errorf("招行查询券订单状态发生错误,orderNo:%s,err:%v", order.OrderNo, err)
}
}
return nil
})
}
func (v *VoucherBiz) isCanNotice(ctx context.Context) error {
if v.bc.Cmb.NoticeStartDays == 0 {
return errors.New("noticeStartDays eq 0")
return errors.New("订单定时通知,noticeStartDays eq 0")
}
if v.bc.Cmb.NoticeEndDays == 0 {
return errors.New("noticeEndDays eq 0")
return errors.New("订单定时通知,noticeEndDays eq 0")
}
cache := vo.CmbBatchNoticeCacheKey.BuildCache([]string{""})
@ -71,11 +29,11 @@ func (v *VoucherBiz) isCanNotice(ctx context.Context) error {
_, err := v.rdb.Rdb.Get(ctx, cache.Key).Result()
if err == nil {
return fmt.Errorf("notice 获取redis缓存存在已被执行,本台服务不做执行")
return fmt.Errorf("订单定时通知,notice 获取redis缓存存在已被执行,本台服务不做执行")
}
if err != redis.Nil {
return fmt.Errorf(fmt.Sprintf("notice 获取redis缓存%s异常:%v", cache.Key, err))
return fmt.Errorf(fmt.Sprintf("订单定时通知,notice 获取redis缓存%s异常:%v", cache.Key, err))
}
c := vo.CmbBatchNoticeLockKey.BuildCache([]string{""})
@ -87,23 +45,123 @@ func (v *VoucherBiz) isCanNotice(ctx context.Context) error {
cacheValue, err2 := v.rdb.Rdb.Get(ctx, cache.Key).Result()
if err2 != nil && err2 != redis.Nil {
return fmt.Errorf(fmt.Sprintf("notice 二次获取redis缓存%s异常:%v", cache.Key, err2))
return fmt.Errorf(fmt.Sprintf("订单定时通知,notice 二次获取redis缓存%s异常:%v", cache.Key, err2))
}
if len(cacheValue) > 0 {
return fmt.Errorf("notice 二次获取redis缓存存在已被执行,本台服务不做执行")
return fmt.Errorf("订单定时通知,notice 二次获取redis缓存存在已被执行,本台服务不做执行")
}
if err = v.rdb.Rdb.Set(ctx, cache.Key, fmt.Sprintf("%d_%d", v.bc.Cmb.NoticeStartDays, v.bc.Cmb.NoticeEndDays), c.TTL).Err(); err != nil {
return fmt.Errorf(fmt.Sprintf("notice 设置redis缓存%s异常:%v", cache.Key, err))
}
log.Warnf("notice 获取redis缓存,不存在,开始处理")
log.Warnf("订单定时通知,notice 获取redis缓存,不存在,开始处理")
return nil
})
}
func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo) error {
func (v *VoucherBiz) Notice(ctx context.Context) error {
if err := v.isCanNotice(ctx); err != nil {
return err
}
now := time.Now()
// 获取 NoticeStartDays 天前的日期
noticeStartDay := now.AddDate(0, 0, int(-v.bc.Cmb.NoticeStartDays))
// 获取 NoticeStartDays 天前 00:00:00 的时间
startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location())
// 获取 NoticeEndDays 天前的日期
noticeEndDay := now.AddDate(0, 0, int(-v.bc.Cmb.NoticeEndDays))
// 获取 NoticeEndDays 天 23:59:59 的时间
endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location())
return v.timeSliceQuery(ctx, startTime, endTime)
}
func (v *VoucherBiz) timeSliceQuery(ctx context.Context, startTime, endTime time.Time) error {
log.Warnf("订单定时通知,开始处理,按每两个小时分片处理,范围:%s到%s", startTime.Format(time.DateTime), endTime.Format(time.DateTime))
duration := 2 * time.Hour
eg := new(errgroup.Group)
eg.SetLimit(10)
for start := startTime; start.Before(endTime); start = start.Add(duration) {
end := start.Add(duration) // 计算每次请求的结束时间
if end.After(endTime) {
end = endTime
} else {
end = end.Add(-1 * time.Second)
}
req := &bo.FindInBatchesUseBo{
StartTime: &start,
EndTime: &end,
}
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// 继续执行
}
defer func() {
if err := recover(); err != nil {
log.Errorf("订单定时通知,发生错误:req:%+v,err:%v", req, err)
}
}()
return v.ExecuteNotice(ctx, req)
})
}
return eg.Wait() // 仅返回第一个错误
}
func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUseBo) error {
start := time.Now()
num := 0
notifyNum := 0
errNum := 0
err := v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
for _, order := range rows {
num += 1
if err := v.notice(ctx, order, &notifyNum); err != nil {
errNum += 1
log.Error(err)
}
}
return nil
})
logFields := map[string]interface{}{
"searchTime": req.StartTime.Format(time.DateTime) + "到" + req.EndTime.Format(time.DateTime),
"num": num,
"notifyNum": notifyNum,
"elapsed": time.Now().Sub(start).String(),
}
log.Warnf("订单定时通知,%+v", logFields)
return err
}
func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo, notifyNum *int) error {
// 批量通知不做数据存储,量会很大
resp, err := v.WechatCpnRepo.QueryCoupon(ctx, order)
@ -117,8 +175,7 @@ func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo) error {
}
if order.Status == status {
//log.Warnf("notice 券状态未改变:%s忽略不处理,orderNo:%s", order.Status.GetText(), order.OrderNo)
return nil
return nil // 券状态未改变,忽略不处理
}
event, err := status.GetOrderNotifyEvent()
@ -134,20 +191,21 @@ func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo) error {
Type: order.Type,
}
if err = v.cmbNotice(ctx, order, orderNotify); err != nil {
if err = v.cmbNotice(ctx, order, orderNotify, notifyNum); err != nil {
return err
}
return v.UpdateOrderStatus(ctx, order.ID, status)
}
func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo) error {
func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo, notifyNum *int) error {
if !orderNotify.Event.CanNotify() {
//log.Warnf("notice 券状态:%s忽略不通知,orderNo:%s", orderNotify.Event.GetText(), order.OrderNo)
return nil
return nil // 不可通知,忽略
}
*notifyNum += 1
request, err := v.Cmb.NotifyRequest(ctx, order, orderNotify)
if err != nil {
return err
@ -155,11 +213,11 @@ func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNoti
reply, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl)
if err != nil {
return fmt.Errorf("orderNo:%s,outBizNo:%s,%s", order.OrderNo, order.OutBizNo, err.Error())
return fmt.Errorf("订单定时通知,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error())
}
if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() {
return errors.New(reply.RespMsg)
return errors.New("订单定时通知,招行返回错误:" + reply.RespMsg)
}
return nil

View File

@ -4,4 +4,13 @@ type WechatQuery struct {
ProductNo string `json:"product_no"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
OrderNo string `json:"order_no"`
}
type RdsWechatQuery struct {
ProductNo string `json:"product_no"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
GoNum int `json:"go_num"` // 并发数
TimeSliceHours int64 `json:"time_slice_hours"` // 时间片"小时"
}

View File

@ -0,0 +1,14 @@
# <p align="center">券状态同步</p>
* * *
### 主要工作
+ 券状态查询同步
* * *
### 规则说明
+ 按照时间分片处理按照2小时为一个时间片启用一个协程消费处理
+ 协程最大可同时运行指定数量设置为2
* * *
### 使用方式
+ 消费处理,按照时间范围上报消费
+ 每次请求按照时间片分别启用2个协程消费处理也就是每个请求可以同时启用2个协程消费处理
+ 请不要无休止的访问,请按照时间片进行访问,并且不要重复的时间片访问,增加系统负载,特殊发券日期量较大,建议缩短时间范围上报,多切分上报处理
* * *

View File

@ -0,0 +1,84 @@
package timeslicequery
import (
"github.com/nacos-group/nacos-sdk-go/util"
"sync"
"voucher/internal/biz/cmb"
"voucher/internal/biz/mixrepos"
"voucher/internal/biz/repo"
"voucher/internal/biz/wechatrepo"
"voucher/internal/conf"
"voucher/internal/data"
)
type Query struct {
mu sync.RWMutex
queryMap map[string]bool
bc *conf.Bootstrap
rdb *data.Rdb
cmb *cmb.Cmb
productRepo repo.ProductRepo
orderRepo repo.OrderRepo
wechatCpnRepo wechatrepo.WechatCpnRepo
mqSendMixRepo mixrepos.MQSendMixRepo
}
func NewQuery(
bc *conf.Bootstrap,
rdb *data.Rdb,
cmb *cmb.Cmb,
productRepo repo.ProductRepo,
orderRepo repo.OrderRepo,
wechatCpnRepo wechatrepo.WechatCpnRepo,
mqSendMixRepo mixrepos.MQSendMixRepo) *Query {
return &Query{
queryMap: make(map[string]bool),
bc: bc,
rdb: rdb,
cmb: cmb,
productRepo: productRepo,
orderRepo: orderRepo,
wechatCpnRepo: wechatCpnRepo,
mqSendMixRepo: mqSendMixRepo}
}
func (v *Query) uid(no string) string {
return util.Md5("query" + no)
}
func (this *Query) GetAll() map[string]bool {
return this.queryMap
}
func (this *Query) Get(uid string) bool {
this.mu.Lock()
defer this.mu.Unlock()
if _, ok := this.queryMap[uid]; ok {
return ok
}
return false
}
func (this *Query) Add(uid string) {
this.mu.Lock()
defer this.mu.Unlock()
this.queryMap[uid] = true
}
func (this *Query) Remove(uid string) {
this.mu.Lock()
defer this.mu.Unlock()
if _, ok := this.queryMap[uid]; ok {
delete(this.queryMap, uid)
}
}

View File

@ -0,0 +1,88 @@
package timeslicequery
import (
"context"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"time"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
"voucher/internal/pkg/timeslice"
)
func (v *Query) execute(ctx context.Context, req *timeslice.Manager) error {
managerStartStr := req.StartTime.Format(time.DateTime)
managerEndStr := req.EndTime.Format(time.DateTime)
taskCount, err := timeslice.NewManager(v.callbackFunc).Run(ctx, req)
if err != nil {
log.Errorf("微信券查询处理,%s到%s,失败:%v", managerStartStr, managerEndStr, err)
}
fmt.Printf("微信券查询处理,%s到%s,总任务数:%d\n", managerStartStr, managerEndStr, taskCount)
log.Warnf("微信券查询处理,%s到%s,总任务数:%d", managerStartStr, managerEndStr, taskCount)
return nil
}
func (v *Query) callbackFunc(ctx context.Context, req *timeslice.Task) error {
startTimeStr := req.Process.Manager.StartTime.Format(time.DateTime)
endTimeStr := req.Process.Manager.EndTime.Format(time.DateTime)
currentStartTimeStr := req.CurrentStartTime.Format(time.DateTime)
currentEndTimeStr := req.CurrentEndTime.Format(time.DateTime)
start := time.Now()
x := &do.WechatQuery{
StartTime: currentStartTimeStr,
EndTime: currentEndTimeStr,
ProductNo: req.Process.Manager.ProductNo,
}
num := 0
notifyNum := 0
errNum := 0
err := v.orderRepo.FinSucByStockIdInBatches(ctx, x, func(ctx context.Context, rows []*bo.OrderBo) error {
for _, order := range rows {
num += 1
if err := v.wechatQuery(ctx, order, &notifyNum); err != nil {
errNum += 1
logFields := map[string]string{
"order_no": order.OrderNo,
"coupon_id": order.VoucherNo,
"open_id": order.Account,
"stock_id": order.BatchNo,
"err": err.Error(),
}
log.Errorf("微信券查询处理,%s到%s,taskId:%d,错误:%+v", startTimeStr, endTimeStr, req.TaskID, logFields)
if errNum > 20 {
return fmt.Errorf("微信券查询处理,%s到%s,第%d个任务,已经连续发生20次错误%+v", startTimeStr, endTimeStr, req.TaskID, logFields)
}
}
}
return nil
})
end := time.Now()
logFields := map[string]interface{}{
"searchTime": currentStartTimeStr + "到" + currentEndTimeStr,
"num": num,
"notifyNum": notifyNum,
"elapsed": end.Sub(start).String(),
}
log.Warnf("微信券查询处理,%s到%s,taskId:%d,处理完毕:%+v", startTimeStr, endTimeStr, req.TaskID, logFields)
return err
}

View File

@ -0,0 +1,130 @@
package timeslicequery
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"time"
"voucher/internal/biz/do"
"voucher/internal/pkg/timeslice"
)
func (v *Query) Push(ctx http.Context, req *do.RdsWechatQuery) (string, error) {
if req.StartTime == "" || req.EndTime == "" {
return "", fmt.Errorf("时间参数不能为空")
}
queue := v.bc.RdsMQ.GetWechatTimeSliceQuery()
if queue == nil {
return "", fmt.Errorf("队列不存在")
}
if queue.Name == "" {
return "", fmt.Errorf("队列不存在")
}
if queue.IsOpen == false {
return "", fmt.Errorf("队列未开启")
}
if req.ProductNo != "" {
_, err := v.productRepo.GetByProductNo(ctx, req.ProductNo)
if err != nil {
return "", err
}
}
b, err := json.Marshal(req)
if err != nil {
return "", err
}
strMsg := string(b)
uid := v.uid(strMsg)
if v.Get(uid) {
return "", fmt.Errorf("此台服务队列正在处理中,%s-%s,ip:%s", uid, strMsg, ctx.Header().Get("X-Forwarded-For"))
}
v.Add(uid)
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
if err != nil {
v.Remove(uid)
return "", fmt.Errorf("添加到队列失败:%v", err)
}
return strMsg, nil
}
func (v *Query) getManager(msg string) (*timeslice.Manager, error) {
var req *do.RdsWechatQuery
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return nil, err
}
if req.StartTime == "" || req.EndTime == "" {
return nil, fmt.Errorf("时间参数不能为空")
}
start, err := time.Parse(time.DateTime, req.StartTime)
if err != nil {
return nil, err
}
end, err := time.Parse(time.DateTime, req.EndTime)
if err != nil {
return nil, err
}
manager := &timeslice.Manager{
StartTime: start,
EndTime: end,
ProductNo: req.ProductNo,
GoNum: timeslice.DefaultGoNum, // 协程数量
TimeSliceHours: timeslice.DefaultTimeSliceHours, // 时间间隔
}
if req.GoNum > 0 {
manager.GoNum = req.GoNum
}
if req.TimeSliceHours > 0 {
manager.TimeSliceHours = req.TimeSliceHours
}
return manager, nil
}
func (v *Query) Consumer(ctx context.Context, msg string) error {
defer v.Remove(v.uid(msg))
req, err := v.getManager(msg)
if err != nil {
return err
}
reqStr := req.String()
executeStart := time.Now()
executeStartStr := executeStart.Format(time.DateTime)
log.Warnf("微信券查询处理,开始:%s,msg:%s,manager%s", executeStartStr, msg, reqStr)
fmt.Printf("微信券查询处理,开始:%s,msg:%s,manager%s", executeStartStr, msg, reqStr)
if err = v.execute(ctx, req); err != nil {
log.Errorf("微信券查询处理,失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
return fmt.Errorf("微信券查询处理,失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
}
executeEnd := time.Now()
log.Warnf("微信券查询处理,耗时:%s,结束时间%s,manager%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr)
fmt.Printf("微信券查询处理,耗时:%s,结束时间%s,manager%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr)
return nil
}

View File

@ -0,0 +1,7 @@
package timeslicequery
import (
"github.com/google/wire"
)
var ProviderSetTimeSliceQuery = wire.NewSet(NewQuery)

View File

@ -0,0 +1,66 @@
package timeslicequery
import (
"context"
"github.com/go-kratos/kratos/v2/log"
"voucher/internal/biz/bo"
)
func (v *Query) wechatQuery(ctx context.Context, order *bo.OrderBo, notifyNum *int) error {
status, err := v.wechatCpnRepo.Query(ctx, order)
if err != nil {
return err
}
if status.IsUse() {
return v.queryUsed(ctx, order, notifyNum)
} else if status.IsExpired() {
return v.queryExpired(ctx, order)
}
return nil
}
func (v *Query) queryUsed(ctx context.Context, order *bo.OrderBo, notifyNum *int) error {
*notifyNum += 1
if order.Status.IsUse() {
return v.notify(ctx, order)
}
if err := v.orderRepo.Used(ctx, order.ID); err != nil {
return err
}
return v.notify(ctx, order)
}
func (v *Query) queryExpired(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsExpired() {
log.Warnf("券状态已是已过期,忽略不处理,orderNo:%s", order.OrderNo)
return nil
}
if err := v.orderRepo.Expired(ctx, order.ID); err != nil {
return err
}
return nil // 过期不做通知
}
func (v *Query) notify(ctx context.Context, order *bo.OrderBo) error {
order, err := v.orderRepo.GetByID(ctx, order.ID)
if err != nil {
return err
}
if _, err = v.cmb.Notify(ctx, order); err != nil {
return err
}
return nil
}

View File

@ -40,7 +40,7 @@ func (s OrderNotifyEvent) IsExpired() bool {
}
func (s OrderNotifyEvent) CanNotify() bool {
return s.IsSendDEd() || s.IsUsed() || s.IsExpired()
return s.IsSendDEd() || s.IsUsed()
}
var OrderNotifyEventMapCmbStatus = map[OrderNotifyEvent]CmbStatus{

View File

@ -70,11 +70,11 @@ func NewVoucherBiz(
}
}
func (this *VoucherBiz) Get(stockNo string) bool {
func (this *VoucherBiz) Get(uid string) bool {
this.mu.Lock()
defer this.mu.Unlock()
if _, ok := this.queryMap[stockNo]; ok {
if _, ok := this.queryMap[uid]; ok {
return ok
}

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"gorm.io/gorm"
"time"
errPb "voucher/api/err"
@ -39,7 +38,6 @@ func (v *VoucherBiz) WechatNotifyConsumer(ctx context.Context, tag string, req *
}
return fmt.Errorf("未知通知类型:%s", req.PlainText.Status.GetText())
})
}
@ -120,7 +118,7 @@ func (this *VoucherBiz) getOrder(ctx context.Context, req *bo.WechatVoucherNotif
func (v *VoucherBiz) notifyUsed(ctx context.Context, order *bo.OrderBo, req *bo.WechatVoucherNotifyBo) error {
if order.Status.IsUse() {
log.Warnf("券状态已是可使用,忽略不处理,orderNo:%s", order.OrderNo)
// 券状态已是已使用,忽略不处理
return nil
}
@ -144,7 +142,7 @@ func (v *VoucherBiz) notifyUsed(ctx context.Context, order *bo.OrderBo, req *bo.
func (v *VoucherBiz) available(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsSuccess() {
log.Warnf("券状态已是可使用,忽略不处理,orderNo:%s", order.OrderNo)
// 券状态已是可使用,忽略不处理
return nil
}
@ -158,7 +156,7 @@ func (v *VoucherBiz) available(ctx context.Context, order *bo.OrderBo) error {
func (v *VoucherBiz) expired(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsExpired() {
log.Warnf("券状态已是已过期,忽略不处理,orderNo:%s", order.OrderNo)
// 券状态已是已过期,忽略不处理
return nil
}
@ -172,12 +170,7 @@ func (v *VoucherBiz) expired(ctx context.Context, order *bo.OrderBo) error {
func (v *VoucherBiz) notify(ctx context.Context, order *bo.OrderBo) error {
if order.Type.IsCmb() {
return v.cmbNotify(ctx, order.ID)
}
return fmt.Errorf("未知渠道订单类型:%s", order.Type.GetText())
return v.cmbNotify(ctx, order.ID)
}
func (v *VoucherBiz) cmbNotify(ctx context.Context, orderId uint64) error {

View File

@ -6,16 +6,17 @@ import (
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"github.com/nacos-group/nacos-sdk-go/util"
"time"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
)
func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error {
func (v *VoucherBiz) uid(_ context.Context, msg string) string {
return util.Md5(msg)
}
if v.Get("CMB_WECHAT_QUERY") {
return fmt.Errorf("此台服务队列正在处理中,ip:%s", ctx.Header().Get("X-Forwarded-For"))
}
func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error {
if req.ProductNo != "" {
_, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo)
@ -34,11 +35,18 @@ func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) erro
return err
}
v.Add("CMB_WECHAT_QUERY")
strMsg := string(msg)
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result()
uid := v.uid(ctx, strMsg)
if v.Get(uid) {
return fmt.Errorf("此台服务队列正在处理中,key:%s,ip:%s", uid, ctx.Header().Get("X-Forwarded-For"))
}
v.Add(uid)
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
if err != nil {
v.Remove("CMB_WECHAT_QUERY")
v.Remove(uid)
return fmt.Errorf("添加到队列失败:%v", err)
}
@ -47,7 +55,7 @@ func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) erro
func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
defer v.Remove("CMB_WECHAT_QUERY")
defer v.Remove(v.uid(ctx, msg))
var req *do.WechatQuery
@ -63,31 +71,34 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
n := 0
num := 0
notifyNum := 0
err := v.OrderRepo.FinSucByStockIdInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
n += 1
for _, order := range rows {
num += 1
if err := v.wechatQuery(ctx, order); err != nil {
log.Errorf("微信查询券订单状态发生错误,msg:%s,orderNo:%s,couponId:%s,appId:%s,openId:%s,err:%v",
msg, order.OrderNo, order.VoucherNo, order.AppID, order.Account, err)
if err := v.wechatQuery(ctx, order, &notifyNum); err != nil {
log.Errorf("微信查询券订单状态发生错误,msg:%s,orderNo:%s,couponId:%s,appId:%s,openId:%s,stockId:%s,err:%v",
msg, order.OrderNo, order.VoucherNo, order.AppID, order.Account, order.BatchNo, err)
}
}
log.Warnf("微信券查询处理第:%d组,已执行条数:%d,执行开始时间:%s已耗时:%s", n, num, startStr, time.Now().Sub(start).String())
groupTime := time.Now()
log.Warnf("微信券查询处理第:%d组,已执行条数:%d,核销通知条数:%d,执行开始时间:%s当前时间:%s,已耗时:%s", n, num, notifyNum, startStr, groupTime.String(), groupTime.Sub(start).String())
return nil
})
log.Warnf("微信券查询处理耗时:%s,处理%d组,处理%d单,msg:%s", time.Now().Sub(start).String(), n, num, msg)
fmt.Printf("微信券查询处理耗时:%s,处理%d组,处理%d单,msg:%s", time.Now().Sub(start).String(), n, num, msg)
endTime := time.Now()
log.Warnf("微信券查询处理耗时:%s,结束时间:%s,处理%d组,处理%d单,核销通知条数:%d,msg:%s", endTime.Sub(start).String(), endTime.String(), n, num, notifyNum, msg)
fmt.Printf("微信券查询处理耗时:%s,结束时间%s,处理%d组,处理%d单,核销通知条数:%d,msg:%s", endTime.Sub(start).String(), endTime.String(), n, num, notifyNum, msg)
return err
}
func (v *VoucherBiz) wechatQuery(ctx context.Context, order *bo.OrderBo) error {
func (v *VoucherBiz) wechatQuery(ctx context.Context, order *bo.OrderBo, notifyNum *int) error {
status, err := v.WechatCpnRepo.Query(ctx, order)
if err != nil {
@ -95,7 +106,7 @@ func (v *VoucherBiz) wechatQuery(ctx context.Context, order *bo.OrderBo) error {
}
if status.IsUse() {
return v.queryUsed(ctx, order)
return v.queryUsed(ctx, order, notifyNum)
} else if status.IsExpired() {
return v.expired(ctx, order)
}
@ -103,11 +114,12 @@ func (v *VoucherBiz) wechatQuery(ctx context.Context, order *bo.OrderBo) error {
return nil
}
func (v *VoucherBiz) queryUsed(ctx context.Context, order *bo.OrderBo) error {
func (v *VoucherBiz) queryUsed(ctx context.Context, order *bo.OrderBo, notifyNum *int) error {
*notifyNum += 1
if order.Status.IsUse() {
log.Warnf("券状态已是已使用,忽略不处理,orderNo:%s", order.OrderNo)
return nil
return v.notify(ctx, order)
}
if err := v.OrderRepo.Used(ctx, order.ID); err != nil {

View File

@ -848,8 +848,9 @@ type RdsMQ struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"`
WechatRetry *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"`
WechatQuery *RdsMQ_Queue `protobuf:"bytes,1,opt,name=wechatQuery,proto3" json:"wechatQuery,omitempty"`
WechatTimeSliceQuery *RdsMQ_Queue `protobuf:"bytes,2,opt,name=wechatTimeSliceQuery,proto3" json:"wechatTimeSliceQuery,omitempty"`
WechatRetry *RdsMQ_Queue `protobuf:"bytes,3,opt,name=wechatRetry,proto3" json:"wechatRetry,omitempty"`
}
func (x *RdsMQ) Reset() {

View File

@ -129,7 +129,8 @@ message RdsMQ {
google.protobuf.Duration waitTime = 5;
}
Queue wechatQuery = 1;
Queue wechatRetry = 2;
Queue wechatTimeSliceQuery = 2;
Queue wechatRetry = 3;
}
message Logs {

View File

@ -39,11 +39,14 @@ func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.We
tx = tx.Where("product_no = ?", req.ProductNo)
}
if req.StartTime != "" {
tx = tx.Where("receive_success_time >= ?", req.StartTime)
tx = tx.Where("receive_success_time > ?", req.StartTime)
}
if req.EndTime != "" {
tx = tx.Where("receive_success_time <= ?", req.EndTime)
}
if req.OrderNo != "" {
tx = tx.Where("order_no = ?", req.OrderNo)
}
var results = make([]*model.Order, 0)
@ -66,7 +69,6 @@ func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo s
result := p.DB(ctx).
Where("batch_no = ?", batchNo).
Where("status = ?", vo.OrderStatusFail.GetValue()).
//Where("remark <> ?", "error: code = 500 reason = WechatFAIL message = 微信返回错误:该用户账号异常,无法领券。商家可联系微信支付或让用户联系微信支付客服处理。 metadat").
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
return fun(ctx, p.ToBos(results))
})
@ -96,13 +98,13 @@ func (p *OrderRepoImpl) FindIngInBatches(ctx context.Context, fun func(ctx conte
return nil
}
func (p *OrderRepoImpl) FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
func (p *OrderRepoImpl) FindInBatches(ctx context.Context, req *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
var results = make([]*model.Order, 0)
result := p.DB(ctx).
Where("status IN (?)", []uint8{vo.OrderStatusSuccess.GetValue(), vo.OrderStatusUse.GetValue()}).
Where("receive_success_time BETWEEN ? AND ?", w.StartTime, w.EndTime).
Where("receive_success_time BETWEEN ? AND ?", req.StartTime, req.EndTime).
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
// tx.RowsAffected 提供当前批处理中记录的计数the count of records in the current batch
// 'batch' 变量表示当前批号the current batch number
@ -275,6 +277,7 @@ func (p *OrderRepoImpl) Success(ctx context.Context, id uint64, voucherNo string
Updates(model.Order{
Status: vo.OrderStatusSuccess.GetValue(),
VoucherNo: voucherNo,
Remark: "成功",
ReceiveSuccessTime: &now,
UpdateTime: &now,
})
@ -296,6 +299,7 @@ func (p *OrderRepoImpl) Available(ctx context.Context, id uint64) error {
}).
Updates(model.Order{
Status: vo.OrderStatusSuccess.GetValue(),
Remark: "重置为成功,领取成功时间重置",
ReceiveSuccessTime: &now, // 领取成功时间重置
UpdateTime: &now,
})
@ -343,6 +347,7 @@ func (p *OrderRepoImpl) Used(ctx context.Context, id uint64) error {
}).
Updates(model.Order{
Status: vo.OrderStatusUse.GetValue(),
Remark: "核销",
LastUseTime: &now,
UpdateTime: &now,
})
@ -364,6 +369,7 @@ func (p *OrderRepoImpl) NotifyUsed(ctx context.Context, id uint64, transactionId
Updates(model.Order{
Status: vo.OrderStatusUse.GetValue(),
TransactionId: transactionId,
Remark: "微信回调核销",
LastUseTime: &now,
UpdateTime: &now,
})
@ -384,6 +390,7 @@ func (p *OrderRepoImpl) Expired(ctx context.Context, id uint64) error {
}).
Updates(model.Order{
Status: vo.OrderStatusExpired.GetValue(),
Remark: "过期",
UpdateTime: &now,
})

View File

@ -149,6 +149,34 @@ func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderBo) (vo.Or
return CpnStatus(*resp.Status).GetStatus()
}
func (c *CpnRepoImpl) QueryCoupon(ctx context.Context, orderWechat *bo.OrderBo) (*cashcoupons.Coupon, error) {
req := cashcoupons.QueryCouponRequest{
CouponId: core.String(orderWechat.VoucherNo),
Appid: core.String(orderWechat.AppID),
Openid: core.String(orderWechat.Account),
}
client, err := c.GetClient(ctx)
if err != nil {
return nil, err
}
svc := cashcoupons.CouponApiService{Client: client}
resp, result, err := svc.QueryCoupon(ctx, req)
if err != nil {
if result.Response != nil && result.Response.Body != nil {
return nil, c.bodyErr(ctx, result)
}
return nil, err
}
return resp, nil
}
func (c *CpnRepoImpl) QueryProduct(ctx context.Context, stockCreatorMchId, stockId string) (*cashcoupons.Stock, error) {
if stockCreatorMchId == "" || stockId == "" {

View File

@ -1,6 +1,8 @@
package helper
import (
"crypto/md5"
"encoding/hex"
"hash/fnv"
"math"
"os"
@ -18,3 +20,14 @@ func HashMod(hashStr string) int {
hashValue := hash.Sum32()
return int(math.Mod(float64(hashValue), 32))
}
func Md5(str string) string {
// 创建一个 MD5 哈希对象
hash := md5.New()
// 写入待加密的数据
hash.Write([]byte(str))
// 获取 MD5 哈希值
hashBytes := hash.Sum(nil)
// 将 MD5 哈希值转换为16进制字符串
return hex.EncodeToString(hashBytes)
}

View File

@ -25,3 +25,24 @@ func TestNoticeTime(t *testing.T) {
t.Logf("startTime:%s,endTime:%s", startTime, endTime)
}
func TestNum(t *testing.T) {
useNum := 0
used(&useNum)
t.Log(useNum)
}
func used(useNum *int) {
queryUsed(useNum)
queryUsed(useNum)
*useNum += 1
}
func queryUsed(useNum *int) {
*useNum += 1
}
func TestMd5(t *testing.T) {
s := Md5(`{"product_no":"","start_time":"2025-04-20 09:00:00","end_time":"2025-05-01 00:00:00"}`)
t.Log(s)
}

View File

@ -89,11 +89,13 @@ func (r *ConsumeConfig) Start(ctx context.Context) {
}
func (r *ConsumeConfig) consumer(ctx context.Context, queueName string, value string) {
if err := r.Fn(ctx, value); err == nil {
r.Logger.Errorf("BLPop on %s Failed to process message [%s] after retry %d times, err[%v]\n", queueName, value, r.RetryNum, err)
err := r.Fn(ctx, value)
if err == nil {
return
}
r.Logger.Errorf("BLPop on %s Failed to process message [%s] after retry %d times, err[%v]\n", queueName, value, r.RetryNum, err)
if r.RetryNum > 0 {
r.retry(ctx, queueName, value)
}

View File

@ -0,0 +1,119 @@
package script
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
const (
URL = "http://127.0.0.1:15000/voucher/timeSliceQueryPush"
DEV_URL = "http://open.cszfan.com/voucher/timeSliceQueryPush"
PRO_URL = "https://voucher.86698.cn/voucher/timeSliceQueryPush"
)
const (
SINGLE_URL = "http://127.0.0.1:15000//voucher/pushWechatQuery"
DEV_SINGLE_URL = "http://open.cszfan.com//voucher/pushWechatQuery"
PRO_SINGLE_URL = "https://voucher.86698.cn//voucher/pushWechatQuery"
)
func timeSliceQueryPush(startTime, endTime time.Time, duration time.Duration, requestURL string) error {
// 每指定间隔时间发送一次请求
for t := startTime; t.Before(endTime); t = t.Add(duration) {
end := t.Add(duration) // 计算每次请求的结束时间
if end.After(endTime) {
end = endTime
}
// 创建请求体
requestBody := map[string]any{
"go_num": 2, // 并发数量
"time_slice_hours": 1, // 时间间隔
"product_no": "",
"start_time": t.Format(time.DateTime),
"end_time": end.Format(time.DateTime),
}
// 将请求体转换为 JSON 格式
bodyBytes, err := json.Marshal(requestBody)
if err != nil {
return err
}
fmt.Printf("body:%s\n", string(bodyBytes))
// 发送请求
if err2 := sendRequest(bodyBytes, requestURL); err2 != nil {
fmt.Printf("Error sending request: %v\n", err2)
}
// 等待一段时间后再发送下一个请求
time.Sleep(1 * time.Second) // 可以根据需要调整间隔时间
}
return nil
}
func pushWechatQuery(startTime, endTime time.Time, duration time.Duration, requestURL string) error {
// 每指定间隔时间发送一次请求
for t := startTime; t.Before(endTime); t = t.Add(duration) {
end := t.Add(duration) // 计算每次请求的结束时间
if end.After(endTime) {
end = endTime
}
// 创建请求体
requestBody := map[string]any{
"order_no": "",
"product_no": "",
"start_time": t.Format(time.DateTime),
"end_time": end.Format(time.DateTime),
}
// 将请求体转换为 JSON 格式
bodyBytes, err := json.Marshal(requestBody)
if err != nil {
return err
}
fmt.Printf("requestBody:%s\n", string(bodyBytes))
// 发送请求
if err2 := sendRequest(bodyBytes, requestURL); err2 != nil {
fmt.Printf("Error sending request: %v\n", err2)
}
// 等待一段时间后再发送下一个请求
time.Sleep(1 * time.Second) // 可以根据需要调整间隔时间
}
return nil
}
func sendRequest(body []byte, requestURL string) error {
resp, err := http.Post(requestURL, "application/json", bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("failed to send POST request: %v", err)
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("读取响应体失败: %w", err)
}
if resp.StatusCode == http.StatusOK {
fmt.Printf("responsBody:%s", string(bodyBytes))
}
return fmt.Errorf("failed with status code: %d", resp.StatusCode)
}

View File

@ -0,0 +1,157 @@
package script
import (
"encoding/json"
"fmt"
"golang.org/x/sync/errgroup"
"testing"
"time"
"voucher/internal/biz/bo"
)
func Test_timeSliceQueryPush(t *testing.T) {
startTime, err := time.Parse(time.DateTime, "2025-05-01 00:00:00")
if err != nil {
t.Error(err)
return
}
endTime, err := time.Parse(time.DateTime, "2025-05-01 02:00:03")
if err != nil {
t.Error(err)
return
}
duration := 5 * time.Hour
//requestUrl := URL
requestUrl := DEV_URL
if err = timeSliceQueryPush(startTime, endTime, duration, requestUrl); err != nil {
t.Error(err)
}
}
func Test_pushWechatQuery(t *testing.T) {
startTime, err := time.Parse(time.DateTime, "2025-05-31 00:00:00")
if err != nil {
t.Error(err)
return
}
endTime, err := time.Parse(time.DateTime, "2025-05-01 10:00:00")
if err != nil {
t.Error(err)
return
}
duration := 1 * time.Hour
requestUrl := SINGLE_URL
if err = pushWechatQuery(startTime, endTime, duration, requestUrl); err != nil {
t.Error(err)
}
}
func Test_moreTime(t *testing.T) {
startTime, err := time.Parse(time.DateTime, "2025-05-31 00:00:00")
if err != nil {
t.Error(err)
return
}
endTime, err := time.Parse(time.DateTime, "2025-05-31 10:00:00")
if err != nil {
t.Error(err)
return
}
//duration := 240 * time.Hour
duration := 1 * time.Hour
for start := startTime; start.Before(endTime); start = start.Add(duration) {
end := start.Add(duration) // 计算每次请求的结束时间
if end.After(endTime) {
end = endTime
}
// 创建请求体
requestBody := map[string]any{
"start_time": start.Format(time.DateTime),
"end_time": end.Format(time.DateTime),
"go_num": 2, // 并发数量
"time_slice_hours": 1, // 时间间隔
"product_no": "",
}
// 将请求体转换为 JSON 格式
bodyBytes, err2 := json.Marshal(requestBody)
if err2 != nil {
t.Error(err)
return
}
fmt.Printf("body:%s\n", string(bodyBytes))
}
}
func Test_goMoreTime(t *testing.T) {
startTime, err := time.Parse(time.DateTime, "2025-05-01 00:00:00")
if err != nil {
t.Error(err)
return
}
//endTime, err := time.Parse(time.DateTime, "2025-05-31 23:59:59")
endTime, err := time.Parse(time.DateTime, "2025-05-01 23:59:59")
if err != nil {
t.Error(err)
return
}
duration := 1 * time.Hour
eg := new(errgroup.Group)
eg.SetLimit(5)
for start := startTime; start.Before(endTime); start = start.Add(duration) {
end := start.Add(duration) // 计算每次请求的结束时间
if end.After(endTime) {
end = endTime
}
req := &bo.FindInBatchesUseBo{
StartTime: &start,
EndTime: &end,
}
// 将请求体转换为 JSON 格式
reqStr, err2 := json.Marshal(req)
if err2 != nil {
t.Error(err)
return
}
eg.Go(func() error {
// 任务逻辑...
time.Sleep(2 * time.Second)
return fmt.Errorf("任务失败")
})
fmt.Printf("%s\n", string(reqStr))
}
err = eg.Wait() // 仅返回第一个错误
if err != nil {
fmt.Println(err)
} else {
fmt.Println("所有任务完成")
}
}

View File

@ -0,0 +1,133 @@
package timeslice
import (
"context"
"fmt"
"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"
"sync"
"time"
)
const (
DefaultGoNum = 2
DefaultTimeSliceHours = 2
maxGlobalGoroutines = 1000
)
type Callback func(ctx context.Context, req *Task) error
type ManagerSrv struct {
callback Callback
}
func NewManager(callback Callback) *ManagerSrv {
return &ManagerSrv{callback: callback}
}
func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) {
if req.StartTime.After(req.EndTime) {
return 0, fmt.Errorf("start_time不能大于end_time")
}
if req.GoNum == 0 {
return 0, fmt.Errorf("协程数量不能为0")
}
if req.GoNum > maxGlobalGoroutines {
return 0, fmt.Errorf("协程数量不能大于%d", maxGlobalGoroutines)
}
timeSliceHours := float64(req.TimeSliceHours)
totalHours := req.EndTime.Sub(req.StartTime).Hours()
taskCount := int(totalHours / timeSliceHours)
// 如果剩余时间不足 TimeSliceHours 小时,增加任务数
if totalHours-float64(taskCount)*timeSliceHours > 0 {
taskCount++
}
processReq := &Process{
Manager: req,
TaskCount: taskCount,
}
return taskCount, m.process(ctx, processReq)
}
func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
if req.TaskCount == 0 {
return fmt.Errorf("该时间范围无可执行任务次数,请检查时间范围")
}
// 设置最大并发任务数为 5
eg := new(errgroup.Group)
eg.SetLimit(req.Manager.GoNum)
var mu sync.Mutex
errs := make([]error, 0, req.TaskCount)
// 为每个任务按指定的时间片 TimeSliceHours 分配开始和结束时间
for i := 0; i < req.TaskCount; i++ {
currentStart := req.Manager.StartTime.Add(time.Duration(i) * time.Duration(req.Manager.TimeSliceHours) * time.Hour)
currentEnd := currentStart.Add(time.Duration(req.Manager.TimeSliceHours) * time.Hour)
if currentEnd.After(req.Manager.EndTime) {
currentEnd = req.Manager.EndTime
}
taskID := i + 1
eg.Go(func() error {
select {
case <-ctx.Done():
mu.Lock()
errs = append(errs, fmt.Errorf("任务 %d 被上下文取消", taskID))
mu.Unlock()
return ctx.Err()
default:
// 继续执行
}
defer func() {
if err := recover(); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("任务 %d panic: %v", taskID, err))
mu.Unlock()
}
}()
taskReq := &Task{
CurrentStartTime: currentStart,
CurrentEndTime: currentEnd,
TaskID: taskID,
Process: req,
}
if err := m.callback(ctx, taskReq); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("任务 %d 执行失败: %v", taskID, err))
mu.Unlock()
}
return nil
})
}
// 等待所有任务完成
if err := eg.Wait(); err != nil {
return fmt.Errorf("任务执行失败: %v", err)
}
var result error
// 收集错误
for _, err2 := range errs {
result = multierror.Append(result, err2)
}
return result
}

View File

@ -0,0 +1,231 @@
package timeslice
import (
"context"
"errors"
"fmt"
"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"
"math/rand"
"sync"
"testing"
"time"
)
func ProcessTasks() error {
eg := new(errgroup.Group)
eg.SetLimit(5)
for i := 0; i < 5; i++ {
eg.Go(func() error {
// 任务逻辑...
time.Sleep(100 * time.Millisecond)
return fmt.Errorf("任务失败")
})
}
return eg.Wait() // 仅返回第一个错误
}
func TestNewManager(t *testing.T) {
// 解析起始时间和结束时间
start, err := time.Parse(time.DateTime, "2023-01-01 00:00:00")
if err != nil {
t.Fatalf("查询失败: %v", err)
return
}
//end, err := time.Parse(time.DateTime, "2023-01-31 02:00:01")
end, err := time.Parse(time.DateTime, "2023-01-02 02:00:01")
if err != nil {
t.Fatalf("查询失败: %v", err)
return
}
var results []string
callback := func(ctx context.Context, req *Task) error {
// 模拟任务执行,休眠随机时间
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
// 生成任务执行结果
result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.Process.Manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
results = append(results, result)
//return nil
return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.Process.Manager.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
}
startTime := time.Now()
startStr := time.Now().String()
srv := NewManager(callback)
taskCount, err := srv.Run(context.Background(), &Manager{
StartTime: start,
EndTime: end,
ProductNo: "no123456",
GoNum: 2,
TimeSliceHours: 2,
})
// 输出结果
fmt.Printf("总任务数:%d\n", taskCount)
for _, result := range results {
fmt.Printf("%v\n", result)
}
if err != nil {
multiErr, ok := err.(*multierror.Error)
if ok {
for i, err := range multiErr.Errors {
fmt.Printf("错误 %d: %v\n", i+1, err)
}
} else {
// 不是多错误,单独处理
t.Error("单个错误", err)
}
}
endTime := time.Now()
fmt.Printf("处理耗时:%s,开始处理时间:%s,结束时间%s\n", startStr, endTime.Sub(startTime).String(), endTime.String())
}
func TestBatchCallBackFunc(t *testing.T) {
// 初始化开始时间和结束时间
start, err := time.Parse(time.DateTime, "2025-01-01 01:00:00")
if err != nil {
t.Fatalf("%v", err)
return
}
end, err := time.Parse(time.DateTime, "2025-01-01 05:00:01")
if err != nil {
t.Fatalf("%v", err)
return
}
// 确保开始时间小于结束时间
if start.After(end) {
t.Fatalf("start_time不能大于end_time")
return
}
var wg sync.WaitGroup
// 按照每天的时间单位分片
duration := 24 * time.Hour // 每个时间段为一天
// 循环处理时间段
for current := start; current.Before(end); current = current.Add(duration) {
// 计算当前时间段的结束时间
next := current.Add(duration)
// 如果下一个结束时间超过了实际的结束时间,则调整为实际结束时间
if next.After(end) {
next = end
}
t.Logf("\n处理时间: %s到%s", current.Format(time.DateTime), next.Format(time.DateTime))
wg.Add(1)
// 启动goroutine处理每个时间段
go func(startTime, endTime time.Time) {
defer func() {
wg.Done()
if err2 := recover(); err2 != nil {
t.Error("panic", err2)
}
}()
if err3 := CallbackFunc(startTime, endTime); err3 != nil {
t.Errorf("任务执行失败:%v\n", err3)
}
}(current, next)
}
// 等待所有的 goroutine 完成
wg.Wait()
}
func CallbackFunc(start, end time.Time) error {
nowTime := time.Now()
managerStartStr := start.Format(time.DateTime)
managerEndStr := end.Format(time.DateTime)
req := &Manager{
StartTime: start,
EndTime: end,
ProductNo: "no123456",
GoNum: 2,
TimeSliceHours: 1,
}
taskCount, err := NewManager(callbackFunc).Run(context.Background(), req)
if err != nil {
var multiErr *multierror.Error
if errors.As(err, &multiErr) {
for i, err := range multiErr.Errors {
fmt.Printf("%s到%s,错误 %d: %v\n", managerStartStr, managerEndStr, i+1, err)
}
} else {
// 不是多错误,单独处理
fmt.Printf("单个错误%v", err)
}
}
fmt.Printf("%s到%s,总任务数:%d\n", managerStartStr, managerEndStr, taskCount)
endTime := time.Now()
fmt.Printf("%s到%s,处理耗时:%s,开始处理时间:%s,结束时间%s\n", managerStartStr, managerEndStr, endTime.Sub(nowTime).String(), nowTime.Format(time.DateTime), endTime.Format(time.DateTime))
return nil
}
func callbackFunc(_ context.Context, req *Task) error {
managerStartTimeStr := req.Process.Manager.StartTime.Format(time.DateTime)
managerEndTimeStr := req.Process.Manager.EndTime.Format(time.DateTime)
currentStartTimeStr := req.CurrentStartTime.Format(time.DateTime)
currentEndTimeStr := req.CurrentEndTime.Format(time.DateTime)
start := time.Now()
startStr := start.Format(time.DateTime)
groupNum := 0
allNum := 0
notifyNum := 0
for i := 0; i < 3; i++ {
groupStartTime := time.Now()
// 模拟任务执行,休眠随机时间
time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
groupNum += 1
allNum = allNum + (i+1)*100
logFields := map[string]interface{}{
"duration": time.Now().Sub(groupStartTime).String(),
"searchTime": currentStartTimeStr + "到" + currentEndTimeStr,
}
fmt.Printf("%s到%s,第%d个任务第%d组,处理完毕, %+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, i+1, logFields)
}
end := time.Now()
logFields := map[string]interface{}{
"groupNum": groupNum,
"allNum": allNum,
"startTime": startStr,
"endTime": end.Format(time.DateTime),
"searchTime": currentStartTimeStr + "到" + currentEndTimeStr,
"notifyNum": notifyNum,
"duration": end.Sub(start).String(),
}
// 生成任务执行结果
result := fmt.Sprintf("%s到%s第%d个任务,处理完毕,%+v\n", managerStartTimeStr, managerEndTimeStr, req.TaskID, logFields)
fmt.Printf(result)
//return fmt.Errorf(result)
return nil
}

View File

@ -0,0 +1,41 @@
package timeslice
import (
"encoding/json"
"time"
)
type Manager struct {
StartTime time.Time // 开始时间
EndTime time.Time // 结束时间
ProductNo string // 产品编号
GoNum int // 并发数
TimeSliceHours int64 // 时间片"小时"
}
func (m *Manager) String() string {
b, _ := json.Marshal(m)
return string(b)
}
type Process struct {
Manager *Manager
TaskCount int // 任务数
}
func (m *Process) String() string {
b, _ := json.Marshal(m)
return string(b)
}
type Task struct {
Process *Process
CurrentStartTime time.Time // 时间片开始时间
CurrentEndTime time.Time // 时间片结束时间
TaskID int // 任务ID
}
func (m *Task) String() string {
b, _ := json.Marshal(m)
return string(b)
}

View File

@ -40,6 +40,7 @@ func NewHTTPServer(
srv.Route("/voucher/").POST("queryOrder/{order_no}", cmb.QueryOrder)
srv.Route("/voucher/").POST("registerTag/{product_no}", cmb.RegisterTag)
srv.Route("/voucher/").POST("pushWechatQuery", cmb.PushWechatQuery)
srv.Route("/voucher/").POST("timeSliceQueryPush", cmb.TimeSliceQueryPush)
srv.Route("/voucher/").POST("pushWechatRetry/{product_no}", cmb.PushWechatRetry)
v1.RegisterCmbHTTPServer(srv, cmb)

View File

@ -30,6 +30,10 @@ func NewRdbConsumer(
manager.Add(cf)
}
if cf1 := voucherService.GetWechatTimeSliceQueryConfig(); cf1 != nil {
manager.Add(cf1)
}
if cf2 := voucherService.GetWechatRetryConfig(); cf2 != nil {
manager.Add(cf2)
}

View File

@ -2,19 +2,16 @@ package service
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"github.com/robfig/cron"
"io"
http2 "net/http"
"strconv"
v1 "voucher/api/v1"
"voucher/internal/biz"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
"voucher/internal/biz/mixrepos"
"voucher/internal/biz/timeslicequery"
"voucher/internal/biz/vo"
"voucher/internal/biz/wechatrepo"
"voucher/internal/conf"
@ -23,11 +20,12 @@ import (
var _ v1.CmbHTTPServer = (*CmbService)(nil)
type CmbService struct {
bc *conf.Bootstrap
cron *cron.Cron
VoucherBiz *biz.VoucherBiz
CmbMixRepo mixrepos.CmbMixRepo
WechatCpnRepo wechatrepo.WechatCpnRepo
bc *conf.Bootstrap
cron *cron.Cron
VoucherBiz *biz.VoucherBiz
CmbMixRepo mixrepos.CmbMixRepo
WechatCpnRepo wechatrepo.WechatCpnRepo
timeSliceQuery *timeslicequery.Query
}
func NewCmbService(
@ -36,13 +34,15 @@ func NewCmbService(
VoucherBiz *biz.VoucherBiz,
CmbMixRepo mixrepos.CmbMixRepo,
WechatCpnRepo wechatrepo.WechatCpnRepo,
timeSliceQuery *timeslicequery.Query,
) *CmbService {
return &CmbService{
bc: bc,
cron: cron,
VoucherBiz: VoucherBiz,
CmbMixRepo: CmbMixRepo,
WechatCpnRepo: WechatCpnRepo,
bc: bc,
cron: cron,
VoucherBiz: VoucherBiz,
CmbMixRepo: CmbMixRepo,
WechatCpnRepo: WechatCpnRepo,
timeSliceQuery: timeSliceQuery,
}
}
@ -63,37 +63,6 @@ func (c *CmbService) GetResponse(ctx context.Context, replyBizContent []byte) (*
return reply, nil
}
func (this *CmbService) NotifyRetry(ctx http.Context) error {
id := ctx.Vars().Get("id")
if id == "" {
return fmt.Errorf("id is empty")
}
orderNotifyId, err := strconv.ParseUint(id, 10, 64)
if err != nil {
return err
}
return this.VoucherBiz.PushNotifyRetryDelayMQ(ctx, 1, orderNotifyId)
}
func (this *CmbService) QueryOrder(ctx http.Context) error {
orderNo := ctx.Vars().Get("order_no")
if orderNo == "" {
return fmt.Errorf("orderNo is empty")
}
str, err := this.VoucherBiz.QueryOrder(ctx, orderNo)
if err != nil {
return err
}
return ctx.JSON(http2.StatusOK, map[string]interface{}{
"data": str,
})
}
func (c *CmbService) OrderRetry(ctx context.Context, request *v1.OrderRetryRequest) (*v1.Empty, error) {
return nil, c.VoucherBiz.OrderRetry(ctx, request.GetTransactionIds())
@ -115,49 +84,3 @@ func (this *CmbService) RegisterTag(ctx http.Context) error {
"data": productNo,
})
}
func (this *CmbService) PushWechatQuery(ctx http.Context) error {
bodyBytes, err := io.ReadAll(ctx.Request().Body)
if err != nil {
return err
}
var req *do.WechatQuery
if err = json.Unmarshal(bodyBytes, &req); err != nil {
return err
}
if req == nil {
return fmt.Errorf("req is empty")
}
if req.StartTime == "" || req.EndTime == "" {
return fmt.Errorf("start_time or end_time is empty")
}
if err = this.VoucherBiz.PushWechatQuery(ctx, req); err != nil {
return err
}
return ctx.JSON(http2.StatusOK, map[string]interface{}{
"data": req,
})
}
func (this *CmbService) PushWechatRetry(ctx http.Context) error {
productNo := ctx.Vars().Get("product_no")
if productNo == "" {
return fmt.Errorf("product_no is empty")
}
err := this.VoucherBiz.PushWechatRetry(ctx, productNo)
if err != nil {
return err
}
return ctx.JSON(http2.StatusOK, map[string]interface{}{
"data": productNo,
})
}

View File

@ -10,6 +10,7 @@ import (
"time"
"voucher/internal/biz"
"voucher/internal/biz/bo"
"voucher/internal/biz/timeslicequery"
"voucher/internal/conf"
"voucher/internal/data"
"voucher/internal/pkg/mq"
@ -21,6 +22,8 @@ type VoucherService struct {
VoucherBiz *biz.VoucherBiz
rdb *data.Rdb
logHelper *log.Helper
timeSliceQuery *timeslicequery.Query
}
func NewVoucherService(
@ -29,13 +32,15 @@ func NewVoucherService(
VoucherBiz *biz.VoucherBiz,
rdb *data.Rdb,
logHelper *log.Helper,
timeSliceQuery *timeslicequery.Query,
) *VoucherService {
return &VoucherService{
bc: bc,
cron: cron,
VoucherBiz: VoucherBiz,
rdb: rdb,
logHelper: logHelper,
bc: bc,
cron: cron,
VoucherBiz: VoucherBiz,
rdb: rdb,
logHelper: logHelper,
timeSliceQuery: timeSliceQuery,
}
}
@ -67,7 +72,9 @@ func (s *VoucherService) CronOrderNotice(ctx context.Context) error {
}
func (s *VoucherService) OrderNotice(ctx context.Context) {
start := time.Now()
log.Errorf("订单定时通知,执行开始: %s", start.Format(time.DateTime))
if err := s.VoucherBiz.Notice(ctx); err != nil {
log.Errorf("订单定时通知,执行失败: %v", err)
@ -76,11 +83,10 @@ func (s *VoucherService) OrderNotice(ctx context.Context) {
end := time.Now()
elapsed := end.Sub(start)
log.Warnf("订单定时通知,开始执行时间%s,执行结束时间%s,代码块执行耗时: %s", start.Format(time.DateTime), end.Format(time.DateTime), elapsed)
return
}
func (j *VoucherService) GetNotifyRetryConfig() *mq.ConsumerConfig {
elm, ok := j.bc.RocketMQ.EventMap["notifyRetry"]
if !ok {
return nil

120
internal/service/script.go Normal file
View File

@ -0,0 +1,120 @@
package service
import (
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/transport/http"
"io"
http2 "net/http"
"strconv"
"voucher/internal/biz/do"
)
func (this *CmbService) NotifyRetry(ctx http.Context) error {
id := ctx.Vars().Get("id")
if id == "" {
return fmt.Errorf("id is empty")
}
orderNotifyId, err := strconv.ParseUint(id, 10, 64)
if err != nil {
return err
}
return this.VoucherBiz.PushNotifyRetryDelayMQ(ctx, 1, orderNotifyId)
}
func (this *CmbService) QueryOrder(ctx http.Context) error {
orderNo := ctx.Vars().Get("order_no")
if orderNo == "" {
return fmt.Errorf("orderNo is empty")
}
str, err := this.VoucherBiz.QueryOrder(ctx, orderNo)
if err != nil {
return err
}
return ctx.JSON(http2.StatusOK, map[string]interface{}{
"data": str,
})
}
func (this *CmbService) PushWechatQuery(ctx http.Context) error {
bodyBytes, err := io.ReadAll(ctx.Request().Body)
if err != nil {
return err
}
var req *do.WechatQuery
if err = json.Unmarshal(bodyBytes, &req); err != nil {
return err
}
if req == nil {
return fmt.Errorf("req is empty")
}
if req.StartTime == "" || req.EndTime == "" {
return fmt.Errorf("start_time or end_time is empty")
}
if err = this.VoucherBiz.PushWechatQuery(ctx, req); err != nil {
return err
}
return ctx.JSON(http2.StatusOK, map[string]interface{}{
"data": req,
})
}
func (this *CmbService) TimeSliceQueryPush(ctx http.Context) error {
bodyBytes, err := io.ReadAll(ctx.Request().Body)
if err != nil {
return err
}
var req *do.RdsWechatQuery
if err = json.Unmarshal(bodyBytes, &req); err != nil {
return err
}
if req == nil {
return fmt.Errorf("req is empty")
}
if req.StartTime == "" || req.EndTime == "" {
return fmt.Errorf("start_time or end_time is empty")
}
if req.GoNum > 10 {
return fmt.Errorf("协程数量不能大于10")
}
_, err = this.timeSliceQuery.Push(ctx, req)
if err != nil {
return err
}
return ctx.JSON(http2.StatusOK, req)
}
func (this *CmbService) PushWechatRetry(ctx http.Context) error {
productNo := ctx.Vars().Get("product_no")
if productNo == "" {
return fmt.Errorf("product_no is empty")
}
err := this.VoucherBiz.PushWechatRetry(ctx, productNo)
if err != nil {
return err
}
return ctx.JSON(http2.StatusOK, map[string]interface{}{
"data": productNo,
})
}

View File

@ -43,3 +43,40 @@ func (s *VoucherService) WechatQueryHandle(ctx context.Context, msg string) erro
return nil
}
func (s *VoucherService) GetWechatTimeSliceQueryConfig() *rdsmq.ConsumeConfig {
queue := s.bc.RdsMQ.GetWechatTimeSliceQuery()
if queue == nil {
return nil
}
if !queue.GetIsOpen() {
log.Warn(fmt.Sprintf("[%s]RdsMQ is not open", queue.Name))
return nil
}
return &rdsmq.ConsumeConfig{
Rdb: s.rdb.Rdb,
QueueName: queue.Name,
NumWorkers: queue.NumWorkers,
WaitTime: queue.GetWaitTime().AsDuration(),
RetryNum: queue.RetryNum,
Fn: s.WechatTimeSliceQueryHandle,
Logger: s.logHelper,
}
}
func (s *VoucherService) WechatTimeSliceQueryHandle(ctx context.Context, msg string) error {
if msg == "" {
s.logHelper.Errorf("wechat TimeSlice query error: batchNo is empty")
return nil
}
if err := s.timeSliceQuery.Consumer(ctx, msg); err != nil {
s.logHelper.Errorf("wechat TimeSlice query msg:%s error: %v", msg, err)
}
return nil
}