修复连接
This commit is contained in:
parent
573d68de01
commit
a4fdcb89a3
|
@ -70,7 +70,7 @@ func (t *TcpHelper) watch(conn net.Conn) {
|
|||
|
||||
for {
|
||||
fmt.Println("watch")
|
||||
conn.SetWriteDeadline(time.Now().Add(time.Second * 3))
|
||||
conn.SetWriteDeadline(time.Now().Add(time.Second * 15))
|
||||
_, err := conn.Write([]byte("1"))
|
||||
|
||||
if err != nil {
|
||||
|
|
|
@ -2,6 +2,7 @@ package mqs
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/confluentinc/confluent-kafka-go/kafka"
|
||||
"quenue/app/http/entities"
|
||||
"quenue/app/http/tcppool"
|
||||
|
@ -45,18 +46,35 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
|
|||
"bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址
|
||||
"group.id": config.GetConf().KafkaGroup, // 消费者组ID
|
||||
"auto.offset.reset": "earliest", // 自动从最早的消息开始消费
|
||||
"heartbeat.interval.ms": 3000,
|
||||
"session.timeout.ms": 8000,
|
||||
"heartbeat.interval.ms": 1000,
|
||||
"session.timeout.ms": 45000,
|
||||
"max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了
|
||||
"enable.auto.commit": false,
|
||||
}
|
||||
|
||||
var start = time.Now()
|
||||
var end time.Time
|
||||
consumer, err := kafka.NewConsumer(kfconfig)
|
||||
|
||||
if err != nil {
|
||||
utils.Log(nil, "comsume", err)
|
||||
}
|
||||
defer consumer.Close()
|
||||
err = consumer.Subscribe(name, nil)
|
||||
err = consumer.Subscribe(name, func(c *kafka.Consumer, event kafka.Event) error {
|
||||
switch ev := event.(type) {
|
||||
case kafka.AssignedPartitions:
|
||||
fmt.Printf("Assigned partitions: %v\n", ev.Partitions)
|
||||
// 在这里处理分区分配
|
||||
c.Assign(ev.Partitions)
|
||||
case kafka.RevokedPartitions:
|
||||
fmt.Printf("Revoked partitions: %v\n", ev.Partitions)
|
||||
// 在这里处理分区撤销
|
||||
c.Unassign()
|
||||
default:
|
||||
fmt.Printf("Ignored event: %s\n", ev)
|
||||
}
|
||||
return nil
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
utils.Log(nil, "Failed to subscribe:", err)
|
||||
}
|
||||
|
@ -66,15 +84,15 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
|
|||
//fmt.Println(atomic.LoadInt32(tcppool.TcpFactory.Full))
|
||||
if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 {
|
||||
utils.Log(nil, "对列阻塞")
|
||||
time.Sleep(1000 * time.Millisecond)
|
||||
consumer.GetRebalanceProtocol()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
continue
|
||||
} else {
|
||||
utils.Log(nil, "对列放开")
|
||||
}
|
||||
msg, err := consumer.ReadMessage(-1)
|
||||
msg, err := consumer.ReadMessage(1 * time.Hour)
|
||||
if err == nil {
|
||||
utils.Log(nil, "offset", msg.TopicPartition.Offset)
|
||||
|
||||
var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error)
|
||||
var mqsg = entities.MqMessage{}
|
||||
mqsg.Key = string(msg.Key)
|
||||
|
@ -97,6 +115,10 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
|
|||
if err == nil {
|
||||
//手动提交编译量
|
||||
kk.commitOffset(consumer, msg.TopicPartition)
|
||||
if msg.TopicPartition.Offset == 104939 {
|
||||
end = time.Now()
|
||||
utils.Log(nil, "消费耗时", end.Sub(start))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
utils.Log(nil, "Error while consuming: %v\n", err)
|
||||
|
|
Loading…
Reference in New Issue