com.snow.auto_monitor/app/utils/rdbmq/rdbmq.go

85 lines
2.8 KiB
Go
Raw Normal View History

2024-07-12 18:11:21 +08:00
package rdbmq
import (
"context"
"fmt"
"time"
"com.snow.auto_monitor/config"
goredis "github.com/go-redis/redis/v8"
"github.com/qit-team/snow-core/redis"
)
// const Nil = goredis.
func init() {
//加载配置文件
conf, err := config.Load("./.env")
if err != nil {
fmt.Println(err)
}
//注册redis类
err = redis.Pr.Register(redis.SingletonMain, conf.Redis)
if err != nil {
fmt.Println(err)
}
}
func CreateStreamAndGroup(stream string, group string) (err error) {
rdb := redis.GetRedis(redis.SingletonMain)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// 创建stream和group
err = rdb.XGroupCreateMkStream(ctx, stream, group, "$").Err()
// XGroupCreate创建一个消费者组
// err = rdb.XGroupCreate(ctx, stream, group, "$").Err()
return
}
func WriteOne(stream string, key string, value interface{}) (err error) {
rdb := redis.GetRedis(redis.SingletonMain)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// XADD添加消息到对尾这个代码每运行一次就增加一次内容
err = rdb.XAdd(ctx, &goredis.XAddArgs{
Stream: stream, // 设置流stream的 key消息队列名
NoMkStream: false, //为falsekey不存在会新建
MaxLen: 10000, //消息队列最大长度,队列长度超过设置最大长度后,旧消息会被删除
Approx: false, //默认false设为true时模糊指定stram的长度
ID: "*", //消息ID* 表示由Redis自动生成
Values: []interface{}{ //消息队列的内容,键值对形式
key, value,
},
// MinID: "id",//超过设置长度值丢弃小于MinID消息id
// Limit: 1000, //限制长度,基本不用
}).Err()
return
}
func ReadOne(stream string, group string, consumer string) (res map[string]interface{}, err error) {
rdb := redis.GetRedis(redis.SingletonMain)
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
// XReadGroup读取消费者中消息
readgroupval, err := rdb.XReadGroup(ctx, &goredis.XReadGroupArgs{
// Streams第二个参数为IDlist of streams and ids, e.g. stream1 stream2 id1 id2
// id为 >表示最新未读消息ID也是未被分配给其他消费者的最新消息
// id为 0 或其他表示可以获取已读但未确认的消息。这种情况下BLOCK和NOACK都会忽略
// id为具体ID表示获取这个消费者组的pending的历史消息而不是新消息
Streams: []string{stream, ">"},
Group: group, //消费者组名
Consumer: consumer, // 消费者名
Count: 1,
Block: 0,
NoAck: true, // true-表示读取消息时确认消息
}).Result()
if err != nil {
return nil, err
}
return readgroupval[0].Messages[0].Values, err
}