package timeslicequery import ( "context" "encoding/json" "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/hashicorp/go-multierror" "golang.org/x/sync/errgroup" "runtime" "sync" "time" "voucher/internal/biz/bo" "voucher/internal/biz/do" ) func (v *Query) RetryQueryNotice(ctx context.Context, msg string) error { var req *do.RetryQueryNotice if err := json.Unmarshal([]byte(msg), &req); err != nil { return err } err := v.RetryQueryNoticeOrder(ctx, req) if err != nil { return err } return v.RetryQueryNoticeOrderBak(ctx, req) } func (v *Query) RetryQueryNoticeOrder(ctx context.Context, req *do.RetryQueryNotice) error { start := time.Now() num := 0 errNum := 0 sucNum := 0 var mu sync.Mutex errs := make([]error, 0) eg := new(errgroup.Group) eg.SetLimit(5) err := v.orderRepo.FindRetryQuery(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { eg.Go(func() error { defer func() { if err := recover(); err != nil { // 获取调用栈信息 _, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息 mu.Lock() errs = append(errs, fmt.Errorf("panic: %v,file:%s, line:%d", err, file, line)) mu.Unlock() } }() for _, order := range rows { if err := v.retryQueryNoticeOrder(ctx, order); err != nil { logFields := map[string]string{ "order_no": order.OrderNo, "coupon_id": order.VoucherNo, "open_id": order.Account, "stock_id": order.BatchNo, "err": err.Error(), } log.Errorf("微信券查询order,错误:%+v", logFields) errNum++ if errNum > 20 { return fmt.Errorf("微信券查询order,已经连续发生20次错误%+v", logFields) } } else { sucNum++ } } return nil }) return nil }) // 等待所有任务完成 if err := eg.Wait(); err != nil { return fmt.Errorf("微信券查询order,任务执行失败: %v", err) } logFields := map[string]any{ "num": num, "sucNum": sucNum, "errNum": errNum, "elapsed": time.Now().Sub(start).String(), } log.Warnf("微信券查询order,处理完毕:%+v", logFields) // 收集错误 var result error for _, err2 := range errs { result = multierror.Append(result, err2) } return err } func (v *Query) retryQueryNoticeOrder(ctx context.Context, order *bo.OrderBo) error { if order.Status.IsExpired() { _, err := v.cmb.Notify(ctx, order) return err } status, err := v.wechatCpnRepo.Query(ctx, order) if err != nil { return err } if status.IsUse() { return v.queryUsed(ctx, order) } else if status.IsSuccess() { return v.querySuccess(ctx, order) } else if status.IsExpired() { return v.queryExpired(ctx, order) } else { log.Warnf("微信券查询order,未知状态orderNo:%s,status:%d", order.OrderNo, status) } return nil } func (v *Query) RetryQueryNoticeOrderBak(ctx context.Context, req *do.RetryQueryNotice) error { start := time.Now() num := 0 errNum := 0 sucNum := 0 var mu sync.Mutex errs := make([]error, 0) eg := new(errgroup.Group) eg.SetLimit(5) err := v.orderBakRepo.FindRetryQuery(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { eg.Go(func() error { defer func() { if err := recover(); err != nil { // 获取调用栈信息 _, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息 mu.Lock() errs = append(errs, fmt.Errorf("panic: %v,file:%s, line:%d", err, file, line)) mu.Unlock() } }() for _, order := range rows { if err := v.retryQueryNoticeOrderBal(ctx, order); err != nil { logFields := map[string]string{ "order_no": order.OrderNo, "coupon_id": order.VoucherNo, "open_id": order.Account, "stock_id": order.BatchNo, "err": err.Error(), } log.Errorf("微信券查询orderBak,错误:%+v", logFields) errNum++ if errNum > 20 { return fmt.Errorf("微信券查询orderBak,已经连续发生20次错误%+v", logFields) } } else { sucNum++ } } return nil }) return nil }) if err != nil { return err } // 等待所有任务完成 if err2 := eg.Wait(); err2 != nil { return fmt.Errorf("微信券查询orderBak,任务执行失败: %v", err2) } logFields := map[string]any{ "num": num, "sucNum": sucNum, "errNum": errNum, "elapsed": time.Now().Sub(start).String(), } log.Warnf("微信券查询orderBak,处理完毕:%+v", logFields) // 收集错误 var result error for _, err2 := range errs { result = multierror.Append(result, err2) } return result } func (v *Query) retryQueryNoticeOrderBal(ctx context.Context, order *bo.OrderBo) error { if order.Status.IsExpired() { _, err := v.cmb.Notify(ctx, order) return err } status, err := v.wechatCpnRepo.Query(ctx, order) if err != nil { return err } if status.IsUse() { return v.queryUsedBak(ctx, order) } else if status.IsSuccess() { return v.querySuccessBak(ctx, order) } else if status.IsExpired() { return v.queryExpiredBak(ctx, order) } else { log.Warnf("微信券查询orderBak,未知状态orderNo:%s,status:%d", order.OrderNo, status) } return nil }