247 lines
5.1 KiB
Go
247 lines
5.1 KiB
Go
package timeslicequery
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/go-kratos/kratos/v2/log"
|
||
"github.com/hashicorp/go-multierror"
|
||
"golang.org/x/sync/errgroup"
|
||
"runtime"
|
||
"sync"
|
||
"time"
|
||
"voucher/internal/biz/bo"
|
||
"voucher/internal/biz/do"
|
||
)
|
||
|
||
func (v *Query) RetryQueryNotice(ctx context.Context, msg string) error {
|
||
|
||
var req *do.RetryQueryNotice
|
||
|
||
if err := json.Unmarshal([]byte(msg), &req); err != nil {
|
||
return err
|
||
}
|
||
|
||
err := v.RetryQueryNoticeOrder(ctx, req)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
return v.RetryQueryNoticeOrderBak(ctx, req)
|
||
}
|
||
|
||
func (v *Query) RetryQueryNoticeOrder(ctx context.Context, req *do.RetryQueryNotice) error {
|
||
|
||
start := time.Now()
|
||
num := 0
|
||
errNum := 0
|
||
sucNum := 0
|
||
|
||
var mu sync.Mutex
|
||
errs := make([]error, 0)
|
||
|
||
eg := new(errgroup.Group)
|
||
eg.SetLimit(5)
|
||
|
||
err := v.orderRepo.FindRetryQuery(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
||
|
||
eg.Go(func() error {
|
||
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
// 获取调用栈信息
|
||
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
|
||
|
||
mu.Lock()
|
||
errs = append(errs, fmt.Errorf("panic: %v,file:%s, line:%d", err, file, line))
|
||
mu.Unlock()
|
||
}
|
||
}()
|
||
|
||
for _, order := range rows {
|
||
|
||
if err := v.retryQueryNoticeOrder(ctx, order); err != nil {
|
||
|
||
logFields := map[string]string{
|
||
"order_no": order.OrderNo,
|
||
"coupon_id": order.VoucherNo,
|
||
"open_id": order.Account,
|
||
"stock_id": order.BatchNo,
|
||
"err": err.Error(),
|
||
}
|
||
|
||
log.Errorf("微信券查询order,错误:%+v", logFields)
|
||
|
||
errNum++
|
||
|
||
if errNum > 20 {
|
||
return fmt.Errorf("微信券查询order,已经连续发生20次错误%+v", logFields)
|
||
}
|
||
|
||
} else {
|
||
sucNum++
|
||
}
|
||
}
|
||
|
||
return nil
|
||
})
|
||
|
||
return nil
|
||
})
|
||
|
||
// 等待所有任务完成
|
||
if err := eg.Wait(); err != nil {
|
||
return fmt.Errorf("微信券查询order,任务执行失败: %v", err)
|
||
}
|
||
|
||
logFields := map[string]any{
|
||
"num": num,
|
||
"sucNum": sucNum,
|
||
"errNum": errNum,
|
||
"elapsed": time.Now().Sub(start).String(),
|
||
}
|
||
log.Warnf("微信券查询order,处理完毕:%+v", logFields)
|
||
|
||
// 收集错误
|
||
var result error
|
||
for _, err2 := range errs {
|
||
result = multierror.Append(result, err2)
|
||
}
|
||
|
||
return err
|
||
}
|
||
|
||
func (v *Query) retryQueryNoticeOrder(ctx context.Context, order *bo.OrderBo) error {
|
||
|
||
if order.Status.IsExpired() {
|
||
_, err := v.cmb.Notify(ctx, order)
|
||
return err
|
||
}
|
||
|
||
status, err := v.wechatCpnRepo.Query(ctx, order)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if status.IsUse() {
|
||
return v.queryUsed(ctx, order)
|
||
} else if status.IsSuccess() {
|
||
return v.querySuccess(ctx, order)
|
||
} else if status.IsExpired() {
|
||
return v.queryExpired(ctx, order)
|
||
} else {
|
||
log.Warnf("微信券查询order,未知状态orderNo:%s,status:%d", order.OrderNo, status)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (v *Query) RetryQueryNoticeOrderBak(ctx context.Context, req *do.RetryQueryNotice) error {
|
||
|
||
start := time.Now()
|
||
num := 0
|
||
errNum := 0
|
||
sucNum := 0
|
||
|
||
var mu sync.Mutex
|
||
errs := make([]error, 0)
|
||
|
||
eg := new(errgroup.Group)
|
||
eg.SetLimit(5)
|
||
|
||
err := v.orderBakRepo.FindRetryQuery(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
||
|
||
eg.Go(func() error {
|
||
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
// 获取调用栈信息
|
||
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
|
||
|
||
mu.Lock()
|
||
errs = append(errs, fmt.Errorf("panic: %v,file:%s, line:%d", err, file, line))
|
||
mu.Unlock()
|
||
}
|
||
}()
|
||
|
||
for _, order := range rows {
|
||
if err := v.retryQueryNoticeOrderBal(ctx, order); err != nil {
|
||
|
||
logFields := map[string]string{
|
||
"order_no": order.OrderNo,
|
||
"coupon_id": order.VoucherNo,
|
||
"open_id": order.Account,
|
||
"stock_id": order.BatchNo,
|
||
"err": err.Error(),
|
||
}
|
||
|
||
log.Errorf("微信券查询orderBak,错误:%+v", logFields)
|
||
|
||
errNum++
|
||
|
||
if errNum > 20 {
|
||
return fmt.Errorf("微信券查询orderBak,已经连续发生20次错误%+v", logFields)
|
||
}
|
||
|
||
} else {
|
||
sucNum++
|
||
}
|
||
}
|
||
|
||
return nil
|
||
})
|
||
|
||
return nil
|
||
})
|
||
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 等待所有任务完成
|
||
if err2 := eg.Wait(); err2 != nil {
|
||
return fmt.Errorf("微信券查询orderBak,任务执行失败: %v", err2)
|
||
}
|
||
|
||
logFields := map[string]any{
|
||
"num": num,
|
||
"sucNum": sucNum,
|
||
"errNum": errNum,
|
||
"elapsed": time.Now().Sub(start).String(),
|
||
}
|
||
log.Warnf("微信券查询orderBak,处理完毕:%+v", logFields)
|
||
|
||
// 收集错误
|
||
var result error
|
||
for _, err2 := range errs {
|
||
result = multierror.Append(result, err2)
|
||
}
|
||
|
||
return result
|
||
}
|
||
|
||
func (v *Query) retryQueryNoticeOrderBal(ctx context.Context, order *bo.OrderBo) error {
|
||
|
||
if order.Status.IsExpired() {
|
||
_, err := v.cmb.Notify(ctx, order)
|
||
return err
|
||
}
|
||
|
||
status, err := v.wechatCpnRepo.Query(ctx, order)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
if status.IsUse() {
|
||
return v.queryUsedBak(ctx, order)
|
||
} else if status.IsSuccess() {
|
||
return v.querySuccessBak(ctx, order)
|
||
} else if status.IsExpired() {
|
||
return v.queryExpiredBak(ctx, order)
|
||
} else {
|
||
log.Warnf("微信券查询orderBak,未知状态orderNo:%s,status:%d", order.OrderNo, status)
|
||
}
|
||
|
||
return nil
|
||
}
|