Merge branch 'pro' into kx
# Conflicts: # internal/biz/voucher.go
This commit is contained in:
commit
9012f17bc0
|
|
@ -8,13 +8,13 @@ server:
|
||||||
data:
|
data:
|
||||||
db:
|
db:
|
||||||
driver: mysql
|
driver: mysql
|
||||||
source: root:lansexiongdi6,@tcp(47.97.27.195:3306)/voucher?parseTime=True&loc=Local
|
source: root:lansexiongdi6,@tcp(47.108.53.72:3306)/voucher?parseTime=True&loc=Local
|
||||||
maxIdle: 200 #最大的空闲连接数
|
maxIdle: 200 #最大的空闲连接数
|
||||||
maxOpen: 1000 #最大连接数,0表示不受限制
|
maxOpen: 1000 #最大连接数,0表示不受限制
|
||||||
maxLifetime: 300s #连接复用的最大生命周期
|
maxLifetime: 300s #连接复用的最大生命周期
|
||||||
isDebug: false
|
isDebug: false
|
||||||
redis: #没有则注释此属性
|
redis: #没有则注释此属性
|
||||||
addr: 47.97.27.195:6379
|
addr: 47.108.53.72:6379
|
||||||
password: lansexiongdi@666
|
password: lansexiongdi@666
|
||||||
readTimeout: 5s
|
readTimeout: 5s
|
||||||
writeTimeout: 5s
|
writeTimeout: 5s
|
||||||
|
|
|
||||||
|
|
@ -51,8 +51,6 @@ func (v *VoucherBiz) ExecuteNotice(ctx context.Context, req *bo.FindInBatchesUse
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@ package biz
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/go-kratos/kratos/v2/log"
|
"github.com/go-kratos/kratos/v2/log"
|
||||||
|
"sync"
|
||||||
v1 "voucher/api/v1"
|
v1 "voucher/api/v1"
|
||||||
"voucher/internal/biz/bo"
|
"voucher/internal/biz/bo"
|
||||||
"voucher/internal/biz/cmb"
|
"voucher/internal/biz/cmb"
|
||||||
|
|
@ -29,6 +30,9 @@ type VoucherBiz struct {
|
||||||
CmbMixRepo mixrepos.CmbMixRepo
|
CmbMixRepo mixrepos.CmbMixRepo
|
||||||
KxMixRepo mixrepos.KxMixRepo
|
KxMixRepo mixrepos.KxMixRepo
|
||||||
UseLogRepo repo.UseLogRepo
|
UseLogRepo repo.UseLogRepo
|
||||||
|
|
||||||
|
mu sync.RWMutex
|
||||||
|
queryMap map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewVoucherBiz(
|
func NewVoucherBiz(
|
||||||
|
|
@ -62,6 +66,35 @@ func NewVoucherBiz(
|
||||||
CmbMixRepo: CmbMixRepo,
|
CmbMixRepo: CmbMixRepo,
|
||||||
KxMixRepo: KxMixRepo,
|
KxMixRepo: KxMixRepo,
|
||||||
UseLogRepo: UseLogRepo,
|
UseLogRepo: UseLogRepo,
|
||||||
|
queryMap: make(map[string]bool),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *VoucherBiz) Get(stockNo string) bool {
|
||||||
|
this.mu.Lock()
|
||||||
|
defer this.mu.Unlock()
|
||||||
|
|
||||||
|
if _, ok := this.queryMap[stockNo]; ok {
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *VoucherBiz) Add(uid string) {
|
||||||
|
this.mu.Lock()
|
||||||
|
defer this.mu.Unlock()
|
||||||
|
|
||||||
|
this.queryMap[uid] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *VoucherBiz) Remove(uid string) {
|
||||||
|
|
||||||
|
this.mu.Lock()
|
||||||
|
defer this.mu.Unlock()
|
||||||
|
|
||||||
|
if _, ok := this.queryMap[uid]; ok {
|
||||||
|
delete(this.queryMap, uid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,17 +5,24 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/go-kratos/kratos/v2/log"
|
"github.com/go-kratos/kratos/v2/log"
|
||||||
|
"github.com/go-kratos/kratos/v2/transport/http"
|
||||||
"time"
|
"time"
|
||||||
"voucher/internal/biz/bo"
|
"voucher/internal/biz/bo"
|
||||||
"voucher/internal/biz/do"
|
"voucher/internal/biz/do"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) error {
|
func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error {
|
||||||
|
|
||||||
|
if v.Get("CMB_WECHAT_QUERY") {
|
||||||
|
return fmt.Errorf("此台服务队列正在处理中,ip:%s", ctx.Header().Get("X-Forwarded-For"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.ProductNo != "" {
|
||||||
_, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo)
|
_, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
queue := v.bc.RdsMQ.GetWechatQuery()
|
queue := v.bc.RdsMQ.GetWechatQuery()
|
||||||
if queue == nil {
|
if queue == nil {
|
||||||
|
|
@ -27,8 +34,11 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
v.Add("CMB_WECHAT_QUERY")
|
||||||
|
|
||||||
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result()
|
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
v.Remove("CMB_WECHAT_QUERY")
|
||||||
return fmt.Errorf("添加到队列失败:%v", err)
|
return fmt.Errorf("添加到队列失败:%v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -37,6 +47,8 @@ func (v *VoucherBiz) PushWechatQuery(ctx context.Context, req *do.WechatQuery) e
|
||||||
|
|
||||||
func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
|
func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
|
||||||
|
|
||||||
|
defer v.Remove("CMB_WECHAT_QUERY")
|
||||||
|
|
||||||
var req *do.WechatQuery
|
var req *do.WechatQuery
|
||||||
|
|
||||||
if err := json.Unmarshal([]byte(msg), &req); err != nil {
|
if err := json.Unmarshal([]byte(msg), &req); err != nil {
|
||||||
|
|
@ -44,12 +56,16 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
log.Warnf("微信券查询处理开始:%s,msg:%s", start.String(), msg)
|
startStr := time.Now().String()
|
||||||
fmt.Printf("微信券查询处理开始:%s,msg:%s", start.String(), msg)
|
|
||||||
|
|
||||||
|
log.Warnf("微信券查询处理开始:%s,msg:%s", startStr, msg)
|
||||||
|
fmt.Printf("微信券查询处理开始:%s,msg:%s", startStr, msg)
|
||||||
|
|
||||||
|
n := 0
|
||||||
num := 0
|
num := 0
|
||||||
err := v.OrderRepo.FinSucByStockIdInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
err := v.OrderRepo.FinSucByStockIdInBatches(ctx, req, func(ctx context.Context, rows []*bo.OrderBo) error {
|
||||||
|
|
||||||
|
n += 1
|
||||||
for _, order := range rows {
|
for _, order := range rows {
|
||||||
|
|
||||||
num += 1
|
num += 1
|
||||||
|
|
@ -60,13 +76,13 @@ func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(1 * time.Second)
|
log.Warnf("微信券查询处理第:%d组,已执行条数:%d,执行开始时间:%s,已耗时:%s", n, num, startStr, time.Now().Sub(start).String())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
log.Warnf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num)
|
log.Warnf("微信券查询处理耗时:%s,处理%d组,处理%d单,msg:%s", time.Now().Sub(start).String(), n, num, msg)
|
||||||
fmt.Printf("微信券查询处理耗时:%s,msg:%s,处理%d单", time.Now().Sub(start).String(), msg, num)
|
fmt.Printf("微信券查询处理耗时:%s,处理%d组,处理%d单,msg:%s", time.Now().Sub(start).String(), n, num, msg)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -33,10 +33,11 @@ func (p *OrderRepoImpl) DB(ctx context.Context) *gorm.DB {
|
||||||
|
|
||||||
func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
|
func (p *OrderRepoImpl) FinSucByStockIdInBatches(ctx context.Context, req *do.WechatQuery, fun func(ctx context.Context, rows []*bo.OrderBo) error) error {
|
||||||
|
|
||||||
tx := p.DB(ctx).
|
tx := p.DB(ctx).Where("status = ?", vo.OrderStatusSuccess.GetValue())
|
||||||
Where("product_no = ?", req.ProductNo).
|
|
||||||
Where("status = ?", vo.OrderStatusSuccess.GetValue())
|
|
||||||
|
|
||||||
|
if req.ProductNo != "" {
|
||||||
|
tx = tx.Where("product_no = ?", req.ProductNo)
|
||||||
|
}
|
||||||
if req.StartTime != "" {
|
if req.StartTime != "" {
|
||||||
tx = tx.Where("receive_success_time >= ?", req.StartTime)
|
tx = tx.Where("receive_success_time >= ?", req.StartTime)
|
||||||
}
|
}
|
||||||
|
|
@ -100,7 +101,6 @@ func (p *OrderRepoImpl) FindInBatches(ctx context.Context, w *bo.FindInBatchesUs
|
||||||
var results = make([]*model.Order, 0)
|
var results = make([]*model.Order, 0)
|
||||||
|
|
||||||
result := p.DB(ctx).
|
result := p.DB(ctx).
|
||||||
Where("type = ?", w.Type).
|
|
||||||
Where("status IN (?)", []uint8{vo.OrderStatusSuccess.GetValue(), vo.OrderStatusUse.GetValue()}).
|
Where("status IN (?)", []uint8{vo.OrderStatusSuccess.GetValue(), vo.OrderStatusUse.GetValue()}).
|
||||||
Where("receive_success_time BETWEEN ? AND ?", w.StartTime, w.EndTime).
|
Where("receive_success_time BETWEEN ? AND ?", w.StartTime, w.EndTime).
|
||||||
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
|
FindInBatches(&results, 100, func(tx *gorm.DB, batch int) error {
|
||||||
|
|
|
||||||
|
|
@ -15,11 +15,11 @@ func TestNoticeTime(t *testing.T) {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
// 获取七天前的日期
|
// 获取七天前的日期
|
||||||
noticeStartDay := now.AddDate(0, 0, -29)
|
noticeStartDay := now.AddDate(0, 0, -15)
|
||||||
// 获取七天前 00:00:00 的时间
|
// 获取七天前 00:00:00 的时间
|
||||||
startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location())
|
startTime := time.Date(noticeStartDay.Year(), noticeStartDay.Month(), noticeStartDay.Day(), 0, 0, 0, 0, noticeStartDay.Location())
|
||||||
|
|
||||||
noticeEndDay := now.AddDate(0, 0, -28)
|
noticeEndDay := now.AddDate(0, 0, -1)
|
||||||
// 获取昨天 23:59:59 的时间
|
// 获取昨天 23:59:59 的时间
|
||||||
endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location())
|
endTime := time.Date(noticeEndDay.Year(), noticeEndDay.Month(), noticeEndDay.Day(), 23, 59, 59, 0, noticeEndDay.Location())
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -49,8 +49,8 @@ func Test_WechatNotifyProducer2(t *testing.T) {
|
||||||
"associated_data":"coupon",
|
"associated_data":"coupon",
|
||||||
"plain_text":{
|
"plain_text":{
|
||||||
"stock_creator_mchid":"1652465541",
|
"stock_creator_mchid":"1652465541",
|
||||||
"stock_id":"20393435",
|
"stock_id":"20393759",
|
||||||
"coupon_id":"101423873113",
|
"coupon_id":"104611498109",
|
||||||
"coupon_name":"test",
|
"coupon_name":"test",
|
||||||
"description":"","status":"USED",
|
"description":"","status":"USED",
|
||||||
"create_time":"2025-03-07T15:49:31+08:00",
|
"create_time":"2025-03-07T15:49:31+08:00",
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ func (r *ConsumeConfig) init(_ context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ConsumeConfig) Start(ctx context.Context) {
|
func (r *ConsumeConfig) Start(ctx context.Context) {
|
||||||
fmt.Printf("RdsMQ Starting to dequeue from [%s]", r.QueueName)
|
fmt.Printf("RdsMQ Starting to dequeue from [%s] \n", r.QueueName)
|
||||||
|
|
||||||
r.init(ctx)
|
r.init(ctx)
|
||||||
defer r.close(ctx)
|
defer r.close(ctx)
|
||||||
|
|
|
||||||
|
|
@ -118,11 +118,6 @@ func (this *CmbService) RegisterTag(ctx http.Context) error {
|
||||||
|
|
||||||
func (this *CmbService) PushWechatQuery(ctx http.Context) error {
|
func (this *CmbService) PushWechatQuery(ctx http.Context) error {
|
||||||
|
|
||||||
productNo := ctx.Vars().Get("product_no")
|
|
||||||
if productNo == "" {
|
|
||||||
return fmt.Errorf("product_no is empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
bodyBytes, err := io.ReadAll(ctx.Request().Body)
|
bodyBytes, err := io.ReadAll(ctx.Request().Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -137,7 +132,11 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error {
|
||||||
return fmt.Errorf("req is empty")
|
return fmt.Errorf("req is empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
productNo := ctx.Vars().Get("product_no")
|
||||||
|
if productNo == "" {
|
||||||
req.ProductNo = productNo
|
req.ProductNo = productNo
|
||||||
|
}
|
||||||
|
|
||||||
if req.StartTime == "" || req.EndTime == "" {
|
if req.StartTime == "" || req.EndTime == "" {
|
||||||
return fmt.Errorf("start_time or end_time is empty")
|
return fmt.Errorf("start_time or end_time is empty")
|
||||||
}
|
}
|
||||||
|
|
@ -147,7 +146,7 @@ func (this *CmbService) PushWechatQuery(ctx http.Context) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
return ctx.JSON(http2.StatusOK, map[string]interface{}{
|
return ctx.JSON(http2.StatusOK, map[string]interface{}{
|
||||||
"data": productNo,
|
"data": req,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -38,7 +38,7 @@ func (s *VoucherService) WechatQueryHandle(ctx context.Context, msg string) erro
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.VoucherBiz.WechatQuery(ctx, msg); err != nil {
|
if err := s.VoucherBiz.WechatQuery(ctx, msg); err != nil {
|
||||||
s.logHelper.Error(err)
|
s.logHelper.Errorf("wechat query error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue