131 lines
3.1 KiB
Go
131 lines
3.1 KiB
Go
package timeslicequery
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"github.com/go-kratos/kratos/v2/transport/http"
|
|
"time"
|
|
"voucher/internal/biz/do"
|
|
"voucher/internal/pkg/timeslice"
|
|
)
|
|
|
|
func (v *Query) Push(ctx http.Context, req *do.RdsWechatQuery) (string, error) {
|
|
|
|
if req.StartTime == "" || req.EndTime == "" {
|
|
return "", fmt.Errorf("时间参数不能为空")
|
|
}
|
|
|
|
queue := v.bc.RdsMQ.GetWechatTimeSliceQuery()
|
|
if queue == nil {
|
|
return "", fmt.Errorf("队列不存在")
|
|
}
|
|
|
|
if queue.Name == "" {
|
|
return "", fmt.Errorf("队列不存在")
|
|
}
|
|
|
|
if queue.IsOpen == false {
|
|
return "", fmt.Errorf("队列未开启")
|
|
}
|
|
|
|
if req.ProductNo != "" {
|
|
_, err := v.productRepo.GetByProductNo(ctx, req.ProductNo)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
|
|
b, err := json.Marshal(req)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
strMsg := string(b)
|
|
|
|
uid := v.uid(strMsg)
|
|
if v.Get(uid) {
|
|
return "", fmt.Errorf("此台服务队列正在处理中,%s-%s,ip:%s", uid, strMsg, ctx.Header().Get("X-Forwarded-For"))
|
|
}
|
|
|
|
v.Add(uid)
|
|
|
|
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
|
|
if err != nil {
|
|
v.Remove(uid)
|
|
return "", fmt.Errorf("添加到队列失败:%v", err)
|
|
}
|
|
|
|
return strMsg, nil
|
|
}
|
|
|
|
func (v *Query) getManager(msg string) (*timeslice.Manager, error) {
|
|
|
|
var req *do.RdsWechatQuery
|
|
|
|
if err := json.Unmarshal([]byte(msg), &req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if req.StartTime == "" || req.EndTime == "" {
|
|
return nil, fmt.Errorf("时间参数不能为空")
|
|
}
|
|
|
|
start, err := time.Parse(time.DateTime, req.StartTime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
end, err := time.Parse(time.DateTime, req.EndTime)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
manager := ×lice.Manager{
|
|
StartTime: start,
|
|
EndTime: end,
|
|
ProductNo: req.ProductNo,
|
|
GoNum: timeslice.DefaultGoNum, // 协程数量
|
|
TimeSliceHours: timeslice.DefaultTimeSliceHours, // 时间间隔
|
|
}
|
|
|
|
if req.GoNum > 0 {
|
|
manager.GoNum = req.GoNum
|
|
}
|
|
|
|
if req.TimeSliceHours > 0 {
|
|
manager.TimeSliceHours = req.TimeSliceHours
|
|
}
|
|
|
|
return manager, nil
|
|
}
|
|
|
|
func (v *Query) Consumer(ctx context.Context, msg string) error {
|
|
|
|
defer v.Remove(v.uid(msg))
|
|
|
|
req, err := v.getManager(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reqStr := req.String()
|
|
|
|
executeStart := time.Now()
|
|
executeStartStr := executeStart.Format(time.DateTime)
|
|
|
|
log.Warnf("微信券查询处理,开始:%s,msg:%s,manager%s", executeStartStr, msg, reqStr)
|
|
fmt.Printf("微信券查询处理,开始:%s,msg:%s,manager%s", executeStartStr, msg, reqStr)
|
|
|
|
if err = v.execute(ctx, req); err != nil {
|
|
log.Errorf("微信券查询处理,失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
|
|
return fmt.Errorf("微信券查询处理,失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
|
|
}
|
|
|
|
executeEnd := time.Now()
|
|
log.Warnf("微信券查询处理,耗时:%s,结束时间%s,manager%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr)
|
|
fmt.Printf("微信券查询处理,耗时:%s,结束时间%s,manager%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr)
|
|
|
|
return nil
|
|
}
|