增加重连
This commit is contained in:
		
							parent
							
								
									705fa97062
								
							
						
					
					
						commit
						26b0a27628
					
				|  | @ -45,12 +45,12 @@ func (t *TcpHelper) Init(port string) *TcpHelper { | ||||||
| func (t *TcpHelper) reconnect(port string) { | func (t *TcpHelper) reconnect(port string) { | ||||||
| 	var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, expire) | 	var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, expire) | ||||||
| 	if err == nil { | 	if err == nil { | ||||||
| 		utils.Log(nil, "连接下游成功") | 		//utils.Log(nil, "连接下游成功")
 | ||||||
| 		atomic.StoreInt32(t.Full, 0) | 		atomic.StoreInt32(t.Full, 0) | ||||||
| 		t.client = conn | 		t.client = conn | ||||||
| 		t.watch(t.client) | 		t.watch(t.client) | ||||||
| 	} else { | 	} else { | ||||||
| 		utils.Log(nil, "重连下游") | 		//utils.Log(nil, "重连下游")
 | ||||||
| 		time.Sleep(1 * time.Second) | 		time.Sleep(1 * time.Second) | ||||||
| 		atomic.StoreInt32(t.Full, 1) | 		atomic.StoreInt32(t.Full, 1) | ||||||
| 		t.client = conn | 		t.client = conn | ||||||
|  | @ -90,7 +90,7 @@ func (t *TcpHelper) SendMsg(msg []byte) error { | ||||||
| 				} | 				} | ||||||
| 			} else { | 			} else { | ||||||
| 				if len(recvStr) > 0 { | 				if len(recvStr) > 0 { | ||||||
| 					fmt.Println("结果:recvStr:", recvStr) | 					//fmt.Println("结果:recvStr:", recvStr)
 | ||||||
| 					if recvStr == "5" { | 					if recvStr == "5" { | ||||||
| 						fmt.Println("客户端繁忙") | 						fmt.Println("客户端繁忙") | ||||||
| 						atomic.StoreInt32(t.Full, 1) | 						atomic.StoreInt32(t.Full, 1) | ||||||
|  | @ -114,7 +114,7 @@ func (t *TcpHelper) watch(conn net.Conn) { | ||||||
| 	go func() { | 	go func() { | ||||||
| 		defer func() { | 		defer func() { | ||||||
| 			if err := recover(); err != nil { | 			if err := recover(); err != nil { | ||||||
| 				fmt.Println("连接断开", err) | 				//fmt.Println("连接断开", err)
 | ||||||
| 			} | 			} | ||||||
| 		}() | 		}() | ||||||
| 		for { | 		for { | ||||||
|  | @ -122,7 +122,7 @@ func (t *TcpHelper) watch(conn net.Conn) { | ||||||
| 			_, err := conn.Write([]byte("1\n")) | 			_, err := conn.Write([]byte("1\n")) | ||||||
| 
 | 
 | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				utils.Log(nil, "连接关闭", err) | 				//utils.Log(nil, "连接关闭", err)
 | ||||||
| 				atomic.StoreInt32(t.Full, 1) | 				atomic.StoreInt32(t.Full, 1) | ||||||
| 				t.client.Close() | 				t.client.Close() | ||||||
| 				t.reconnect(config.GetConf().OrderPort) | 				t.reconnect(config.GetConf().OrderPort) | ||||||
|  | @ -144,7 +144,7 @@ func (t *TcpHelper) watch(conn net.Conn) { | ||||||
| 					} | 					} | ||||||
| 				} else { | 				} else { | ||||||
| 					atomic.StoreInt32(t.Full, 1) | 					atomic.StoreInt32(t.Full, 1) | ||||||
| 					utils.Log(nil, "连接关闭", err) | 					//utils.Log(nil, "连接关闭", err)
 | ||||||
| 					t.client.Close() | 					t.client.Close() | ||||||
| 					t.reconnect(config.GetConf().OrderPort) | 					t.reconnect(config.GetConf().OrderPort) | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
|  | @ -99,7 +99,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { | ||||||
| 					time.Sleep(100 * time.Millisecond) | 					time.Sleep(100 * time.Millisecond) | ||||||
| 					continue | 					continue | ||||||
| 				} else { | 				} else { | ||||||
| 					utils.Log(nil, "对列放开") | 					//utils.Log(nil, "对列放开")
 | ||||||
| 				} | 				} | ||||||
| 				msg, err := consumer.ReadMessage(1 * time.Second) | 				msg, err := consumer.ReadMessage(1 * time.Second) | ||||||
| 				if err == nil { | 				if err == nil { | ||||||
|  | @ -141,7 +141,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { | ||||||
| 
 | 
 | ||||||
| 					} | 					} | ||||||
| 				} else { | 				} else { | ||||||
| 					utils.Log(nil, "Error while consuming: %v\n", err) | 					//utils.Log(nil, "Error while consuming: %v\n", err)
 | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		}() | 		}() | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue