157 lines
3.8 KiB
Go
157 lines
3.8 KiB
Go
package rdsmq
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/redis/go-redis/v9"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type ConsumeConfig struct {
|
|
// Redis
|
|
Rdb *redis.Client
|
|
// 队列名称
|
|
QueueName string
|
|
// 限制启用的协程数量
|
|
NumWorkers uint32
|
|
// 处理完成后等待时间
|
|
WaitTime time.Duration
|
|
// 重试次数
|
|
RetryNum uint32
|
|
// 消费函数
|
|
Fn func(context.Context, string) error
|
|
// 日志
|
|
Logger Logger
|
|
// 协程控制
|
|
numWorkersChan chan struct{}
|
|
}
|
|
|
|
func (r *ConsumeConfig) init(_ context.Context) {
|
|
if r.RetryNum > 5 {
|
|
panic("RetryNum must be less than 5")
|
|
}
|
|
|
|
if r.NumWorkers == 0 {
|
|
r.NumWorkers = 10
|
|
}
|
|
|
|
r.numWorkersChan = make(chan struct{}, r.NumWorkers)
|
|
}
|
|
|
|
func (r *ConsumeConfig) Start(ctx context.Context) {
|
|
fmt.Printf("RdsMQ Starting to dequeue from [%s]", r.QueueName)
|
|
|
|
r.init(ctx)
|
|
defer r.close(ctx)
|
|
|
|
var wg sync.WaitGroup
|
|
for {
|
|
// 使用 BLPop 获取消息
|
|
results, err := r.Rdb.BLPop(ctx, 0, r.QueueName).Result()
|
|
if err != nil {
|
|
r.Logger.Errorf("BLPop on %s Failed to get message: %v", r.QueueName, err)
|
|
time.Sleep(time.Second * 10) // 等待一段时间
|
|
continue
|
|
}
|
|
|
|
// 处理消息
|
|
if len(results) < 2 {
|
|
panic(fmt.Sprintf("Invalid result length from BLPop on [%s]: %v", r.QueueName, results))
|
|
}
|
|
|
|
key := results[0] // 队列名称
|
|
value := results[1] // 消息内容
|
|
|
|
wg.Add(1)
|
|
go func(key, value string) {
|
|
|
|
defer func() {
|
|
wg.Done()
|
|
if err := recover(); err != nil {
|
|
r.Logger.Errorf("rds panic [%v]", err)
|
|
}
|
|
}()
|
|
|
|
r.numWorkersChan <- struct{}{} // 获取信号量
|
|
defer func() { <-r.numWorkersChan }() // 释放信号量
|
|
|
|
// 处理消息的业务逻辑
|
|
r.consumer(ctx, key, value)
|
|
}(key, value)
|
|
|
|
if r.WaitTime > 0 {
|
|
// 可选:合理设置等待时间
|
|
time.Sleep(r.WaitTime) // 等待一段时间
|
|
}
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (r *ConsumeConfig) consumer(ctx context.Context, queueName string, value string) {
|
|
if err := r.Fn(ctx, value); err == nil {
|
|
r.Logger.Errorf("BLPop on %s Failed to process message [%s] after retry %d times, err[%v]\n", queueName, value, r.RetryNum, err)
|
|
return
|
|
}
|
|
|
|
if r.RetryNum > 0 {
|
|
r.retry(ctx, queueName, value)
|
|
}
|
|
}
|
|
|
|
func (r *ConsumeConfig) retry(ctx context.Context, queueName string, value string) {
|
|
retryNum := uint32(0)
|
|
retryInterval := 5 * time.Second // 初始重试间隔
|
|
|
|
// 创建一个定时器,定期尝试处理消息
|
|
lockTicker := time.NewTicker(retryInterval)
|
|
defer lockTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-lockTicker.C:
|
|
if retryNum < r.RetryNum {
|
|
|
|
err := r.Fn(ctx, value)
|
|
if err == nil {
|
|
return // 成功处理消息,退出
|
|
}
|
|
|
|
// 错误分类处理
|
|
if isRecoverableError(err) {
|
|
r.Logger.Errorf("BLPop on %s Failed to process message [%s] after retry %d times, err[%v]\n", queueName, value, retryNum, err)
|
|
retryNum++
|
|
// 动态调整重试间隔
|
|
retryInterval = time.Duration(float64(retryInterval) * 1.5) // 每次增加 50%
|
|
lockTicker.Reset(retryInterval) // 重置定时器
|
|
} else {
|
|
r.Logger.Errorf("BLPop on %s Non-recoverable error for message [%s]: %v\n", queueName, value, err)
|
|
return // 立即退出
|
|
}
|
|
} else {
|
|
r.Logger.Warnf("BLPop on %s Max retries reached for message [%s]\n", queueName, value)
|
|
return // 达到最大重试次数,退出
|
|
}
|
|
case <-ctx.Done():
|
|
r.Logger.Warnf("BLPop on %s push consumer close tick. [%s]\n", queueName, value)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// 示例函数,判断错误是否可恢复
|
|
func isRecoverableError(_ error) bool {
|
|
// 根据具体情况实现错误分类逻辑
|
|
return true // 假设所有错误都可恢复,实际情况中应进行详细判断
|
|
}
|
|
|
|
func (r *ConsumeConfig) close(_ context.Context) {
|
|
if r.numWorkersChan != nil {
|
|
close(r.numWorkersChan)
|
|
}
|
|
}
|
|
|
|
func (r *ConsumeConfig) Stop(ctx context.Context) {
|
|
r.close(ctx)
|
|
}
|