优化短线后cpu飙升问题

This commit is contained in:
qiyunfanbo126.com 2025-01-22 09:34:43 +08:00
parent 4e7b508212
commit 8adc24ef98
4 changed files with 141 additions and 21 deletions

View File

@ -1,11 +1,13 @@
package tcppool package tcppool
import ( import (
"bufio"
"errors" "errors"
"fmt" "fmt"
"net" "net"
"quenue/app/utils" "quenue/app/utils"
"quenue/config" "quenue/config"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -15,6 +17,7 @@ var (
full int32 = 0 full int32 = 0
TcpFactory = TcpHelper{Full: &full} TcpFactory = TcpHelper{Full: &full}
lockSingle sync.Once lockSingle sync.Once
OrderMap = sync.Map{}
) )
type TcpHelper struct { type TcpHelper struct {
@ -65,21 +68,38 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
_, err := clinet.Write(msg) _, err := clinet.Write(msg)
var end = time.Now().Unix() var end = time.Now().Unix()
fmt.Println(end-start, "秒") fmt.Println(end-start, "秒")
var buffer [1]byte buf := make([]byte, 1024)
// 持续读取数据 reader := bufio.NewReader(clinet)
n, err := t.client.Read(buffer[:]) line, err := reader.ReadString('\n')
if err != nil {
fmt.Println("Error reading from connection:", err)
return err
}
// 将读取到的字符串转换为字节切片
buf = []byte(line)
if err == nil { if err == nil {
if n > 0 { if len(buf) > 0 {
recvStr := string(buffer[:n]) recvStr := string(buf)
fmt.Println("结果recvStr:", recvStr) recvStr = strings.Replace(recvStr, "\n", "", 1)
if recvStr == "1" { if len(recvStr) > 2 {
fmt.Println("客户端繁忙") var orderNo = recvStr
atomic.StoreInt32(t.Full, 1) if _, ok := OrderMap.Load(orderNo); !ok {
} else if recvStr == "2" { OrderMap.Store(orderNo, "1")
fmt.Println("客户端空闲") return nil
atomic.StoreInt32(t.Full, 0) }
} else {
fmt.Println("结果recvStr:", recvStr)
if recvStr == "5" {
fmt.Println("客户端繁忙")
atomic.StoreInt32(t.Full, 1)
} else if recvStr == "2" {
fmt.Println("客户端空闲")
atomic.StoreInt32(t.Full, 0)
}
} }
} }
return errors.New("无效ack")
} }
return err return err
} }
@ -95,7 +115,7 @@ func (t *TcpHelper) watch(conn net.Conn) {
}() }()
for { for {
conn.SetWriteDeadline(time.Now().Add(time.Second * 5)) conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
_, err := conn.Write([]byte("1")) _, err := conn.Write([]byte("1\n"))
if err != nil { if err != nil {
utils.Log(nil, "连接关闭", err) utils.Log(nil, "连接关闭", err)
@ -104,8 +124,9 @@ func (t *TcpHelper) watch(conn net.Conn) {
t.reconnect(config.GetConf().OrderPort) t.reconnect(config.GetConf().OrderPort)
return return
} else { } else {
var buffer [1]byte var buffer = make([]byte, 1)
// 持续读取数据 // 持续读取数据
t.client.SetReadDeadline(time.Now().Add(time.Second * 5))
n, err := t.client.Read(buffer[:]) n, err := t.client.Read(buffer[:])
if err == nil && n > 0 { if err == nil && n > 0 {
recvStr := string(buffer[:n]) recvStr := string(buffer[:n])

View File

@ -4,10 +4,12 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/confluentinc/confluent-kafka-go/kafka" "github.com/confluentinc/confluent-kafka-go/kafka"
"math/rand/v2"
"quenue/app/http/entities" "quenue/app/http/entities"
"quenue/app/http/tcppool" "quenue/app/http/tcppool"
"quenue/app/utils" "quenue/app/utils"
config "quenue/config" config "quenue/config"
"strconv"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@ -51,6 +53,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
"max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了 "max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了
"enable.auto.commit": false, "enable.auto.commit": false,
} }
defer func() {
if err := recover(); err != nil {
fmt.Println("消费中断", err)
}
}()
utils.Log(nil, "kafka config", kfconfig) utils.Log(nil, "kafka config", kfconfig)
var start = time.Now() var start = time.Now()
var end time.Time var end time.Time
@ -83,8 +90,8 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
go func() { go func() {
for { for {
if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 { if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 {
utils.Log(nil, "对列阻塞") //utils.Log(nil, "对列阻塞")
time.Sleep(50 * time.Millisecond) time.Sleep(100 * time.Millisecond)
continue continue
} else { } else {
utils.Log(nil, "对列放开") utils.Log(nil, "对列放开")
@ -108,7 +115,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
} }
err = json.Unmarshal(msg.Value, &mqsg.Body) err = json.Unmarshal(msg.Value, &mqsg.Body)
} else { } else {
mqsg.Key = "100000001" mqsg.Key = time.Now().Format("20060102150405") + strconv.Itoa(int(rand.Int32N(1000000)))
mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"} mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"}
mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"} mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"}
} }
@ -122,10 +129,10 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
if err == nil { if err == nil {
//手动提交编译量 //手动提交编译量
kk.commitOffset(consumer, msg.TopicPartition) kk.commitOffset(consumer, msg.TopicPartition)
if msg.TopicPartition.Offset == 104939 { tcppool.OrderMap.Delete(string(msg.Key))
end = time.Now() end = time.Now()
utils.Log(nil, "消费耗时", end.Sub(start)) utils.Log(nil, "消费耗时", end.Sub(start))
}
} }
} else { } else {
utils.Log(nil, "Error while consuming: %v\n", err) utils.Log(nil, "Error while consuming: %v\n", err)

88
app/utils/tcp.go Normal file
View File

@ -0,0 +1,88 @@
package utils
import (
"bufio"
"encoding/json"
"fmt"
"net"
"sync/atomic"
"time"
)
var (
workNum int32 = 0
workSingleNum = &workNum
maxNum = 2000
handNum int32 = 0
handSingleNum = &handNum
busy int32 = 0
busySingle = &busy
)
func handLogacal(conn net.Conn, data []byte) {
conn.Write([]byte("2\n"))
time.Sleep(1 * time.Second)
atomic.AddInt32(workSingleNum, -1)
atomic.AddInt32(handSingleNum, 1)
atomic.StoreInt32(busySingle, 0)
fmt.Println(*handSingleNum, "处理条数")
fmt.Println(string(data), "消息")
}
func handleConnection(conn net.Conn) {
for {
buf := make([]byte, 1024)
reader := bufio.NewReader(conn)
line, err := reader.ReadString('\n')
if err != nil {
fmt.Println("Error reading from connection:", err)
return
}
// 将读取到的字符串转换为字节切片
buf = []byte(line)
if err == nil && len(buf) > 1 {
if atomic.LoadInt32(workSingleNum) >= int32(maxNum) {
if atomic.LoadInt32(busySingle) == 0 {
fmt.Println("繁忙")
atomic.StoreInt32(busySingle, 1)
conn.Write([]byte("5\n"))
continue
}
} else {
if len(buf) > 2 {
var data map[string]interface{}
json.Unmarshal(buf, &data)
fmt.Println("收到", *workSingleNum, maxNum, string(buf))
conn.Write([]byte(data["serial_number"].(string) + "\n"))
go handLogacal(conn, buf)
atomic.AddInt32(workSingleNum, 1)
} else {
conn.Write([]byte("2\n"))
}
}
}
}
}
func StartTcpServer() {
listener, err := net.Listen("tcp", ":8080")
if err != nil {
fmt.Println("Error starting server:", err)
return
}
defer listener.Close()
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println("Error accepting connection:", err)
continue
}
go handleConnection(conn)
}
}

View File

@ -12,6 +12,7 @@ import (
"quenue/app/handlers/mq" "quenue/app/handlers/mq"
"quenue/app/http/routes" "quenue/app/http/routes"
"quenue/app/jobs" "quenue/app/jobs"
"quenue/app/utils"
"quenue/bootstrap" "quenue/bootstrap"
"quenue/config" "quenue/config"
_ "quenue/docs" _ "quenue/docs"
@ -96,6 +97,7 @@ func startServer(opts *config.Options) (err error) {
if err != nil { if err != nil {
return return
} }
//err = tcppool.TcpFactory.Init("9502").SendMsg([]byte("123"))
//引导程序 //引导程序
err = bootstrap.Bootstrap(conf) err = bootstrap.Bootstrap(conf)
@ -116,6 +118,8 @@ func startServer(opts *config.Options) (err error) {
err = server.ExecuteCommand(opts.Command, console.RegisterCommand) err = server.ExecuteCommand(opts.Command, console.RegisterCommand)
case "mq": case "mq":
mq.StartQunueServer() //消费消息 mq.StartQunueServer() //消费消息
case "tcp":
utils.StartTcpServer()
default: default:
err = errors.New("no server start") err = errors.New("no server start")
} }