diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index 3a2e400..ad2d658 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -58,13 +58,13 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { kfconfig := &kafka.ConfigMap{ "bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址 "group.id": group, // 消费者组ID - "auto.offset.reset": "earliest", // 自动从最早的消息开始消费earliest,latest + "auto.offset.reset": "latest", // 自动从最早的消息开始消费earliest,latest "heartbeat.interval.ms": 1000, "session.timeout.ms": 45000, "max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了 "enable.auto.commit": false, } - + kfconfig.SetKey("security.protocol", "plaintext") defer func() { if err := recover(); err != nil { fmt.Println("消费中断", err)