diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index 5df2ad0..a515ace 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -28,19 +28,29 @@ func (t *TcpHelper) Init(port string) *TcpHelper { var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, 10*time.Second) if err == nil { t.client = conn + atomic.StoreInt32(t.Full, 0) t.watch(t.client) } else { + atomic.StoreInt32(t.Full, 1) + t.reconnect(port) utils.Log(nil, config.GetConf().Url+":"+port) } }) return t } func (t *TcpHelper) reconnect(port string) { - var conn, err = net.Dial("tcp", "192.168.110.50:"+port) + var conn, err = net.Dial("tcp", config.GetConf().Url+":"+port) if err == nil { + utils.Log(nil, "连接下游失败") atomic.StoreInt32(t.Full, 0) t.client = conn t.watch(t.client) + } else { + utils.Log(nil, "重连下游") + time.Sleep(1 * time.Second) + atomic.StoreInt32(t.Full, 1) + t.client = conn + t.reconnect(port) } } func (t *TcpHelper) SendMsg(msg []byte) error { @@ -50,9 +60,10 @@ func (t *TcpHelper) SendMsg(msg []byte) error { return errors.New("客户端连接失败") } var start = time.Now().Unix() - _, err := t.Init(config.GetConf().OrderPort).client.Write(msg) + var clinet = t.Init(config.GetConf().OrderPort).client + clinet.SetReadDeadline(time.Now().Add(time.Second * 5)) + _, err := clinet.Write(msg) var end = time.Now().Unix() - fmt.Println(end-start, "秒") var buffer [1]byte // 持续读取数据 @@ -69,7 +80,6 @@ func (t *TcpHelper) SendMsg(msg []byte) error { atomic.StoreInt32(t.Full, 0) } } - } return err } @@ -78,14 +88,13 @@ func (t *TcpHelper) Close(conn net.Conn) { } func (t *TcpHelper) watch(conn net.Conn) { go func() { - for { - fmt.Println("watch") - conn.SetWriteDeadline(time.Now().Add(time.Second * 15)) + conn.SetWriteDeadline(time.Now().Add(time.Second * 5)) _, err := conn.Write([]byte("1")) if err != nil { utils.Log(nil, "连接关闭", err) + atomic.StoreInt32(t.Full, 1) t.client.Close() t.reconnect(config.GetConf().OrderPort) return diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index 4ccd72b..f0f9d63 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -82,7 +82,6 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { for i := 0; i < config.GetConf().Num; i++ { go func() { for { - //fmt.Println(atomic.LoadInt32(tcppool.TcpFactory.Full)) if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 { utils.Log(nil, "对列阻塞") time.Sleep(50 * time.Millisecond) @@ -93,12 +92,12 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { msg, err := consumer.ReadMessage(1 * time.Second) if err == nil { utils.Log(nil, "offset", msg.TopicPartition.Offset) - + fmt.Println(msg.TopicPartition.Partition, "分区") var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error) var mqsg = entities.MqMessage{} mqsg.Key = string(msg.Key) mqsg.Property = make(map[string]interface{}) - if len(msg.Value) > 0 { + if false { if msg.Headers != nil { for _, v := range msg.Headers { if v.Key == "property" { @@ -113,6 +112,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"} mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"} } + if mqsg.Key == "734760617161662465" { + fmt.Println("消费消息", mqsg.Key) + } else { + fmt.Println(mqsg.Key) + } var data, _ = json.Marshal(mqsg) err = handler(0, nil, data) if err == nil {