voucher/internal/pkg/rdsmq/rdsmq.go

157 lines
3.8 KiB
Go

package rdsmq
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"sync"
"time"
)
type ConsumeConfig struct {
// Redis
Rdb *redis.Client
// 队列名称
QueueName string
// 限制启用的协程数量
NumWorkers uint32
// 处理完成后等待时间
WaitTime time.Duration
// 重试次数
RetryNum uint32
// 消费函数
Fn func(context.Context, string) error
// 日志
Logger Logger
// 协程控制
numWorkersChan chan struct{}
}
func (r *ConsumeConfig) init(_ context.Context) {
if r.RetryNum > 5 {
panic("RetryNum must be less than 5")
}
if r.NumWorkers == 0 {
r.NumWorkers = 10
}
r.numWorkersChan = make(chan struct{}, r.NumWorkers)
}
func (r *ConsumeConfig) Start(ctx context.Context) {
fmt.Printf("RdsMQ Starting to dequeue from [%s] \n", r.QueueName)
r.init(ctx)
defer r.close(ctx)
var wg sync.WaitGroup
for {
// 使用 BLPop 获取消息
results, err := r.Rdb.BLPop(ctx, 0, r.QueueName).Result()
if err != nil {
r.Logger.Errorf("BLPop on %s Failed to get message: %v", r.QueueName, err)
time.Sleep(time.Second * 10) // 等待一段时间
continue
}
// 处理消息
if len(results) < 2 {
panic(fmt.Sprintf("Invalid result length from BLPop on [%s]: %v", r.QueueName, results))
}
key := results[0] // 队列名称
value := results[1] // 消息内容
wg.Add(1)
go func(key, value string) {
defer func() {
wg.Done()
if err := recover(); err != nil {
r.Logger.Errorf("rds panic [%v]", err)
}
}()
r.numWorkersChan <- struct{}{} // 获取信号量
defer func() { <-r.numWorkersChan }() // 释放信号量
// 处理消息的业务逻辑
r.consumer(ctx, key, value)
}(key, value)
if r.WaitTime > 0 {
// 可选:合理设置等待时间
time.Sleep(r.WaitTime) // 等待一段时间
}
}
wg.Wait()
}
func (r *ConsumeConfig) consumer(ctx context.Context, queueName string, value string) {
if err := r.Fn(ctx, value); err == nil {
r.Logger.Errorf("BLPop on %s Failed to process message [%s] after retry %d times, err[%v]\n", queueName, value, r.RetryNum, err)
return
}
if r.RetryNum > 0 {
r.retry(ctx, queueName, value)
}
}
func (r *ConsumeConfig) retry(ctx context.Context, queueName string, value string) {
retryNum := uint32(0)
retryInterval := 5 * time.Second // 初始重试间隔
// 创建一个定时器,定期尝试处理消息
lockTicker := time.NewTicker(retryInterval)
defer lockTicker.Stop()
for {
select {
case <-lockTicker.C:
if retryNum < r.RetryNum {
err := r.Fn(ctx, value)
if err == nil {
return // 成功处理消息,退出
}
// 错误分类处理
if isRecoverableError(err) {
r.Logger.Errorf("BLPop on %s Failed to process message [%s] after retry %d times, err[%v]\n", queueName, value, retryNum, err)
retryNum++
// 动态调整重试间隔
retryInterval = time.Duration(float64(retryInterval) * 1.5) // 每次增加 50%
lockTicker.Reset(retryInterval) // 重置定时器
} else {
r.Logger.Errorf("BLPop on %s Non-recoverable error for message [%s]: %v\n", queueName, value, err)
return // 立即退出
}
} else {
r.Logger.Warnf("BLPop on %s Max retries reached for message [%s]\n", queueName, value)
return // 达到最大重试次数,退出
}
case <-ctx.Done():
r.Logger.Warnf("BLPop on %s push consumer close tick. [%s]\n", queueName, value)
return
}
}
}
// 示例函数,判断错误是否可恢复
func isRecoverableError(_ error) bool {
// 根据具体情况实现错误分类逻辑
return true // 假设所有错误都可恢复,实际情况中应进行详细判断
}
func (r *ConsumeConfig) close(_ context.Context) {
if r.numWorkersChan != nil {
close(r.numWorkersChan)
}
}
func (r *ConsumeConfig) Stop(ctx context.Context) {
r.close(ctx)
}