voucher/internal/biz/used_notify.go

77 lines
1.5 KiB
Go

package biz
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
)
func (this *VoucherBiz) UsedNotifyPush(ctx http.Context, req *do.WechatUsedQuery) error {
queue := this.bc.RdsMQ.GetUsedNotify()
if queue == nil {
return fmt.Errorf("队列不存在")
}
msg, err := json.Marshal(req)
if err != nil {
return err
}
strMsg := string(msg)
_, err = this.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
if err != nil {
return fmt.Errorf("添加到队列失败:%v", err)
}
return nil
}
func (this *VoucherBiz) UsedNotify(ctx context.Context, msg string) error {
log.Warnf("核销重试通知处理,开始:%s", msg)
var req *do.WechatUsedQuery
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err
}
errNum := 0
return this.OrderRepo.FinUsedInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
for _, order := range rows {
event, err := order.Status.GetOrderNotifyEvent()
if err != nil {
return err
}
notify := &bo.OrderNotifyBo{
OrderNo: order.OrderNo,
NotifyUrl: order.NotifyUrl,
Channel: order.Channel,
Event: event,
Type: order.Type,
}
if err = this.request(ctx, order, notify); err != nil {
errNum++
if errNum > 50 {
return fmt.Errorf("核销重试通知处理,通知失败次数超过50次,请检查:%v", err)
}
log.Warnf("核销重试通知处理,通知失败:%v", err)
}
}
return nil
})
}