104 lines
3.2 KiB
Go
104 lines
3.2 KiB
Go
package rdbmq
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"time"
|
||
|
||
"com.snow.auto_monitor/config"
|
||
goredis "github.com/go-redis/redis/v8"
|
||
"github.com/qit-team/snow-core/redis"
|
||
)
|
||
|
||
// const Nil = goredis.
|
||
|
||
func init() {
|
||
//加载配置文件
|
||
conf, err := config.Load("./.env")
|
||
if err != nil {
|
||
fmt.Println(err)
|
||
}
|
||
//注册redis类
|
||
err = redis.Pr.Register(redis.SingletonMain, conf.Redis)
|
||
if err != nil {
|
||
fmt.Println(err)
|
||
}
|
||
}
|
||
|
||
func CreateStreamAndGroup(stream string, group string) (err error) {
|
||
rdb := redis.GetRedis(redis.SingletonMain)
|
||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||
defer cancel()
|
||
|
||
// 创建stream和group
|
||
err = rdb.XGroupCreateMkStream(ctx, stream, group, "$").Err()
|
||
|
||
// XGroupCreate,创建一个消费者组
|
||
// err = rdb.XGroupCreate(ctx, stream, group, "$").Err()
|
||
return
|
||
}
|
||
|
||
func WriteOne(stream string, key string, value interface{}) (err error) {
|
||
rdb := redis.GetRedis(redis.SingletonMain)
|
||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||
defer cancel()
|
||
// XADD,添加消息到对尾(这个代码每运行一次就增加一次内容)
|
||
err = rdb.XAdd(ctx, &goredis.XAddArgs{
|
||
Stream: stream, // 设置流stream的 key,消息队列名
|
||
NoMkStream: false, //为false,key不存在会新建
|
||
MaxLen: 10000, //消息队列最大长度,队列长度超过设置最大长度后,旧消息会被删除
|
||
Approx: false, //默认false,设为true时,模糊指定stram的长度
|
||
ID: "*", //消息ID,* 表示由Redis自动生成
|
||
Values: []interface{}{ //消息队列的内容,键值对形式
|
||
key, value,
|
||
},
|
||
// MinID: "id",//超过设置长度值,丢弃小于MinID消息id
|
||
// Limit: 1000, //限制长度,基本不用
|
||
}).Err()
|
||
return
|
||
}
|
||
|
||
func AckOne(stream string, group string, id string) (err error) {
|
||
rdb := redis.GetRedis(redis.SingletonMain)
|
||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||
defer cancel()
|
||
|
||
// XACK,确认消息
|
||
err = rdb.XAck(ctx, stream, group, id).Err()
|
||
if err != nil {
|
||
fmt.Println(err)
|
||
return
|
||
}
|
||
_, err = rdb.XDel(ctx, stream, id).Result()
|
||
if err != nil {
|
||
fmt.Println("删除消息失败:", err)
|
||
}
|
||
return
|
||
}
|
||
|
||
func ReadOne(stream string, group string, consumer string) (id string, res map[string]interface{}, err error) {
|
||
rdb := redis.GetRedis(redis.SingletonMain)
|
||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||
defer cancel()
|
||
|
||
// XReadGroup,读取消费者中消息
|
||
readgroupval, err := rdb.XReadGroup(ctx, &goredis.XReadGroupArgs{
|
||
// Streams第二个参数为ID,list of streams and ids, e.g. stream1 stream2 id1 id2
|
||
// id为 >,表示最新未读消息ID,也是未被分配给其他消费者的最新消息
|
||
// id为 0 或其他,表示可以获取已读但未确认的消息。这种情况下BLOCK和NOACK都会忽略
|
||
// id为具体ID,表示获取这个消费者组的pending的历史消息,而不是新消息
|
||
Streams: []string{stream, ">"},
|
||
Group: group, //消费者组名
|
||
Consumer: consumer, // 消费者名
|
||
Count: 1,
|
||
Block: 0,
|
||
NoAck: true, // true-表示读取消息时确认消息
|
||
}).Result()
|
||
|
||
if err != nil {
|
||
return "", nil, err
|
||
}
|
||
|
||
return readgroupval[0].Messages[0].ID, readgroupval[0].Messages[0].Values, err
|
||
}
|