ai_scheduler/internal/pkg/channel_pool.go

76 lines
1.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package pkg
import (
"ai_scheduler/internal/config"
"errors"
"sync"
)
type SafeChannelPool struct {
pool chan chan []byte // 存储空闲 channel 的队列
bufSize int // channel 缓冲大小
mu sync.Mutex
closed bool
}
func NewSafeChannelPool(c *config.Config) (*SafeChannelPool, func()) {
pool := &SafeChannelPool{
pool: make(chan chan []byte, c.Sys.ChannelPoolLen),
bufSize: c.Sys.ChannelPoolSize,
}
cleanup := pool.Close
return pool, cleanup
}
// 从池中获取 channel若无空闲则创建新 channel
func (p *SafeChannelPool) Get() (chan []byte, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil, errors.New("pool is closed")
}
select {
case ch := <-p.pool: // 从池中取
return ch, nil
default: // 池为空,创建新 channel
return make(chan []byte, p.bufSize), nil
}
}
// 将 channel 放回池中(必须确保 channel 已清空!)
func (p *SafeChannelPool) Put(ch chan []byte) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return errors.New("pool is closed")
}
// 清空 channel防止复用时读取旧数据
go func() {
for range ch {
// 丢弃所有数据(或根据业务需求处理)
}
}()
select {
case p.pool <- ch: // 尝试放回池中
default: // 池已满,直接关闭 channel避免泄漏
close(ch)
}
return nil
}
// 关闭池(释放所有资源)
func (p *SafeChannelPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
p.closed = true
close(p.pool) // 关闭池队列
// 需额外逻辑关闭所有内部 channel此处简化
}