完善消费

This commit is contained in:
qiyunfanbo126.com 2025-01-20 17:11:48 +08:00
parent 49034f102c
commit f5d5cf28d1
2 changed files with 23 additions and 10 deletions

View File

@ -28,19 +28,29 @@ func (t *TcpHelper) Init(port string) *TcpHelper {
var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, 10*time.Second) var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, 10*time.Second)
if err == nil { if err == nil {
t.client = conn t.client = conn
atomic.StoreInt32(t.Full, 0)
t.watch(t.client) t.watch(t.client)
} else { } else {
atomic.StoreInt32(t.Full, 1)
t.reconnect(port)
utils.Log(nil, config.GetConf().Url+":"+port) utils.Log(nil, config.GetConf().Url+":"+port)
} }
}) })
return t return t
} }
func (t *TcpHelper) reconnect(port string) { func (t *TcpHelper) reconnect(port string) {
var conn, err = net.Dial("tcp", "192.168.110.50:"+port) var conn, err = net.Dial("tcp", config.GetConf().Url+":"+port)
if err == nil { if err == nil {
utils.Log(nil, "连接下游失败")
atomic.StoreInt32(t.Full, 0) atomic.StoreInt32(t.Full, 0)
t.client = conn t.client = conn
t.watch(t.client) t.watch(t.client)
} else {
utils.Log(nil, "重连下游")
time.Sleep(1 * time.Second)
atomic.StoreInt32(t.Full, 1)
t.client = conn
t.reconnect(port)
} }
} }
func (t *TcpHelper) SendMsg(msg []byte) error { func (t *TcpHelper) SendMsg(msg []byte) error {
@ -50,9 +60,10 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
return errors.New("客户端连接失败") return errors.New("客户端连接失败")
} }
var start = time.Now().Unix() var start = time.Now().Unix()
_, err := t.Init(config.GetConf().OrderPort).client.Write(msg) var clinet = t.Init(config.GetConf().OrderPort).client
clinet.SetReadDeadline(time.Now().Add(time.Second * 5))
_, err := clinet.Write(msg)
var end = time.Now().Unix() var end = time.Now().Unix()
fmt.Println(end-start, "秒") fmt.Println(end-start, "秒")
var buffer [1]byte var buffer [1]byte
// 持续读取数据 // 持续读取数据
@ -69,7 +80,6 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
atomic.StoreInt32(t.Full, 0) atomic.StoreInt32(t.Full, 0)
} }
} }
} }
return err return err
} }
@ -78,14 +88,13 @@ func (t *TcpHelper) Close(conn net.Conn) {
} }
func (t *TcpHelper) watch(conn net.Conn) { func (t *TcpHelper) watch(conn net.Conn) {
go func() { go func() {
for { for {
fmt.Println("watch") conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
conn.SetWriteDeadline(time.Now().Add(time.Second * 15))
_, err := conn.Write([]byte("1")) _, err := conn.Write([]byte("1"))
if err != nil { if err != nil {
utils.Log(nil, "连接关闭", err) utils.Log(nil, "连接关闭", err)
atomic.StoreInt32(t.Full, 1)
t.client.Close() t.client.Close()
t.reconnect(config.GetConf().OrderPort) t.reconnect(config.GetConf().OrderPort)
return return

View File

@ -82,7 +82,6 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
for i := 0; i < config.GetConf().Num; i++ { for i := 0; i < config.GetConf().Num; i++ {
go func() { go func() {
for { for {
//fmt.Println(atomic.LoadInt32(tcppool.TcpFactory.Full))
if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 { if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 {
utils.Log(nil, "对列阻塞") utils.Log(nil, "对列阻塞")
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
@ -93,12 +92,12 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
msg, err := consumer.ReadMessage(1 * time.Second) msg, err := consumer.ReadMessage(1 * time.Second)
if err == nil { if err == nil {
utils.Log(nil, "offset", msg.TopicPartition.Offset) utils.Log(nil, "offset", msg.TopicPartition.Offset)
fmt.Println(msg.TopicPartition.Partition, "分区")
var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error) var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error)
var mqsg = entities.MqMessage{} var mqsg = entities.MqMessage{}
mqsg.Key = string(msg.Key) mqsg.Key = string(msg.Key)
mqsg.Property = make(map[string]interface{}) mqsg.Property = make(map[string]interface{})
if len(msg.Value) > 0 { if false {
if msg.Headers != nil { if msg.Headers != nil {
for _, v := range msg.Headers { for _, v := range msg.Headers {
if v.Key == "property" { if v.Key == "property" {
@ -113,6 +112,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"} mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"}
mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"} mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"}
} }
if mqsg.Key == "734760617161662465" {
fmt.Println("消费消息", mqsg.Key)
} else {
fmt.Println(mqsg.Key)
}
var data, _ = json.Marshal(mqsg) var data, _ = json.Marshal(mqsg)
err = handler(0, nil, data) err = handler(0, nil, data)
if err == nil { if err == nil {