76 lines
1.6 KiB
Go
76 lines
1.6 KiB
Go
package pkg
|
||
|
||
import (
|
||
"ai_scheduler/internal/config"
|
||
"ai_scheduler/internal/entitys"
|
||
"sync"
|
||
)
|
||
|
||
type SafeChannelPool struct {
|
||
pool chan chan entitys.ResponseData // 存储空闲 channel 的队列
|
||
bufSize int // channel 缓冲大小
|
||
mu sync.Mutex
|
||
closed bool
|
||
}
|
||
|
||
func NewSafeChannelPool(c *config.Config) (*SafeChannelPool, func()) {
|
||
pool := &SafeChannelPool{
|
||
pool: make(chan chan entitys.ResponseData, c.Sys.ChannelPoolLen),
|
||
bufSize: c.Sys.ChannelPoolSize,
|
||
}
|
||
|
||
cleanup := pool.Close
|
||
return pool, cleanup
|
||
}
|
||
|
||
// 从池中获取 channel(若无空闲则创建新 channel)
|
||
func (p *SafeChannelPool) Get() chan entitys.ResponseData {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
if p.closed {
|
||
return make(chan entitys.ResponseData, p.bufSize)
|
||
}
|
||
|
||
select {
|
||
case ch := <-p.pool: // 从池中取
|
||
return ch
|
||
default: // 池为空,创建新 channel
|
||
return make(chan entitys.ResponseData, p.bufSize)
|
||
}
|
||
}
|
||
|
||
// 将 channel 放回池中(必须确保 channel 已清空!)
|
||
func (p *SafeChannelPool) Put(ch chan entitys.ResponseData) {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
if p.closed {
|
||
return
|
||
}
|
||
|
||
// 清空 channel(防止复用时读取旧数据)
|
||
go func() {
|
||
for range ch {
|
||
// 丢弃所有数据(或根据业务需求处理)
|
||
}
|
||
}()
|
||
|
||
select {
|
||
case p.pool <- ch: // 尝试放回池中
|
||
default: // 池已满,直接关闭 channel(避免泄漏)
|
||
close(ch)
|
||
}
|
||
return
|
||
}
|
||
|
||
// 关闭池(释放所有资源)
|
||
func (p *SafeChannelPool) Close() {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
|
||
p.closed = true
|
||
close(p.pool) // 关闭池队列
|
||
// 需额外逻辑关闭所有内部 channel(此处简化)
|
||
}
|