voucher/internal/biz/retry_notify.go

123 lines
3.3 KiB
Go

package biz
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"time"
"voucher/internal/biz/bo"
)
func (this *VoucherBiz) PushRetryNotify(ctx http.Context, req *bo.FindInBatchesBo) error {
queue := this.bc.RdsMQ.GetRetryNotify()
if queue == nil {
return fmt.Errorf("队列不存在")
}
msg, err := json.Marshal(req)
if err != nil {
return err
}
strMsg := string(msg)
_, err = this.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
if err != nil {
return fmt.Errorf("添加到队列失败:%v", err)
}
return nil
}
func (this *VoucherBiz) RetryNotify(ctx context.Context, msg string) error {
var req *bo.FindInBatchesBo
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err
}
if err := this.RetryNotifyOrder(ctx, req); err != nil {
return err
}
return this.RetryNotifyOrderBack(ctx, req)
}
func (this *VoucherBiz) RetryNotifyOrder(ctx context.Context, req *bo.FindInBatchesBo) error {
start := time.Now()
startStr := time.Now().String()
log.Warnf("重试通知处理:%s", startStr)
fmt.Printf("重试通知处理:%s", startStr)
n := 0
eNum := 0
notifyNum := 0
err := this.OrderRepo.SpecifyFindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
n += 1
for _, order := range rows {
eNum += 1
if _, err := this.Cmb.Notify(ctx, order); err != nil {
log.Errorf("重试通知处理,发生错误,orderNo:%s,outBizNo:%s,voucherNo:%s,err:%v", order.OrderNo, order.OutBizNo, order.VoucherNo, err)
} else {
notifyNum += 1
}
}
log.Warnf("重试通知处理,第:%d组,已执行条数:%d,通知成功条数:%d,", n, eNum, notifyNum)
return nil
})
endTime := time.Now()
log.Warnf("重试通知处理,耗时:%s,结束时间:%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum)
fmt.Printf("重试通知处理,耗时:%s,结束时间%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum)
return err
}
func (this *VoucherBiz) RetryNotifyOrderBack(ctx context.Context, req *bo.FindInBatchesBo) error {
start := time.Now()
startStr := time.Now().String()
log.Warnf("重试通知处理Bak:%s", startStr)
fmt.Printf("重试通知处理Bak:%s", startStr)
n := 0
eNum := 0
notifyNum := 0
err := this.OrderBakRepo.SpecifyFindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
n += 1
for _, order := range rows {
eNum += 1
if _, err := this.Cmb.Notify(ctx, order); err != nil {
log.Errorf("重试通知处理Bak,发生错误,orderNo:%s,outBizNo:%s,voucherNo:%s,err:%v", order.OrderNo, order.OutBizNo, order.VoucherNo, err)
} else {
notifyNum += 1
}
}
log.Warnf("重试通知处理Bak,第:%d组,已执行条数:%d,通知成功条数:%d,", n, eNum, notifyNum)
return nil
})
endTime := time.Now()
log.Warnf("重试通知处理Bak,耗时:%s,结束时间:%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum)
fmt.Printf("重试通知处理Bak,耗时:%s,结束时间%s,处理%d组,处理%d单,通知成功条数:%d", endTime.Sub(start).String(), endTime.String(), n, eNum, notifyNum)
return err
}