Merge branch 'pro' into kx

This commit is contained in:
ziming 2025-06-09 09:30:33 +08:00
commit a367a4530c
9 changed files with 88 additions and 36 deletions

View File

@ -0,0 +1,7 @@
package do
type WechatQuery struct {
ProductNo string `json:"product_no"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
}

View File

@ -3,11 +3,12 @@ package repo
import ( import (
"context" "context"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/do"
"voucher/internal/biz/vo" "voucher/internal/biz/vo"
) )
type OrderRepo interface { type OrderRepo interface {
FinSucByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error
FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error

View File

@ -2,15 +2,17 @@ package biz
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"time" "time"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/do"
) )
func (v *VoucherBiz) PushWechatQuery(ctx context.Context, productNo string) error { func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) error {
product, err := v.ProductRepo.GetByProductNo(ctx, productNo) _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo)
if err != nil { if err != nil {
return err return err
} }
@ -20,7 +22,12 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, productNo string) erro
return fmt.Errorf("队列不存在") return fmt.Errorf("队列不存在")
} }
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, product.BatchNo).Result() msg, err := json.Marshal(req)
if err != nil {
return err
}
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result()
if err != nil { if err != nil {
return fmt.Errorf("添加到队列失败:%v", err) return fmt.Errorf("添加到队列失败:%v", err)
} }
@ -28,21 +35,27 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, productNo string) erro
return nil return nil
} }
func (v *VoucherBiz) WechatQuery(ctx context.Context, batchNo string) error { func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
var req *do.WechatQuery
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err
}
start := time.Now() start := time.Now()
log.Warnf("微信券查询处理开始:%s,batchNo:%s", start.String(), batchNo) log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg)
fmt.Printf("微信券查询处理开始:%s,batchNo:%s", start.String(), batchNo) fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg)
num := 0 num := 0
err := v.OrderRepo.FinSucByStockIdInBatches(ctx, batchNo, func(ctx context.Context, rows []*bo.OrderBo) error { err := v.OrderRepo.FinSucByStockIdInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
for _, order := range rows { for _, order := range rows {
num += 1 num += 1
if err := v.wechatQuery(ctx, order); err != nil { if err := v.wechatQuery(ctx, order); err != nil {
log.Errorf("微信查询券订单状态发生错误,batchNo:%s,orderNo:%s,couponId:%s,appId:%s,openId:%s,err:%v", log.Errorf("微信查询券订单状态发生错误,msg:%s,orderNo:%s,couponId:%s,appId:%s,openId:%s,err:%v",
batchNo, order.OrderNo, order.VoucherNo, order.AppID, order.Account, err) msg, order.OrderNo, order.VoucherNo, order.AppID, order.Account, err)
} }
} }
@ -52,8 +65,8 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, batchNo string) error {
return nil return nil
}) })
log.Warnf("微信券查询处理耗时:%s,batchNo:%s,处理%d单", time.Now().Sub(start).String(), batchNo, num) log.Warnf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num)
fmt.Printf("微信券查询处理耗时:%s,batchNo:%s,处理%d单", time.Now().Sub(start).String(), batchNo, num) fmt.Printf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num)
return err return err
} }

View File

@ -9,6 +9,7 @@ import (
"unicode/utf8" "unicode/utf8"
err2 "voucher/api/err" err2 "voucher/api/err"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/do"
"voucher/internal/biz/repo" "voucher/internal/biz/repo"
"voucher/internal/biz/vo" "voucher/internal/biz/vo"
"voucher/internal/data" "voucher/internal/data"
@ -30,16 +31,25 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB {
return p.db.DB(ctx).Model(model.Order{}) return p.db.DB(ctx).Model(model.Order{})
} }
func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
tx := p.DB(ctx).
Where("product_no = ?", req.ProductNo).
Where("status = ?", vo.OrderStatusSuccess.GetValue())
if req.StartTime != "" {
tx = tx.Where("receive_success_time >= ?", req.StartTime)
}
if req.EndTime != "" {
tx = tx.Where("receive_success_time <= ?", req.EndTime)
}
var results = make([]*model.Order, 0) var results = make([]*model.Order, 0)
result := p.DB(ctx). result := tx.FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error {
Where("batch_no = ?", batchNo).
Where("status = ?", vo.OrderStatusSuccess.GetValue()). return fun(ctx, p.ToBos(results))
FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { })
return fun(ctx, p.ToBos(results))
})
if result.Error != nil { if result.Error != nil {
return result.Error return result.Error
@ -55,7 +65,7 @@ func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo s
result := p.DB(ctx). result := p.DB(ctx).
Where("batch_no = ?", batchNo). Where("batch_no = ?", batchNo).
Where("status = ?", vo.OrderStatusFail.GetValue()). Where("status = ?", vo.OrderStatusFail.GetValue()).
Where("remark <> ?", "error: code = 500 reason = WechatFAIL message = 微信返回错误:该用户账号异常,无法领券。商家可联系微信支付或让用户联系微信支付客服处理。 metadat"). //Where("remark <> ?", "error: code = 500 reason = WechatFAIL message = 微信返回错误:该用户账号异常,无法领券。商家可联系微信支付或让用户联系微信支付客服处理。 metadat").
FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error {
return fun(ctx, p.ToBos(results)) return fun(ctx, p.ToBos(results))
}) })

View File

@ -36,11 +36,11 @@ func NewHTTPServer(
return ctx.String(http2.StatusOK, "pong") return ctx.String(http2.StatusOK, "pong")
}) })
srv.Route("/voucher/").GET("notifyRetry/{id}", cmb.NotifyRetry) srv.Route("/voucher/").POST("notifyRetry/{id}", cmb.NotifyRetry)
srv.Route("/voucher/").GET("queryOrder/{order_no}", cmb.QueryOrder) srv.Route("/voucher/").POST("queryOrder/{order_no}", cmb.QueryOrder)
srv.Route("/voucher/").GET("registerTag/{product_no}", cmb.RegisterTag) srv.Route("/voucher/").POST("registerTag/{product_no}", cmb.RegisterTag)
srv.Route("/voucher/").GET("pushWechatQuery/{product_no}", cmb.PushWechatQuery) srv.Route("/voucher/").POST("pushWechatQuery/{product_no}", cmb.PushWechatQuery)
srv.Route("/voucher/").GET("pushWechatRetry/{product_no}", cmb.PushWechatRetry) srv.Route("/voucher/").POST("pushWechatRetry/{product_no}", cmb.PushWechatRetry)
v1.RegisterCmbHTTPServer(srv, cmb) v1.RegisterCmbHTTPServer(srv, cmb)

View File

@ -26,11 +26,11 @@ func NewRdbConsumer(
) *RdbConsumer { ) *RdbConsumer {
manager := rdsmq.NewConsumerManager() manager := rdsmq.NewConsumerManager()
if cf := voucherService.GetConfig(); cf != nil { if cf := voucherService.GetWechatQueryConfig(); cf != nil {
manager.Add(cf) manager.Add(cf)
} }
if cf2 := voucherService.GetWechatConfig(); cf2 != nil { if cf2 := voucherService.GetWechatRetryConfig(); cf2 != nil {
manager.Add(cf2) manager.Add(cf2)
} }

View File

@ -2,15 +2,18 @@ package service
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http" "github.com/go-kratos/kratos/v2/transport/http"
"github.com/robfig/cron" "github.com/robfig/cron"
"io"
http2 "net/http" http2 "net/http"
"strconv" "strconv"
v1 "voucher/api/v1" v1 "voucher/api/v1"
"voucher/internal/biz" "voucher/internal/biz"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/do"
"voucher/internal/biz/mixrepos" "voucher/internal/biz/mixrepos"
"voucher/internal/biz/vo" "voucher/internal/biz/vo"
"voucher/internal/biz/wechatrepo" "voucher/internal/biz/wechatrepo"
@ -120,11 +123,29 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error {
return fmt.Errorf("product_no is empty") return fmt.Errorf("product_no is empty")
} }
err := this.VoucherBiz.PushWechatQuery(ctx, productNo) bodyBytes, err := io.ReadAll(ctx.Request().Body)
if err != nil { if err != nil {
return err return err
} }
var req *do.WechatQuery
if err = json.Unmarshal(bodyBytes, &req); err != nil {
return err
}
if req == nil {
return fmt.Errorf("req is empty")
}
req.ProductNo = productNo
if req.StartTime == "" || req.EndTime == "" {
return fmt.Errorf("start_time or end_time is empty")
}
if err = this.VoucherBiz.PushWechatQuery(ctx, req); err != nil {
return err
}
return ctx.JSON(http2.StatusOK, map[string]interface{}{ return ctx.JSON(http2.StatusOK, map[string]interface{}{
"data": productNo, "data": productNo,
}) })

View File

@ -7,7 +7,7 @@ import (
"voucher/internal/pkg/rdsmq" "voucher/internal/pkg/rdsmq"
) )
func (s *VoucherService) GetConfig() *rdsmq.ConsumeConfig { func (s *VoucherService) GetWechatQueryConfig() *rdsmq.ConsumeConfig {
queue := s.bc.RdsMQ.GetWechatQuery() queue := s.bc.RdsMQ.GetWechatQuery()
if queue == nil { if queue == nil {
@ -25,19 +25,19 @@ func (s *VoucherService) GetConfig() *rdsmq.ConsumeConfig {
NumWorkers: queue.NumWorkers, NumWorkers: queue.NumWorkers,
WaitTime: queue.GetWaitTime().AsDuration(), WaitTime: queue.GetWaitTime().AsDuration(),
RetryNum: queue.RetryNum, RetryNum: queue.RetryNum,
Fn: s.Handle, Fn: s.WechatQueryHandle,
Logger: s.logHelper, Logger: s.logHelper,
} }
} }
func (s *VoucherService) Handle(ctx context.Context, batchNo string) error { func (s *VoucherService) WechatQueryHandle(ctx context.Context, msg string) error {
if batchNo == "" { if msg == "" {
s.logHelper.Errorf("wechat query error: batchNo is empty") s.logHelper.Errorf("wechat query error: batchNo is empty")
return nil return nil
} }
if err := s.VoucherBiz.WechatQuery(ctx, batchNo); err != nil { if err := s.VoucherBiz.WechatQuery(ctx, msg); err != nil {
s.logHelper.Error(err) s.logHelper.Error(err)
} }

View File

@ -7,7 +7,7 @@ import (
"voucher/internal/pkg/rdsmq" "voucher/internal/pkg/rdsmq"
) )
func (s *VoucherService) GetWechatConfig() *rdsmq.ConsumeConfig { func (s *VoucherService) GetWechatRetryConfig() *rdsmq.ConsumeConfig {
queue := s.bc.RdsMQ.GetWechatRetry() queue := s.bc.RdsMQ.GetWechatRetry()
if queue == nil { if queue == nil {
@ -25,12 +25,12 @@ func (s *VoucherService) GetWechatConfig() *rdsmq.ConsumeConfig {
NumWorkers: queue.NumWorkers, NumWorkers: queue.NumWorkers,
WaitTime: queue.GetWaitTime().AsDuration(), WaitTime: queue.GetWaitTime().AsDuration(),
RetryNum: queue.RetryNum, RetryNum: queue.RetryNum,
Fn: s.HandleWechat, Fn: s.HandleWechatRetry,
Logger: s.logHelper, Logger: s.logHelper,
} }
} }
func (s *VoucherService) HandleWechat(ctx context.Context, batchNo string) error { func (s *VoucherService) HandleWechatRetry(ctx context.Context, batchNo string) error {
if batchNo == "" { if batchNo == "" {
s.logHelper.Errorf("RdsMQ keySend error: batchNo is empty") s.logHelper.Errorf("RdsMQ keySend error: batchNo is empty")