WechatQuery

This commit is contained in:
ziming 2025-06-06 11:39:26 +08:00
parent 11ff1c8540
commit 9325bde2a3
7 changed files with 80 additions and 28 deletions

View File

@ -0,0 +1,7 @@
package do
type WechatQuery struct {
ProductNo string `json:"product_no"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
}

View File

@ -3,11 +3,12 @@ package repo
import ( import (
"context" "context"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/do"
"voucher/internal/biz/vo" "voucher/internal/biz/vo"
) )
type OrderRepo interface { type OrderRepo interface {
FinSucByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error

View File

@ -2,15 +2,17 @@ package biz
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"time" "time"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/do"
) )
func (v *VoucherBiz) PushWechatQuery(ctx context.Context, productNo string) error { func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) error {
product, err := v.ProductRepo.GetByProductNo(ctx, productNo) _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo)
if err != nil { if err != nil {
return err return err
} }
@ -20,7 +22,12 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, productNo string) erro
return fmt.Errorf("队列不存在") return fmt.Errorf("队列不存在")
} }
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, product.BatchNo).Result() msg, err := json.Marshal(req)
if err != nil {
return err
}
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result()
if err != nil { if err != nil {
return fmt.Errorf("添加到队列失败:%v", err) return fmt.Errorf("添加到队列失败:%v", err)
} }
@ -28,21 +35,27 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, productNo string) erro
return nil return nil
} }
func (v *VoucherBiz) WechatQuery(ctx context.Context, batchNo string) error { func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
start := time.Now() start := time.Now()
log.Warnf("微信券查询处理开始:%s,batchNo:%s", start.String(), batchNo) log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg)
fmt.Printf("微信券查询处理开始:%s,batchNo:%s", start.String(), batchNo) fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg)
var req *do.WechatQuery
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err
}
num := 0 num := 0
err := v.OrderRepo.FinSucByStockIdInBatches(ctx, batchNo, func(ctx context.Context, rows []*bo.OrderBo) error { err := v.OrderRepo.FinSucByStockIdInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
for _, order := range rows { for _, order := range rows {
num += 1 num += 1
if err := v.wechatQuery(ctx, order); err != nil { if err := v.wechatQuery(ctx, order); err != nil {
log.Errorf("微信查询券订单状态发生错误,batchNo:%s,orderNo:%s,couponId:%s,appId:%s,openId:%s,err:%v", log.Errorf("微信查询券订单状态发生错误,msg:%s,orderNo:%s,couponId:%s,appId:%s,openId:%s,err:%v",
batchNo, order.OrderNo, order.VoucherNo, order.AppID, order.Account, err) msg, order.OrderNo, order.VoucherNo, order.AppID, order.Account, err)
} }
} }
@ -52,8 +65,8 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, batchNo string) error {
return nil return nil
}) })
log.Warnf("微信券查询处理耗时:%s,batchNo:%s,处理%d单", time.Now().Sub(start).String(), batchNo, num) log.Warnf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num)
fmt.Printf("微信券查询处理耗时:%s,batchNo:%s,处理%d单", time.Now().Sub(start).String(), batchNo, num) fmt.Printf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num)
return err return err
} }

View File

@ -9,6 +9,7 @@ import (
"unicode/utf8" "unicode/utf8"
err2 "voucher/api/err" err2 "voucher/api/err"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/do"
"voucher/internal/biz/repo" "voucher/internal/biz/repo"
"voucher/internal/biz/vo" "voucher/internal/biz/vo"
"voucher/internal/data" "voucher/internal/data"
@ -30,16 +31,25 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB {
return p.db.DB(ctx).Model(model.Order{}) return p.db.DB(ctx).Model(model.Order{})
} }
func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
tx := p.DB(ctx).
Where("product_no = ?", req.ProductNo).
Where("status = ?", vo.OrderStatusSuccess.GetValue())
if req.StartTime != "" {
tx = tx.Where("receive_success_time >= ?", req.StartTime)
}
if req.EndTime != "" {
tx = tx.Where("receive_success_time <= ?", req.EndTime)
}
var results = make([]*model.Order, 0) var results = make([]*model.Order, 0)
result := p.DB(ctx). result := tx.FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error {
Where("batch_no = ?", batchNo).
Where("status = ?", vo.OrderStatusSuccess.GetValue()). return fun(ctx, p.ToBos(results))
FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { })
return fun(ctx, p.ToBos(results))
})
if result.Error != nil { if result.Error != nil {
return result.Error return result.Error

View File

@ -36,11 +36,11 @@ func NewHTTPServer(
return ctx.String(http2.StatusOK, "pong") return ctx.String(http2.StatusOK, "pong")
}) })
srv.Route("/voucher/").GET("notifyRetry/{id}", cmb.NotifyRetry) srv.Route("/voucher/").POST("notifyRetry/{id}", cmb.NotifyRetry)
srv.Route("/voucher/").GET("queryOrder/{order_no}", cmb.QueryOrder) srv.Route("/voucher/").POST("queryOrder/{order_no}", cmb.QueryOrder)
srv.Route("/voucher/").GET("registerTag/{product_no}", cmb.RegisterTag) srv.Route("/voucher/").POST("registerTag/{product_no}", cmb.RegisterTag)
srv.Route("/voucher/").GET("pushWechatQuery/{product_no}", cmb.PushWechatQuery) srv.Route("/voucher/").POST("pushWechatQuery/{product_no}", cmb.PushWechatQuery)
srv.Route("/voucher/").GET("pushWechatRetry/{product_no}", cmb.PushWechatRetry) srv.Route("/voucher/").POST("pushWechatRetry/{product_no}", cmb.PushWechatRetry)
v1.RegisterCmbHTTPServer(srv, cmb) v1.RegisterCmbHTTPServer(srv, cmb)

View File

@ -2,15 +2,18 @@ package service
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http" "github.com/go-kratos/kratos/v2/transport/http"
"github.com/robfig/cron" "github.com/robfig/cron"
"io"
http2 "net/http" http2 "net/http"
"strconv" "strconv"
v1 "voucher/api/v1" v1 "voucher/api/v1"
"voucher/internal/biz" "voucher/internal/biz"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/do"
"voucher/internal/biz/mixrepos" "voucher/internal/biz/mixrepos"
"voucher/internal/biz/vo" "voucher/internal/biz/vo"
"voucher/internal/biz/wechatrepo" "voucher/internal/biz/wechatrepo"
@ -120,11 +123,29 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error {
return fmt.Errorf("product_no is empty") return fmt.Errorf("product_no is empty")
} }
err := this.VoucherBiz.PushWechatQuery(ctx, productNo) bodyBytes, err := io.ReadAll(ctx.Request().Body)
if err != nil { if err != nil {
return err return err
} }
var req *do.WechatQuery
if err = json.Unmarshal(bodyBytes, &req); err != nil {
return err
}
if req == nil {
return fmt.Errorf("req is empty")
}
req.ProductNo = productNo
if req.StartTime == "" || req.EndTime == "" {
return fmt.Errorf("start_time or end_time is empty")
}
if err = this.VoucherBiz.PushWechatQuery(ctx, req); err != nil {
return err
}
return ctx.JSON(http2.StatusOK, map[string]interface{}{ return ctx.JSON(http2.StatusOK, map[string]interface{}{
"data": productNo, "data": productNo,
}) })

View File

@ -30,14 +30,14 @@ func (s *VoucherService) GetWechatQueryConfig() *rdsmq.ConsumeConfig {
} }
} }
func (s *VoucherService) WechatQueryHandle(ctx context.Context, batchNo string) error { func (s *VoucherService) WechatQueryHandle(ctx context.Context, msg string) error {
if batchNo == "" { if msg == "" {
s.logHelper.Errorf("wechat query error: batchNo is empty") s.logHelper.Errorf("wechat query error: batchNo is empty")
return nil return nil
} }
if err := s.VoucherBiz.WechatQuery(ctx, batchNo); err != nil { if err := s.VoucherBiz.WechatQuery(ctx, msg); err != nil {
s.logHelper.Error(err) s.logHelper.Error(err)
} }