voucher/internal/biz/cron_notice.go

242 lines
6.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package biz
import (
"context"
"errors"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"runtime"
"time"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
"voucher/internal/pkg/lock"
)
func (v *VoucherBiz) isCanNotice(ctx context.Context) error {
if v.bc.Cmb.NoticeStartDays == 0 {
return errors.New("订单定时通知,noticeStartDays eq 0")
}
if v.bc.Cmb.NoticeEndDays == 0 {
return errors.New("订单定时通知,noticeEndDays eq 0")
}
cache := vo.CmbBatchNoticeCacheKey.BuildCache([]string{""})
_, err := v.rdb.Rdb.Get(ctx, cache.Key).Result()
if err == nil {
return fmt.Errorf("订单定时通知,notice 获取redis缓存存在已被执行,本台服务不做执行")
}
if err != redis.Nil {
return fmt.Errorf(fmt.Sprintf("订单定时通知,notice 获取redis缓存%s异常:%v", cache.Key, err))
}
c := vo.CmbBatchNoticeLockKey.BuildCache([]string{""})
return lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error {
// 二次获取,判定处理,以免获取锁后又执行了一次
cacheValue, err2 := v.rdb.Rdb.Get(ctx, cache.Key).Result()
if err2 != nil && err2 != redis.Nil {
return fmt.Errorf(fmt.Sprintf("订单定时通知,notice 二次获取redis缓存%s异常:%v", cache.Key, err2))
}
if len(cacheValue) > 0 {
return fmt.Errorf("订单定时通知,notice 二次获取redis缓存存在已被执行,本台服务不做执行")
}
if err = v.rdb.Rdb.Set(ctx, cache.Key, fmt.Sprintf("%d_%d", v.bc.Cmb.NoticeStartDays, v.bc.Cmb.NoticeEndDays), c.TTL).Err(); err != nil {
return fmt.Errorf(fmt.Sprintf("notice 设置redis缓存%s异常:%v", cache.Key, err))
}
log.Warnf("订单定时通知,notice 获取redis缓存,不存在,开始处理")
return nil
})
}
func (v *VoucherBiz) Notice(ctx context.Context) error {
if err := v.isCanNotice(ctx); err != nil {
return err
}
now := time.Now()
// 获取 NoticeStartDays 天前的日期
noticeStartDay := now.AddDate(0, 0, int(-v.bc.Cmb.NoticeStartDays))
// 获取 NoticeStartDays 天前 00:00:00 的时间
startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location())
// 获取 NoticeEndDays 天前的日期
noticeEndDay := now.AddDate(0, 0, int(-v.bc.Cmb.NoticeEndDays))
// 获取 NoticeEndDays 天 23:59:59 的时间
endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location())
return v.timeSliceQuery(ctx, startTime, endTime)
}
func (v *VoucherBiz) timeSliceQuery(ctx context.Context, startTime, endTime time.Time) error {
log.Warnf("订单定时通知,开始处理,按每两个小时分片处理,范围:%s到%s", startTime.Format(time.DateTime), endTime.Format(time.DateTime))
duration := 2 * time.Hour
eg := new(errgroup.Group)
eg.SetLimit(10)
for start := startTime; start.Before(endTime); start = start.Add(duration) {
end := start.Add(duration) // 计算每次请求的结束时间
if end.After(endTime) {
end = endTime
} else {
end = end.Add(-1 * time.Second)
}
eg.Go(func() error {
req := &bo.FindInBatchesUseBo{
StartTime: &start,
EndTime: &end,
}
defer func() {
if err := recover(); err != nil {
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
log.Errorf("订单定时通知,发生错误:req:%+v,err:%v,file:%s,line:%d", req, err, file, line)
}
}()
return v.ExecuteNotice(ctx, req)
})
}
return eg.Wait() // 仅返回第一个错误
}
func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUseBo) error {
start := time.Now()
num := 0
useNum := 0
sucNum := 0
err := v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
for _, order := range rows {
num += 1
if err := v.notice(ctx, order, &useNum, &sucNum); err != nil {
log.Errorf("订单定时通知,err:%v", err)
}
}
return nil
})
logFields := map[string]interface{}{
"sTime": req.StartTime.Format(time.DateTime) + "到" + req.EndTime.Format(time.DateTime),
"num": num, // 查询总量
"useNum": useNum, // 核销通知数量
"sucNum": sucNum, // 重置为成功数量
"elapsed": time.Now().Sub(start).String(),
}
log.Warnf("订单定时通知,%+v", logFields)
return err
}
func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo, useNum, sucNum *int) (respErr error) {
// 批量通知不做数据存储,量会很大
status, err := v.WechatCpnRepo.Query(ctx, order)
if err != nil {
return err
}
if order.Status == status {
return nil // 券状态未改变,忽略不处理
}
order.Status = status
event, err := status.GetOrderNotifyEvent()
if err != nil {
return err
}
notify := &bo.OrderNotifyBo{
OrderNo: order.OrderNo,
NotifyUrl: order.NotifyUrl,
Channel: order.Channel,
Event: event,
Type: order.Type,
}
if err = v.request(ctx, order, notify); err != nil {
return err
}
if err = v.UpdateOrderStatus(ctx, order.ID, status); err != nil {
return err
}
if event.IsUsed() {
*useNum += 1
} else if event.IsSendDEd() {
*sucNum += 1
}
return nil
}
func (v *VoucherBiz) request(ctx context.Context, order *bo.OrderBo, notify *bo.OrderNotifyBo) (respErr error) {
defer func() {
if err := recover(); err != nil {
// 打印堆栈信息
stackBuf := make([]byte, 1024)
stackSize := runtime.Stack(stackBuf, false)
// 获取调用栈信息
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
respErr = fmt.Errorf("request panic:%v, orderNo:%s, file:%s, line:%d, stack: %s", err, order.OrderNo, file, line, stackBuf[:stackSize])
}
}()
if order == nil {
return fmt.Errorf("request order is nil")
}
if notify == nil {
return fmt.Errorf("notify is nil")
}
if !notify.Event.CanNotify() {
return nil // 不可通知,忽略
}
request, err := v.Cmb.NotifyRequest(ctx, order, notify)
if err != nil {
return err
}
if request == nil {
return fmt.Errorf("request is nil,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo)
}
if _, err = v.CmbMixRepo.Request(ctx, request, order.NotifyUrl); err != nil {
return fmt.Errorf("orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error())
}
return nil
}