242 lines
6.4 KiB
Go
242 lines
6.4 KiB
Go
package biz
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"github.com/go-kratos/kratos/v2/log"
|
||
"github.com/redis/go-redis/v9"
|
||
"golang.org/x/sync/errgroup"
|
||
"runtime"
|
||
"time"
|
||
"voucher/internal/biz/bo"
|
||
"voucher/internal/biz/vo"
|
||
"voucher/internal/pkg/lock"
|
||
)
|
||
|
||
func (this *VoucherBiz) isCanNotice(ctx context.Context) error {
|
||
|
||
if this.bc.Cmb.NoticeStartDays == 0 {
|
||
return errors.New("订单定时通知,noticeStartDays eq 0")
|
||
}
|
||
|
||
if this.bc.Cmb.NoticeEndDays == 0 {
|
||
return errors.New("订单定时通知,noticeEndDays eq 0")
|
||
}
|
||
|
||
cache := vo.CmbBatchNoticeCacheKey.BuildCache([]string{""})
|
||
|
||
_, err := this.rdb.Rdb.Get(ctx, cache.Key).Result()
|
||
|
||
if err == nil {
|
||
return fmt.Errorf("订单定时通知,notice 获取redis缓存存在,已被执行,本台服务不做执行")
|
||
}
|
||
|
||
if err != redis.Nil {
|
||
return fmt.Errorf(fmt.Sprintf("订单定时通知,notice 获取redis缓存%s异常:%v", cache.Key, err))
|
||
}
|
||
|
||
c := vo.CmbBatchNoticeLockKey.BuildCache([]string{""})
|
||
|
||
return lock.NewMutex(this.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error {
|
||
|
||
// 二次获取,判定处理,以免获取锁后又执行了一次
|
||
|
||
cacheValue, err2 := this.rdb.Rdb.Get(ctx, cache.Key).Result()
|
||
|
||
if err2 != nil && err2 != redis.Nil {
|
||
return fmt.Errorf(fmt.Sprintf("订单定时通知,notice 二次获取redis缓存%s异常:%v", cache.Key, err2))
|
||
}
|
||
|
||
if len(cacheValue) > 0 {
|
||
return fmt.Errorf("订单定时通知,notice 二次获取redis缓存存在,已被执行,本台服务不做执行")
|
||
}
|
||
|
||
if err = this.rdb.Rdb.Set(ctx, cache.Key, fmt.Sprintf("%d_%d", this.bc.Cmb.NoticeStartDays, this.bc.Cmb.NoticeEndDays), c.TTL).Err(); err != nil {
|
||
return fmt.Errorf(fmt.Sprintf("notice 设置redis缓存%s异常:%v", cache.Key, err))
|
||
}
|
||
|
||
log.Warnf("订单定时通知,notice 获取redis缓存,不存在,开始处理")
|
||
return nil
|
||
})
|
||
}
|
||
|
||
func (this *VoucherBiz) Notice(ctx context.Context) error {
|
||
|
||
if err := this.isCanNotice(ctx); err != nil {
|
||
return err
|
||
}
|
||
|
||
now := time.Now()
|
||
|
||
// 获取 NoticeStartDays 天前的日期
|
||
noticeStartDay := now.AddDate(0, 0, int(-this.bc.Cmb.NoticeStartDays))
|
||
// 获取 NoticeStartDays 天前 00:00:00 的时间
|
||
startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location())
|
||
|
||
// 获取 NoticeEndDays 天前的日期
|
||
noticeEndDay := now.AddDate(0, 0, int(-this.bc.Cmb.NoticeEndDays))
|
||
// 获取 NoticeEndDays 天 23:59:59 的时间
|
||
endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location())
|
||
|
||
return this.timeSliceQuery(ctx, startTime, endTime)
|
||
}
|
||
|
||
func (this *VoucherBiz) timeSliceQuery(ctx context.Context, startTime, endTime time.Time) error {
|
||
|
||
log.Warnf("订单定时通知,开始处理,按每两个小时分片处理,范围:%s到%s", startTime.Format(time.DateTime), endTime.Format(time.DateTime))
|
||
|
||
duration := 2 * time.Hour
|
||
|
||
eg := new(errgroup.Group)
|
||
eg.SetLimit(10)
|
||
|
||
for start := startTime; start.Before(endTime); start = start.Add(duration) {
|
||
|
||
end := start.Add(duration) // 计算每次请求的结束时间
|
||
if end.After(endTime) {
|
||
end = endTime
|
||
} else {
|
||
end = end.Add(-1 * time.Second)
|
||
}
|
||
|
||
eg.Go(func() error {
|
||
|
||
req := &bo.FindInBatchesUseBo{
|
||
StartTime: &start,
|
||
EndTime: &end,
|
||
}
|
||
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
|
||
log.Errorf("订单定时通知,发生错误:req:%+v,err:%v,file:%s,line:%d", req, err, file, line)
|
||
}
|
||
}()
|
||
|
||
return this.ExecuteNotice(ctx, req)
|
||
})
|
||
}
|
||
|
||
return eg.Wait() // 仅返回第一个错误
|
||
}
|
||
|
||
func (this *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUseBo) error {
|
||
|
||
start := time.Now()
|
||
|
||
num := 0
|
||
useNum := 0
|
||
sucNum := 0
|
||
|
||
err := this.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
||
|
||
for _, order := range rows {
|
||
|
||
num += 1
|
||
if err := this.notice(ctx, order, &useNum, &sucNum); err != nil {
|
||
log.Errorf("订单定时通知,err:%v", err)
|
||
}
|
||
|
||
}
|
||
|
||
return nil
|
||
})
|
||
|
||
logFields := map[string]interface{}{
|
||
"sTime": req.StartTime.Format(time.DateTime) + "到" + req.EndTime.Format(time.DateTime),
|
||
"num": num, // 查询总量
|
||
"useNum": useNum, // 核销通知数量
|
||
"sucNum": sucNum, // 重置为成功数量
|
||
"elapsed": time.Now().Sub(start).String(),
|
||
}
|
||
log.Warnf("订单定时通知,%+v", logFields)
|
||
|
||
return err
|
||
}
|
||
|
||
func (this *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo, useNum, sucNum *int) (respErr error) {
|
||
// 批量通知不做数据存储,量会很大
|
||
|
||
status, err := this.WechatCpnRepo.Query(ctx, order)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if order.Status == status {
|
||
return nil // 券状态未改变,忽略不处理
|
||
}
|
||
|
||
order.Status = status
|
||
|
||
event, err := status.GetOrderNotifyEvent()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
notify := &bo.OrderNotifyBo{
|
||
OrderNo: order.OrderNo,
|
||
NotifyUrl: order.NotifyUrl,
|
||
Channel: order.Channel,
|
||
Event: event,
|
||
Type: order.Type,
|
||
}
|
||
|
||
if err = this.request(ctx, order, notify); err != nil {
|
||
return err
|
||
}
|
||
|
||
if err = this.UpdateOrderStatus(ctx, order.ID, status); err != nil {
|
||
return err
|
||
}
|
||
|
||
if event.IsUsed() {
|
||
*useNum += 1
|
||
} else if event.IsSendDEd() {
|
||
*sucNum += 1
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (this *VoucherBiz) request(ctx context.Context, order *bo.OrderBo, notify *bo.OrderNotifyBo) (respErr error) {
|
||
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
// 打印堆栈信息
|
||
stackBuf := make([]byte, 1024)
|
||
stackSize := runtime.Stack(stackBuf, false)
|
||
// 获取调用栈信息
|
||
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
|
||
respErr = fmt.Errorf("request panic:%v, orderNo:%s, file:%s, line:%d, stack: %s", err, order.OrderNo, file, line, stackBuf[:stackSize])
|
||
}
|
||
}()
|
||
|
||
if order == nil {
|
||
return fmt.Errorf("request order is nil")
|
||
}
|
||
|
||
if notify == nil {
|
||
return fmt.Errorf("notify is nil")
|
||
}
|
||
|
||
if !notify.Event.CanNotify() {
|
||
return nil // 不可通知,忽略
|
||
}
|
||
|
||
request, err := this.Cmb.NotifyRequest(ctx, order, notify)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if request == nil {
|
||
return fmt.Errorf("request is nil,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo)
|
||
}
|
||
|
||
if _, err = this.CmbMixRepo.Request(ctx, request, order.NotifyUrl); err != nil {
|
||
return fmt.Errorf("orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error())
|
||
}
|
||
|
||
return nil
|
||
}
|