增加连接修复

This commit is contained in:
qiyunfanbo126.com 2025-01-16 21:33:38 +08:00
parent 7dc125cc22
commit 573d68de01
3 changed files with 34 additions and 18 deletions

View File

@ -24,7 +24,7 @@ type TcpHelper struct {
func (t *TcpHelper) Init(port string) *TcpHelper { func (t *TcpHelper) Init(port string) *TcpHelper {
lockSingle.Do(func() { lockSingle.Do(func() {
var conn, err = net.Dial("tcp", "192.168.110.50:"+port) var conn, err = net.DialTimeout("tcp", "192.168.110.50:"+port, 10*time.Second)
if err == nil { if err == nil {
t.client = conn t.client = conn
t.watch(t.client) t.watch(t.client)
@ -35,13 +35,14 @@ func (t *TcpHelper) Init(port string) *TcpHelper {
func (t *TcpHelper) reconnect(port string) { func (t *TcpHelper) reconnect(port string) {
var conn, err = net.Dial("tcp", "192.168.110.50:"+port) var conn, err = net.Dial("tcp", "192.168.110.50:"+port)
if err == nil { if err == nil {
atomic.StoreInt32(t.Full, 0)
t.client = conn t.client = conn
t.watch(t.client) t.watch(t.client)
} }
} }
func (t *TcpHelper) SendMsg(msg []byte) error { func (t *TcpHelper) SendMsg(msg []byte) error {
msg = append(msg, '\n')
_, err := t.Init(config.GetConf().OrderPort).client.Write(msg) _, err := t.Init(config.GetConf().OrderPort).client.Write(msg)
var buffer [1]byte var buffer [1]byte
// 持续读取数据 // 持续读取数据
n, err := t.client.Read(buffer[:]) n, err := t.client.Read(buffer[:])
@ -50,10 +51,11 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
recvStr := string(buffer[:n]) recvStr := string(buffer[:n])
fmt.Println("结果recvStr:", recvStr) fmt.Println("结果recvStr:", recvStr)
if recvStr == "1" { if recvStr == "1" {
fmt.Println("满了") fmt.Println("客户端繁忙")
atomic.AddInt32(t.Full, 1) atomic.StoreInt32(t.Full, 1)
} else if recvStr == "2" { } else if recvStr == "2" {
atomic.AddInt32(t.Full, -1) fmt.Println("客户端空闲")
atomic.StoreInt32(t.Full, 0)
} }
} }
@ -65,8 +67,9 @@ func (t *TcpHelper) Close(conn net.Conn) {
} }
func (t *TcpHelper) watch(conn net.Conn) { func (t *TcpHelper) watch(conn net.Conn) {
go func() { go func() {
fmt.Println("watch")
for { for {
fmt.Println("watch")
conn.SetWriteDeadline(time.Now().Add(time.Second * 3)) conn.SetWriteDeadline(time.Now().Add(time.Second * 3))
_, err := conn.Write([]byte("1")) _, err := conn.Write([]byte("1"))
@ -75,8 +78,23 @@ func (t *TcpHelper) watch(conn net.Conn) {
t.client.Close() t.client.Close()
t.reconnect(config.GetConf().OrderPort) t.reconnect(config.GetConf().OrderPort)
return return
} else {
var buffer [1]byte
// 持续读取数据
n, err := t.client.Read(buffer[:])
if err == nil && n > 0 {
recvStr := string(buffer[:n])
fmt.Println("结果recvStr:", recvStr)
if recvStr == "1" {
fmt.Println("客户端繁忙")
atomic.StoreInt32(t.Full, 1)
} else if recvStr == "2" {
fmt.Println("客户端空闲")
atomic.StoreInt32(t.Full, 0)
}
}
} }
time.Sleep(time.Second)
} }
}() }()
} }

View File

@ -2,7 +2,6 @@ package mqs
import ( import (
"encoding/json" "encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka" "github.com/confluentinc/confluent-kafka-go/kafka"
"quenue/app/http/entities" "quenue/app/http/entities"
"quenue/app/http/tcppool" "quenue/app/http/tcppool"
@ -64,21 +63,23 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
for i := 0; i < config.GetConf().Num; i++ { for i := 0; i < config.GetConf().Num; i++ {
go func() { go func() {
for { for {
fmt.Println(atomic.LoadInt32(tcppool.TcpFactory.Full)) //fmt.Println(atomic.LoadInt32(tcppool.TcpFactory.Full))
if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 { if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 {
utils.Log(nil, "对列阻塞") utils.Log(nil, "对列阻塞")
time.Sleep(200 * time.Millisecond) time.Sleep(1000 * time.Millisecond)
consumer.GetRebalanceProtocol()
continue continue
} else {
utils.Log(nil, "对列放开")
} }
msg, err := consumer.ReadMessage(-1) msg, err := consumer.ReadMessage(-1)
fmt.Println("read msg", ci)
if err == nil { if err == nil {
utils.Log(nil, "offset", msg.TopicPartition.Offset) utils.Log(nil, "offset", msg.TopicPartition.Offset)
var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error) var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error)
var mqsg = entities.MqMessage{} var mqsg = entities.MqMessage{}
mqsg.Key = string(msg.Key) mqsg.Key = string(msg.Key)
mqsg.Property = make(map[string]interface{}) mqsg.Property = make(map[string]interface{})
if msg.Headers != nil { if false {
for _, v := range msg.Headers { for _, v := range msg.Headers {
if v.Key == "property" { if v.Key == "property" {
json.Unmarshal(v.Value, &mqsg.Property) json.Unmarshal(v.Value, &mqsg.Property)
@ -87,8 +88,9 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
} }
err = json.Unmarshal(msg.Value, &mqsg.Body) err = json.Unmarshal(msg.Value, &mqsg.Body)
} else { } else {
mqsg.Key = "100000001"
mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"} mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"}
mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0} mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"}
} }
var data, _ = json.Marshal(mqsg) var data, _ = json.Marshal(mqsg)
err = handler(0, nil, data) err = handler(0, nil, data)

View File

@ -116,11 +116,7 @@ func startServer(opts *config.Options) (err error) {
if err != nil { if err != nil {
return return
} }
//for i := 0; i < 2; i++ {
// mqs.MqManager.GetMqByName(common.MQ_KFK).Produce(common.ORDER_RESEND_TOPICAL, time.Now().Format(time.DateTime), 0)
//}
////
//select {}
pidFile := opts.GenPidFile() pidFile := opts.GenPidFile()
config.GetConf().KafkaUrl = kafka config.GetConf().KafkaUrl = kafka
config.GetConf().OrderPort = port config.GetConf().OrderPort = port