diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index 0da3b71..400a505 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -71,19 +71,19 @@ func (this *VoucherBiz) Get(stockNo string) bool { return false } -func (this *VoucherBiz) Add(stockNo string) { +func (this *VoucherBiz) Add(uid string) { this.mu.Lock() defer this.mu.Unlock() - this.queryMap[stockNo] = true + this.queryMap[uid] = true } -func (this *VoucherBiz) Remove(stockNo string) { +func (this *VoucherBiz) Remove(uid string) { this.mu.Lock() defer this.mu.Unlock() - if _, ok := this.queryMap[stockNo]; ok { - delete(this.queryMap, stockNo) + if _, ok := this.queryMap[uid]; ok { + delete(this.queryMap, uid) } } diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index cb7e176..9540a61 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -5,20 +5,23 @@ import ( "encoding/json" "fmt" "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/transport/http" "time" "voucher/internal/biz/bo" "voucher/internal/biz/do" ) -func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) error { +func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error { - if v.Get(req.ProductNo) { - return fmt.Errorf("正在处理中") + if v.Get("CMB_WECHAT_QUERY") { + return fmt.Errorf("此台服务队列正在处理中,ip:", ctx.Header().Get("X-Forwarded-For")) } - _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) - if err != nil { - return err + if req.ProductNo != "" { + _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) + if err != nil { + return err + } } queue := v.bc.RdsMQ.GetWechatQuery() @@ -31,26 +34,26 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) e return err } + v.Add("CMB_WECHAT_QUERY") + _, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result() if err != nil { return fmt.Errorf("添加到队列失败:%v", err) } - v.Add(req.ProductNo) - return nil } func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { + defer v.Remove("CMB_WECHAT_QUERY") + var req *do.WechatQuery if err := json.Unmarshal([]byte(msg), &req); err != nil { return err } - defer v.Remove(req.ProductNo) - start := time.Now() log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg) fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg) diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 36cccf3..3ff09e3 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -33,10 +33,11 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB { func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { - tx := p.DB(ctx). - Where("product_no = ?", req.ProductNo). - Where("status = ?", vo.OrderStatusSuccess.GetValue()) + tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue()) + if req.ProductNo != "" { + tx = tx.Where("product_no = ?", req.ProductNo) + } if req.StartTime != "" { tx = tx.Where("receive_success_time >= ?", req.StartTime) } diff --git a/internal/service/cmb.go b/internal/service/cmb.go index b064056..fd72b54 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -118,11 +118,6 @@ func (this *CmbService) RegisterTag(ctx http.Context) error { func (this *CmbService) PushWechatQuery(ctx http.Context) error { - productNo := ctx.Vars().Get("product_no") - if productNo == "" { - return fmt.Errorf("product_no is empty") - } - bodyBytes, err := io.ReadAll(ctx.Request().Body) if err != nil { return err @@ -137,7 +132,11 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error { return fmt.Errorf("req is empty") } - req.ProductNo = productNo + productNo := ctx.Vars().Get("product_no") + if productNo == "" { + req.ProductNo = productNo + } + if req.StartTime == "" || req.EndTime == "" { return fmt.Errorf("start_time or end_time is empty") } @@ -147,7 +146,7 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error { } return ctx.JSON(http2.StatusOK, map[string]interface{}{ - "data": productNo, + "data": req, }) }