85 lines
2.8 KiB
Go
85 lines
2.8 KiB
Go
|
package rdbmq
|
|||
|
|
|||
|
import (
|
|||
|
"context"
|
|||
|
"fmt"
|
|||
|
"time"
|
|||
|
|
|||
|
"com.snow.auto_monitor/config"
|
|||
|
goredis "github.com/go-redis/redis/v8"
|
|||
|
"github.com/qit-team/snow-core/redis"
|
|||
|
)
|
|||
|
|
|||
|
// const Nil = goredis.
|
|||
|
|
|||
|
func init() {
|
|||
|
//加载配置文件
|
|||
|
conf, err := config.Load("./.env")
|
|||
|
if err != nil {
|
|||
|
fmt.Println(err)
|
|||
|
}
|
|||
|
//注册redis类
|
|||
|
err = redis.Pr.Register(redis.SingletonMain, conf.Redis)
|
|||
|
if err != nil {
|
|||
|
fmt.Println(err)
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
func CreateStreamAndGroup(stream string, group string) (err error) {
|
|||
|
rdb := redis.GetRedis(redis.SingletonMain)
|
|||
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|||
|
defer cancel()
|
|||
|
|
|||
|
// 创建stream和group
|
|||
|
err = rdb.XGroupCreateMkStream(ctx, stream, group, "$").Err()
|
|||
|
|
|||
|
// XGroupCreate,创建一个消费者组
|
|||
|
// err = rdb.XGroupCreate(ctx, stream, group, "$").Err()
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
func WriteOne(stream string, key string, value interface{}) (err error) {
|
|||
|
rdb := redis.GetRedis(redis.SingletonMain)
|
|||
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|||
|
defer cancel()
|
|||
|
// XADD,添加消息到对尾(这个代码每运行一次就增加一次内容)
|
|||
|
err = rdb.XAdd(ctx, &goredis.XAddArgs{
|
|||
|
Stream: stream, // 设置流stream的 key,消息队列名
|
|||
|
NoMkStream: false, //为false,key不存在会新建
|
|||
|
MaxLen: 10000, //消息队列最大长度,队列长度超过设置最大长度后,旧消息会被删除
|
|||
|
Approx: false, //默认false,设为true时,模糊指定stram的长度
|
|||
|
ID: "*", //消息ID,* 表示由Redis自动生成
|
|||
|
Values: []interface{}{ //消息队列的内容,键值对形式
|
|||
|
key, value,
|
|||
|
},
|
|||
|
// MinID: "id",//超过设置长度值,丢弃小于MinID消息id
|
|||
|
// Limit: 1000, //限制长度,基本不用
|
|||
|
}).Err()
|
|||
|
return
|
|||
|
}
|
|||
|
|
|||
|
func ReadOne(stream string, group string, consumer string) (res map[string]interface{}, err error) {
|
|||
|
rdb := redis.GetRedis(redis.SingletonMain)
|
|||
|
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
|||
|
defer cancel()
|
|||
|
|
|||
|
// XReadGroup,读取消费者中消息
|
|||
|
readgroupval, err := rdb.XReadGroup(ctx, &goredis.XReadGroupArgs{
|
|||
|
// Streams第二个参数为ID,list of streams and ids, e.g. stream1 stream2 id1 id2
|
|||
|
// id为 >,表示最新未读消息ID,也是未被分配给其他消费者的最新消息
|
|||
|
// id为 0 或其他,表示可以获取已读但未确认的消息。这种情况下BLOCK和NOACK都会忽略
|
|||
|
// id为具体ID,表示获取这个消费者组的pending的历史消息,而不是新消息
|
|||
|
Streams: []string{stream, ">"},
|
|||
|
Group: group, //消费者组名
|
|||
|
Consumer: consumer, // 消费者名
|
|||
|
Count: 1,
|
|||
|
Block: 0,
|
|||
|
NoAck: true, // true-表示读取消息时确认消息
|
|||
|
}).Result()
|
|||
|
if err != nil {
|
|||
|
return nil, err
|
|||
|
}
|
|||
|
|
|||
|
return readgroupval[0].Messages[0].Values, err
|
|||
|
}
|