package pkg import ( "ai_scheduler/internal/config" "ai_scheduler/internal/entitys" "errors" "sync" ) type SafeChannelPool struct { pool chan chan entitys.ResponseData // 存储空闲 channel 的队列 bufSize int // channel 缓冲大小 mu sync.Mutex closed bool } func NewSafeChannelPool(c *config.Config) (*SafeChannelPool, func()) { pool := &SafeChannelPool{ pool: make(chan chan entitys.ResponseData, c.Sys.ChannelPoolLen), bufSize: c.Sys.ChannelPoolSize, } cleanup := pool.Close return pool, cleanup } // 从池中获取 channel(若无空闲则创建新 channel) func (p *SafeChannelPool) Get() (chan entitys.ResponseData, error) { p.mu.Lock() defer p.mu.Unlock() if p.closed { return nil, errors.New("pool is closed") } select { case ch := <-p.pool: // 从池中取 return ch, nil default: // 池为空,创建新 channel return make(chan entitys.ResponseData, p.bufSize), nil } } // 将 channel 放回池中(必须确保 channel 已清空!) func (p *SafeChannelPool) Put(ch chan entitys.ResponseData) error { p.mu.Lock() defer p.mu.Unlock() if p.closed { return errors.New("pool is closed") } // 清空 channel(防止复用时读取旧数据) go func() { for range ch { // 丢弃所有数据(或根据业务需求处理) } }() select { case p.pool <- ch: // 尝试放回池中 default: // 池已满,直接关闭 channel(避免泄漏) close(ch) } return nil } // 关闭池(释放所有资源) func (p *SafeChannelPool) Close() { p.mu.Lock() defer p.mu.Unlock() p.closed = true close(p.pool) // 关闭池队列 // 需额外逻辑关闭所有内部 channel(此处简化) }