package biz import ( "context" "encoding/json" "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport/http" "golang.org/x/sync/errgroup" "runtime" "voucher/internal/biz/bo" "voucher/internal/biz/do" ) func (this *VoucherBiz) UsedNotifyPush(ctx http.Context, req *do.WechatUsedQuery) error { queue := this.bc.RdsMQ.GetUsedNotify() if queue == nil { return fmt.Errorf("队列不存在") } msg, err := json.Marshal(req) if err != nil { return err } strMsg := string(msg) _, err = this.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result() if err != nil { return fmt.Errorf("添加到队列失败:%v", err) } return nil } func (this *VoucherBiz) UsedNotify(ctx context.Context, msg string) error { log.Warnf("核销重试通知处理,开始:%s", msg) var req *do.WechatUsedQuery if err := json.Unmarshal([]byte(msg), &req); err != nil { return err } errNum := 0 eg := new(errgroup.Group) eg.SetLimit(3) err := this.OrderRepo.FinUsedInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { for _, order := range rows { eg.Go(func() error { if err := this.usedNotify(ctx, order); err != nil { errNum++ if errNum > 50 { return fmt.Errorf("核销重试通知处理,通知失败次数超过50次,请检查:%v", err) } log.Warnf("核销重试通知处理,通知失败:%v", err) } return nil }) } return nil }) if err != nil { return err } return eg.Wait() // 仅返回第一个错误 } func (this *VoucherBiz) usedNotify(ctx context.Context, order *bo.OrderBo) error { defer func() { if err := recover(); err != nil { _, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息 log.Errorf("核销重试通知处理,发生错误:req:%s,err:%v,file:%s,line:%d", order.OrderNo, err, file, line) } }() event, err := order.Status.GetOrderNotifyEvent() if err != nil { return err } notify := &bo.OrderNotifyBo{ OrderNo: order.OrderNo, NotifyUrl: order.NotifyUrl, Channel: order.Channel, Event: event, Type: order.Type, } return this.request(ctx, order, notify) }