增加重连
This commit is contained in:
		
							parent
							
								
									6da589a937
								
							
						
					
					
						commit
						46adc9c617
					
				|  | @ -127,7 +127,7 @@ func (t *TcpHelper) resend() { | ||||||
| 				var nowTime = time.Now().Unix() | 				var nowTime = time.Now().Unix() | ||||||
| 				for _, v := range rs { | 				for _, v := range rs { | ||||||
| 					json.Unmarshal([]byte(v), &data) | 					json.Unmarshal([]byte(v), &data) | ||||||
| 					if (data["send_time"].(float64) - float64(nowTime)) > 60 { | 					if (float64(nowTime) - data["send_time"].(float64)) > 60 { | ||||||
| 						t.SendMsg([]byte(v)) | 						t.SendMsg([]byte(v)) | ||||||
| 					} | 					} | ||||||
| 				} | 				} | ||||||
|  |  | ||||||
|  | @ -103,6 +103,8 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { | ||||||
| 				} else { | 				} else { | ||||||
| 					//utils.Log(nil, "对列放开")
 | 					//utils.Log(nil, "对列放开")
 | ||||||
| 				} | 				} | ||||||
|  | 
 | ||||||
|  | 				time.Sleep(3 * time.Minute) | ||||||
| 				msg, err := consumer.ReadMessage(1 * time.Second) | 				msg, err := consumer.ReadMessage(1 * time.Second) | ||||||
| 				if err == nil { | 				if err == nil { | ||||||
| 					utils.Log(nil, "offset", msg.TopicPartition.Offset) | 					utils.Log(nil, "offset", msg.TopicPartition.Offset) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue