105 lines
2.1 KiB
Go
105 lines
2.1 KiB
Go
package biz
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"github.com/go-kratos/kratos/v2/transport/http"
|
|
"golang.org/x/sync/errgroup"
|
|
"runtime"
|
|
"voucher/internal/biz/bo"
|
|
"voucher/internal/biz/do"
|
|
)
|
|
|
|
func (this *VoucherBiz) UsedNotifyPush(ctx http.Context, req *do.WechatUsedQuery) error {
|
|
|
|
queue := this.bc.RdsMQ.GetUsedNotify()
|
|
if queue == nil {
|
|
return fmt.Errorf("队列不存在")
|
|
}
|
|
|
|
msg, err := json.Marshal(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
strMsg := string(msg)
|
|
|
|
_, err = this.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
|
|
if err != nil {
|
|
return fmt.Errorf("添加到队列失败:%v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (this *VoucherBiz) UsedNotify(ctx context.Context, msg string) error {
|
|
|
|
log.Warnf("核销重试通知处理,开始:%s", msg)
|
|
|
|
var req *do.WechatUsedQuery
|
|
|
|
if err := json.Unmarshal([]byte(msg), &req); err != nil {
|
|
return err
|
|
}
|
|
|
|
errNum := 0
|
|
|
|
eg := new(errgroup.Group)
|
|
eg.SetLimit(3)
|
|
|
|
err := this.OrderRepo.FinUsedInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
|
|
|
for _, order := range rows {
|
|
|
|
eg.Go(func() error {
|
|
|
|
if err := this.usedNotify(ctx, order); err != nil {
|
|
errNum++
|
|
if errNum > 50 {
|
|
return fmt.Errorf("核销重试通知处理,通知失败次数超过50次,请检查:%v", err)
|
|
}
|
|
log.Warnf("核销重试通知处理,通知失败:%v", err)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return eg.Wait() // 仅返回第一个错误
|
|
}
|
|
|
|
func (this *VoucherBiz) usedNotify(ctx context.Context, order *bo.OrderBo) error {
|
|
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
|
|
log.Errorf("核销重试通知处理,发生错误:req:%s,err:%v,file:%s,line:%d", order.OrderNo, err, file, line)
|
|
}
|
|
}()
|
|
|
|
event, err := order.Status.GetOrderNotifyEvent()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
notify := &bo.OrderNotifyBo{
|
|
OrderNo: order.OrderNo,
|
|
NotifyUrl: order.NotifyUrl,
|
|
Channel: order.Channel,
|
|
Event: event,
|
|
Type: order.Type,
|
|
}
|
|
|
|
return this.request(ctx, order, notify)
|
|
}
|