增加连接修复

This commit is contained in:
qiyunfanbo126.com 2025-01-16 17:58:56 +08:00
parent 141682065f
commit 7dc125cc22
4 changed files with 32 additions and 49 deletions

View File

@ -13,8 +13,11 @@ import (
"time" "time"
) )
var lock sync.Once var (
var TcpPoolFactory = &TcpPool{Full: new(int32)} fullNum int32 = 1
lock sync.Once
TcpPoolFactory = &TcpPool{Full: &full}
)
type TcpPool struct { type TcpPool struct {
client easypool.Pool client easypool.Pool

View File

@ -1,9 +1,7 @@
package tcppool package tcppool
import ( import (
"bufio"
"fmt" "fmt"
"io"
"net" "net"
"quenue/app/utils" "quenue/app/utils"
"quenue/config" "quenue/config"
@ -29,55 +27,22 @@ func (t *TcpHelper) Init(port string) *TcpHelper {
var conn, err = net.Dial("tcp", "192.168.110.50:"+port) var conn, err = net.Dial("tcp", "192.168.110.50:"+port)
if err == nil { if err == nil {
t.client = conn t.client = conn
//t.watch(t.client) t.watch(t.client)
} }
}) })
return t return t
} }
func (t *TcpHelper) handRead(conn net.Conn) { func (t *TcpHelper) reconnect(port string) {
//defer func() { var conn, err = net.Dial("tcp", "192.168.110.50:"+port)
// if err := recover(); err != nil { if err == nil {
// utils.Log(nil, "tcp read err", err) t.client = conn
// } t.watch(t.client)
//}()
for {
if time.Now().Unix()-t.lastTime > 10 {
//t.isDie = true
//t.client.Close()
//t.client = nil
//return
}
fmt.Println("read")
reader := bufio.NewReader(conn)
var buffer [256]byte
// 持续读取数据
n, err := reader.Read(buffer[:])
if err == io.EOF {
continue
}
if err != nil {
//if t.client != nil {
// t.client.Close()
// t.client = nil
//}
utils.Log(nil, "Error reading data:", err)
continue
}
recvStr := string(buffer[:n])
if recvStr == "1" {
atomic.AddInt32(t.Full, 1)
} else if recvStr == "ping" {
conn.Write([]byte("pong"))
t.lastTime = time.Now().Unix()
}
} }
select {}
} }
func (t *TcpHelper) SendMsg(msg []byte) error { func (t *TcpHelper) SendMsg(msg []byte) error {
_, err := t.Init(config.GetConf().OrderPort).client.Write(msg) _, err := t.Init(config.GetConf().OrderPort).client.Write(msg)
var buffer [256]byte var buffer [1]byte
// 持续读取数据 // 持续读取数据
n, err := t.client.Read(buffer[:]) n, err := t.client.Read(buffer[:])
if err == nil { if err == nil {
@ -99,5 +64,19 @@ func (t *TcpHelper) Close(conn net.Conn) {
t.client.Close() t.client.Close()
} }
func (t *TcpHelper) watch(conn net.Conn) { func (t *TcpHelper) watch(conn net.Conn) {
go t.handRead(conn) go func() {
fmt.Println("watch")
for {
conn.SetWriteDeadline(time.Now().Add(time.Second * 3))
_, err := conn.Write([]byte("1"))
if err != nil {
utils.Log(nil, "连接关闭", err)
t.client.Close()
t.reconnect(config.GetConf().OrderPort)
return
}
time.Sleep(time.Second)
}
}()
} }

View File

@ -64,9 +64,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
for i := 0; i < config.GetConf().Num; i++ { for i := 0; i < config.GetConf().Num; i++ {
go func() { go func() {
for { for {
if atomic.LoadInt32(tcppool.TcpPoolFactory.Full) == 1 { fmt.Println(atomic.LoadInt32(tcppool.TcpFactory.Full))
utils.Log(nil, "tcp is die") if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 {
time.Sleep(1 * time.Second) utils.Log(nil, "对列阻塞")
time.Sleep(200 * time.Millisecond)
continue
} }
msg, err := consumer.ReadMessage(-1) msg, err := consumer.ReadMessage(-1)
fmt.Println("read msg", ci) fmt.Println("read msg", ci)

View File

@ -76,7 +76,6 @@ func main() {
//解析启动命令 //解析启动命令
initConfig() initConfig()
opts := config.GetOptions() opts := config.GetOptions()
if opts.ShowVersion { if opts.ShowVersion {
fmt.Printf("%s\ncommit %s\nbuilt on %s\n", server.Version, server.BuildCommit, server.BuildDate) fmt.Printf("%s\ncommit %s\nbuilt on %s\n", server.Version, server.BuildCommit, server.BuildDate)
os.Exit(0) os.Exit(0)