46 lines
993 B
Go
46 lines
993 B
Go
package rdbdq
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"time"
|
||
|
||
goredis "github.com/go-redis/redis/v8"
|
||
"github.com/qit-team/snow-core/redis"
|
||
)
|
||
|
||
func WriteOne(value string, executeIime int64) (err error) {
|
||
rdb := redis.GetRedis(redis.SingletonMain)
|
||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||
defer cancel()
|
||
|
||
err = rdb.ZAdd(ctx, "dealy_queue", &goredis.Z{
|
||
Score: float64(executeIime),
|
||
Member: value,
|
||
}).Err()
|
||
return
|
||
}
|
||
|
||
func ReadLatestOne() (res string, err error) {
|
||
rdb := redis.GetRedis(redis.SingletonMain)
|
||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||
defer cancel()
|
||
|
||
// XReadGroup,读取消费者中消息
|
||
tasks, err := rdb.ZRangeByScore(ctx, "dealy_queue", &goredis.ZRangeBy{
|
||
Min: "-inf",
|
||
Max: fmt.Sprintf("%d", time.Now().Unix()),
|
||
}).Result()
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
if len(tasks) > 0 {
|
||
res := tasks[0]
|
||
err = rdb.ZRem(ctx, "dealy_queue", res).Err()
|
||
return res, nil
|
||
}
|
||
|
||
return "", nil
|
||
}
|