From 679154720f57e6089e84b6e02639501ca7b456d3 Mon Sep 17 00:00:00 2001 From: "qiyunfanbo126.com" <815699> Date: Wed, 22 Jan 2025 13:50:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=9F=AD=E7=BA=BF=E5=90=8Ecp?= =?UTF-8?q?u=E9=A3=99=E5=8D=87=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/http/tcppool/single.go | 17 ++++++++++------- app/utils/mq/kafka_v2.go | 12 +++++++++--- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index b41242a..5aeac14 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -88,14 +88,17 @@ func (t *TcpHelper) SendMsg(msg []byte) error { return nil } } else { - fmt.Println("结果:recvStr:", recvStr) - if recvStr == "5" { - fmt.Println("客户端繁忙") - atomic.StoreInt32(t.Full, 1) - } else if recvStr == "2" { - fmt.Println("客户端空闲") - atomic.StoreInt32(t.Full, 0) + if len(recvStr) > 0 { + fmt.Println("结果:recvStr:", recvStr) + if recvStr == "5" { + fmt.Println("客户端繁忙") + atomic.StoreInt32(t.Full, 1) + } else if recvStr == "2" { + fmt.Println("客户端空闲") + atomic.StoreInt32(t.Full, 0) + } } + } } diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index 099fe43..b9bce05 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -14,6 +14,11 @@ import ( "time" ) +var ( + now int32 = 0 + workNum = &now +) + type KafkaV2Mq struct { } @@ -90,7 +95,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { go func() { for { if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 { - //utils.Log(nil, "对列阻塞") + utils.Log(nil, "对列阻塞") time.Sleep(100 * time.Millisecond) continue } else { @@ -104,7 +109,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { var mqsg = entities.MqMessage{} mqsg.Key = string(msg.Key) mqsg.Property = make(map[string]interface{}) - if false { + if len(msg.Value) > 0 { if msg.Headers != nil { for _, v := range msg.Headers { if v.Key == "property" { @@ -129,9 +134,10 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { if err == nil { //手动提交编译量 kk.commitOffset(consumer, msg.TopicPartition) + atomic.AddInt32(workNum, 1) tcppool.OrderMap.Delete(string(msg.Key)) end = time.Now() - utils.Log(nil, "消费耗时", end.Sub(start)) + utils.Log(nil, "消费耗时", end.Sub(start), "消息数", *workNum) } } else {