增加重连

This commit is contained in:
qiyunfanbo126.com 2025-01-22 15:10:39 +08:00
parent 2a93e1cc38
commit 322a051f32
1 changed files with 12 additions and 11 deletions

View File

@ -18,6 +18,7 @@ var (
TcpFactory = TcpHelper{Full: &full} TcpFactory = TcpHelper{Full: &full}
lockSingle sync.Once lockSingle sync.Once
OrderMap = sync.Map{} OrderMap = sync.Map{}
expire = 10 * time.Second
) )
type TcpHelper struct { type TcpHelper struct {
@ -28,7 +29,7 @@ type TcpHelper struct {
func (t *TcpHelper) Init(port string) *TcpHelper { func (t *TcpHelper) Init(port string) *TcpHelper {
lockSingle.Do(func() { lockSingle.Do(func() {
var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, 10*time.Second) var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, expire)
if err == nil { if err == nil {
t.client = conn t.client = conn
atomic.StoreInt32(t.Full, 0) atomic.StoreInt32(t.Full, 0)
@ -42,7 +43,7 @@ func (t *TcpHelper) Init(port string) *TcpHelper {
return t return t
} }
func (t *TcpHelper) reconnect(port string) { func (t *TcpHelper) reconnect(port string) {
var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, 5*time.Second) var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, expire)
if err == nil { if err == nil {
utils.Log(nil, "连接下游成功") utils.Log(nil, "连接下游成功")
atomic.StoreInt32(t.Full, 0) atomic.StoreInt32(t.Full, 0)
@ -64,7 +65,7 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
} }
var start = time.Now().Unix() var start = time.Now().Unix()
var clinet = t.Init(config.GetConf().OrderPort).client var clinet = t.Init(config.GetConf().OrderPort).client
clinet.SetReadDeadline(time.Now().Add(time.Second * 5)) clinet.SetReadDeadline(time.Now().Add(time.Second * 10))
_, err := clinet.Write(msg) _, err := clinet.Write(msg)
var end = time.Now().Unix() var end = time.Now().Unix()
fmt.Println(end-start, "秒") fmt.Println(end-start, "秒")
@ -111,13 +112,13 @@ func (t *TcpHelper) Close(conn net.Conn) {
} }
func (t *TcpHelper) watch(conn net.Conn) { func (t *TcpHelper) watch(conn net.Conn) {
go func() { go func() {
defer func() { //defer func() {
if err := recover(); err != nil { // if err := recover(); err != nil {
fmt.Println("连接断开", err) // fmt.Println("连接断开", err)
} // }
}() //}()
for { for {
conn.SetWriteDeadline(time.Now().Add(time.Second * 5)) conn.SetWriteDeadline(time.Now().Add(expire))
_, err := conn.Write([]byte("1\n")) _, err := conn.Write([]byte("1\n"))
if err != nil { if err != nil {
@ -129,7 +130,7 @@ func (t *TcpHelper) watch(conn net.Conn) {
} else { } else {
var buffer = make([]byte, 1) var buffer = make([]byte, 1)
// 持续读取数据 // 持续读取数据
t.client.SetReadDeadline(time.Now().Add(time.Second * 5)) t.client.SetReadDeadline(time.Now().Add(expire))
n, err := t.client.Read(buffer[:]) n, err := t.client.Read(buffer[:])
if err == nil && n > 0 { if err == nil && n > 0 {
recvStr := string(buffer[:n]) recvStr := string(buffer[:n])
@ -144,7 +145,7 @@ func (t *TcpHelper) watch(conn net.Conn) {
} else { } else {
atomic.StoreInt32(t.Full, 1) atomic.StoreInt32(t.Full, 1)
utils.Log(nil, "连接关闭", err) utils.Log(nil, "连接关闭", err)
t.reconnect(config.GetConf().OrderPort) //t.reconnect(config.GetConf().OrderPort)
} }
} }
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)