voucher/internal/biz/timeslicequery/retry_query_notice.go

247 lines
5.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package timeslicequery
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"
"runtime"
"sync"
"time"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
)
func (v *Query) RetryQueryNotice(ctx context.Context, msg string) error {
var req *do.RetryQueryNotice
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err
}
err := v.RetryQueryNoticeOrder(ctx, req)
if err != nil {
return err
}
return v.RetryQueryNoticeOrderBak(ctx, req)
}
func (v *Query) RetryQueryNoticeOrder(ctx context.Context, req *do.RetryQueryNotice) error {
start := time.Now()
num := 0
errNum := 0
sucNum := 0
var mu sync.Mutex
errs := make([]error, 0)
eg := new(errgroup.Group)
eg.SetLimit(5)
err := v.orderRepo.FindRetryQuery(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
// 获取调用栈信息
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
mu.Lock()
errs = append(errs, fmt.Errorf("panic: %v,file:%s, line:%d", err, file, line))
mu.Unlock()
}
}()
for _, order := range rows {
if err := v.retryQueryNoticeOrder(ctx, order); err != nil {
logFields := map[string]string{
"order_no": order.OrderNo,
"coupon_id": order.VoucherNo,
"open_id": order.Account,
"stock_id": order.BatchNo,
"err": err.Error(),
}
log.Errorf("微信券查询order,错误:%+v", logFields)
errNum++
if errNum > 20 {
return fmt.Errorf("微信券查询order,已经连续发生20次错误%+v", logFields)
}
} else {
sucNum++
}
}
return nil
})
return nil
})
// 等待所有任务完成
if err := eg.Wait(); err != nil {
return fmt.Errorf("微信券查询order任务执行失败: %v", err)
}
logFields := map[string]any{
"num": num,
"sucNum": sucNum,
"errNum": errNum,
"elapsed": time.Now().Sub(start).String(),
}
log.Warnf("微信券查询order,处理完毕:%+v", logFields)
// 收集错误
var result error
for _, err2 := range errs {
result = multierror.Append(result, err2)
}
return err
}
func (v *Query) retryQueryNoticeOrder(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsExpired() {
_, err := v.cmb.Notify(ctx, order)
return err
}
status, err := v.wechatCpnRepo.Query(ctx, order)
if err != nil {
return err
}
if status.IsUse() {
return v.queryUsed(ctx, order)
} else if status.IsSuccess() {
return v.querySuccess(ctx, order)
} else if status.IsExpired() {
return v.queryExpired(ctx, order)
} else {
log.Warnf("微信券查询order,未知状态orderNo:%s,status:%d", order.OrderNo, status)
}
return nil
}
func (v *Query) RetryQueryNoticeOrderBak(ctx context.Context, req *do.RetryQueryNotice) error {
start := time.Now()
num := 0
errNum := 0
sucNum := 0
var mu sync.Mutex
errs := make([]error, 0)
eg := new(errgroup.Group)
eg.SetLimit(5)
err := v.orderBakRepo.FindRetryQuery(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
// 获取调用栈信息
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
mu.Lock()
errs = append(errs, fmt.Errorf("panic: %v,file:%s, line:%d", err, file, line))
mu.Unlock()
}
}()
for _, order := range rows {
if err := v.retryQueryNoticeOrderBal(ctx, order); err != nil {
logFields := map[string]string{
"order_no": order.OrderNo,
"coupon_id": order.VoucherNo,
"open_id": order.Account,
"stock_id": order.BatchNo,
"err": err.Error(),
}
log.Errorf("微信券查询orderBak,错误:%+v", logFields)
errNum++
if errNum > 20 {
return fmt.Errorf("微信券查询orderBak,已经连续发生20次错误%+v", logFields)
}
} else {
sucNum++
}
}
return nil
})
return nil
})
if err != nil {
return err
}
// 等待所有任务完成
if err2 := eg.Wait(); err2 != nil {
return fmt.Errorf("微信券查询orderBak任务执行失败: %v", err2)
}
logFields := map[string]any{
"num": num,
"sucNum": sucNum,
"errNum": errNum,
"elapsed": time.Now().Sub(start).String(),
}
log.Warnf("微信券查询orderBak,处理完毕:%+v", logFields)
// 收集错误
var result error
for _, err2 := range errs {
result = multierror.Append(result, err2)
}
return result
}
func (v *Query) retryQueryNoticeOrderBal(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsExpired() {
_, err := v.cmb.Notify(ctx, order)
return err
}
status, err := v.wechatCpnRepo.Query(ctx, order)
if err != nil {
return err
}
if status.IsUse() {
return v.queryUsedBak(ctx, order)
} else if status.IsSuccess() {
return v.querySuccessBak(ctx, order)
} else if status.IsExpired() {
return v.queryExpiredBak(ctx, order)
} else {
log.Warnf("微信券查询orderBak,未知状态orderNo:%s,status:%d", order.OrderNo, status)
}
return nil
}