diff --git a/configs/config.yaml b/configs/config.yaml index 8af4ea6..6938286 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -8,13 +8,13 @@ server: data: db: driver: mysql - source: root:lansexiongdi6,@tcp(47.97.27.195:3306)/voucher?parseTime=True&loc=Local + source: root:lansexiongdi6,@tcp(47.108.53.72:3306)/voucher?parseTime=True&loc=Local maxIdle: 200 #最大的空闲连接数 maxOpen: 1000 #最大连接数,0表示不受限制 maxLifetime: 300s #连接复用的最大生命周期 isDebug: false redis: #没有则注释此属性 - addr: 47.97.27.195:6379 + addr: 47.108.53.72:6379 password: lansexiongdi@666 readTimeout: 5s writeTimeout: 5s diff --git a/internal/biz/cron_notice.go b/internal/biz/cron_notice.go index 547e60f..2ef5cbd 100644 --- a/internal/biz/cron_notice.go +++ b/internal/biz/cron_notice.go @@ -51,8 +51,6 @@ func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUse } - time.Sleep(1 * time.Second) - return nil }) diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index 8bb0d84..fda519a 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -3,6 +3,7 @@ package biz import ( "context" "github.com/go-kratos/kratos/v2/log" + "sync" v1 "voucher/api/v1" "voucher/internal/biz/bo" "voucher/internal/biz/cmb" @@ -29,6 +30,9 @@ type VoucherBiz struct { CmbMixRepo mixrepos.CmbMixRepo KxMixRepo mixrepos.KxMixRepo UseLogRepo repo.UseLogRepo + + mu sync.RWMutex + queryMap map[string]bool } func NewVoucherBiz( @@ -62,6 +66,35 @@ func NewVoucherBiz( CmbMixRepo: CmbMixRepo, KxMixRepo: KxMixRepo, UseLogRepo: UseLogRepo, + queryMap: make(map[string]bool), + } +} + +func (this *VoucherBiz) Get(stockNo string) bool { + this.mu.Lock() + defer this.mu.Unlock() + + if _, ok := this.queryMap[stockNo]; ok { + return ok + } + + return false +} + +func (this *VoucherBiz) Add(uid string) { + this.mu.Lock() + defer this.mu.Unlock() + + this.queryMap[uid] = true +} + +func (this *VoucherBiz) Remove(uid string) { + + this.mu.Lock() + defer this.mu.Unlock() + + if _, ok := this.queryMap[uid]; ok { + delete(this.queryMap, uid) } } diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index 2098bc9..a7711aa 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -5,16 +5,23 @@ import ( "encoding/json" "fmt" "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/transport/http" "time" "voucher/internal/biz/bo" "voucher/internal/biz/do" ) -func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) error { +func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error { - _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) - if err != nil { - return err + if v.Get("CMB_WECHAT_QUERY") { + return fmt.Errorf("此台服务队列正在处理中,ip:%s", ctx.Header().Get("X-Forwarded-For")) + } + + if req.ProductNo != "" { + _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) + if err != nil { + return err + } } queue := v.bc.RdsMQ.GetWechatQuery() @@ -27,8 +34,11 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) e return err } + v.Add("CMB_WECHAT_QUERY") + _, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result() if err != nil { + v.Remove("CMB_WECHAT_QUERY") return fmt.Errorf("添加到队列失败:%v", err) } @@ -37,6 +47,8 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) e func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { + defer v.Remove("CMB_WECHAT_QUERY") + var req *do.WechatQuery if err := json.Unmarshal([]byte(msg), &req); err != nil { @@ -44,12 +56,16 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { } start := time.Now() - log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg) - fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg) + startStr := time.Now().String() + log.Warnf("微信券查询处理开始:%s,msg:%s", startStr, msg) + fmt.Printf("微信券查询处理开始:%s,msg:%s", startStr, msg) + + n := 0 num := 0 err := v.OrderRepo.FinSucByStockIdInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { + n += 1 for _, order := range rows { num += 1 @@ -60,13 +76,13 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { } - time.Sleep(1 * time.Second) + log.Warnf("微信券查询处理第:%d组,已执行条数:%d,执行开始时间:%s,已耗时:%s", n, num, startStr, time.Now().Sub(start).String()) return nil }) - log.Warnf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num) - fmt.Printf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num) + log.Warnf("微信券查询处理耗时:%s,处理%d组,处理%d单,msg:%s", time.Now().Sub(start).String(), n, num, msg) + fmt.Printf("微信券查询处理耗时:%s,处理%d组,处理%d单,msg:%s", time.Now().Sub(start).String(), n, num, msg) return err } diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 36cccf3..6c803bc 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -33,10 +33,11 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB { func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { - tx := p.DB(ctx). - Where("product_no = ?", req.ProductNo). - Where("status = ?", vo.OrderStatusSuccess.GetValue()) + tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue()) + if req.ProductNo != "" { + tx = tx.Where("product_no = ?", req.ProductNo) + } if req.StartTime != "" { tx = tx.Where("receive_success_time >= ?", req.StartTime) } @@ -100,7 +101,6 @@ func (p *OrderRepoImpl) FindInBatches(ctx context.Context, w *bo.FindInBatchesUs var results = make([]*model.Order, 0) result := p.DB(ctx). - Where("type = ?", w.Type). Where("status IN (?)", []uint8{vo.OrderStatusSuccess.GetValue(), vo.OrderStatusUse.GetValue()}). Where("receive_success_time BETWEEN ? AND ?", w.StartTime, w.EndTime). FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { diff --git a/internal/pkg/helper/utils_test.go b/internal/pkg/helper/utils_test.go index 64996c6..8f14839 100644 --- a/internal/pkg/helper/utils_test.go +++ b/internal/pkg/helper/utils_test.go @@ -15,11 +15,11 @@ func TestNoticeTime(t *testing.T) { now := time.Now() // 获取七天前的日期 - noticeStartDay := now.AddDate(0, 0, -29) + noticeStartDay := now.AddDate(0, 0, -15) // 获取七天前 00:00:00 的时间 startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location()) - noticeEndDay := now.AddDate(0, 0, -28) + noticeEndDay := now.AddDate(0, 0, -1) // 获取昨天 23:59:59 的时间 endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location()) diff --git a/internal/pkg/mq_http/mq_http_test.go b/internal/pkg/mq_http/mq_http_test.go index 2102a2b..96cf45b 100644 --- a/internal/pkg/mq_http/mq_http_test.go +++ b/internal/pkg/mq_http/mq_http_test.go @@ -49,8 +49,8 @@ func Test_WechatNotifyProducer2(t *testing.T) { "associated_data":"coupon", "plain_text":{ "stock_creator_mchid":"1652465541", -"stock_id":"20393435", -"coupon_id":"101423873113", +"stock_id":"20393759", +"coupon_id":"104611498109", "coupon_name":"test", "description":"","status":"USED", "create_time":"2025-03-07T15:49:31+08:00", diff --git a/internal/pkg/rdsmq/rdsmq.go b/internal/pkg/rdsmq/rdsmq.go index e1ff23f..3ef5b7c 100644 --- a/internal/pkg/rdsmq/rdsmq.go +++ b/internal/pkg/rdsmq/rdsmq.go @@ -40,7 +40,7 @@ func (r *ConsumeConfig) init(_ context.Context) { } func (r *ConsumeConfig) Start(ctx context.Context) { - fmt.Printf("RdsMQ Starting to dequeue from [%s]", r.QueueName) + fmt.Printf("RdsMQ Starting to dequeue from [%s] \n", r.QueueName) r.init(ctx) defer r.close(ctx) diff --git a/internal/service/cmb.go b/internal/service/cmb.go index b064056..fd72b54 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -118,11 +118,6 @@ func (this *CmbService) RegisterTag(ctx http.Context) error { func (this *CmbService) PushWechatQuery(ctx http.Context) error { - productNo := ctx.Vars().Get("product_no") - if productNo == "" { - return fmt.Errorf("product_no is empty") - } - bodyBytes, err := io.ReadAll(ctx.Request().Body) if err != nil { return err @@ -137,7 +132,11 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error { return fmt.Errorf("req is empty") } - req.ProductNo = productNo + productNo := ctx.Vars().Get("product_no") + if productNo == "" { + req.ProductNo = productNo + } + if req.StartTime == "" || req.EndTime == "" { return fmt.Errorf("start_time or end_time is empty") } @@ -147,7 +146,7 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error { } return ctx.JSON(http2.StatusOK, map[string]interface{}{ - "data": productNo, + "data": req, }) } diff --git a/internal/service/wechat_query.go b/internal/service/wechat_query.go index f210d53..390964e 100644 --- a/internal/service/wechat_query.go +++ b/internal/service/wechat_query.go @@ -38,7 +38,7 @@ func (s *VoucherService) WechatQueryHandle(ctx context.Context, msg string) erro } if err := s.VoucherBiz.WechatQuery(ctx, msg); err != nil { - s.logHelper.Error(err) + s.logHelper.Errorf("wechat query error: %v", err) } return nil