diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index 9c431c1..e4b84ec 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -24,7 +24,7 @@ type TcpHelper struct { func (t *TcpHelper) Init(port string) *TcpHelper { lockSingle.Do(func() { - var conn, err = net.Dial("tcp", "192.168.110.50:"+port) + var conn, err = net.DialTimeout("tcp", "192.168.110.50:"+port, 10*time.Second) if err == nil { t.client = conn t.watch(t.client) @@ -35,13 +35,14 @@ func (t *TcpHelper) Init(port string) *TcpHelper { func (t *TcpHelper) reconnect(port string) { var conn, err = net.Dial("tcp", "192.168.110.50:"+port) if err == nil { + atomic.StoreInt32(t.Full, 0) t.client = conn t.watch(t.client) } } func (t *TcpHelper) SendMsg(msg []byte) error { + msg = append(msg, '\n') _, err := t.Init(config.GetConf().OrderPort).client.Write(msg) - var buffer [1]byte // 持续读取数据 n, err := t.client.Read(buffer[:]) @@ -50,10 +51,11 @@ func (t *TcpHelper) SendMsg(msg []byte) error { recvStr := string(buffer[:n]) fmt.Println("结果:recvStr:", recvStr) if recvStr == "1" { - fmt.Println("满了") - atomic.AddInt32(t.Full, 1) + fmt.Println("客户端繁忙") + atomic.StoreInt32(t.Full, 1) } else if recvStr == "2" { - atomic.AddInt32(t.Full, -1) + fmt.Println("客户端空闲") + atomic.StoreInt32(t.Full, 0) } } @@ -65,8 +67,9 @@ func (t *TcpHelper) Close(conn net.Conn) { } func (t *TcpHelper) watch(conn net.Conn) { go func() { - fmt.Println("watch") + for { + fmt.Println("watch") conn.SetWriteDeadline(time.Now().Add(time.Second * 3)) _, err := conn.Write([]byte("1")) @@ -75,8 +78,23 @@ func (t *TcpHelper) watch(conn net.Conn) { t.client.Close() t.reconnect(config.GetConf().OrderPort) return + } else { + var buffer [1]byte + // 持续读取数据 + n, err := t.client.Read(buffer[:]) + if err == nil && n > 0 { + recvStr := string(buffer[:n]) + fmt.Println("结果:recvStr:", recvStr) + if recvStr == "1" { + fmt.Println("客户端繁忙") + atomic.StoreInt32(t.Full, 1) + } else if recvStr == "2" { + fmt.Println("客户端空闲") + atomic.StoreInt32(t.Full, 0) + } + } } - time.Sleep(time.Second) + } }() } diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index f2ddf29..3a733fd 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -2,7 +2,6 @@ package mqs import ( "encoding/json" - "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "quenue/app/http/entities" "quenue/app/http/tcppool" @@ -64,21 +63,23 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { for i := 0; i < config.GetConf().Num; i++ { go func() { for { - fmt.Println(atomic.LoadInt32(tcppool.TcpFactory.Full)) + //fmt.Println(atomic.LoadInt32(tcppool.TcpFactory.Full)) if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 { utils.Log(nil, "对列阻塞") - time.Sleep(200 * time.Millisecond) + time.Sleep(1000 * time.Millisecond) + consumer.GetRebalanceProtocol() continue + } else { + utils.Log(nil, "对列放开") } msg, err := consumer.ReadMessage(-1) - fmt.Println("read msg", ci) if err == nil { utils.Log(nil, "offset", msg.TopicPartition.Offset) var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error) var mqsg = entities.MqMessage{} mqsg.Key = string(msg.Key) mqsg.Property = make(map[string]interface{}) - if msg.Headers != nil { + if false { for _, v := range msg.Headers { if v.Key == "property" { json.Unmarshal(v.Value, &mqsg.Property) @@ -87,8 +88,9 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { } err = json.Unmarshal(msg.Value, &mqsg.Body) } else { + mqsg.Key = "100000001" mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"} - mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0} + mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"} } var data, _ = json.Marshal(mqsg) err = handler(0, nil, data) diff --git a/main.go b/main.go index 25f66f9..a8ca9c4 100644 --- a/main.go +++ b/main.go @@ -116,11 +116,7 @@ func startServer(opts *config.Options) (err error) { if err != nil { return } - //for i := 0; i < 2; i++ { - // mqs.MqManager.GetMqByName(common.MQ_KFK).Produce(common.ORDER_RESEND_TOPICAL, time.Now().Format(time.DateTime), 0) - //} - //// - //select {} + pidFile := opts.GenPidFile() config.GetConf().KafkaUrl = kafka config.GetConf().OrderPort = port