voucher/internal/biz/used_notify.go

105 lines
2.1 KiB
Go

package biz
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"golang.org/x/sync/errgroup"
"runtime"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
)
func (this *VoucherBiz) UsedNotifyPush(ctx http.Context, req *do.WechatUsedQuery) error {
queue := this.bc.RdsMQ.GetUsedNotify()
if queue == nil {
return fmt.Errorf("队列不存在")
}
msg, err := json.Marshal(req)
if err != nil {
return err
}
strMsg := string(msg)
_, err = this.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
if err != nil {
return fmt.Errorf("添加到队列失败:%v", err)
}
return nil
}
func (this *VoucherBiz) UsedNotify(ctx context.Context, msg string) error {
log.Warnf("核销重试通知处理,开始:%s", msg)
var req *do.WechatUsedQuery
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err
}
errNum := 0
eg := new(errgroup.Group)
eg.SetLimit(3)
err := this.OrderRepo.FinUsedInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
for _, order := range rows {
eg.Go(func() error {
if err := this.usedNotify(ctx, order); err != nil {
errNum++
if errNum > 50 {
return fmt.Errorf("核销重试通知处理,通知失败次数超过50次,请检查:%v", err)
}
log.Warnf("核销重试通知处理,通知失败:%v", err)
}
return nil
})
}
return nil
})
if err != nil {
return err
}
return eg.Wait() // 仅返回第一个错误
}
func (this *VoucherBiz) usedNotify(ctx context.Context, order *bo.OrderBo) error {
defer func() {
if err := recover(); err != nil {
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
log.Errorf("核销重试通知处理,发生错误:req:%s,err:%v,file:%s,line:%d", order.OrderNo, err, file, line)
}
}()
event, err := order.Status.GetOrderNotifyEvent()
if err != nil {
return err
}
notify := &bo.OrderNotifyBo{
OrderNo: order.OrderNo,
NotifyUrl: order.NotifyUrl,
Channel: order.Channel,
Event: event,
Type: order.Type,
}
return this.request(ctx, order, notify)
}