package biz import ( "context" "encoding/json" "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport/http" "voucher/internal/biz/bo" "voucher/internal/biz/do" ) func (this *VoucherBiz) UsedNotifyPush(ctx http.Context, req *do.WechatUsedQuery) error { queue := this.bc.RdsMQ.GetUsedNotify() if queue == nil { return fmt.Errorf("队列不存在") } msg, err := json.Marshal(req) if err != nil { return err } strMsg := string(msg) _, err = this.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result() if err != nil { return fmt.Errorf("添加到队列失败:%v", err) } return nil } func (this *VoucherBiz) UsedNotify(ctx context.Context, msg string) error { log.Warnf("核销重试通知处理,开始:%s", msg) var req *do.WechatUsedQuery if err := json.Unmarshal([]byte(msg), &req); err != nil { return err } errNum := 0 return this.OrderRepo.FinUsedInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { for _, order := range rows { event, err := order.Status.GetOrderNotifyEvent() if err != nil { return err } notify := &bo.OrderNotifyBo{ OrderNo: order.OrderNo, NotifyUrl: order.NotifyUrl, Channel: order.Channel, Event: event, Type: order.Type, } if err = this.request(ctx, order, notify); err != nil { errNum++ if errNum > 50 { return fmt.Errorf("核销重试通知处理,通知失败次数超过50次,请检查:%v", err) } log.Warnf("核销重试通知处理,通知失败:%v", err) return err } } return nil }) }