118 lines
2.7 KiB
Go
118 lines
2.7 KiB
Go
package transfersys
|
||
|
||
import (
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"time"
|
||
|
||
models "com.snow.auto_monitor/app/models/orders"
|
||
queueServ "com.snow.auto_monitor/app/services/queuelist"
|
||
rdbmq "com.snow.auto_monitor/app/utils/rdbmq"
|
||
)
|
||
|
||
// func TestRDB() (err error) {
|
||
// rdbmq.CreateGroup("orders", "orders_one")
|
||
// return
|
||
// }
|
||
|
||
// func WriteGroup() (err error) {
|
||
// err = rdbmq.WriteOne("orders", time.Now().Format("2006-01-02 15:04:05"), "test")
|
||
// return
|
||
// }
|
||
|
||
func ReadGroup(queue_no int64) (order *models.Orders, err error) {
|
||
streamName := "orders"
|
||
groupName := "orders_one"
|
||
consumerName := "cosumer_one"
|
||
if queue_no != 0 {
|
||
streamName = fmt.Sprintf("orders_%d", queue_no)
|
||
groupName = fmt.Sprintf("orders_%d", queue_no)
|
||
consumerName = fmt.Sprintf("cosumer_%d", queue_no)
|
||
}
|
||
id, res, err := rdbmq.ReadOne(streamName, groupName, consumerName)
|
||
if err != nil {
|
||
if err.Error() == "context deadline exceeded" {
|
||
return nil, errors.New("no data")
|
||
}
|
||
return nil, err
|
||
}
|
||
key := ""
|
||
for k := range res {
|
||
key = k
|
||
break
|
||
}
|
||
jsonstr := res[key].(string)
|
||
json.Unmarshal([]byte(jsonstr), &order)
|
||
|
||
//确认信息
|
||
err = rdbmq.AckOne(streamName, groupName, id)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return order, nil
|
||
}
|
||
|
||
func WriteOrder() {
|
||
fmt.Println("开始执行周期任务:WriteOrder")
|
||
// 创建默认分组
|
||
err := rdbmq.CreateStreamAndGroup("orders", "orders_one")
|
||
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
||
fmt.Println("分组失败")
|
||
fmt.Println(err)
|
||
return
|
||
}
|
||
|
||
queues, err := queueServ.Search(0, 0, "", 0, "", "", 0, 0)
|
||
if err != nil {
|
||
fmt.Println(err)
|
||
return
|
||
}
|
||
queue_nos := make([]int64, 0)
|
||
for _, queue := range queues {
|
||
if queue.QueueNo == 0 {
|
||
continue
|
||
}
|
||
queue_nos = append(queue_nos, queue.QueueNo)
|
||
}
|
||
|
||
// 创建价格分组
|
||
for _, value := range queue_nos {
|
||
err = rdbmq.CreateStreamAndGroup(fmt.Sprintf("orders_%d", value), fmt.Sprintf("orders_%d", value))
|
||
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
||
fmt.Println("分组失败")
|
||
fmt.Println(err)
|
||
return
|
||
}
|
||
}
|
||
|
||
// 创建一个新的Ticker,每3秒钟触发一次
|
||
ticker := time.NewTicker(3 * time.Second)
|
||
defer ticker.Stop() // 在函数结束时停止Ticker
|
||
for range ticker.C {
|
||
for i := 0; i < 10; i++ {
|
||
res, _, err := models.GetInstance().GetIdleOrder()
|
||
if err != nil {
|
||
fmt.Println(err.Error())
|
||
continue
|
||
}
|
||
if res != nil {
|
||
//获取产品信息
|
||
streamName := "orders"
|
||
if res.QueueNo != 0 {
|
||
streamName = fmt.Sprintf("orders_%d", res.QueueNo)
|
||
}
|
||
err = rdbmq.WriteOne(streamName, res.OrderNo, res)
|
||
if err != nil {
|
||
fmt.Println(err.Error())
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func init() {
|
||
go WriteOrder()
|
||
}
|