diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index 3449f01..5df2ad0 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -1,6 +1,7 @@ package tcppool import ( + "errors" "fmt" "net" "quenue/app/utils" @@ -44,7 +45,15 @@ func (t *TcpHelper) reconnect(port string) { } func (t *TcpHelper) SendMsg(msg []byte) error { msg = append(msg, '\n') + t = t.Init(config.GetConf().OrderPort) + if t.client == nil { + return errors.New("客户端连接失败") + } + var start = time.Now().Unix() _, err := t.Init(config.GetConf().OrderPort).client.Write(msg) + var end = time.Now().Unix() + + fmt.Println(end-start, "秒") var buffer [1]byte // 持续读取数据 n, err := t.client.Read(buffer[:]) diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index 45563b5..a0a620b 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -74,11 +74,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { fmt.Printf("Ignored event: %s\n", ev) } return nil - return nil }) if err != nil { utils.Log(nil, "Failed to subscribe:", err) } + fmt.Println(config.GetConf().Num) for i := 0; i < config.GetConf().Num; i++ { go func() { for { @@ -90,7 +90,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { } else { utils.Log(nil, "对列放开") } - msg, err := consumer.ReadMessage(1 * time.Hour) + msg, err := consumer.ReadMessage(1 * time.Second) if err == nil { utils.Log(nil, "offset", msg.TopicPartition.Offset)