2024-07-12 18:11:21 +08:00
|
|
|
|
package transfersys
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
models "com.snow.auto_monitor/app/models/orders"
|
|
|
|
|
rdbmq "com.snow.auto_monitor/app/utils/rdbmq"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// func TestRDB() (err error) {
|
|
|
|
|
// rdbmq.CreateGroup("orders", "orders_one")
|
|
|
|
|
// return
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
// func WriteGroup() (err error) {
|
|
|
|
|
// err = rdbmq.WriteOne("orders", time.Now().Format("2006-01-02 15:04:05"), "test")
|
|
|
|
|
// return
|
|
|
|
|
// }
|
|
|
|
|
|
2024-09-10 14:26:07 +08:00
|
|
|
|
func ReadGroup(queue_no int64) (order *models.Orders, err error) {
|
|
|
|
|
streamName := "orders"
|
|
|
|
|
groupName := "orders_one"
|
|
|
|
|
consumerName := "cosumer_one"
|
2024-09-11 12:01:27 +08:00
|
|
|
|
if queue_no != 0 {
|
2024-09-10 14:26:07 +08:00
|
|
|
|
streamName = fmt.Sprintf("orders_%d", queue_no)
|
|
|
|
|
groupName = fmt.Sprintf("orders_%d", queue_no)
|
|
|
|
|
consumerName = fmt.Sprintf("cosumer_%d", queue_no)
|
|
|
|
|
}
|
|
|
|
|
id, res, err := rdbmq.ReadOne(streamName, groupName, consumerName)
|
2024-07-12 18:11:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
if err.Error() == "context deadline exceeded" {
|
|
|
|
|
return nil, errors.New("no data")
|
|
|
|
|
}
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
key := ""
|
|
|
|
|
for k := range res {
|
|
|
|
|
key = k
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
jsonstr := res[key].(string)
|
|
|
|
|
json.Unmarshal([]byte(jsonstr), &order)
|
2024-07-26 15:09:57 +08:00
|
|
|
|
|
2024-09-10 14:26:07 +08:00
|
|
|
|
//确认信息
|
|
|
|
|
err = rdbmq.AckOne(streamName, groupName, id)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
2024-07-12 18:11:21 +08:00
|
|
|
|
return order, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func WriteOrder() {
|
2024-07-26 17:56:54 +08:00
|
|
|
|
fmt.Println("开始执行周期任务:WriteOrder")
|
2024-09-10 14:26:07 +08:00
|
|
|
|
// 创建默认分组
|
2024-07-12 18:11:21 +08:00
|
|
|
|
err := rdbmq.CreateStreamAndGroup("orders", "orders_one")
|
|
|
|
|
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
|
|
|
|
fmt.Println("分组失败")
|
2024-08-21 10:18:34 +08:00
|
|
|
|
fmt.Println(err)
|
2024-07-12 18:11:21 +08:00
|
|
|
|
return
|
|
|
|
|
}
|
2024-09-10 14:26:07 +08:00
|
|
|
|
// 创建价格分组
|
2024-09-13 18:06:37 +08:00
|
|
|
|
var queue_nos = [...]int{-1, 10, 20, 50, 100, 1000, 10210, 10220, 10250, 102100, 1021000}
|
2024-09-10 14:26:07 +08:00
|
|
|
|
for _, value := range queue_nos {
|
|
|
|
|
err = rdbmq.CreateStreamAndGroup(fmt.Sprintf("orders_%d", value), fmt.Sprintf("orders_%d", value))
|
|
|
|
|
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
|
|
|
|
fmt.Println("分组失败")
|
|
|
|
|
fmt.Println(err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2024-07-12 18:11:21 +08:00
|
|
|
|
// 创建一个新的Ticker,每3秒钟触发一次
|
|
|
|
|
ticker := time.NewTicker(3 * time.Second)
|
|
|
|
|
defer ticker.Stop() // 在函数结束时停止Ticker
|
|
|
|
|
for range ticker.C {
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
|
|
res, _, err := models.GetInstance().GetIdleOrder()
|
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println(err.Error())
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if res != nil {
|
2024-09-10 14:26:07 +08:00
|
|
|
|
//获取产品信息
|
|
|
|
|
streamName := "orders"
|
|
|
|
|
if res.QueueNo != 0 {
|
|
|
|
|
streamName = fmt.Sprintf("orders_%d", res.QueueNo)
|
|
|
|
|
}
|
|
|
|
|
err = rdbmq.WriteOne(streamName, res.OrderNo, res)
|
2024-07-12 18:11:21 +08:00
|
|
|
|
if err != nil {
|
|
|
|
|
fmt.Println(err.Error())
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
go WriteOrder()
|
|
|
|
|
}
|