package rdsmq import ( "context" "fmt" "github.com/redis/go-redis/v9" "sync" "time" ) type ConsumeConfig struct { // Redis Rdb *redis.Client // 队列名称 QueueName string // 限制启用的协程数量 NumWorkers uint32 // 处理完成后等待时间 WaitTime time.Duration // 重试次数 RetryNum uint32 // 消费函数 Fn func(context.Context, string) error // 日志 Logger Logger // 协程控制 numWorkersChan chan struct{} } func (r *ConsumeConfig) init(_ context.Context) { if r.RetryNum > 5 { panic("RetryNum must be less than 5") } if r.NumWorkers == 0 { r.NumWorkers = 10 } r.numWorkersChan = make(chan struct{}, r.NumWorkers) } func (r *ConsumeConfig) Start(ctx context.Context) { fmt.Printf("RdsMQ Starting to dequeue from [%s] \n", r.QueueName) r.init(ctx) defer r.close(ctx) var wg sync.WaitGroup for { // 使用 BLPop 获取消息 results, err := r.Rdb.BLPop(ctx, 0, r.QueueName).Result() if err != nil { r.Logger.Errorf("BLPop on %s Failed to get message: %v", r.QueueName, err) time.Sleep(time.Second * 10) // 等待一段时间 continue } // 处理消息 if len(results) < 2 { panic(fmt.Sprintf("Invalid result length from BLPop on [%s]: %v", r.QueueName, results)) } key := results[0] // 队列名称 value := results[1] // 消息内容 wg.Add(1) go func(key, value string) { defer func() { wg.Done() if err := recover(); err != nil { r.Logger.Errorf("rds panic [%v]", err) } }() r.numWorkersChan <- struct{}{} // 获取信号量 defer func() { <-r.numWorkersChan }() // 释放信号量 // 处理消息的业务逻辑 r.consumer(ctx, key, value) }(key, value) if r.WaitTime > 0 { // 可选:合理设置等待时间 time.Sleep(r.WaitTime) // 等待一段时间 } } wg.Wait() } func (r *ConsumeConfig) consumer(ctx context.Context, queueName string, value string) { err := r.Fn(ctx, value) if err == nil { return } r.Logger.Errorf("BLPop on %s Failed to process message [%s] after retry %d times, err[%v]\n", queueName, value, r.RetryNum, err) if r.RetryNum > 0 { r.retry(ctx, queueName, value) } } func (r *ConsumeConfig) retry(ctx context.Context, queueName string, value string) { retryNum := uint32(0) retryInterval := 5 * time.Second // 初始重试间隔 // 创建一个定时器,定期尝试处理消息 lockTicker := time.NewTicker(retryInterval) defer lockTicker.Stop() for { select { case <-lockTicker.C: if retryNum < r.RetryNum { err := r.Fn(ctx, value) if err == nil { return // 成功处理消息,退出 } // 错误分类处理 if isRecoverableError(err) { r.Logger.Errorf("BLPop on %s Failed to process message [%s] after retry %d times, err[%v]\n", queueName, value, retryNum, err) retryNum++ // 动态调整重试间隔 retryInterval = time.Duration(float64(retryInterval) * 1.5) // 每次增加 50% lockTicker.Reset(retryInterval) // 重置定时器 } else { r.Logger.Errorf("BLPop on %s Non-recoverable error for message [%s]: %v\n", queueName, value, err) return // 立即退出 } } else { r.Logger.Warnf("BLPop on %s Max retries reached for message [%s]\n", queueName, value) return // 达到最大重试次数,退出 } case <-ctx.Done(): r.Logger.Warnf("BLPop on %s push consumer close tick. [%s]\n", queueName, value) return } } } // 示例函数,判断错误是否可恢复 func isRecoverableError(_ error) bool { // 根据具体情况实现错误分类逻辑 return true // 假设所有错误都可恢复,实际情况中应进行详细判断 } func (r *ConsumeConfig) close(_ context.Context) { if r.numWorkersChan != nil { close(r.numWorkersChan) } } func (r *ConsumeConfig) Stop(ctx context.Context) { r.close(ctx) }