voucher/internal/biz/timeslicequery/mq.go

131 lines
3.1 KiB
Go

package timeslicequery
import (
"context"
"encoding/json"
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"time"
"voucher/internal/biz/do"
"voucher/internal/pkg/timeslice"
)
func (v *Query) Push(ctx http.Context, req *do.RdsWechatQuery) (string, error) {
if req.StartTime == "" || req.EndTime == "" {
return "", fmt.Errorf("时间参数不能为空")
}
queue := v.bc.RdsMQ.GetWechatTimeSliceQuery()
if queue == nil {
return "", fmt.Errorf("队列不存在")
}
if queue.Name == "" {
return "", fmt.Errorf("队列不存在")
}
if queue.IsOpen == false {
return "", fmt.Errorf("队列未开启")
}
if req.ProductNo != "" {
_, err := v.productRepo.GetByProductNo(ctx, req.ProductNo)
if err != nil {
return "", err
}
}
b, err := json.Marshal(req)
if err != nil {
return "", err
}
strMsg := string(b)
uid := v.uid(strMsg)
if v.Get(uid) {
return "", fmt.Errorf("此台服务队列正在处理中,%s-%s,ip:%s", uid, strMsg, ctx.Header().Get("X-Forwarded-For"))
}
v.Add(uid)
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
if err != nil {
v.Remove(uid)
return "", fmt.Errorf("添加到队列失败:%v", err)
}
return strMsg, nil
}
func (v *Query) getManager(msg string) (*timeslice.Manager, error) {
var req *do.RdsWechatQuery
if err := json.Unmarshal([]byte(msg), &req); err != nil {
return nil, err
}
if req.StartTime == "" || req.EndTime == "" {
return nil, fmt.Errorf("时间参数不能为空")
}
start, err := time.Parse(time.DateTime, req.StartTime)
if err != nil {
return nil, err
}
end, err := time.Parse(time.DateTime, req.EndTime)
if err != nil {
return nil, err
}
manager := &timeslice.Manager{
StartTime: start,
EndTime: end,
ProductNo: req.ProductNo,
GoNum: timeslice.DefaultGoNum, // 协程数量
TimeSliceHours: timeslice.DefaultTimeSliceHours, // 时间间隔
}
if req.GoNum > 0 {
manager.GoNum = req.GoNum
}
if req.TimeSliceHours > 0 {
manager.TimeSliceHours = req.TimeSliceHours
}
return manager, nil
}
func (v *Query) Consumer(ctx context.Context, msg string) error {
defer v.Remove(v.uid(msg))
req, err := v.getManager(msg)
if err != nil {
return err
}
reqStr := req.String()
executeStart := time.Now()
executeStartStr := executeStart.Format(time.DateTime)
log.Warnf("微信券查询处理,开始:%s,msg:%s,manager%s", executeStartStr, msg, reqStr)
fmt.Printf("微信券查询处理,开始:%s,msg:%s,manager%s", executeStartStr, msg, reqStr)
if err = v.execute(ctx, req); err != nil {
log.Errorf("微信券查询处理,失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
return fmt.Errorf("微信券查询处理,失败:%s,msg:%s,err:%v", executeStartStr, msg, err)
}
executeEnd := time.Now()
log.Warnf("微信券查询处理,耗时:%s,结束时间%s,manager%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr)
fmt.Printf("微信券查询处理,耗时:%s,结束时间%s,manager%s", executeEnd.Sub(executeStart).String(), executeEnd.String(), reqStr)
return nil
}