query order notify

This commit is contained in:
ziming 2026-03-05 14:14:58 +08:00
parent 42b88fd9f9
commit 9fc1ab9c82
16 changed files with 625 additions and 17 deletions

View File

@ -33,3 +33,15 @@ type WechatUsedQuery struct {
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
}
type RetryQueryNotice struct {
ProductNo string `json:"product_no"`
ReceiveSuccessStartTime string `json:"receive_success_start_time"`
ReceiveSuccessEndTime string `json:"receive_success_end_time"`
OrderNos []string `json:"order_nos"`
BatchNos []string `json:"batch_nos"`
OutBizNos []string `json:"out_biz_nos"`
VoucherNos []string `json:"voucher_nos"`
}

View File

@ -9,6 +9,16 @@ import (
"voucher/internal/biz/do"
)
func (this *VoucherBiz) PushRetryOrderNotice(ctx http.Context, bodyBytes []byte) error {
_, err := this.rdb.Rdb.RPush(ctx, "retryQueryNotice", string(bodyBytes)).Result()
if err != nil {
return fmt.Errorf("添加到队列失败:%v", err)
}
return nil
}
func (this *VoucherBiz) PushOrderNotifyRetry(ctx http.Context, req *do.OrderNotifyRetry) error {
queue := this.bc.RdsMQ.GetOrderNotifyRetry()

View File

@ -15,6 +15,7 @@ type OrderRepo interface {
FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindRetryQuery(ctx context.Context, req *do.RetryQueryNotice, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
GetByOutBizNo(ctx context.Context, t vo.OrderType, outBizNo string) (*bo.OrderBo, error)
GetByOrderNo(ctx context.Context, orderNo string) (*bo.OrderBo, error)
GetByCouponId(ctx context.Context, merchantNo, batchNo, voucherNo string) (*bo.OrderBo, error)

View File

@ -1,10 +0,0 @@
package repo
import (
"context"
"voucher/internal/biz/bo"
)
type OrderBakRepo interface {
SpecifyFindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
}

View File

@ -0,0 +1,17 @@
package repo
import (
"context"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
)
type OrderBakRepo interface {
SpecifyFindInBatches(ctx context.Context, w *bo.FindInBatchesBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindRetryQuery(ctx context.Context, req *do.RetryQueryNotice, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
GetByID(ctx context.Context, id uint64) (*bo.OrderBo, error)
Used(ctx context.Context, id uint64) error
Available(ctx context.Context, id uint64) error
Expired(ctx context.Context, id uint64) error
}

View File

@ -19,8 +19,9 @@ type Query struct {
rdb *data.Rdb
cmb *cmb.Cmb
productRepo repo.ProductRepo
orderRepo repo.OrderRepo
productRepo repo.ProductRepo
orderRepo repo.OrderRepo
orderBakRepo repo.OrderBakRepo
wechatCpnRepo wechatrepo.WechatCpnRepo
mqSendMixRepo mixrepos.MQSendMixRepo
@ -32,6 +33,7 @@ func NewQuery(
cmb *cmb.Cmb,
productRepo repo.ProductRepo,
orderRepo repo.OrderRepo,
orderBakRepo repo.OrderBakRepo,
wechatCpnRepo wechatrepo.WechatCpnRepo,
mqSendMixRepo mixrepos.MQSendMixRepo) *Query {
return &Query{
@ -41,6 +43,7 @@ func NewQuery(
cmb: cmb,
productRepo: productRepo,
orderRepo: orderRepo,
orderBakRepo: orderBakRepo,
wechatCpnRepo: wechatCpnRepo,
mqSendMixRepo: mqSendMixRepo}
}

View File

@ -32,7 +32,8 @@ func (v *Query) wechatQuery(ctx context.Context, order *bo.OrderBo, useNum *int)
func (v *Query) queryUsed(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsUse() {
return v.notify(ctx, order)
_, err := v.cmb.Notify(ctx, order)
return err
}
if err := v.orderRepo.Used(ctx, order.ID); err != nil {
@ -45,7 +46,8 @@ func (v *Query) queryUsed(ctx context.Context, order *bo.OrderBo) error {
func (v *Query) queryExpired(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsExpired() {
return nil
_, err := v.cmb.Notify(ctx, order)
return err
}
if err := v.orderRepo.Expired(ctx, order.ID); err != nil {
@ -55,6 +57,20 @@ func (v *Query) queryExpired(ctx context.Context, order *bo.OrderBo) error {
return v.notify(ctx, order)
}
func (v *Query) querySuccess(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsSuccess() {
_, err := v.cmb.Notify(ctx, order)
return err
}
if err := v.orderRepo.Available(ctx, order.ID); err != nil {
return err
}
return v.notify(ctx, order)
}
func (v *Query) notify(ctx context.Context, order *bo.OrderBo) error {
order, err := v.orderRepo.GetByID(ctx, order.ID)

View File

@ -0,0 +1,62 @@
package timeslicequery
import (
"context"
"voucher/internal/biz/bo"
)
func (v *Query) queryUsedBak(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsUse() {
_, err := v.cmb.Notify(ctx, order)
return err
}
if err := v.orderBakRepo.Used(ctx, order.ID); err != nil {
return err
}
return v.notify(ctx, order)
}
func (v *Query) queryExpiredBak(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsExpired() {
_, err := v.cmb.Notify(ctx, order)
return err
}
if err := v.orderBakRepo.Expired(ctx, order.ID); err != nil {
return err
}
return v.notify(ctx, order)
}
func (v *Query) querySuccessBak(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsSuccess() {
_, err := v.cmb.Notify(ctx, order)
return err
}
if err := v.orderBakRepo.Available(ctx, order.ID); err != nil {
return err
}
return v.notify(ctx, order)
}
func (v *Query) notifyBak(ctx context.Context, order *bo.OrderBo) error {
order, err := v.orderBakRepo.GetByID(ctx, order.ID)
if err != nil {
return err
}
if _, err = v.cmb.Notify(ctx, order); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,246 @@
package timeslicequery
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"
"runtime"
"sync"
"time"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
)
func (v *Query) RetryQueryNotice(ctx context.Context, msg string) error {
var req *do.RetryQueryNotice
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err
}
err := v.RetryQueryNoticeOrder(ctx, req)
if err != nil {
return err
}
return v.RetryQueryNoticeOrderBak(ctx, req)
}
func (v *Query) RetryQueryNoticeOrder(ctx context.Context, req *do.RetryQueryNotice) error {
start := time.Now()
num := 0
errNum := 0
sucNum := 0
var mu sync.Mutex
errs := make([]error, 0)
eg := new(errgroup.Group)
eg.SetLimit(5)
err := v.orderRepo.FindRetryQuery(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
// 获取调用栈信息
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
mu.Lock()
errs = append(errs, fmt.Errorf("panic: %v,file:%s, line:%d", err, file, line))
mu.Unlock()
}
}()
for _, order := range rows {
if err := v.retryQueryNoticeOrder(ctx, order); err != nil {
logFields := map[string]string{
"order_no": order.OrderNo,
"coupon_id": order.VoucherNo,
"open_id": order.Account,
"stock_id": order.BatchNo,
"err": err.Error(),
}
log.Errorf("微信券查询order,错误:%+v", logFields)
errNum++
if errNum > 20 {
return fmt.Errorf("微信券查询order,已经连续发生20次错误%+v", logFields)
}
} else {
sucNum++
}
}
return nil
})
return nil
})
// 等待所有任务完成
if err := eg.Wait(); err != nil {
return fmt.Errorf("微信券查询order任务执行失败: %v", err)
}
end := time.Now()
logFields := map[string]any{
"num": num,
"sucNum": sucNum,
"errNum": errNum,
"elapsed": end.Sub(start).String(),
}
log.Warnf("微信券查询order,处理完毕:%+v", logFields)
// 收集错误
var result error
for _, err2 := range errs {
result = multierror.Append(result, err2)
}
return err
}
func (v *Query) retryQueryNoticeOrder(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsExpired() {
_, err := v.cmb.Notify(ctx, order)
return err
}
status, err := v.wechatCpnRepo.Query(ctx, order)
if err != nil {
return err
}
if status.IsUse() {
return v.queryUsed(ctx, order)
} else if status.IsSuccess() {
return v.querySuccess(ctx, order)
} else if status.IsExpired() {
return v.queryExpired(ctx, order)
}
return nil
}
func (v *Query) RetryQueryNoticeOrderBak(ctx context.Context, req *do.RetryQueryNotice) error {
start := time.Now()
num := 0
errNum := 0
sucNum := 0
var mu sync.Mutex
errs := make([]error, 0)
eg := new(errgroup.Group)
eg.SetLimit(5)
err := v.orderBakRepo.FindRetryQuery(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
// 获取调用栈信息
_, file, line, _ := runtime.Caller(1) // 1 表示获取当前调用者的调用信息
mu.Lock()
errs = append(errs, fmt.Errorf("panic: %v,file:%s, line:%d", err, file, line))
mu.Unlock()
}
}()
for _, order := range rows {
if err := v.retryQueryNoticeOrderBal(ctx, order); err != nil {
logFields := map[string]string{
"order_no": order.OrderNo,
"coupon_id": order.VoucherNo,
"open_id": order.Account,
"stock_id": order.BatchNo,
"err": err.Error(),
}
log.Errorf("微信券查询orderBak,错误:%+v", logFields)
errNum++
if errNum > 20 {
return fmt.Errorf("微信券查询orderBak,已经连续发生20次错误%+v", logFields)
}
} else {
sucNum++
}
}
return nil
})
return nil
})
if err != nil {
return err
}
// 等待所有任务完成
if err2 := eg.Wait(); err2 != nil {
return fmt.Errorf("微信券查询orderBak任务执行失败: %v", err2)
}
end := time.Now()
logFields := map[string]any{
"num": num,
"sucNum": sucNum,
"errNum": errNum,
"elapsed": end.Sub(start).String(),
}
log.Warnf("微信券查询orderBak,处理完毕:%+v", logFields)
// 收集错误
var result error
for _, err2 := range errs {
result = multierror.Append(result, err2)
}
return result
}
func (v *Query) retryQueryNoticeOrderBal(ctx context.Context, order *bo.OrderBo) error {
if order.Status.IsExpired() {
_, err := v.cmb.Notify(ctx, order)
return err
}
status, err := v.wechatCpnRepo.Query(ctx, order)
if err != nil {
return err
}
if status.IsUse() {
return v.queryUsedBak(ctx, order)
} else if status.IsSuccess() {
return v.querySuccessBak(ctx, order)
} else if status.IsExpired() {
return v.queryExpiredBak(ctx, order)
}
return nil
}

View File

@ -206,6 +206,61 @@ func (p *OrderRepoImpl) FindInBatches(ctx context.Context, req *bo.FindInBatches
return nil
}
func (p *OrderRepoImpl) FindRetryQuery(ctx context.Context, req *do.RetryQueryNotice, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
statusArr := []uint8{
vo.OrderStatusSuccess.GetValue(),
vo.OrderStatusUse.GetValue(),
vo.OrderStatusExpired.GetValue(),
}
tx := p.DB(ctx).
Where("`status` in (?)", statusArr).
Where("activity_id = ''")
if req.ProductNo != "" {
tx = tx.Where("product_no = ?", req.ProductNo)
}
if req.ReceiveSuccessStartTime != "" {
tx = tx.Where("receive_success_time > ?", req.ReceiveSuccessStartTime)
}
if req.ReceiveSuccessEndTime != "" {
tx = tx.Where("receive_success_time <= ?", req.ReceiveSuccessEndTime)
}
if req.ProductNo != "" {
tx = tx.Where("product_no = ?", req.ProductNo)
}
if req.OrderNos != nil {
tx = tx.Where("order_no IN (?)", req.OrderNos)
}
if req.OutBizNos != nil {
tx = tx.Where("out_biz_no IN (?)", req.OutBizNos)
}
if req.VoucherNos != nil {
tx = tx.Where("voucher_no IN (?)", req.VoucherNos)
}
var results = make([]*model.Order, 0)
tx.Order("receive_success_time asc") // 显式清除排序,移除默认的 ORDER BY
result := tx.FindInBatches(&results, 1000, func(tx *gorm.DB, batch int) error {
return fun(ctx, p.ToBos(results))
})
if result.Error != nil {
return result.Error
}
return nil
}
func (p *OrderRepoImpl) Create(ctx context.Context, req *bo.OrderBo) (*bo.OrderBo, error) {
now := time.Now()

View File

@ -2,9 +2,15 @@ package repoimpl
import (
"context"
"errors"
"fmt"
"gorm.io/gorm"
"time"
err2 "voucher/api/err"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
"voucher/internal/biz/repo"
"voucher/internal/biz/vo"
"voucher/internal/data"
"voucher/internal/data/model"
)
@ -61,3 +67,142 @@ func (p *OrderBakRepoImpl) SpecifyFindInBatches(ctx context.Context, req *bo.Fin
return nil
}
func (p *OrderBakRepoImpl) FindRetryQuery(ctx context.Context, req *do.RetryQueryNotice, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
statusArr := []uint8{
vo.OrderStatusSuccess.GetValue(),
vo.OrderStatusUse.GetValue(),
vo.OrderStatusExpired.GetValue(),
}
tx := p.DB(ctx).
Where("`status` in (?)", statusArr).
Where("activity_id = ''")
if req.ProductNo != "" {
tx = tx.Where("product_no = ?", req.ProductNo)
}
if req.ReceiveSuccessStartTime != "" {
tx = tx.Where("receive_success_time > ?", req.ReceiveSuccessStartTime)
}
if req.ReceiveSuccessEndTime != "" {
tx = tx.Where("receive_success_time <= ?", req.ReceiveSuccessEndTime)
}
if req.ProductNo != "" {
tx = tx.Where("product_no = ?", req.ProductNo)
}
if req.OrderNos != nil {
tx = tx.Where("order_no IN (?)", req.OrderNos)
}
if req.OutBizNos != nil {
tx = tx.Where("out_biz_no IN (?)", req.OutBizNos)
}
if req.VoucherNos != nil {
tx = tx.Where("voucher_no IN (?)", req.VoucherNos)
}
var results = make([]*model.OrderBak, 0)
tx.Order("receive_success_time asc") // 显式清除排序,移除默认的 ORDER BY
result := tx.FindInBatches(&results, 1000, func(tx *gorm.DB, batch int) error {
return fun(ctx, p.ToBos(results))
})
if result.Error != nil {
return result.Error
}
return nil
}
func (p *OrderBakRepoImpl) GetByID(ctx context.Context, id uint64) (*bo.OrderBo, error) {
info := &model.OrderBak{}
tx := p.DB(ctx).Where(model.OrderBak{ID: id}).First(&info)
if tx.Error != nil {
if errors.Is(tx.Error, gorm.ErrRecordNotFound) {
return nil, err2.ErrorDbNotFound("订单数据不存在")
}
return nil, fmt.Errorf("order db fail %w", tx.Error)
}
if tx.RowsAffected == 0 {
return nil, err2.ErrorDbNotFound("订单数据不存在")
}
return p.ToBo(info), nil
}
func (p *OrderBakRepoImpl) Used(ctx context.Context, id uint64) error {
now := time.Now()
tx := p.DB(ctx).
Where(model.OrderBak{
ID: id,
}).
Updates(model.OrderBak{
Status: vo.OrderStatusUse.GetValue(),
Remark: "核销",
LastUseTime: &now,
UpdateTime: &now,
})
if tx.Error != nil {
return fmt.Errorf("update db fail %w", tx.Error)
}
return nil
}
func (p *OrderBakRepoImpl) Expired(ctx context.Context, id uint64) error {
now := time.Now()
tx := p.DB(ctx).
Where(model.OrderBak{
ID: id,
}).
Updates(model.OrderBak{
Status: vo.OrderStatusExpired.GetValue(),
Remark: "过期",
UpdateTime: &now,
})
if tx.Error != nil {
return fmt.Errorf("update db fail %w", tx.Error)
}
return nil
}
func (p *OrderBakRepoImpl) Available(ctx context.Context, id uint64) error {
now := time.Now()
tx := p.DB(ctx).
Where(model.OrderBak{
ID: id,
Status: vo.OrderStatusUse.GetValue(),
}).
Updates(model.OrderBak{
Status: vo.OrderStatusSuccess.GetValue(),
Remark: "重置为成功,领取成功时间重置",
ReceiveSuccessTime: &now, // 领取成功时间重置
UpdateTime: &now,
})
if tx.Error != nil {
return fmt.Errorf("update db fail %w", tx.Error)
}
return nil
}

View File

@ -42,6 +42,7 @@ func NewHTTPServer(
// 订单通知重试 -- 不健全
srv.Route("/voucher/").POST("orderNotifyRetry", cmb.OrderNotifyRetry)
srv.Route("/voucher/").POST("retryOrderNotice", cmb.RetryOrderNotice)
// 重试通知
srv.Route("/voucher/").POST("notifyRetry/{id}", cmb.NotifyRetry)
// 查询订单状态及微信状态

View File

@ -26,6 +26,10 @@ func NewRdbConsumer(
) *RdbConsumer {
manager := rdsmq.NewConsumerManager()
if cfTr := voucherService.GetRetryQueryNoticeConfig(); cfTr != nil {
manager.Add(cfTr)
}
if cf := voucherService.GetWechatQueryConfig(); cf != nil {
manager.Add(cf)
}

View File

@ -11,6 +11,25 @@ import (
"voucher/internal/biz/do"
)
func (this *CmbService) RetryOrderNotice(ctx http.Context) error {
bodyBytes, err := io.ReadAll(ctx.Request().Body)
if err != nil {
return err
}
if bodyBytes == nil {
return fmt.Errorf("bodyBytes is empty")
}
var req *do.RetryQueryNotice
if err = json.Unmarshal(bodyBytes, &req); err != nil {
return err
}
return this.VoucherBiz.PushRetryOrderNotice(ctx, bodyBytes)
}
func (this *CmbService) OrderNotifyRetry(ctx http.Context) error {
bodyBytes, err := io.ReadAll(ctx.Request().Body)

View File

@ -80,3 +80,30 @@ func (s *VoucherService) WechatTimeSliceQueryHandle(ctx context.Context, msg str
return nil
}
func (s *VoucherService) GetRetryQueryNoticeConfig() *rdsmq.ConsumeConfig {
return &rdsmq.ConsumeConfig{
Rdb: s.rdb.Rdb,
QueueName: "retryQueryNotice",
NumWorkers: 1,
WaitTime: 30,
RetryNum: 1,
Fn: s.RetryQueryNotice,
Logger: s.logHelper,
}
}
func (s *VoucherService) RetryQueryNotice(ctx context.Context, msg string) error {
if msg == "" {
s.logHelper.Errorf("wechat TimeSlice query error: batchNo is empty")
return nil
}
if err := s.timeSliceQuery.RetryQueryNotice(ctx, msg); err != nil {
s.logHelper.Errorf("retry query notice msg:%s error: %v", msg, err)
}
return nil
}

View File

@ -102,9 +102,9 @@ func QueryCoupon() {
return
}
appId := "wxd27e255810842ba8"
openId := "o3dEt5Wq3v-bEBXXkzvIlMgMh7Kc"
couponId := "149248300483"
appId := "wx619991cc795028f5"
openId := "oSNb4fnWoktz7YVNIXE5bvoB3-1w"
couponId := "106048490308"
req := cashcoupons.QueryCouponRequest{
CouponId: core.String(couponId),