diff --git a/internal/biz/do/rds_mq.go b/internal/biz/do/rds_mq.go new file mode 100644 index 0000000..b89b5a7 --- /dev/null +++ b/internal/biz/do/rds_mq.go @@ -0,0 +1,7 @@ +package do + +type WechatQuery struct { + ProductNo string `json:"product_no"` + StartTime string `json:"start_time"` + EndTime string `json:"end_time"` +} diff --git a/internal/biz/repo/order.go b/internal/biz/repo/order.go index e3520d8..294ce0b 100644 --- a/internal/biz/repo/order.go +++ b/internal/biz/repo/order.go @@ -3,11 +3,12 @@ package repo import ( "context" "voucher/internal/biz/bo" + "voucher/internal/biz/do" "voucher/internal/biz/vo" ) type OrderRepo interface { - FinSucByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error + FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FinFailByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindIngInBatches(ctx context.Context, fun func(ctx context.Context, rows []*bo.OrderBo) error) error FindInBatches(ctx context.Context, w *bo.FindInBatchesUseBo, fun func(ctx context.Context, rows []*bo.OrderBo) error) error diff --git a/internal/biz/wechat_query.go b/internal/biz/wechat_query.go index b725138..c3b027d 100644 --- a/internal/biz/wechat_query.go +++ b/internal/biz/wechat_query.go @@ -2,15 +2,17 @@ package biz import ( "context" + "encoding/json" "fmt" "github.com/go-kratos/kratos/v2/log" "time" "voucher/internal/biz/bo" + "voucher/internal/biz/do" ) -func (v *VoucherBiz) PushWechatQuery(ctx context.Context, productNo string) error { +func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) error { - product, err := v.ProductRepo.GetByProductNo(ctx, productNo) + _, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo) if err != nil { return err } @@ -20,7 +22,12 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, productNo string) erro return fmt.Errorf("队列不存在") } - _, err = v.rdb.Rdb.RPush(ctx, queue.Name, product.BatchNo).Result() + msg, err := json.Marshal(req) + if err != nil { + return err + } + + _, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result() if err != nil { return fmt.Errorf("添加到队列失败:%v", err) } @@ -28,21 +35,27 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, productNo string) erro return nil } -func (v *VoucherBiz) WechatQuery(ctx context.Context, batchNo string) error { +func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error { + + var req *do.WechatQuery + + if err := json.Unmarshal([]byte(msg), &req); err != nil { + return err + } start := time.Now() - log.Warnf("微信券查询处理开始:%s,batchNo:%s", start.String(), batchNo) - fmt.Printf("微信券查询处理开始:%s,batchNo:%s", start.String(), batchNo) + log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg) + fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg) num := 0 - err := v.OrderRepo.FinSucByStockIdInBatches(ctx, batchNo, func(ctx context.Context, rows []*bo.OrderBo) error { + err := v.OrderRepo.FinSucByStockIdInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error { for _, order := range rows { num += 1 if err := v.wechatQuery(ctx, order); err != nil { - log.Errorf("微信查询券订单状态发生错误,batchNo:%s,orderNo:%s,couponId:%s,appId:%s,openId:%s,err:%v", - batchNo, order.OrderNo, order.VoucherNo, order.AppID, order.Account, err) + log.Errorf("微信查询券订单状态发生错误,msg:%s,orderNo:%s,couponId:%s,appId:%s,openId:%s,err:%v", + msg, order.OrderNo, order.VoucherNo, order.AppID, order.Account, err) } } @@ -52,8 +65,8 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, batchNo string) error { return nil }) - log.Warnf("微信券查询处理耗时:%s,batchNo:%s,处理%d单", time.Now().Sub(start).String(), batchNo, num) - fmt.Printf("微信券查询处理耗时:%s,batchNo:%s,处理%d单", time.Now().Sub(start).String(), batchNo, num) + log.Warnf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num) + fmt.Printf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num) return err } diff --git a/internal/data/repoimpl/order.go b/internal/data/repoimpl/order.go index d00826f..5348a85 100644 --- a/internal/data/repoimpl/order.go +++ b/internal/data/repoimpl/order.go @@ -9,6 +9,7 @@ import ( "unicode/utf8" err2 "voucher/api/err" "voucher/internal/biz/bo" + "voucher/internal/biz/do" "voucher/internal/biz/repo" "voucher/internal/biz/vo" "voucher/internal/data" @@ -30,16 +31,25 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB { return p.db.DB(ctx).Model(model.Order{}) } -func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, batchNo string, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { +func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error { + + tx := p.DB(ctx). + Where("product_no = ?", req.ProductNo). + Where("status = ?", vo.OrderStatusSuccess.GetValue()) + + if req.StartTime != "" { + tx = tx.Where("receive_success_time >= ?", req.StartTime) + } + if req.EndTime != "" { + tx = tx.Where("receive_success_time <= ?", req.EndTime) + } var results = make([]*model.Order, 0) - result := p.DB(ctx). - Where("batch_no = ?", batchNo). - Where("status = ?", vo.OrderStatusSuccess.GetValue()). - FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { - return fun(ctx, p.ToBos(results)) - }) + result := tx.FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { + + return fun(ctx, p.ToBos(results)) + }) if result.Error != nil { return result.Error @@ -55,7 +65,7 @@ func (p *OrderRepoImpl) FinFailByStockIdInBatches(ctx context.Context, batchNo s result := p.DB(ctx). Where("batch_no = ?", batchNo). Where("status = ?", vo.OrderStatusFail.GetValue()). - Where("remark <> ?", "error: code = 500 reason = WechatFAIL message = 微信返回错误:该用户账号异常,无法领券。商家可联系微信支付或让用户联系微信支付客服处理。 metadat"). + //Where("remark <> ?", "error: code = 500 reason = WechatFAIL message = 微信返回错误:该用户账号异常,无法领券。商家可联系微信支付或让用户联系微信支付客服处理。 metadat"). FindInBatches(&results, 50, func(tx *gorm.DB, batch int) error { return fun(ctx, p.ToBos(results)) }) diff --git a/internal/server/http.go b/internal/server/http.go index c89501c..4395103 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -36,11 +36,11 @@ func NewHTTPServer( return ctx.String(http2.StatusOK, "pong") }) - srv.Route("/voucher/").GET("notifyRetry/{id}", cmb.NotifyRetry) - srv.Route("/voucher/").GET("queryOrder/{order_no}", cmb.QueryOrder) - srv.Route("/voucher/").GET("registerTag/{product_no}", cmb.RegisterTag) - srv.Route("/voucher/").GET("pushWechatQuery/{product_no}", cmb.PushWechatQuery) - srv.Route("/voucher/").GET("pushWechatRetry/{product_no}", cmb.PushWechatRetry) + srv.Route("/voucher/").POST("notifyRetry/{id}", cmb.NotifyRetry) + srv.Route("/voucher/").POST("queryOrder/{order_no}", cmb.QueryOrder) + srv.Route("/voucher/").POST("registerTag/{product_no}", cmb.RegisterTag) + srv.Route("/voucher/").POST("pushWechatQuery/{product_no}", cmb.PushWechatQuery) + srv.Route("/voucher/").POST("pushWechatRetry/{product_no}", cmb.PushWechatRetry) v1.RegisterCmbHTTPServer(srv, cmb) diff --git a/internal/server/rds_consume.go b/internal/server/rds_consume.go index 9eed6ef..895f7d3 100644 --- a/internal/server/rds_consume.go +++ b/internal/server/rds_consume.go @@ -26,11 +26,11 @@ func NewRdbConsumer( ) *RdbConsumer { manager := rdsmq.NewConsumerManager() - if cf := voucherService.GetConfig(); cf != nil { + if cf := voucherService.GetWechatQueryConfig(); cf != nil { manager.Add(cf) } - if cf2 := voucherService.GetWechatConfig(); cf2 != nil { + if cf2 := voucherService.GetWechatRetryConfig(); cf2 != nil { manager.Add(cf2) } diff --git a/internal/service/cmb.go b/internal/service/cmb.go index bcbdc12..b064056 100644 --- a/internal/service/cmb.go +++ b/internal/service/cmb.go @@ -2,15 +2,18 @@ package service import ( "context" + "encoding/json" "fmt" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport/http" "github.com/robfig/cron" + "io" http2 "net/http" "strconv" v1 "voucher/api/v1" "voucher/internal/biz" "voucher/internal/biz/bo" + "voucher/internal/biz/do" "voucher/internal/biz/mixrepos" "voucher/internal/biz/vo" "voucher/internal/biz/wechatrepo" @@ -120,11 +123,29 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error { return fmt.Errorf("product_no is empty") } - err := this.VoucherBiz.PushWechatQuery(ctx, productNo) + bodyBytes, err := io.ReadAll(ctx.Request().Body) if err != nil { return err } + var req *do.WechatQuery + if err = json.Unmarshal(bodyBytes, &req); err != nil { + return err + } + + if req == nil { + return fmt.Errorf("req is empty") + } + + req.ProductNo = productNo + if req.StartTime == "" || req.EndTime == "" { + return fmt.Errorf("start_time or end_time is empty") + } + + if err = this.VoucherBiz.PushWechatQuery(ctx, req); err != nil { + return err + } + return ctx.JSON(http2.StatusOK, map[string]interface{}{ "data": productNo, }) diff --git a/internal/service/wechat_query.go b/internal/service/wechat_query.go index 35a2a9f..f210d53 100644 --- a/internal/service/wechat_query.go +++ b/internal/service/wechat_query.go @@ -7,7 +7,7 @@ import ( "voucher/internal/pkg/rdsmq" ) -func (s *VoucherService) GetConfig() *rdsmq.ConsumeConfig { +func (s *VoucherService) GetWechatQueryConfig() *rdsmq.ConsumeConfig { queue := s.bc.RdsMQ.GetWechatQuery() if queue == nil { @@ -25,19 +25,19 @@ func (s *VoucherService) GetConfig() *rdsmq.ConsumeConfig { NumWorkers: queue.NumWorkers, WaitTime: queue.GetWaitTime().AsDuration(), RetryNum: queue.RetryNum, - Fn: s.Handle, + Fn: s.WechatQueryHandle, Logger: s.logHelper, } } -func (s *VoucherService) Handle(ctx context.Context, batchNo string) error { +func (s *VoucherService) WechatQueryHandle(ctx context.Context, msg string) error { - if batchNo == "" { + if msg == "" { s.logHelper.Errorf("wechat query error: batchNo is empty") return nil } - if err := s.VoucherBiz.WechatQuery(ctx, batchNo); err != nil { + if err := s.VoucherBiz.WechatQuery(ctx, msg); err != nil { s.logHelper.Error(err) } diff --git a/internal/service/wechat_retry.go b/internal/service/wechat_retry.go index 34cc754..1ba4d76 100644 --- a/internal/service/wechat_retry.go +++ b/internal/service/wechat_retry.go @@ -7,7 +7,7 @@ import ( "voucher/internal/pkg/rdsmq" ) -func (s *VoucherService) GetWechatConfig() *rdsmq.ConsumeConfig { +func (s *VoucherService) GetWechatRetryConfig() *rdsmq.ConsumeConfig { queue := s.bc.RdsMQ.GetWechatRetry() if queue == nil { @@ -25,12 +25,12 @@ func (s *VoucherService) GetWechatConfig() *rdsmq.ConsumeConfig { NumWorkers: queue.NumWorkers, WaitTime: queue.GetWaitTime().AsDuration(), RetryNum: queue.RetryNum, - Fn: s.HandleWechat, + Fn: s.HandleWechatRetry, Logger: s.logHelper, } } -func (s *VoucherService) HandleWechat(ctx context.Context, batchNo string) error { +func (s *VoucherService) HandleWechatRetry(ctx context.Context, batchNo string) error { if batchNo == "" { s.logHelper.Errorf("RdsMQ keySend error: batchNo is empty")