This commit is contained in:
ziming 2025-06-10 09:37:55 +08:00
parent 8dc9431614
commit 7ee72814bd
4 changed files with 28 additions and 25 deletions

View File

@ -71,19 +71,19 @@ func (this *VoucherBiz) Get(stockNo string) bool {
return false return false
} }
func (this *VoucherBiz) Add(stockNo string) { func (this *VoucherBiz) Add(uid string) {
this.mu.Lock() this.mu.Lock()
defer this.mu.Unlock() defer this.mu.Unlock()
this.queryMap[stockNo] = true this.queryMap[uid] = true
} }
func (this *VoucherBiz) Remove(stockNo string) { func (this *VoucherBiz) Remove(uid string) {
this.mu.Lock() this.mu.Lock()
defer this.mu.Unlock() defer this.mu.Unlock()
if _, ok := this.queryMap[stockNo]; ok { if _, ok := this.queryMap[uid]; ok {
delete(this.queryMap, stockNo) delete(this.queryMap, uid)
} }
} }

View File

@ -5,20 +5,23 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"time" "time"
"voucher/internal/biz/bo" "voucher/internal/biz/bo"
"voucher/internal/biz/do" "voucher/internal/biz/do"
) )
func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) error { func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error {
if v.Get(req.ProductNo) { if v.Get("CMB_WECHAT_QUERY") {
return fmt.Errorf("正在处理中") return fmt.Errorf("此台服务队列正在处理中,ip:", ctx.Header().Get("X-Forwarded-For"))
} }
_, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) if req.ProductNo != "" {
if err != nil { _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo)
return err if err != nil {
return err
}
} }
queue := v.bc.RdsMQ.GetWechatQuery() queue := v.bc.RdsMQ.GetWechatQuery()
@ -31,26 +34,26 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) e
return err return err
} }
v.Add("CMB_WECHAT_QUERY")
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result() _, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result()
if err != nil { if err != nil {
return fmt.Errorf("添加到队列失败:%v", err) return fmt.Errorf("添加到队列失败:%v", err)
} }
v.Add(req.ProductNo)
return nil return nil
} }
func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
defer v.Remove("CMB_WECHAT_QUERY")
var req *do.WechatQuery var req *do.WechatQuery
if err := json.Unmarshal([]byte(msg), &req); err != nil { if err := json.Unmarshal([]byte(msg), &req); err != nil {
return err return err
} }
defer v.Remove(req.ProductNo)
start := time.Now() start := time.Now()
log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg) log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg)
fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg) fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg)

View File

@ -33,10 +33,11 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB {
func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
tx := p.DB(ctx). tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue())
Where("product_no = ?", req.ProductNo).
Where("status = ?", vo.OrderStatusSuccess.GetValue())
if req.ProductNo != "" {
tx = tx.Where("product_no = ?", req.ProductNo)
}
if req.StartTime != "" { if req.StartTime != "" {
tx = tx.Where("receive_success_time >= ?", req.StartTime) tx = tx.Where("receive_success_time >= ?", req.StartTime)
} }

View File

@ -118,11 +118,6 @@ func (this *CmbService) RegisterTag(ctx http.Context) error {
func (this *CmbService) PushWechatQuery(ctx http.Context) error { func (this *CmbService) PushWechatQuery(ctx http.Context) error {
productNo := ctx.Vars().Get("product_no")
if productNo == "" {
return fmt.Errorf("product_no is empty")
}
bodyBytes, err := io.ReadAll(ctx.Request().Body) bodyBytes, err := io.ReadAll(ctx.Request().Body)
if err != nil { if err != nil {
return err return err
@ -137,7 +132,11 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error {
return fmt.Errorf("req is empty") return fmt.Errorf("req is empty")
} }
req.ProductNo = productNo productNo := ctx.Vars().Get("product_no")
if productNo == "" {
req.ProductNo = productNo
}
if req.StartTime == "" || req.EndTime == "" { if req.StartTime == "" || req.EndTime == "" {
return fmt.Errorf("start_time or end_time is empty") return fmt.Errorf("start_time or end_time is empty")
} }
@ -147,7 +146,7 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error {
} }
return ctx.JSON(http2.StatusOK, map[string]interface{}{ return ctx.JSON(http2.StatusOK, map[string]interface{}{
"data": productNo, "data": req,
}) })
} }