159 lines
3.7 KiB
Go
159 lines
3.7 KiB
Go
package tcppool
|
||
|
||
import (
|
||
"bufio"
|
||
"errors"
|
||
"fmt"
|
||
"net"
|
||
"quenue/app/utils"
|
||
"quenue/config"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
)
|
||
|
||
var (
|
||
full int32 = 0
|
||
TcpFactory = TcpHelper{Full: &full}
|
||
lockSingle sync.Once
|
||
OrderMap = sync.Map{}
|
||
expire = 10 * time.Second
|
||
)
|
||
|
||
type TcpHelper struct {
|
||
client net.Conn
|
||
lastTime int64
|
||
Full *int32
|
||
}
|
||
|
||
func (t *TcpHelper) Init(port string) *TcpHelper {
|
||
lockSingle.Do(func() {
|
||
var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, expire)
|
||
if err == nil {
|
||
t.client = conn
|
||
atomic.StoreInt32(t.Full, 0)
|
||
t.watch(t.client)
|
||
} else {
|
||
atomic.StoreInt32(t.Full, 1)
|
||
t.reconnect(port)
|
||
utils.Log(nil, config.GetConf().Url+":"+port)
|
||
}
|
||
})
|
||
return t
|
||
}
|
||
func (t *TcpHelper) reconnect(port string) {
|
||
var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, expire)
|
||
if err == nil {
|
||
//utils.Log(nil, "连接下游成功")
|
||
atomic.StoreInt32(t.Full, 0)
|
||
t.client = conn
|
||
t.watch(t.client)
|
||
} else {
|
||
//utils.Log(nil, "重连下游")
|
||
time.Sleep(1 * time.Second)
|
||
atomic.StoreInt32(t.Full, 1)
|
||
t.client = conn
|
||
t.reconnect(port)
|
||
}
|
||
}
|
||
func (t *TcpHelper) SendMsg(msg []byte) error {
|
||
msg = append(msg, '\n')
|
||
t = t.Init(config.GetConf().OrderPort)
|
||
if t.client == nil {
|
||
return errors.New("客户端连接失败")
|
||
}
|
||
var start = time.Now().Unix()
|
||
var clinet = t.Init(config.GetConf().OrderPort).client
|
||
clinet.SetReadDeadline(time.Now().Add(time.Second * 10))
|
||
_, err := clinet.Write(msg)
|
||
var end = time.Now().Unix()
|
||
fmt.Println(end-start, "秒")
|
||
buf := make([]byte, 1024)
|
||
reader := bufio.NewReader(clinet)
|
||
line, err := reader.ReadString('\n')
|
||
if err != nil {
|
||
fmt.Println("Error reading from connection:", err)
|
||
return err
|
||
}
|
||
// 将读取到的字符串转换为字节切片
|
||
buf = []byte(line)
|
||
if err == nil {
|
||
if len(buf) > 0 {
|
||
recvStr := string(buf)
|
||
recvStr = strings.Replace(recvStr, "\n", "", 1)
|
||
if len(recvStr) > 2 {
|
||
var orderNo = recvStr
|
||
if _, ok := OrderMap.Load(orderNo); !ok {
|
||
OrderMap.Store(orderNo, "1")
|
||
return nil
|
||
}
|
||
} else {
|
||
if len(recvStr) > 0 {
|
||
//fmt.Println("结果:recvStr:", recvStr)
|
||
if recvStr == "5" {
|
||
fmt.Println("客户端繁忙")
|
||
atomic.StoreInt32(t.Full, 1)
|
||
} else if recvStr == "2" {
|
||
fmt.Println("客户端空闲")
|
||
atomic.StoreInt32(t.Full, 0)
|
||
}
|
||
}
|
||
|
||
}
|
||
|
||
}
|
||
return errors.New("无效ack")
|
||
}
|
||
return err
|
||
}
|
||
func (t *TcpHelper) Close(conn net.Conn) {
|
||
t.client.Close()
|
||
}
|
||
func (t *TcpHelper) watch(conn net.Conn) {
|
||
go func() {
|
||
defer func() {
|
||
if err := recover(); err != nil {
|
||
//fmt.Println("连接断开", err)
|
||
}
|
||
}()
|
||
for {
|
||
conn.SetWriteDeadline(time.Now().Add(expire))
|
||
_, err := conn.Write([]byte("1\n"))
|
||
|
||
if err != nil {
|
||
//utils.Log(nil, "连接关闭", err)
|
||
atomic.StoreInt32(t.Full, 1)
|
||
t.client.Close()
|
||
t.reconnect(config.GetConf().OrderPort)
|
||
return
|
||
} else {
|
||
var buffer = make([]byte, 1)
|
||
// 持续读取数据
|
||
t.client.SetReadDeadline(time.Now().Add(expire))
|
||
n, err := t.client.Read(buffer[:])
|
||
if err == nil && n > 0 {
|
||
recvStr := string(buffer[:n])
|
||
//fmt.Println("结果:recvStr:", recvStr)
|
||
if recvStr == "5" {
|
||
fmt.Println("客户端繁忙")
|
||
atomic.StoreInt32(t.Full, 1)
|
||
} else if recvStr == "2" {
|
||
fmt.Println("客户端空闲")
|
||
atomic.StoreInt32(t.Full, 0)
|
||
} else if recvStr == "6" {
|
||
fmt.Println("客户端心跳")
|
||
atomic.StoreInt32(t.Full, 0)
|
||
}
|
||
} else {
|
||
atomic.StoreInt32(t.Full, 1)
|
||
//utils.Log(nil, "连接关闭", err)
|
||
t.client.Close()
|
||
t.reconnect(config.GetConf().OrderPort)
|
||
}
|
||
}
|
||
time.Sleep(1 * time.Second)
|
||
}
|
||
}()
|
||
}
|