From 7dc125cc2239a32f19e93b1841e90c2e49e48726 Mon Sep 17 00:00:00 2001 From: "qiyunfanbo126.com" <815699> Date: Thu, 16 Jan 2025 17:58:56 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=BF=9E=E6=8E=A5=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/http/tcppool/pool.go | 7 ++-- app/http/tcppool/single.go | 65 +++++++++++++------------------------- app/utils/mq/kafka_v2.go | 8 +++-- main.go | 1 - 4 files changed, 32 insertions(+), 49 deletions(-) diff --git a/app/http/tcppool/pool.go b/app/http/tcppool/pool.go index 0cc010a..2b940b2 100644 --- a/app/http/tcppool/pool.go +++ b/app/http/tcppool/pool.go @@ -13,8 +13,11 @@ import ( "time" ) -var lock sync.Once -var TcpPoolFactory = &TcpPool{Full: new(int32)} +var ( + fullNum int32 = 1 + lock sync.Once + TcpPoolFactory = &TcpPool{Full: &full} +) type TcpPool struct { client easypool.Pool diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index cdd49a7..9c431c1 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -1,9 +1,7 @@ package tcppool import ( - "bufio" "fmt" - "io" "net" "quenue/app/utils" "quenue/config" @@ -29,55 +27,22 @@ func (t *TcpHelper) Init(port string) *TcpHelper { var conn, err = net.Dial("tcp", "192.168.110.50:"+port) if err == nil { t.client = conn - //t.watch(t.client) + t.watch(t.client) } }) return t } -func (t *TcpHelper) handRead(conn net.Conn) { - //defer func() { - // if err := recover(); err != nil { - // utils.Log(nil, "tcp read err", err) - // } - //}() - for { - if time.Now().Unix()-t.lastTime > 10 { - //t.isDie = true - //t.client.Close() - //t.client = nil - //return - } - fmt.Println("read") - reader := bufio.NewReader(conn) - var buffer [256]byte - // 持续读取数据 - n, err := reader.Read(buffer[:]) - if err == io.EOF { - continue - } - if err != nil { - //if t.client != nil { - // t.client.Close() - // t.client = nil - //} - utils.Log(nil, "Error reading data:", err) - continue - - } - recvStr := string(buffer[:n]) - if recvStr == "1" { - atomic.AddInt32(t.Full, 1) - } else if recvStr == "ping" { - conn.Write([]byte("pong")) - t.lastTime = time.Now().Unix() - } +func (t *TcpHelper) reconnect(port string) { + var conn, err = net.Dial("tcp", "192.168.110.50:"+port) + if err == nil { + t.client = conn + t.watch(t.client) } - select {} } func (t *TcpHelper) SendMsg(msg []byte) error { _, err := t.Init(config.GetConf().OrderPort).client.Write(msg) - var buffer [256]byte + var buffer [1]byte // 持续读取数据 n, err := t.client.Read(buffer[:]) if err == nil { @@ -99,5 +64,19 @@ func (t *TcpHelper) Close(conn net.Conn) { t.client.Close() } func (t *TcpHelper) watch(conn net.Conn) { - go t.handRead(conn) + go func() { + fmt.Println("watch") + for { + conn.SetWriteDeadline(time.Now().Add(time.Second * 3)) + _, err := conn.Write([]byte("1")) + + if err != nil { + utils.Log(nil, "连接关闭", err) + t.client.Close() + t.reconnect(config.GetConf().OrderPort) + return + } + time.Sleep(time.Second) + } + }() } diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index 0f04ca4..f2ddf29 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -64,9 +64,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { for i := 0; i < config.GetConf().Num; i++ { go func() { for { - if atomic.LoadInt32(tcppool.TcpPoolFactory.Full) == 1 { - utils.Log(nil, "tcp is die") - time.Sleep(1 * time.Second) + fmt.Println(atomic.LoadInt32(tcppool.TcpFactory.Full)) + if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 { + utils.Log(nil, "对列阻塞") + time.Sleep(200 * time.Millisecond) + continue } msg, err := consumer.ReadMessage(-1) fmt.Println("read msg", ci) diff --git a/main.go b/main.go index 4dabb45..25f66f9 100644 --- a/main.go +++ b/main.go @@ -76,7 +76,6 @@ func main() { //解析启动命令 initConfig() opts := config.GetOptions() - if opts.ShowVersion { fmt.Printf("%s\ncommit %s\nbuilt on %s\n", server.Version, server.BuildCommit, server.BuildDate) os.Exit(0)