package rdsmq import ( "context" "fmt" "sync" ) // ConsumerManager 消费者管理器 type ConsumerManager struct { ConsumerConfigs []*ConsumeConfig } func NewConsumerManager() *ConsumerManager { return &ConsumerManager{} } func (r *ConsumerManager) Add(config *ConsumeConfig) { r.ConsumerConfigs = append(r.ConsumerConfigs, config) } func (r *ConsumerManager) Start(ctx context.Context) { var wg sync.WaitGroup for _, c := range r.ConsumerConfigs { if _, err := c.Rdb.Ping(ctx).Result(); err != nil { panic(fmt.Sprintf("Redis connection failed: %v", err)) } wg.Add(1) go func(co *ConsumeConfig) { defer func() { wg.Done() if err := recover(); err != nil { fmt.Printf("panic", err) } }() co.Start(ctx) }(c) } wg.Wait() } func (r *ConsumerManager) Stop(ctx context.Context) { for _, c := range r.ConsumerConfigs { c.Stop(ctx) } }