mq/app/http/tcppool/pool.go

107 lines
2.0 KiB
Go

package tcppool
import (
"bufio"
"github.com/nange/easypool"
_ "github.com/nange/easypool"
"io"
"net"
"quenue/app/utils"
"quenue/config"
"sync"
"sync/atomic"
"time"
)
var lock sync.Once
var TcpPoolFactory = &TcpPool{Full: new(int32)}
type TcpPool struct {
client easypool.Pool
Full *int32
lastTime int64
isDie bool
}
func (t *TcpPool) initPool(port string) *TcpPool {
lock.Do(func() {
factory := func() (net.Conn, error) {
var conn, err = net.Dial("tcp", "192.168.110.50:"+port)
return conn, err
}
config := &easypool.PoolConfig{
InitialCap: 1,
MaxCap: 2,
MaxIdle: 1,
Idletime: 30 * time.Second,
MaxLifetime: 10 * time.Minute,
Factory: factory,
}
pool, err := easypool.NewHeapPool(config)
if err == nil {
t.isDie = false
t.client = pool
} else {
utils.Log(nil, "tcp err", err)
time.Sleep(3 * time.Second)
}
})
return t
}
func (t *TcpPool) handRead(conn net.Conn) {
//defer func() {
// if err := recover(); err != nil {
// utils.Log(nil, "tcp read err", err)
// }
//}()
for {
if time.Now().Unix()-t.lastTime > 10 {
//t.isDie = true
//t.client.Close()
//t.client = nil
//return
}
reader := bufio.NewReader(conn)
var buffer [256]byte
// 持续读取数据
n, err := reader.Read(buffer[:])
if err == io.EOF {
continue
}
if err != nil {
t.isDie = true
if t.client != nil {
t.client.Close()
t.client = nil
}
return
utils.Log(nil, "Error reading data:", err)
}
recvStr := string(buffer[:n])
if recvStr == "1" {
atomic.AddInt32(t.Full, 1)
} else if recvStr == "ping" {
conn.Write([]byte("pong"))
t.lastTime = time.Now().Unix()
}
}
}
func (t *TcpPool) watch(conn net.Conn) {
go t.handRead(conn)
}
func (t *TcpPool) SendMsg(msg []byte) error {
conn, err := t.initPool(config.GetConf().OrderPort).client.Get()
if err != nil {
utils.Log(nil, "get tcp err", err)
return err
}
//var data, _ = json.Marshal(msg)
t.watch(conn)
_, err = conn.Write(msg)
conn.Close()
return err
}