package transfersys import ( "encoding/json" "errors" "fmt" "time" models "com.snow.auto_monitor/app/models/orders" rdbmq "com.snow.auto_monitor/app/utils/rdbmq" ) // func TestRDB() (err error) { // rdbmq.CreateGroup("orders", "orders_one") // return // } // func WriteGroup() (err error) { // err = rdbmq.WriteOne("orders", time.Now().Format("2006-01-02 15:04:05"), "test") // return // } func ReadGroup() (order *models.Orders, err error) { res, err := rdbmq.ReadOne("orders", "orders_one", "cosumer_one") if err != nil { if err.Error() == "context deadline exceeded" { return nil, errors.New("no data") } return nil, err } key := "" for k := range res { key = k break } jsonstr := res[key].(string) json.Unmarshal([]byte(jsonstr), &order) return order, nil } func WriteOrder() { fmt.Println("开始执行周期任务:WriteOrder") // 创建分组 err := rdbmq.CreateStreamAndGroup("orders", "orders_one") if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { fmt.Println("分组失败") return } // 创建一个新的Ticker,每3秒钟触发一次 ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() // 在函数结束时停止Ticker for range ticker.C { for i := 0; i < 10; i++ { res, _, err := models.GetInstance().GetIdleOrder() if err != nil { fmt.Println(err.Error()) continue } if res != nil { err = rdbmq.WriteOne("orders", res.OrderNo, res) if err != nil { fmt.Println(err.Error()) continue } } } } } func init() { go WriteOrder() }