diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index e4b84ec..f4091be 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -70,7 +70,7 @@ func (t *TcpHelper) watch(conn net.Conn) { for { fmt.Println("watch") - conn.SetWriteDeadline(time.Now().Add(time.Second * 3)) + conn.SetWriteDeadline(time.Now().Add(time.Second * 15)) _, err := conn.Write([]byte("1")) if err != nil { diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index 3a733fd..08e3017 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -2,6 +2,7 @@ package mqs import ( "encoding/json" + "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" "quenue/app/http/entities" "quenue/app/http/tcppool" @@ -45,18 +46,35 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { "bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址 "group.id": config.GetConf().KafkaGroup, // 消费者组ID "auto.offset.reset": "earliest", // 自动从最早的消息开始消费 - "heartbeat.interval.ms": 3000, - "session.timeout.ms": 8000, + "heartbeat.interval.ms": 1000, + "session.timeout.ms": 45000, + "max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了 "enable.auto.commit": false, } - + var start = time.Now() + var end time.Time consumer, err := kafka.NewConsumer(kfconfig) if err != nil { utils.Log(nil, "comsume", err) } defer consumer.Close() - err = consumer.Subscribe(name, nil) + err = consumer.Subscribe(name, func(c *kafka.Consumer, event kafka.Event) error { + switch ev := event.(type) { + case kafka.AssignedPartitions: + fmt.Printf("Assigned partitions: %v\n", ev.Partitions) + // 在这里处理分区分配 + c.Assign(ev.Partitions) + case kafka.RevokedPartitions: + fmt.Printf("Revoked partitions: %v\n", ev.Partitions) + // 在这里处理分区撤销 + c.Unassign() + default: + fmt.Printf("Ignored event: %s\n", ev) + } + return nil + return nil + }) if err != nil { utils.Log(nil, "Failed to subscribe:", err) } @@ -66,15 +84,15 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { //fmt.Println(atomic.LoadInt32(tcppool.TcpFactory.Full)) if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 { utils.Log(nil, "对列阻塞") - time.Sleep(1000 * time.Millisecond) - consumer.GetRebalanceProtocol() + time.Sleep(50 * time.Millisecond) continue } else { utils.Log(nil, "对列放开") } - msg, err := consumer.ReadMessage(-1) + msg, err := consumer.ReadMessage(1 * time.Hour) if err == nil { utils.Log(nil, "offset", msg.TopicPartition.Offset) + var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error) var mqsg = entities.MqMessage{} mqsg.Key = string(msg.Key) @@ -97,6 +115,10 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { if err == nil { //手动提交编译量 kk.commitOffset(consumer, msg.TopicPartition) + if msg.TopicPartition.Offset == 104939 { + end = time.Now() + utils.Log(nil, "消费耗时", end.Sub(start)) + } } } else { utils.Log(nil, "Error while consuming: %v\n", err)