voucher/internal/pkg/rdsmq/manager.go

48 lines
889 B
Go

package rdsmq
import (
"context"
"fmt"
"sync"
)
// ConsumerManager 消费者管理器
type ConsumerManager struct {
ConsumerConfigs []*ConsumeConfig
}
func NewConsumerManager() *ConsumerManager {
return &ConsumerManager{}
}
func (r *ConsumerManager) Add(config *ConsumeConfig) {
r.ConsumerConfigs = append(r.ConsumerConfigs, config)
}
func (r *ConsumerManager) Start(ctx context.Context) {
var wg sync.WaitGroup
for _, c := range r.ConsumerConfigs {
if _, err := c.Rdb.Ping(ctx).Result(); err != nil {
panic(fmt.Sprintf("Redis connection failed: %v", err))
}
wg.Add(1)
go func(co *ConsumeConfig) {
defer func() {
wg.Done()
if err := recover(); err != nil {
fmt.Printf("panic", err)
}
}()
co.Start(ctx)
}(c)
}
wg.Wait()
}
func (r *ConsumerManager) Stop(ctx context.Context) {
for _, c := range r.ConsumerConfigs {
c.Stop(ctx)
}
}