ai_scheduler/internal/pkg/channel_pool.go

77 lines
1.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package pkg
import (
"ai_scheduler/internal/config"
"ai_scheduler/internal/entitys"
"errors"
"sync"
)
type SafeChannelPool struct {
pool chan chan entitys.ResponseData // 存储空闲 channel 的队列
bufSize int // channel 缓冲大小
mu sync.Mutex
closed bool
}
func NewSafeChannelPool(c *config.Config) (*SafeChannelPool, func()) {
pool := &SafeChannelPool{
pool: make(chan chan entitys.ResponseData, c.Sys.ChannelPoolLen),
bufSize: c.Sys.ChannelPoolSize,
}
cleanup := pool.Close
return pool, cleanup
}
// 从池中获取 channel若无空闲则创建新 channel
func (p *SafeChannelPool) Get() (chan entitys.ResponseData, error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil, errors.New("pool is closed")
}
select {
case ch := <-p.pool: // 从池中取
return ch, nil
default: // 池为空,创建新 channel
return make(chan entitys.ResponseData, p.bufSize), nil
}
}
// 将 channel 放回池中(必须确保 channel 已清空!)
func (p *SafeChannelPool) Put(ch chan entitys.ResponseData) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return errors.New("pool is closed")
}
// 清空 channel防止复用时读取旧数据
go func() {
for range ch {
// 丢弃所有数据(或根据业务需求处理)
}
}()
select {
case p.pool <- ch: // 尝试放回池中
default: // 池已满,直接关闭 channel避免泄漏
close(ch)
}
return nil
}
// 关闭池(释放所有资源)
func (p *SafeChannelPool) Close() {
p.mu.Lock()
defer p.mu.Unlock()
p.closed = true
close(p.pool) // 关闭池队列
// 需额外逻辑关闭所有内部 channel此处简化
}