package biz import ( "context" "errors" "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/redis/go-redis/v9" "golang.org/x/sync/errgroup" "time" "voucher/internal/biz/bo" "voucher/internal/biz/vo" "voucher/internal/data/wechatrepoimpl" "voucher/internal/pkg/lock" ) func (v *VoucherBiz) isCanNotice(ctx context.Context) error { if v.bc.Cmb.NoticeStartDays == 0 { return errors.New("订单定时通知,noticeStartDays eq 0") } if v.bc.Cmb.NoticeEndDays == 0 { return errors.New("订单定时通知,noticeEndDays eq 0") } cache := vo.CmbBatchNoticeCacheKey.BuildCache([]string{""}) _, err := v.rdb.Rdb.Get(ctx, cache.Key).Result() if err == nil { return fmt.Errorf("订单定时通知,notice 获取redis缓存存在,已被执行,本台服务不做执行") } if err != redis.Nil { return fmt.Errorf(fmt.Sprintf("订单定时通知,notice 获取redis缓存%s异常:%v", cache.Key, err)) } c := vo.CmbBatchNoticeLockKey.BuildCache([]string{""}) return lock.NewMutex(v.rdb.Rdb, c.TTL).Lock(ctx, c.Key, func(ctx context.Context) error { // 二次获取,判定处理,以免获取锁后又执行了一次 cacheValue, err2 := v.rdb.Rdb.Get(ctx, cache.Key).Result() if err2 != nil && err2 != redis.Nil { return fmt.Errorf(fmt.Sprintf("订单定时通知,notice 二次获取redis缓存%s异常:%v", cache.Key, err2)) } if len(cacheValue) > 0 { return fmt.Errorf("订单定时通知,notice 二次获取redis缓存存在,已被执行,本台服务不做执行") } if err = v.rdb.Rdb.Set(ctx, cache.Key, fmt.Sprintf("%d_%d", v.bc.Cmb.NoticeStartDays, v.bc.Cmb.NoticeEndDays), c.TTL).Err(); err != nil { return fmt.Errorf(fmt.Sprintf("notice 设置redis缓存%s异常:%v", cache.Key, err)) } log.Warnf("订单定时通知,notice 获取redis缓存,不存在,开始处理") return nil }) } func (v *VoucherBiz) Notice(ctx context.Context) error { if err := v.isCanNotice(ctx); err != nil { return err } now := time.Now() // 获取 NoticeStartDays 天前的日期 noticeStartDay := now.AddDate(0, 0, int(-v.bc.Cmb.NoticeStartDays)) // 获取 NoticeStartDays 天前 00:00:00 的时间 startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location()) // 获取 NoticeEndDays 天前的日期 noticeEndDay := now.AddDate(0, 0, int(-v.bc.Cmb.NoticeEndDays)) // 获取 NoticeEndDays 天 23:59:59 的时间 endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location()) return v.timeSliceQuery(ctx, startTime, endTime) } func (v *VoucherBiz) timeSliceQuery(ctx context.Context, startTime, endTime time.Time) error { log.Warnf("订单定时通知,开始处理,按每两个小时分片处理,范围:%s到%s", startTime.Format(time.DateTime), endTime.Format(time.DateTime)) duration := 2 * time.Hour eg := new(errgroup.Group) eg.SetLimit(10) for start := startTime; start.Before(endTime); start = start.Add(duration) { end := start.Add(duration) // 计算每次请求的结束时间 if end.After(endTime) { end = endTime } else { end = end.Add(-1 * time.Second) } req := &bo.FindInBatchesUseBo{ StartTime: &start, EndTime: &end, } eg.Go(func() error { select { case <-ctx.Done(): return ctx.Err() default: // 继续执行 } defer func() { if err := recover(); err != nil { log.Errorf("订单定时通知,发生错误:req:%+v,err:%v", req, err) } }() return v.ExecuteNotice(ctx, req) }) } return eg.Wait() // 仅返回第一个错误 } func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUseBo) error { start := time.Now() num := 0 notifyNum := 0 errNum := 0 err := v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { for _, order := range rows { num += 1 if err := v.notice(ctx, order, ¬ifyNum); err != nil { errNum += 1 log.Error(err) } } return nil }) logFields := map[string]interface{}{ "searchTime": req.StartTime.Format(time.DateTime) + "到" + req.EndTime.Format(time.DateTime), "num": num, "notifyNum": notifyNum, "elapsed": time.Now().Sub(start).String(), } log.Warnf("订单定时通知,%+v", logFields) return err } func (v *VoucherBiz) notice(ctx context.Context, order *bo.OrderBo, notifyNum *int) error { // 批量通知不做数据存储,量会很大 resp, err := v.WechatCpnRepo.QueryCoupon(ctx, order) if err != nil { return err } status, err := wechatrepoimpl.CpnStatus(*resp.Status).GetStatus() if err != nil { return err } if order.Status == status { return nil // 券状态未改变,忽略不处理 } event, err := status.GetOrderNotifyEvent() if err != nil { return err } orderNotify := &bo.OrderNotifyBo{ OrderNo: order.OrderNo, NotifyUrl: order.NotifyUrl, Channel: order.Channel, Event: event, Type: order.Type, } if err = v.cmbNotice(ctx, order, orderNotify, notifyNum); err != nil { return err } return v.UpdateOrderStatus(ctx, order.ID, status) } func (v *VoucherBiz) cmbNotice(ctx context.Context, order *bo.OrderBo, orderNotify *bo.OrderNotifyBo, notifyNum *int) error { if !orderNotify.Event.CanNotify() { return nil // 不可通知,忽略 } *notifyNum += 1 request, err := v.Cmb.NotifyRequest(ctx, order, orderNotify) if err != nil { return err } reply, err := v.CmbMixRepo.Request(ctx, request, order.NotifyUrl) if err != nil { return fmt.Errorf("订单定时通知,orderNo:%s,outBizNo:%s,stockId:%s,err:%s", order.OrderNo, order.OutBizNo, order.BatchNo, err.Error()) } if reply.RespCode != vo.CmbResponseStatusSuccess.GetValue() { return errors.New("订单定时通知,招行返回错误:" + reply.RespMsg) } return nil }