From 8dc94316142ae1c45d90d4a15a8582e57f7be0e9 Mon Sep 17 00:00:00 2001 From: ziming Date: Mon, 9 Jun 2025 17:45:34 +0800 Subject: [PATCH 1/7] pro order use notify --- configs/config.yaml | 4 ++-- internal/biz/voucher.go | 34 ++++++++++++++++++++++++++++ internal/biz/wechat_query.go | 8 +++++++ internal/pkg/mq_http/mq_http_test.go | 4 ++-- internal/pkg/rdsmq/rdsmq.go | 2 +- 5 files changed, 47 insertions(+), 5 deletions(-) diff --git a/configs/config.yaml b/configs/config.yaml index 47aba0c..decdbea 100644 --- a/configs/config.yaml +++ b/configs/config.yaml @@ -8,13 +8,13 @@ server: data: db: driver: mysql - source: root:lansexiongdi6,@tcp(47.97.27.195:3306)/voucher?parseTime=True&loc=Local + source: root:lansexiongdi6,@tcp(47.108.53.72:3306)/voucher?parseTime=True&loc=Local maxIdle: 200 #最大的空闲连接数 maxOpen: 1000 #最大连接数,0表示不受限制 maxLifetime: 300s #连接复用的最大生命周期 isDebug: false redis: #没有则注释此属性 - addr: 47.97.27.195:6379 + addr: 47.108.53.72:6379 password: lansexiongdi@666 readTimeout: 5s writeTimeout: 5s diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index de5de99..0da3b71 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -1,6 +1,7 @@ package biz import ( + "sync" "voucher/internal/biz/cmb" "voucher/internal/biz/mixrepos" "voucher/internal/biz/repo" @@ -22,6 +23,9 @@ type VoucherBiz struct { WechatCpnRepo wechatrepo.WechatCpnRepo DingMixRepo mixrepos.DingMixRepo CmbMixRepo mixrepos.CmbMixRepo + + mu sync.RWMutex + queryMap map[string]bool } func NewVoucherBiz( @@ -51,5 +55,35 @@ func NewVoucherBiz( WechatCpnRepo: WechatCpnRepo, DingMixRepo: DingMixRepo, CmbMixRepo: CmbMixRepo, + + queryMap: make(map[string]bool), + } +} + +func (this *VoucherBiz) Get(stockNo string) bool { + this.mu.Lock() + defer this.mu.Unlock() + + if _, ok := this.queryMap[stockNo]; ok { + return ok + } + + return false +} + +func (this *VoucherBiz) Add(stockNo string) { + this.mu.Lock() + defer this.mu.Unlock() + + this.queryMap[stockNo] = true +} + +func (this *VoucherBiz) Remove(stockNo string) { + + this.mu.Lock() + defer this.mu.Unlock() + + if _, ok := this.queryMap[stockNo]; ok { + delete(this.queryMap, stockNo) } } diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index 2098bc9..cb7e176 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -12,6 +12,10 @@ import ( func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) error { + if v.Get(req.ProductNo) { + return fmt.Errorf("正在处理中") + } + _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) if err != nil { return err @@ -32,6 +36,8 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) e return fmt.Errorf("添加到队列失败:%v", err) } + v.Add(req.ProductNo) + return nil } @@ -43,6 +49,8 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { return err } + defer v.Remove(req.ProductNo) + start := time.Now() log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg) fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg) diff --git a/internal/pkg/mq_http/mq_http_test.go b/internal/pkg/mq_http/mq_http_test.go index 47e5758..b2e4105 100644 --- a/internal/pkg/mq_http/mq_http_test.go +++ b/internal/pkg/mq_http/mq_http_test.go @@ -48,8 +48,8 @@ func Test_WechatNotifyProducer2(t *testing.T) { "associated_data":"coupon", "plain_text":{ "stock_creator_mchid":"1652465541", -"stock_id":"20393435", -"coupon_id":"101423873113", +"stock_id":"20393759", +"coupon_id":"104611498109", "coupon_name":"test", "description":"","status":"USED", "create_time":"2025-03-07T15:49:31+08:00", diff --git a/internal/pkg/rdsmq/rdsmq.go b/internal/pkg/rdsmq/rdsmq.go index e1ff23f..3ef5b7c 100644 --- a/internal/pkg/rdsmq/rdsmq.go +++ b/internal/pkg/rdsmq/rdsmq.go @@ -40,7 +40,7 @@ func (r *ConsumeConfig) init(_ context.Context) { } func (r *ConsumeConfig) Start(ctx context.Context) { - fmt.Printf("RdsMQ Starting to dequeue from [%s]", r.QueueName) + fmt.Printf("RdsMQ Starting to dequeue from [%s] \n", r.QueueName) r.init(ctx) defer r.close(ctx) From 7ee72814bded95d08ae52d1aae13bf303039720d Mon Sep 17 00:00:00 2001 From: ziming Date: Tue, 10 Jun 2025 09:37:55 +0800 Subject: [PATCH 2/7] query --- internal/biz/voucher.go | 10 +++++----- internal/biz/wechat_query.go | 23 +++++++++++++---------- internal/data/repoimpl/order.go | 7 ++++--- internal/service/cmb.go | 13 ++++++------- 4 files changed, 28 insertions(+), 25 deletions(-) diff --git a/internal/biz/voucher.go b/internal/biz/voucher.go index 0da3b71..400a505 100644 --- a/internal/biz/voucher.go +++ b/internal/biz/voucher.go @@ -71,19 +71,19 @@ func (this *VoucherBiz) Get(stockNo string) bool { return false } -func (this *VoucherBiz) Add(stockNo string) { +func (this *VoucherBiz) Add(uid string) { this.mu.Lock() defer this.mu.Unlock() - this.queryMap[stockNo] = true + this.queryMap[uid] = true } -func (this *VoucherBiz) Remove(stockNo string) { +func (this *VoucherBiz) Remove(uid string) { this.mu.Lock() defer this.mu.Unlock() - if _, ok := this.queryMap[stockNo]; ok { - delete(this.queryMap, stockNo) + if _, ok := this.queryMap[uid]; ok { + delete(this.queryMap, uid) } } diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index cb7e176..9540a61 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -5,20 +5,23 @@ import ( "encoding/json" "fmt" "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/transport/http" "time" "voucher/internal/biz/bo" "voucher/internal/biz/do" ) -func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) error { +func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error { - if v.Get(req.ProductNo) { - return fmt.Errorf("正在处理中") + if v.Get("CMB_WECHAT_QUERY") { + return fmt.Errorf("此台服务队列正在处理中,ip:", ctx.Header().Get("X-Forwarded-For")) } - _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) - if err != nil { - return err + if req.ProductNo != "" { + _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) + if err != nil { + return err + } } queue := v.bc.RdsMQ.GetWechatQuery() @@ -31,26 +34,26 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) e return err } + v.Add("CMB_WECHAT_QUERY") + _, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result() if err != nil { return fmt.Errorf("添加到队列失败:%v", err) } - v.Add(req.ProductNo) - return nil } func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { + defer v.Remove("CMB_WECHAT_QUERY") + var req *do.WechatQuery if err := json.Unmarshal([]byte(msg), &req); err != nil { return err } - defer v.Remove(req.ProductNo) - start := time.Now() log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg) fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg) diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 36cccf3..3ff09e3 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -33,10 +33,11 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB { func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { - tx := p.DB(ctx). - Where("product_no = ?", req.ProductNo). - Where("status = ?", vo.OrderStatusSuccess.GetValue()) + tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue()) + if req.ProductNo != "" { + tx = tx.Where("product_no = ?", req.ProductNo) + } if req.StartTime != "" { tx = tx.Where("receive_success_time >= ?", req.StartTime) } diff --git a/internal/service/cmb.go b/internal/service/cmb.go index b064056..fd72b54 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -118,11 +118,6 @@ func (this *CmbService) RegisterTag(ctx http.Context) error { func (this *CmbService) PushWechatQuery(ctx http.Context) error { - productNo := ctx.Vars().Get("product_no") - if productNo == "" { - return fmt.Errorf("product_no is empty") - } - bodyBytes, err := io.ReadAll(ctx.Request().Body) if err != nil { return err @@ -137,7 +132,11 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error { return fmt.Errorf("req is empty") } - req.ProductNo = productNo + productNo := ctx.Vars().Get("product_no") + if productNo == "" { + req.ProductNo = productNo + } + if req.StartTime == "" || req.EndTime == "" { return fmt.Errorf("start_time or end_time is empty") } @@ -147,7 +146,7 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error { } return ctx.JSON(http2.StatusOK, map[string]interface{}{ - "data": productNo, + "data": req, }) } From 5ce7dfbb88471f08292d984ebca52d62a8409c65 Mon Sep 17 00:00:00 2001 From: ziming Date: Tue, 10 Jun 2025 09:39:05 +0800 Subject: [PATCH 3/7] query --- internal/biz/wechat_query.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index 9540a61..d9c7534 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -38,6 +38,7 @@ func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) erro _, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result() if err != nil { + v.Remove("CMB_WECHAT_QUERY") return fmt.Errorf("添加到队列失败:%v", err) } From 5ff8c8a0e6fd7dd1c21a51c77dce61317ececaf7 Mon Sep 17 00:00:00 2001 From: ziming Date: Tue, 10 Jun 2025 09:50:20 +0800 Subject: [PATCH 4/7] query --- internal/biz/cron_notice.go | 12 +++++------- internal/biz/wechat_query.go | 16 ++++++++++------ internal/data/repoimpl/order.go | 1 - internal/pkg/helper/utils_test.go | 4 ++-- 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/internal/biz/cron_notice.go b/internal/biz/cron_notice.go index 276eb7a..605cb34 100644 --- a/internal/biz/cron_notice.go +++ b/internal/biz/cron_notice.go @@ -18,11 +18,6 @@ func (v *VoucherBiz) Notice(ctx context.Context) error { return err } - return v.ExecuteNotice(ctx) -} - -func (v *VoucherBiz) ExecuteNotice(ctx context.Context) error { - now := time.Now() // 获取七天前的日期 @@ -40,6 +35,11 @@ func (v *VoucherBiz) ExecuteNotice(ctx context.Context) error { EndTime: &endTime, } + return v.ExecuteNotice(ctx, req) +} + +func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUseBo) error { + return v.OrderRepo.FindInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { for _, order := range rows { @@ -50,8 +50,6 @@ func (v *VoucherBiz) ExecuteNotice(ctx context.Context) error { } - time.Sleep(1 * time.Second) - return nil }) diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index d9c7534..4bc1781 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -14,7 +14,7 @@ import ( func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error { if v.Get("CMB_WECHAT_QUERY") { - return fmt.Errorf("此台服务队列正在处理中,ip:", ctx.Header().Get("X-Forwarded-For")) + return fmt.Errorf("此台服务队列正在处理中,ip:%s", ctx.Header().Get("X-Forwarded-For")) } if req.ProductNo != "" { @@ -56,12 +56,16 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { } start := time.Now() - log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg) - fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg) + startStr := time.Now().String() + log.Warnf("微信券查询处理开始:%s,msg:%s", startStr, msg) + fmt.Printf("微信券查询处理开始:%s,msg:%s", startStr, msg) + + n := 0 num := 0 err := v.OrderRepo.FinSucByStockIdInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { + n += 1 for _, order := range rows { num += 1 @@ -72,13 +76,13 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { } - time.Sleep(1 * time.Second) + log.Warnf("微信券查询处理第 %d 批次,执行开始时间:%s,当前执行耗时:%s", n, startStr, time.Now().Sub(start).String()) return nil }) - log.Warnf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num) - fmt.Printf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num) + log.Warnf("微信券查询处理耗时:%s,处理%d单,msg:%s", time.Now().Sub(start).String(), num, msg) + fmt.Printf("微信券查询处理耗时:%s,处理%d单,msg:%s", time.Now().Sub(start).String(), num, msg) return err } diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index 3ff09e3..6c803bc 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -101,7 +101,6 @@ func (p *OrderRepoImpl) FindInBatches(ctx context.Context, w *bo.FindInBatchesUs var results = make([]*model.Order, 0) result := p.DB(ctx). - Where("type = ?", w.Type). Where("status IN (?)", []uint8{vo.OrderStatusSuccess.GetValue(), vo.OrderStatusUse.GetValue()}). Where("receive_success_time BETWEEN ? AND ?", w.StartTime, w.EndTime). FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error { diff --git a/internal/pkg/helper/utils_test.go b/internal/pkg/helper/utils_test.go index 64996c6..8f14839 100644 --- a/internal/pkg/helper/utils_test.go +++ b/internal/pkg/helper/utils_test.go @@ -15,11 +15,11 @@ func TestNoticeTime(t *testing.T) { now := time.Now() // 获取七天前的日期 - noticeStartDay := now.AddDate(0, 0, -29) + noticeStartDay := now.AddDate(0, 0, -15) // 获取七天前 00:00:00 的时间 startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location()) - noticeEndDay := now.AddDate(0, 0, -28) + noticeEndDay := now.AddDate(0, 0, -1) // 获取昨天 23:59:59 的时间 endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location()) From c18f25d55e01fcb566cc9584ecee473f59b1cb3a Mon Sep 17 00:00:00 2001 From: ziming Date: Tue, 10 Jun 2025 09:51:55 +0800 Subject: [PATCH 5/7] query --- internal/biz/wechat_query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index 4bc1781..97f06d4 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -76,7 +76,7 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { } - log.Warnf("微信券查询处理第 %d 批次,执行开始时间:%s,当前执行耗时:%s", n, startStr, time.Now().Sub(start).String()) + log.Warnf("微信券查询处理第 %d 组,执行开始时间:%s,当前执行耗时:%s", n, startStr, time.Now().Sub(start).String()) return nil }) From 3410510d811a0d77f3d2d05b9f94001edd409609 Mon Sep 17 00:00:00 2001 From: ziming Date: Tue, 10 Jun 2025 09:54:04 +0800 Subject: [PATCH 6/7] query --- internal/biz/wechat_query.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index 97f06d4..a7711aa 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -76,13 +76,13 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { } - log.Warnf("微信券查询处理第 %d 组,执行开始时间:%s,当前执行耗时:%s", n, startStr, time.Now().Sub(start).String()) + log.Warnf("微信券查询处理第:%d组,已执行条数:%d,执行开始时间:%s,已耗时:%s", n, num, startStr, time.Now().Sub(start).String()) return nil }) - log.Warnf("微信券查询处理耗时:%s,处理%d单,msg:%s", time.Now().Sub(start).String(), num, msg) - fmt.Printf("微信券查询处理耗时:%s,处理%d单,msg:%s", time.Now().Sub(start).String(), num, msg) + log.Warnf("微信券查询处理耗时:%s,处理%d组,处理%d单,msg:%s", time.Now().Sub(start).String(), n, num, msg) + fmt.Printf("微信券查询处理耗时:%s,处理%d组,处理%d单,msg:%s", time.Now().Sub(start).String(), n, num, msg) return err } From 07b62a97255a756f621ad14ba1261c164809454a Mon Sep 17 00:00:00 2001 From: ziming Date: Tue, 10 Jun 2025 09:55:37 +0800 Subject: [PATCH 7/7] query --- internal/service/wechat_query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/service/wechat_query.go b/internal/service/wechat_query.go index f210d53..390964e 100644 --- a/internal/service/wechat_query.go +++ b/internal/service/wechat_query.go @@ -38,7 +38,7 @@ func (s *VoucherService) WechatQueryHandle(ctx context.Context, msg string) erro } if err := s.VoucherBiz.WechatQuery(ctx, msg); err != nil { - s.logHelper.Error(err) + s.logHelper.Errorf("wechat query error: %v", err) } return nil