优化短线后cpu飙升问题
This commit is contained in:
parent
8adc24ef98
commit
679154720f
|
@ -88,14 +88,17 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fmt.Println("结果:recvStr:", recvStr)
|
if len(recvStr) > 0 {
|
||||||
if recvStr == "5" {
|
fmt.Println("结果:recvStr:", recvStr)
|
||||||
fmt.Println("客户端繁忙")
|
if recvStr == "5" {
|
||||||
atomic.StoreInt32(t.Full, 1)
|
fmt.Println("客户端繁忙")
|
||||||
} else if recvStr == "2" {
|
atomic.StoreInt32(t.Full, 1)
|
||||||
fmt.Println("客户端空闲")
|
} else if recvStr == "2" {
|
||||||
atomic.StoreInt32(t.Full, 0)
|
fmt.Println("客户端空闲")
|
||||||
|
atomic.StoreInt32(t.Full, 0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,11 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
now int32 = 0
|
||||||
|
workNum = &now
|
||||||
|
)
|
||||||
|
|
||||||
type KafkaV2Mq struct {
|
type KafkaV2Mq struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +95,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 {
|
if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 {
|
||||||
//utils.Log(nil, "对列阻塞")
|
utils.Log(nil, "对列阻塞")
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
|
@ -104,7 +109,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
|
||||||
var mqsg = entities.MqMessage{}
|
var mqsg = entities.MqMessage{}
|
||||||
mqsg.Key = string(msg.Key)
|
mqsg.Key = string(msg.Key)
|
||||||
mqsg.Property = make(map[string]interface{})
|
mqsg.Property = make(map[string]interface{})
|
||||||
if false {
|
if len(msg.Value) > 0 {
|
||||||
if msg.Headers != nil {
|
if msg.Headers != nil {
|
||||||
for _, v := range msg.Headers {
|
for _, v := range msg.Headers {
|
||||||
if v.Key == "property" {
|
if v.Key == "property" {
|
||||||
|
@ -129,9 +134,10 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
//手动提交编译量
|
//手动提交编译量
|
||||||
kk.commitOffset(consumer, msg.TopicPartition)
|
kk.commitOffset(consumer, msg.TopicPartition)
|
||||||
|
atomic.AddInt32(workNum, 1)
|
||||||
tcppool.OrderMap.Delete(string(msg.Key))
|
tcppool.OrderMap.Delete(string(msg.Key))
|
||||||
end = time.Now()
|
end = time.Now()
|
||||||
utils.Log(nil, "消费耗时", end.Sub(start))
|
utils.Log(nil, "消费耗时", end.Sub(start), "消息数", *workNum)
|
||||||
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue