diff --git a/configs/config.yaml b/configs/config.yaml index 47aba0c..decdbea 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -8,13 +8,13 @@ server: data: db: driver: mysql - source: root:lansexiongdi6,@tcp(47.97.27.195:3306)/voucher?parseTime=True&loc=Local + source: root:lansexiongdi6,@tcp(47.108.53.72:3306)/voucher?parseTime=True&loc=Local maxIdle: 200 #最大的空闲连接数 maxOpen: 1000 #最大连接数,0表示不受限制 maxLifetime: 300s #连接复用的最大生命周期 isDebug: false redis: #没有则注释此属性 - addr: 47.97.27.195:6379 + addr: 47.108.53.72:6379 password: lansexiongdi@666 readTimeout: 5s writeTimeout: 5s diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index de5de99..0da3b71 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -1,6 +1,7 @@ package biz import ( + "sync" "voucher/internal/biz/cmb" "voucher/internal/biz/mixrepos" "voucher/internal/biz/repo" @@ -22,6 +23,9 @@ type VoucherBiz struct { WechatCpnRepo wechatrepo.WechatCpnRepo DingMixRepo mixrepos.DingMixRepo CmbMixRepo mixrepos.CmbMixRepo + + mu sync.RWMutex + queryMap map[string]bool } func NewVoucherBiz( @@ -51,5 +55,35 @@ func NewVoucherBiz( WechatCpnRepo: WechatCpnRepo, DingMixRepo: DingMixRepo, CmbMixRepo: CmbMixRepo, + + queryMap: make(map[string]bool), + } +} + +func (this *VoucherBiz) Get(stockNo string) bool { + this.mu.Lock() + defer this.mu.Unlock() + + if _, ok := this.queryMap[stockNo]; ok { + return ok + } + + return false +} + +func (this *VoucherBiz) Add(stockNo string) { + this.mu.Lock() + defer this.mu.Unlock() + + this.queryMap[stockNo] = true +} + +func (this *VoucherBiz) Remove(stockNo string) { + + this.mu.Lock() + defer this.mu.Unlock() + + if _, ok := this.queryMap[stockNo]; ok { + delete(this.queryMap, stockNo) } } diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index 2098bc9..cb7e176 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -12,6 +12,10 @@ import ( func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) error { + if v.Get(req.ProductNo) { + return fmt.Errorf("正在处理中") + } + _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) if err != nil { return err @@ -32,6 +36,8 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) e return fmt.Errorf("添加到队列失败:%v", err) } + v.Add(req.ProductNo) + return nil } @@ -43,6 +49,8 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { return err } + defer v.Remove(req.ProductNo) + start := time.Now() log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg) fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg) diff --git a/internal/pkg/mq_http/mq_http_test.go b/internal/pkg/mq_http/mq_http_test.go index 47e5758..b2e4105 100644 --- a/internal/pkg/mq_http/mq_http_test.go +++ b/internal/pkg/mq_http/mq_http_test.go @@ -48,8 +48,8 @@ func Test_WechatNotifyProducer2(t *testing.T) { "associated_data":"coupon", "plain_text":{ "stock_creator_mchid":"1652465541", -"stock_id":"20393435", -"coupon_id":"101423873113", +"stock_id":"20393759", +"coupon_id":"104611498109", "coupon_name":"test", "description":"","status":"USED", "create_time":"2025-03-07T15:49:31+08:00", diff --git a/internal/pkg/rdsmq/rdsmq.go b/internal/pkg/rdsmq/rdsmq.go index e1ff23f..3ef5b7c 100644 --- a/internal/pkg/rdsmq/rdsmq.go +++ b/internal/pkg/rdsmq/rdsmq.go @@ -40,7 +40,7 @@ func (r *ConsumeConfig) init(_ context.Context) { } func (r *ConsumeConfig) Start(ctx context.Context) { - fmt.Printf("RdsMQ Starting to dequeue from [%s]", r.QueueName) + fmt.Printf("RdsMQ Starting to dequeue from [%s] \n", r.QueueName) r.init(ctx) defer r.close(ctx)