修复kafka
This commit is contained in:
		
							parent
							
								
									bd6bd6f321
								
							
						
					
					
						commit
						ce0dd22425
					
				|  | @ -58,13 +58,13 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { | ||||||
| 	kfconfig := &kafka.ConfigMap{ | 	kfconfig := &kafka.ConfigMap{ | ||||||
| 		"bootstrap.servers":     config.GetConf().KafkaUrl, // Kafka服务器地址
 | 		"bootstrap.servers":     config.GetConf().KafkaUrl, // Kafka服务器地址
 | ||||||
| 		"group.id":              group,                     // 消费者组ID
 | 		"group.id":              group,                     // 消费者组ID
 | ||||||
| 		"auto.offset.reset":     "earliest",                // 自动从最早的消息开始消费earliest,latest
 | 		"auto.offset.reset":     "latest",                  // 自动从最早的消息开始消费earliest,latest
 | ||||||
| 		"heartbeat.interval.ms": 1000, | 		"heartbeat.interval.ms": 1000, | ||||||
| 		"session.timeout.ms":    45000, | 		"session.timeout.ms":    45000, | ||||||
| 		"max.poll.interval.ms":  300000, // 5 分钟, 防止积压的时候认为掉线了
 | 		"max.poll.interval.ms":  300000, // 5 分钟, 防止积压的时候认为掉线了
 | ||||||
| 		"enable.auto.commit":    false, | 		"enable.auto.commit":    false, | ||||||
| 	} | 	} | ||||||
| 
 | 	kfconfig.SetKey("security.protocol", "plaintext") | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		if err := recover(); err != nil { | 		if err := recover(); err != nil { | ||||||
| 			fmt.Println("消费中断", err) | 			fmt.Println("消费中断", err) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue