76 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			76 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Go
		
	
	
	
package pkg
 | 
						||
 | 
						||
import (
 | 
						||
	"ai_scheduler/internal/config"
 | 
						||
	"ai_scheduler/internal/entitys"
 | 
						||
	"sync"
 | 
						||
)
 | 
						||
 | 
						||
type SafeChannelPool struct {
 | 
						||
	pool    chan chan entitys.ResponseData // 存储空闲 channel 的队列
 | 
						||
	bufSize int                            // channel 缓冲大小
 | 
						||
	mu      sync.Mutex
 | 
						||
	closed  bool
 | 
						||
}
 | 
						||
 | 
						||
func NewSafeChannelPool(c *config.Config) (*SafeChannelPool, func()) {
 | 
						||
	pool := &SafeChannelPool{
 | 
						||
		pool:    make(chan chan entitys.ResponseData, c.Sys.ChannelPoolLen),
 | 
						||
		bufSize: c.Sys.ChannelPoolSize,
 | 
						||
	}
 | 
						||
 | 
						||
	cleanup := pool.Close
 | 
						||
	return pool, cleanup
 | 
						||
}
 | 
						||
 | 
						||
// 从池中获取 channel(若无空闲则创建新 channel)
 | 
						||
func (p *SafeChannelPool) Get() chan entitys.ResponseData {
 | 
						||
	p.mu.Lock()
 | 
						||
	defer p.mu.Unlock()
 | 
						||
 | 
						||
	if p.closed {
 | 
						||
		return make(chan entitys.ResponseData, p.bufSize)
 | 
						||
	}
 | 
						||
 | 
						||
	select {
 | 
						||
	case ch := <-p.pool: // 从池中取
 | 
						||
		return ch
 | 
						||
	default: // 池为空,创建新 channel
 | 
						||
		return make(chan entitys.ResponseData, p.bufSize)
 | 
						||
	}
 | 
						||
}
 | 
						||
 | 
						||
// 将 channel 放回池中(必须确保 channel 已清空!)
 | 
						||
func (p *SafeChannelPool) Put(ch chan entitys.ResponseData) {
 | 
						||
	p.mu.Lock()
 | 
						||
	defer p.mu.Unlock()
 | 
						||
 | 
						||
	if p.closed {
 | 
						||
		return
 | 
						||
	}
 | 
						||
 | 
						||
	// 清空 channel(防止复用时读取旧数据)
 | 
						||
	go func() {
 | 
						||
		for range ch {
 | 
						||
			// 丢弃所有数据(或根据业务需求处理)
 | 
						||
		}
 | 
						||
	}()
 | 
						||
 | 
						||
	select {
 | 
						||
	case p.pool <- ch: // 尝试放回池中
 | 
						||
	default: // 池已满,直接关闭 channel(避免泄漏)
 | 
						||
		close(ch)
 | 
						||
	}
 | 
						||
	return
 | 
						||
}
 | 
						||
 | 
						||
// 关闭池(释放所有资源)
 | 
						||
func (p *SafeChannelPool) Close() {
 | 
						||
	p.mu.Lock()
 | 
						||
	defer p.mu.Unlock()
 | 
						||
 | 
						||
	p.closed = true
 | 
						||
	close(p.pool) // 关闭池队列
 | 
						||
	// 需额外逻辑关闭所有内部 channel(此处简化)
 | 
						||
}
 |