voucher/internal/biz/cron_notice.go

240 lines
5.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package biz
import (
"context"
"errors"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/redis/go-redis/v9"
"golang.org/x/sync/errgroup"
"time"
"voucher/internal/biz/bo"
"voucher/internal/biz/vo"
"voucher/internal/pkg/lock"
)
func (v *VoucherBiz) isCanNotice(ctx context.Context) error {
if v.bc.Cmb.NoticeStartDays == 0 {
return errors.New("订单定时通知,noticeStartDays eq 0")
}
if v.bc.Cmb.NoticeEndDays == 0 {
return errors.New("订单定时通知,noticeEndDays eq 0")
}
cache := vo.CmbBatchNoticeCacheKey.BuildCache([]string{""})
_, err := v.rdb.Rdb.Get(ctx, cache.Key).Result()
if err == nil {
return fmt.Errorf("订单定时通知,notice 获取redis缓存存在已被执行,本台服务不做执行")
}
if err != redis.Nil {
return fmt.Errorf(fmt.Sprintf("订单定时通知,notice 获取redis缓存%s异常:%v", cache.Key, err))
}
c := vo.CmbBatchNoticeLockKey.BuildCache([]string{""})
return lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error {
// 二次获取,判定处理,以免获取锁后又执行了一次
cacheValue, err2 := v.rdb.Rdb.Get(ctx, cache.Key).Result()
if err2 != nil && err2 != redis.Nil {
return fmt.Errorf(fmt.Sprintf("订单定时通知,notice 二次获取redis缓存%s异常:%v", cache.Key, err2))
}
if len(cacheValue) > 0 {
return fmt.Errorf("订单定时通知,notice 二次获取redis缓存存在已被执行,本台服务不做执行")
}
if err = v.rdb.Rdb.Set(ctx, cache.Key, fmt.Sprintf("%d_%d", v.bc.Cmb.NoticeStartDays, v.bc.Cmb.NoticeEndDays), c.TTL).Err(); err != nil {
return fmt.Errorf(fmt.Sprintf("notice 设置redis缓存%s异常:%v", cache.Key, err))
}
log.Warnf("订单定时通知,notice 获取redis缓存,不存在,开始处理")
return nil
})
}
func (v *VoucherBiz) Notice(ctx context.Context) error {
if err := v.isCanNotice(ctx); err != nil {
return err
}
now := time.Now()
// 获取 NoticeStartDays 天前的日期
noticeStartDay := now.AddDate(0, 0, int(-v.bc.Cmb.NoticeStartDays))
// 获取 NoticeStartDays 天前 00:00:00 的时间
startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location())
// 获取 NoticeEndDays 天前的日期
noticeEndDay := now.AddDate(0, 0, int(-v.bc.Cmb.NoticeEndDays))
// 获取 NoticeEndDays 天 23:59:59 的时间
endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location())
return v.timeSliceQuery(ctx, startTime, endTime)
}
func (v *VoucherBiz) timeSliceQuery(ctx context.Context, startTime, endTime time.Time) error {
log.Warnf("订单定时通知,开始处理,按每两个小时分片处理,范围:%s到%s", startTime.Format(time.DateTime), endTime.Format(time.DateTime))
duration := 2 * time.Hour
eg := new(errgroup.Group)
eg.SetLimit(8)
for start := startTime; start.Before(endTime); start = start.Add(duration) {
end := start.Add(duration) // 计算每次请求的结束时间
if end.After(endTime) {
end = endTime
} else {
end = end.Add(-1 * time.Second)
}
eg.Go(func() error {
req := &bo.FindInBatchesUseBo{
StartTime: &start,
EndTime: &end,
}
select {
case <-ctx.Done():
return ctx.Err()
default:
// 继续执行
}
defer func() {
if err := recover(); err != nil {
log.Errorf("订单定时通知,发生错误:req:%+v,err:%v", req, err)
}
}()
return v.ExecuteNotice(ctx, req)
})
}
return eg.Wait() // 仅返回第一个错误
}
func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUseBo) error {
start := time.Now()
num := 0
notifyNum := 0
err := v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
for _, order := range rows {
num += 1
if err := v.notice(ctx, order, &notifyNum); err != nil {
log.Errorf("订单定时通知,err:%v", err)
}
}
return nil
})
logFields := map[string]interface{}{
"searchTime": req.StartTime.Format(time.DateTime) + "到" + req.EndTime.Format(time.DateTime),
"num": num,
"notifyNum": notifyNum,
"elapsed": time.Now().Sub(start).String(),
}
log.Warnf("订单定时通知,%+v", logFields)
return err
}
func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo, notifyNum *int) (respErr error) {
// 批量通知不做数据存储,量会很大
defer func() {
if err := recover(); err != nil {
respErr = fmt.Errorf("panic:%v", err)
}
}()
if order == nil {
return fmt.Errorf("order is nil")
}
status, err := v.WechatCpnRepo.Query(ctx, order)
if err != nil {
return err
}
if order.Status == status {
return nil // 券状态未改变,忽略不处理
}
event, err := status.GetOrderNotifyEvent()
if err != nil {
return err
}
notify := &bo.OrderNotifyBo{
OrderNo: order.OrderNo,
NotifyUrl: order.NotifyUrl,
Channel: order.Channel,
Event: event,
Type: order.Type,
}
if err = v.request(ctx, order, notify); err != nil {
return err
}
if err = v.UpdateOrderStatus(ctx, order.ID, status); err != nil {
return err
}
*notifyNum += 1
return nil
}
func (v *VoucherBiz) request(ctx context.Context, order *bo.OrderBo, notify *bo.OrderNotifyBo) (respErr error) {
defer func() {
if err := recover(); err != nil {
respErr = fmt.Errorf("panic:%v,orderNo:%s", err, order.OrderNo)
}
}()
if notify == nil {
return fmt.Errorf("notify is nil")
}
if !notify.Event.CanNotify() {
return nil // 不可通知,忽略
}
request, err := v.Cmb.NotifyRequest(ctx, order, notify)
if err != nil {
return err
}
if request == nil {
return fmt.Errorf("request is nil,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo)
}
if _, err = v.CmbMixRepo.Request(ctx, request, order.NotifyUrl); err != nil {
return fmt.Errorf("orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error())
}
return nil
}