From bd6bd6f32145ea532aee4289743de9d5aa6c6225 Mon Sep 17 00:00:00 2001 From: "qiyunfanbo126.com" <815699> Date: Sat, 8 Feb 2025 11:07:09 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0dispacher=E5=AF=B9=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/handlers/mq/quenue.go | 4 +- app/http/tcppool/single.go | 132 ++++++++++++++++++++----------------- app/utils/mq/kafka_v2.go | 18 ++--- 3 files changed, 83 insertions(+), 71 deletions(-) diff --git a/app/handlers/mq/quenue.go b/app/handlers/mq/quenue.go index 901e44a..184548b 100644 --- a/app/handlers/mq/quenue.go +++ b/app/handlers/mq/quenue.go @@ -26,8 +26,8 @@ func StartQunueServer() error { if config.GetConf().StartQunue == 1 { for i := 0; i < 1; i++ { fmt.Println("对列" + strconv.Itoa(i)) - startQunue(config.GetConf().Topical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊 - startQunue(config.GetConf().DispatchTopical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊 + startQunue(config.GetConf().Topical, OrderCharge, common.MQ_KFK_V2, 0, "", i) // + //startQunue(config.GetConf().DispatchTopical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //分发 } } select {} diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index 174be28..ba6a88c 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -6,11 +6,11 @@ import ( "encoding/json" "errors" "fmt" + "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/qit-team/snow-core/redis" "net" "quenue/app/utils" "quenue/config" - "strings" "sync" "sync/atomic" "time" @@ -28,6 +28,7 @@ type TcpHelper struct { client net.Conn lastTime int64 Full *int32 + Comsumer *kafka.Consumer } func (t *TcpHelper) Init(port string) *TcpHelper { @@ -37,6 +38,7 @@ func (t *TcpHelper) Init(port string) *TcpHelper { t.client = conn atomic.StoreInt32(t.Full, 0) t.watch(t.client) + t.heart() t.resend() } else { atomic.StoreInt32(t.Full, 1) @@ -53,6 +55,7 @@ func (t *TcpHelper) reconnect(port string) { atomic.StoreInt32(t.Full, 0) t.client = conn t.watch(t.client) + t.heart() } else { //utils.Log(nil, "重连下游") time.Sleep(1 * time.Second) @@ -73,44 +76,6 @@ func (t *TcpHelper) SendMsg(msg []byte) error { _, err := clinet.Write(msg) var end = time.Now().Unix() fmt.Println(end-start, "秒") - buf := make([]byte, 1024) - reader := bufio.NewReader(clinet) - line, err := reader.ReadString('\n') - if err != nil { - fmt.Println("Error reading from connection:", err) - return err - } - fmt.Println("结果:recvStr:", line) - // 将读取到的字符串转换为字节切片 - buf = []byte(line) - if err == nil { - if len(buf) > 0 { - recvStr := string(buf) - recvStr = strings.Replace(recvStr, "\n", "", 1) - if len(recvStr) > 2 { - var orderNo = recvStr - utils.LogFile(nil, "ack", orderNo) - if _, ok := OrderMap.Load(orderNo); !ok { - OrderMap.Store(orderNo, "1") - return nil - } - } else { - if len(recvStr) > 0 { - fmt.Println("结果:recvStr:", recvStr) - if recvStr == "5" { - fmt.Println("客户端繁忙") - atomic.StoreInt32(t.Full, 1) - } else if recvStr == "2" { - fmt.Println("客户端空闲") - atomic.StoreInt32(t.Full, 0) - } - } - - } - - } - return errors.New("无效ack") - } return err } func (t *TcpHelper) Close(conn net.Conn) { @@ -133,6 +98,10 @@ func (t *TcpHelper) resend() { var data = map[string]interface{}{} var nowTime = time.Now().Unix() for _, v := range rs { + if atomic.LoadInt32(t.Full) == 1 { + time.Sleep(1 * time.Second) + continue + } json.Unmarshal([]byte(v), &data) if (float64(nowTime) - data["send_time"].(float64)) > 60 { t.SendMsg([]byte(v)) @@ -146,6 +115,25 @@ func (t *TcpHelper) resend() { } +func (t *TcpHelper) heart() { + go func() { + defer func() { + if err := recover(); err != nil { + utils.Log(nil, "err", err) + } + }() + for { + if t.client == nil { + return + } + t.client.SetWriteDeadline(time.Now().Add(expire)) + t.client.Write([]byte("1\n")) + time.Sleep(2 * time.Second) + } + }() + +} + func (t *TcpHelper) watch(conn net.Conn) { go func() { defer func() { @@ -154,12 +142,8 @@ func (t *TcpHelper) watch(conn net.Conn) { } }() for { - if t.client == nil { - return - } - conn.SetWriteDeadline(time.Now().Add(expire)) - _, err := conn.Write([]byte("1\n")) + var err error if err != nil { //utils.Log(nil, "连接关闭", err) atomic.StoreInt32(t.Full, 1) @@ -167,32 +151,58 @@ func (t *TcpHelper) watch(conn net.Conn) { t.reconnect(config.GetConf().OrderPort) return } else { - var buffer = make([]byte, 1) + var buffer = make([]byte, 1024) // 持续读取数据 t.client.SetReadDeadline(time.Now().Add(expire)) - n, err := t.client.Read(buffer[:]) - if err == nil && n > 0 { - recvStr := string(buffer[:n]) + //n, err := t.client.Read(buffer[:]) + reader := bufio.NewReader(t.client) + line, err := reader.ReadString('\n') + buffer = []byte(line) + if err == nil && len(buffer) > 0 { + recvStr := string(buffer[:len(buffer)-1]) //fmt.Println("结果:recvStr:", recvStr) - if recvStr == "5" { - utils.Log(nil, "客户端繁忙") - atomic.StoreInt32(t.Full, 1) - } else if recvStr == "2" { - utils.Log(nil, "客户端空闲") - atomic.StoreInt32(t.Full, 0) - } else if recvStr == "6" { - utils.Log(nil, "客户端心跳") - atomic.StoreInt32(t.Full, 0) + if len(recvStr) > 1 { + //手动提交编译量 + var partion, ok = OrderMap.Load(recvStr) + if ok { + CommitOffset(t.Comsumer, partion.(kafka.TopicPartition)) + OrderMap.Delete(recvStr) + } + } else { + if recvStr == "5" { + utils.Log(nil, "客户端繁忙") + atomic.StoreInt32(t.Full, 1) + } else if recvStr == "2" { + utils.Log(nil, "客户端空闲") + atomic.StoreInt32(t.Full, 0) + } else if recvStr == "6" { + utils.Log(nil, "客户端心跳") + //atomic.StoreInt32(t.Full, 1) + } } + } else { atomic.StoreInt32(t.Full, 1) //utils.Log(nil, "连接关闭", err) - t.client.Close() - t.client = nil - t.reconnect(config.GetConf().OrderPort) + if t.client != nil { + t.client.Close() + conn = nil + t.client = nil + t.reconnect(config.GetConf().OrderPort) + } } } - time.Sleep(2 * time.Second) + //time.Sleep(2 * time.Second) } }() } +func CommitOffset(consumer *kafka.Consumer, tp kafka.TopicPartition) { + // 创建一个偏移量提交请求 + offsets := []kafka.TopicPartition{tp} + commit, err := consumer.CommitOffsets(offsets) + if err != nil { + utils.Log(nil, "Failed to commit offset: %v", err) + } else { + utils.Log(nil, "Committed offset: %v", commit) + } +} diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index 0f4cbf1..3a2e400 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -58,7 +58,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { kfconfig := &kafka.ConfigMap{ "bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址 "group.id": group, // 消费者组ID - "auto.offset.reset": "latest", // 自动从最早的消息开始消费earliest,latest + "auto.offset.reset": "earliest", // 自动从最早的消息开始消费earliest,latest "heartbeat.interval.ms": 1000, "session.timeout.ms": 45000, "max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了 @@ -71,13 +71,14 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { } }() utils.Log(nil, "kafka config", kfconfig) - var start = time.Now() - var end time.Time + //var start = time.Now() + //var end time.Time consumer, err := kafka.NewConsumer(kfconfig) if err != nil { utils.Log(nil, "comsume", err) } + tcppool.TcpFactory.Comsumer = consumer defer consumer.Close() err = consumer.Subscribe(name, func(c *kafka.Consumer, event kafka.Event) error { switch ev := event.(type) { @@ -110,6 +111,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { } msg, err := consumer.ReadMessage(1 * time.Second) if err == nil { + tcppool.OrderMap.Store(string(msg.Key), msg.TopicPartition) utils.Log(nil, "offset", msg.TopicPartition.Offset) fmt.Println(msg.TopicPartition.Partition, "分区") var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error) @@ -146,11 +148,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { utils.LogFile(nil, "send msg", string(msg.Key), err) if err == nil { //手动提交编译量 - kk.commitOffset(consumer, msg.TopicPartition) - atomic.AddInt32(workNum, 1) - tcppool.OrderMap.Delete(string(msg.Key)) - end = time.Now() - utils.Log(nil, "消费耗时", end.Sub(start), "消息数", *workNum) + //kk.commitOffset(consumer, msg.TopicPartition) + //atomic.AddInt32(workNum, 1) + //tcppool.OrderMap.Delete(string(msg.Key)) + //end = time.Now() + //utils.Log(nil, "消费耗时", end.Sub(start), "消息数", *workNum) } } else {