From 8adc24ef98e0826ee7e66104d00b9b148b0ca211 Mon Sep 17 00:00:00 2001 From: "qiyunfanbo126.com" <815699> Date: Wed, 22 Jan 2025 09:34:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=9F=AD=E7=BA=BF=E5=90=8Ecp?= =?UTF-8?q?u=E9=A3=99=E5=8D=87=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/http/tcppool/single.go | 49 +++++++++++++++------ app/utils/mq/kafka_v2.go | 21 ++++++--- app/utils/tcp.go | 88 ++++++++++++++++++++++++++++++++++++++ main.go | 4 ++ 4 files changed, 141 insertions(+), 21 deletions(-) create mode 100644 app/utils/tcp.go diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index ebb9326..b41242a 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -1,11 +1,13 @@ package tcppool import ( + "bufio" "errors" "fmt" "net" "quenue/app/utils" "quenue/config" + "strings" "sync" "sync/atomic" "time" @@ -15,6 +17,7 @@ var ( full int32 = 0 TcpFactory = TcpHelper{Full: &full} lockSingle sync.Once + OrderMap = sync.Map{} ) type TcpHelper struct { @@ -65,21 +68,38 @@ func (t *TcpHelper) SendMsg(msg []byte) error { _, err := clinet.Write(msg) var end = time.Now().Unix() fmt.Println(end-start, "秒") - var buffer [1]byte - // 持续读取数据 - n, err := t.client.Read(buffer[:]) + buf := make([]byte, 1024) + reader := bufio.NewReader(clinet) + line, err := reader.ReadString('\n') + if err != nil { + fmt.Println("Error reading from connection:", err) + return err + } + // 将读取到的字符串转换为字节切片 + buf = []byte(line) if err == nil { - if n > 0 { - recvStr := string(buffer[:n]) - fmt.Println("结果:recvStr:", recvStr) - if recvStr == "1" { - fmt.Println("客户端繁忙") - atomic.StoreInt32(t.Full, 1) - } else if recvStr == "2" { - fmt.Println("客户端空闲") - atomic.StoreInt32(t.Full, 0) + if len(buf) > 0 { + recvStr := string(buf) + recvStr = strings.Replace(recvStr, "\n", "", 1) + if len(recvStr) > 2 { + var orderNo = recvStr + if _, ok := OrderMap.Load(orderNo); !ok { + OrderMap.Store(orderNo, "1") + return nil + } + } else { + fmt.Println("结果:recvStr:", recvStr) + if recvStr == "5" { + fmt.Println("客户端繁忙") + atomic.StoreInt32(t.Full, 1) + } else if recvStr == "2" { + fmt.Println("客户端空闲") + atomic.StoreInt32(t.Full, 0) + } } + } + return errors.New("无效ack") } return err } @@ -95,7 +115,7 @@ func (t *TcpHelper) watch(conn net.Conn) { }() for { conn.SetWriteDeadline(time.Now().Add(time.Second * 5)) - _, err := conn.Write([]byte("1")) + _, err := conn.Write([]byte("1\n")) if err != nil { utils.Log(nil, "连接关闭", err) @@ -104,8 +124,9 @@ func (t *TcpHelper) watch(conn net.Conn) { t.reconnect(config.GetConf().OrderPort) return } else { - var buffer [1]byte + var buffer = make([]byte, 1) // 持续读取数据 + t.client.SetReadDeadline(time.Now().Add(time.Second * 5)) n, err := t.client.Read(buffer[:]) if err == nil && n > 0 { recvStr := string(buffer[:n]) diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index f0f9d63..099fe43 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -4,10 +4,12 @@ import ( "encoding/json" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" + "math/rand/v2" "quenue/app/http/entities" "quenue/app/http/tcppool" "quenue/app/utils" config "quenue/config" + "strconv" "sync/atomic" "time" ) @@ -51,6 +53,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { "max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了 "enable.auto.commit": false, } + defer func() { + if err := recover(); err != nil { + fmt.Println("消费中断", err) + } + }() utils.Log(nil, "kafka config", kfconfig) var start = time.Now() var end time.Time @@ -83,8 +90,8 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { go func() { for { if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 { - utils.Log(nil, "对列阻塞") - time.Sleep(50 * time.Millisecond) + //utils.Log(nil, "对列阻塞") + time.Sleep(100 * time.Millisecond) continue } else { utils.Log(nil, "对列放开") @@ -108,7 +115,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { } err = json.Unmarshal(msg.Value, &mqsg.Body) } else { - mqsg.Key = "100000001" + mqsg.Key = time.Now().Format("20060102150405") + strconv.Itoa(int(rand.Int32N(1000000))) mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"} mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"} } @@ -122,10 +129,10 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { if err == nil { //手动提交编译量 kk.commitOffset(consumer, msg.TopicPartition) - if msg.TopicPartition.Offset == 104939 { - end = time.Now() - utils.Log(nil, "消费耗时", end.Sub(start)) - } + tcppool.OrderMap.Delete(string(msg.Key)) + end = time.Now() + utils.Log(nil, "消费耗时", end.Sub(start)) + } } else { utils.Log(nil, "Error while consuming: %v\n", err) diff --git a/app/utils/tcp.go b/app/utils/tcp.go new file mode 100644 index 0000000..4746b80 --- /dev/null +++ b/app/utils/tcp.go @@ -0,0 +1,88 @@ +package utils + +import ( + "bufio" + "encoding/json" + "fmt" + "net" + "sync/atomic" + "time" +) + +var ( + workNum int32 = 0 + workSingleNum = &workNum + maxNum = 2000 + handNum int32 = 0 + handSingleNum = &handNum + busy int32 = 0 + busySingle = &busy +) + +func handLogacal(conn net.Conn, data []byte) { + conn.Write([]byte("2\n")) + time.Sleep(1 * time.Second) + atomic.AddInt32(workSingleNum, -1) + atomic.AddInt32(handSingleNum, 1) + atomic.StoreInt32(busySingle, 0) + fmt.Println(*handSingleNum, "处理条数") + fmt.Println(string(data), "消息") + +} + +func handleConnection(conn net.Conn) { + for { + buf := make([]byte, 1024) + reader := bufio.NewReader(conn) + line, err := reader.ReadString('\n') + if err != nil { + fmt.Println("Error reading from connection:", err) + return + } + // 将读取到的字符串转换为字节切片 + buf = []byte(line) + if err == nil && len(buf) > 1 { + if atomic.LoadInt32(workSingleNum) >= int32(maxNum) { + if atomic.LoadInt32(busySingle) == 0 { + fmt.Println("繁忙") + atomic.StoreInt32(busySingle, 1) + conn.Write([]byte("5\n")) + continue + } + + } else { + if len(buf) > 2 { + var data map[string]interface{} + json.Unmarshal(buf, &data) + fmt.Println("收到", *workSingleNum, maxNum, string(buf)) + conn.Write([]byte(data["serial_number"].(string) + "\n")) + go handLogacal(conn, buf) + atomic.AddInt32(workSingleNum, 1) + } else { + conn.Write([]byte("2\n")) + } + + } + + } + + } +} + +func StartTcpServer() { + listener, err := net.Listen("tcp", ":8080") + if err != nil { + fmt.Println("Error starting server:", err) + return + } + defer listener.Close() + + for { + conn, err := listener.Accept() + if err != nil { + fmt.Println("Error accepting connection:", err) + continue + } + go handleConnection(conn) + } +} diff --git a/main.go b/main.go index 3a416ea..3d2632b 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "quenue/app/handlers/mq" "quenue/app/http/routes" "quenue/app/jobs" + "quenue/app/utils" "quenue/bootstrap" "quenue/config" _ "quenue/docs" @@ -96,6 +97,7 @@ func startServer(opts *config.Options) (err error) { if err != nil { return } + //err = tcppool.TcpFactory.Init("9502").SendMsg([]byte("123")) //引导程序 err = bootstrap.Bootstrap(conf) @@ -116,6 +118,8 @@ func startServer(opts *config.Options) (err error) { err = server.ExecuteCommand(opts.Command, console.RegisterCommand) case "mq": mq.StartQunueServer() //消费消息 + case "tcp": + utils.StartTcpServer() default: err = errors.New("no server start") }