Compare commits

...

2 Commits

Author SHA1 Message Date
qiyunfanbo126.com 0783881810 增加重连 2025-01-22 17:11:30 +08:00
qiyunfanbo126.com 26b0a27628 增加重连 2025-01-22 17:10:47 +08:00
2 changed files with 9 additions and 9 deletions

View File

@ -45,12 +45,12 @@ func (t *TcpHelper) Init(port string) *TcpHelper {
func (t *TcpHelper) reconnect(port string) { func (t *TcpHelper) reconnect(port string) {
var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, expire) var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, expire)
if err == nil { if err == nil {
utils.Log(nil, "连接下游成功") //utils.Log(nil, "连接下游成功")
atomic.StoreInt32(t.Full, 0) atomic.StoreInt32(t.Full, 0)
t.client = conn t.client = conn
t.watch(t.client) t.watch(t.client)
} else { } else {
utils.Log(nil, "重连下游") //utils.Log(nil, "重连下游")
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
atomic.StoreInt32(t.Full, 1) atomic.StoreInt32(t.Full, 1)
t.client = conn t.client = conn
@ -90,7 +90,7 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
} }
} else { } else {
if len(recvStr) > 0 { if len(recvStr) > 0 {
fmt.Println("结果recvStr:", recvStr) //fmt.Println("结果recvStr:", recvStr)
if recvStr == "5" { if recvStr == "5" {
fmt.Println("客户端繁忙") fmt.Println("客户端繁忙")
atomic.StoreInt32(t.Full, 1) atomic.StoreInt32(t.Full, 1)
@ -114,7 +114,7 @@ func (t *TcpHelper) watch(conn net.Conn) {
go func() { go func() {
defer func() { defer func() {
if err := recover(); err != nil { if err := recover(); err != nil {
fmt.Println("连接断开", err) //fmt.Println("连接断开", err)
} }
}() }()
for { for {
@ -122,7 +122,7 @@ func (t *TcpHelper) watch(conn net.Conn) {
_, err := conn.Write([]byte("1\n")) _, err := conn.Write([]byte("1\n"))
if err != nil { if err != nil {
utils.Log(nil, "连接关闭", err) //utils.Log(nil, "连接关闭", err)
atomic.StoreInt32(t.Full, 1) atomic.StoreInt32(t.Full, 1)
t.client.Close() t.client.Close()
t.reconnect(config.GetConf().OrderPort) t.reconnect(config.GetConf().OrderPort)
@ -134,7 +134,7 @@ func (t *TcpHelper) watch(conn net.Conn) {
n, err := t.client.Read(buffer[:]) n, err := t.client.Read(buffer[:])
if err == nil && n > 0 { if err == nil && n > 0 {
recvStr := string(buffer[:n]) recvStr := string(buffer[:n])
fmt.Println("结果recvStr:", recvStr) //fmt.Println("结果recvStr:", recvStr)
if recvStr == "1" { if recvStr == "1" {
fmt.Println("客户端繁忙") fmt.Println("客户端繁忙")
atomic.StoreInt32(t.Full, 1) atomic.StoreInt32(t.Full, 1)
@ -144,7 +144,7 @@ func (t *TcpHelper) watch(conn net.Conn) {
} }
} else { } else {
atomic.StoreInt32(t.Full, 1) atomic.StoreInt32(t.Full, 1)
utils.Log(nil, "连接关闭", err) //utils.Log(nil, "连接关闭", err)
t.client.Close() t.client.Close()
t.reconnect(config.GetConf().OrderPort) t.reconnect(config.GetConf().OrderPort)
} }

View File

@ -99,7 +99,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
continue continue
} else { } else {
utils.Log(nil, "对列放开") //utils.Log(nil, "对列放开")
} }
msg, err := consumer.ReadMessage(1 * time.Second) msg, err := consumer.ReadMessage(1 * time.Second)
if err == nil { if err == nil {
@ -141,7 +141,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
} }
} else { } else {
utils.Log(nil, "Error while consuming: %v\n", err) //utils.Log(nil, "Error while consuming: %v\n", err)
} }
} }
}() }()