package transfersys import ( "encoding/json" "errors" "fmt" "time" models "com.snow.auto_monitor/app/models/orders" rdbmq "com.snow.auto_monitor/app/utils/rdbmq" ) // func TestRDB() (err error) { // rdbmq.CreateGroup("orders", "orders_one") // return // } // func WriteGroup() (err error) { // err = rdbmq.WriteOne("orders", time.Now().Format("2006-01-02 15:04:05"), "test") // return // } func ReadGroup(queue_no int64) (order *models.Orders, err error) { streamName := "orders" groupName := "orders_one" consumerName := "cosumer_one" if queue_no > 0 { streamName = fmt.Sprintf("orders_%d", queue_no) groupName = fmt.Sprintf("orders_%d", queue_no) consumerName = fmt.Sprintf("cosumer_%d", queue_no) } id, res, err := rdbmq.ReadOne(streamName, groupName, consumerName) if err != nil { if err.Error() == "context deadline exceeded" { return nil, errors.New("no data") } return nil, err } key := "" for k := range res { key = k break } jsonstr := res[key].(string) json.Unmarshal([]byte(jsonstr), &order) //确认信息 err = rdbmq.AckOne(streamName, groupName, id) if err != nil { return nil, err } return order, nil } func WriteOrder() { fmt.Println("开始执行周期任务:WriteOrder") // 创建默认分组 err := rdbmq.CreateStreamAndGroup("orders", "orders_one") if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { fmt.Println("分组失败") fmt.Println(err) return } // 创建价格分组 var queue_nos = [...]int{10, 20, 50, 100, 1000} for _, value := range queue_nos { err = rdbmq.CreateStreamAndGroup(fmt.Sprintf("orders_%d", value), fmt.Sprintf("orders_%d", value)) if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { fmt.Println("分组失败") fmt.Println(err) return } } // 创建一个新的Ticker,每3秒钟触发一次 ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() // 在函数结束时停止Ticker for range ticker.C { for i := 0; i < 10; i++ { res, _, err := models.GetInstance().GetIdleOrder() if err != nil { fmt.Println(err.Error()) continue } if res != nil { //获取产品信息 streamName := "orders" if res.QueueNo != 0 { streamName = fmt.Sprintf("orders_%d", res.QueueNo) } err = rdbmq.WriteOne(streamName, res.OrderNo, res) if err != nil { fmt.Println(err.Error()) continue } } } } } func init() { go WriteOrder() }