From 495cbe025f57017b9090a530b3776e8dbfde172a Mon Sep 17 00:00:00 2001 From: "qiyunfanbo126.com" <815699> Date: Fri, 17 Jan 2025 23:32:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/http/tcppool/single.go | 9 +++++++++ app/utils/mq/kafka_v2.go | 4 ++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index 3449f01..5df2ad0 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -1,6 +1,7 @@ package tcppool import ( + "errors" "fmt" "net" "quenue/app/utils" @@ -44,7 +45,15 @@ func (t *TcpHelper) reconnect(port string) { } func (t *TcpHelper) SendMsg(msg []byte) error { msg = append(msg, '\n') + t = t.Init(config.GetConf().OrderPort) + if t.client == nil { + return errors.New("客户端连接失败") + } + var start = time.Now().Unix() _, err := t.Init(config.GetConf().OrderPort).client.Write(msg) + var end = time.Now().Unix() + + fmt.Println(end-start, "秒") var buffer [1]byte // 持续读取数据 n, err := t.client.Read(buffer[:]) diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index 45563b5..a0a620b 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -74,11 +74,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { fmt.Printf("Ignored event: %s\n", ev) } return nil - return nil }) if err != nil { utils.Log(nil, "Failed to subscribe:", err) } + fmt.Println(config.GetConf().Num) for i := 0; i < config.GetConf().Num; i++ { go func() { for { @@ -90,7 +90,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { } else { utils.Log(nil, "对列放开") } - msg, err := consumer.ReadMessage(1 * time.Hour) + msg, err := consumer.ReadMessage(1 * time.Second) if err == nil { utils.Log(nil, "offset", msg.TopicPartition.Offset)