This commit is contained in:
ziming 2025-06-10 19:01:15 +08:00
parent f418a277c3
commit 87222148ec
13 changed files with 387 additions and 10 deletions

2
go.mod
View File

@ -63,6 +63,8 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect

4
go.sum
View File

@ -150,6 +150,10 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grokify/html-strip-tags-go v0.0.1 h1:0fThFwLbW7P/kOiTBs03FsJSV9RM2M/Q/MOnCQxKMo0=
github.com/grokify/html-strip-tags-go v0.0.1/go.mod h1:2Su6romC5/1VXOQMaWL2yb618ARB8iVo6/DR99A6d78=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=

View File

@ -0,0 +1,104 @@
package timeslice
import (
"context"
"fmt"
"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"
"time"
)
type ManagerSrv struct {
callback func(ctx context.Context, req *Task) error
}
func NewManager(callback func(ctx context.Context, req *Task) error) *ManagerSrv {
return &ManagerSrv{callback: callback}
}
func (m *ManagerSrv) Run(ctx context.Context, req *Manager) (int, error) {
if req.StartTime.After(req.EndTime) {
return 0, fmt.Errorf("start_time不能大于end_time")
}
totalHours := req.EndTime.Sub(req.StartTime).Hours()
taskCount := int(totalHours / 2)
// 如果剩余时间不足2小时增加任务数
if totalHours-float64(taskCount)*float64(2) > 0 {
taskCount++
}
processReq := &Process{
manager: req,
taskCount: taskCount,
}
return taskCount, m.process(ctx, processReq)
}
func (m *ManagerSrv) process(ctx context.Context, req *Process) error {
if req.taskCount == 0 {
return fmt.Errorf("该时间范围无可执行任务次数,请检查时间范围")
}
if req.manager.GoNum == 0 {
return fmt.Errorf("协程数量不能为0")
}
if req.manager.GoNum > 100 {
return fmt.Errorf("协程数量不能大于100")
}
// 设置最大并发任务数为 5
eg := new(errgroup.Group)
eg.SetLimit(req.manager.GoNum)
errs := make([]error, 0) // 用于存储所有错误
// 为每个任务分配开始和结束时间
for i := 0; i < req.taskCount; i++ {
currentStart := req.manager.StartTime.Add(time.Duration(i) * 2 * time.Hour)
currentEnd := currentStart.Add(2 * time.Hour)
if currentEnd.After(req.manager.EndTime) {
currentEnd = req.manager.EndTime
}
eg.Go(func() error {
defer func() {
if err := recover(); err != nil {
errs = append(errs, fmt.Errorf("panic: %v", err))
}
}()
taskID := i + 1
taskReq := &Task{
CurrentStartTime: currentStart,
CurrentEndTime: currentEnd,
TaskID: taskID,
ProductNo: req.manager.ProductNo,
}
if err := m.callback(ctx, taskReq); err != nil {
errs = append(errs, err)
return err
}
return nil
})
}
// 等待所有任务完成
_ = eg.Wait()
var result error
// 收集错误
for _, err2 := range errs {
result = multierror.Append(result, err2)
}
return result
}

View File

@ -0,0 +1,78 @@
package timeslice
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"math/rand"
"testing"
"time"
)
func ProcessTasks() error {
eg := new(errgroup.Group)
eg.SetLimit(5)
for i := 0; i < 5; i++ {
eg.Go(func() error {
// 任务逻辑...
time.Sleep(100 * time.Millisecond)
return fmt.Errorf("任务失败")
})
}
return eg.Wait() // 仅返回第一个错误
}
func TestNewManager(t *testing.T) {
// 解析起始时间和结束时间
start, err := time.Parse(time.DateTime, "2023-01-01 00:00:00")
if err != nil {
t.Fatalf("查询失败: %v", err)
return
}
//end, err := time.Parse(time.DateTime, "2023-01-31 02:00:01")
end, err := time.Parse(time.DateTime, "2023-01-02 02:00:01")
if err != nil {
t.Fatalf("查询失败: %v", err)
return
}
var results []string
callback := func(ctx context.Context, req *Task) error {
// 模拟任务执行,休眠随机时间
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
// 生成任务执行结果
result := fmt.Sprintf("任务批次 %d-%s: %s 至 %s 处理完成", req.TaskID, req.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
results = append(results, result)
//return nil
return fmt.Errorf("任务执行失败:%d-%s,时间%s-%s", req.TaskID, req.ProductNo, req.CurrentStartTime.Format(time.DateTime), req.CurrentEndTime.Format(time.DateTime))
}
startTime := time.Now()
startStr := time.Now().String()
srv := NewManager(callback)
taskCount, err := srv.Run(context.Background(), &Manager{
StartTime: start,
EndTime: end,
ProductNo: "123456",
GoNum: 2,
})
// 输出结果
fmt.Printf("总任务数:%d\n", taskCount)
for _, result := range results {
fmt.Printf("%v\n", result)
}
if err != nil {
t.Error(err)
}
endTime := time.Now()
fmt.Printf("处理耗时:%s,开始处理时间:%s,结束时间%s", startStr, endTime.Sub(startTime).String(), endTime.String())
}

View File

@ -0,0 +1,24 @@
package timeslice
import (
"time"
)
type Manager struct {
StartTime time.Time
EndTime time.Time
ProductNo string
GoNum int
}
type Process struct {
manager *Manager
taskCount int
}
type Task struct {
CurrentStartTime time.Time
CurrentEndTime time.Time
TaskID int
ProductNo string
}

View File

@ -60,11 +60,11 @@ func NewVoucherBiz(
}
}
func (this *VoucherBiz) Get(stockNo string) bool {
func (this *VoucherBiz) Get(uid string) bool {
this.mu.Lock()
defer this.mu.Unlock()
if _, ok := this.queryMap[stockNo]; ok {
if _, ok := this.queryMap[uid]; ok {
return ok
}

View File

@ -6,16 +6,17 @@ import (
"fmt"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport/http"
"github.com/nacos-group/nacos-sdk-go/util"
"time"
"voucher/internal/biz/bo"
"voucher/internal/biz/do"
)
func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error {
func (v *VoucherBiz) uid(_ context.Context, msg string) string {
return util.Md5(msg)
}
if v.Get("CMB_WECHAT_QUERY") {
return fmt.Errorf("此台服务队列正在处理中,ip:%s", ctx.Header().Get("X-Forwarded-For"))
}
func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) error {
if req.ProductNo != "" {
_, err := v.ProductRepo.GetByProductNo(ctx, req.ProductNo)
@ -34,11 +35,18 @@ func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) erro
return err
}
v.Add("CMB_WECHAT_QUERY")
strMsg := string(msg)
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, msg).Result()
uid := v.uid(ctx, strMsg)
if v.Get(uid) {
return fmt.Errorf("此台服务队列正在处理中,key:%s,ip:%s", uid, ctx.Header().Get("X-Forwarded-For"))
}
v.Add(uid)
_, err = v.rdb.Rdb.RPush(ctx, queue.Name, strMsg).Result()
if err != nil {
v.Remove("CMB_WECHAT_QUERY")
v.Remove(uid)
return fmt.Errorf("添加到队列失败:%v", err)
}
@ -47,7 +55,7 @@ func (v *VoucherBiz) PushWechatQuery(ctx http.Context, req *do.WechatQuery) erro
func (v *VoucherBiz) WechatQuery(ctx context.Context, msg string) error {
defer v.Remove("CMB_WECHAT_QUERY")
defer v.Remove(v.uid(ctx, msg))
var req *do.WechatQuery

View File

@ -10,6 +10,7 @@ import (
type WechatCpnRepo interface {
Order(ctx context.Context, order *bo.OrderBo) (couponId string, err error)
Query(ctx context.Context, order *bo.OrderBo) (vo.OrderStatus, error)
QueryCoupon(ctx context.Context, orderWechat *bo.OrderBo) (*cashcoupons.Coupon, error)
QueryProduct(ctx context.Context, stockCreatorMchId, stockId string) (*cashcoupons.Stock, error)
QueryCallback(ctx context.Context) (*cashcoupons.Callback, error)
SetCallback(ctx context.Context, url string) (*cashcoupons.SetCallbackResponse, error)

View File

@ -126,6 +126,34 @@ func (c *CpnRepoImpl) Query(ctx context.Context, orderWechat *bo.OrderBo) (vo.Or
return CpnStatus(*resp.Status).GetStatus()
}
func (c *CpnRepoImpl) QueryCoupon(ctx context.Context, orderWechat *bo.OrderBo) (*cashcoupons.Coupon, error) {
req := cashcoupons.QueryCouponRequest{
CouponId: core.String(orderWechat.VoucherNo),
Appid: core.String(orderWechat.AppID),
Openid: core.String(orderWechat.Account),
}
client, err := c.GetClient(ctx)
if err != nil {
return nil, err
}
svc := cashcoupons.CouponApiService{Client: client}
resp, result, err := svc.QueryCoupon(ctx, req)
if err != nil {
if result.Response != nil && result.Response.Body != nil {
return nil, c.bodyErr(ctx, result)
}
return nil, err
}
return resp, nil
}
func (c *CpnRepoImpl) QueryProduct(ctx context.Context, stockCreatorMchId, stockId string) (*cashcoupons.Stock, error) {
if stockCreatorMchId == "" || stockId == "" {

View File

@ -1,6 +1,8 @@
package helper
import (
"crypto/md5"
"encoding/hex"
"hash/fnv"
"math"
"os"
@ -18,3 +20,14 @@ func HashMod(hashStr string) int {
hashValue := hash.Sum32()
return int(math.Mod(float64(hashValue), 32))
}
func Md5(str string) string {
// 创建一个 MD5 哈希对象
hash := md5.New()
// 写入待加密的数据
hash.Write([]byte(str))
// 获取 MD5 哈希值
hashBytes := hash.Sum(nil)
// 将 MD5 哈希值转换为16进制字符串
return hex.EncodeToString(hashBytes)
}

View File

@ -41,3 +41,8 @@ func used(useNum *int) {
func queryUsed(useNum *int) {
*useNum += 1
}
func TestMd5(t *testing.T) {
s := Md5(`{"product_no":"","start_time":"2025-04-20 09:00:00","end_time":"2025-05-01 00:00:00"}`)
t.Log(s)
}

View File

@ -0,0 +1,66 @@
package script
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
func script(startTime, endTime time.Time, duration time.Duration) error {
// 每指定间隔时间发送一次请求
for t := startTime; t.Before(endTime); t = t.Add(duration) {
end := t.Add(duration) // 计算每次请求的结束时间
// 发送请求
if err := sendRequest(t, end); err != nil {
fmt.Printf("Error sending request: %v\n", err)
}
// 等待一段时间后再发送下一个请求
time.Sleep(1 * time.Second) // 可以根据需要调整间隔时间
}
return nil
}
func sendRequest(startTime, endTime time.Time) error {
url := "https://gateway.dev.cdlsxd.cn/voucher/cmb/v1/orderQuery"
// 创建请求体
requestBody := map[string]interface{}{
"product_no": "",
"start_time": startTime.Format(time.DateTime),
"end_time": endTime.Format(time.DateTime),
}
// 将请求体转换为 JSON 格式
jsonData, err := json.Marshal(requestBody)
if err != nil {
return fmt.Errorf("failed to marshal JSON: %v", err)
}
// 发送 POST 请求
resp, err := http.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to send POST request: %v", err)
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("读取响应体失败: %w", err)
}
if resp.StatusCode == http.StatusOK {
fmt.Printf("Request sent successfully,body:%s", string(bodyBytes))
} else {
return fmt.Errorf("failed with status code: %d", resp.StatusCode)
}
return nil
}

View File

@ -0,0 +1,44 @@
package script
import (
"testing"
"time"
)
func Test_script(t *testing.T) {
startTime, err := time.Parse(time.DateTime, "2025-05-01 00:00:00")
if err != nil {
t.Error(err)
return
}
endTime, err := time.Parse(time.DateTime, "2025-05-31 00:00:00")
if err != nil {
t.Error(err)
return
}
duration := 24 * time.Hour
if err = script(startTime, endTime, duration); err != nil {
t.Error(err)
}
}
func Test_script2(t *testing.T) {
startTime, err := time.Parse(time.DateTime, "2025-05-31 00:00:00")
if err != nil {
t.Error(err)
return
}
endTime := time.Now()
duration := 1 * time.Hour
if err = script(startTime, endTime, duration); err != nil {
t.Error(err)
}
}