package tcppool import ( "errors" "fmt" "net" "quenue/app/utils" "quenue/config" "sync" "sync/atomic" "time" ) var ( full int32 = 0 TcpFactory = TcpHelper{Full: &full} lockSingle sync.Once ) type TcpHelper struct { client net.Conn lastTime int64 Full *int32 } func (t *TcpHelper) Init(port string) *TcpHelper { lockSingle.Do(func() { var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, 10*time.Second) if err == nil { t.client = conn atomic.StoreInt32(t.Full, 0) t.watch(t.client) } else { atomic.StoreInt32(t.Full, 1) t.reconnect(port) utils.Log(nil, config.GetConf().Url+":"+port) } }) return t } func (t *TcpHelper) reconnect(port string) { var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, 5*time.Second) if err == nil { utils.Log(nil, "连接下游成功") atomic.StoreInt32(t.Full, 0) t.client = conn t.watch(t.client) } else { utils.Log(nil, "重连下游") time.Sleep(1 * time.Second) atomic.StoreInt32(t.Full, 1) t.client = conn t.reconnect(port) } } func (t *TcpHelper) SendMsg(msg []byte) error { msg = append(msg, '\n') t = t.Init(config.GetConf().OrderPort) if t.client == nil { return errors.New("客户端连接失败") } var start = time.Now().Unix() var clinet = t.Init(config.GetConf().OrderPort).client clinet.SetReadDeadline(time.Now().Add(time.Second * 5)) _, err := clinet.Write(msg) var end = time.Now().Unix() fmt.Println(end-start, "秒") var buffer [1]byte // 持续读取数据 n, err := t.client.Read(buffer[:]) if err == nil { if n > 0 { recvStr := string(buffer[:n]) fmt.Println("结果:recvStr:", recvStr) if recvStr == "1" { fmt.Println("客户端繁忙") atomic.StoreInt32(t.Full, 1) } else if recvStr == "2" { fmt.Println("客户端空闲") atomic.StoreInt32(t.Full, 0) } } } return err } func (t *TcpHelper) Close(conn net.Conn) { t.client.Close() } func (t *TcpHelper) watch(conn net.Conn) { go func() { defer func() { if err := recover(); err != nil { fmt.Println("连接断开", err) } }() for { conn.SetWriteDeadline(time.Now().Add(time.Second * 5)) _, err := conn.Write([]byte("1")) if err != nil { utils.Log(nil, "连接关闭", err) atomic.StoreInt32(t.Full, 1) t.client.Close() t.reconnect(config.GetConf().OrderPort) return } else { var buffer [1]byte // 持续读取数据 n, err := t.client.Read(buffer[:]) if err == nil && n > 0 { recvStr := string(buffer[:n]) fmt.Println("结果:recvStr:", recvStr) if recvStr == "1" { fmt.Println("客户端繁忙") atomic.StoreInt32(t.Full, 1) } else if recvStr == "2" { fmt.Println("客户端空闲") atomic.StoreInt32(t.Full, 0) } } else { atomic.StoreInt32(t.Full, 1) utils.Log(nil, "连接关闭", err) } } time.Sleep(1 * time.Second) } }() }