diff --git a/app/http/entities/order.go b/app/http/entities/order.go index dd680ff..302dccc 100644 --- a/app/http/entities/order.go +++ b/app/http/entities/order.go @@ -4,4 +4,5 @@ type MqMessage struct { Body map[string]interface{} `json:"body"` Property map[string]interface{} `json:"property"` Key string `json:"serial_number"` + SendTime int64 `json:"send_time"` } diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index d4723f4..5f1f429 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -2,8 +2,11 @@ package tcppool import ( "bufio" + "context" + "encoding/json" "errors" "fmt" + "github.com/qit-team/snow-core/redis" "net" "quenue/app/utils" "quenue/config" @@ -34,6 +37,7 @@ func (t *TcpHelper) Init(port string) *TcpHelper { t.client = conn atomic.StoreInt32(t.Full, 0) t.watch(t.client) + t.resend() } else { atomic.StoreInt32(t.Full, 1) t.reconnect(port) @@ -110,6 +114,30 @@ func (t *TcpHelper) SendMsg(msg []byte) error { func (t *TcpHelper) Close(conn net.Conn) { t.client.Close() } + +func (t *TcpHelper) resend() { + go func() { + defer func() { + if err := recover(); err != nil { + utils.Log(nil, "重发失败", err) + } + rs, err := redis.GetRedis().HGetAll(context.Background(), "kafka_message").Result() + if err == nil { + var data = map[string]interface{}{} + var nowTime = time.Now().Unix() + for _, v := range rs { + json.Unmarshal([]byte(v), &data) + if (data["send_time"].(float64) - float64(nowTime)) > 60 { + t.SendMsg([]byte(v)) + } + } + } + time.Sleep(3 * time.Minute) + }() + }() + +} + func (t *TcpHelper) watch(conn net.Conn) { go func() { defer func() { diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index 59ee0c1..6dddb7c 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -1,9 +1,11 @@ package mqs import ( + "context" "encoding/json" "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/qit-team/snow-core/redis" "math/rand/v2" "quenue/app/http/entities" "quenue/app/http/tcppool" @@ -109,6 +111,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { var mqsg = entities.MqMessage{} mqsg.Key = string(msg.Key) mqsg.Property = make(map[string]interface{}) + if len(msg.Value) > 0 { if msg.Headers != nil { for _, v := range msg.Headers { @@ -124,12 +127,14 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"} mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"} } + mqsg.SendTime = time.Now().Unix() if mqsg.Key == "734760617161662465" { fmt.Println("消费消息", mqsg.Key) } else { fmt.Println(mqsg.Key) } var data, _ = json.Marshal(mqsg) + _, err = redis.GetRedis().HSet(context.Background(), "kafka_message", string(msg.Key), data).Result() err = handler(0, nil, data) if err == nil { //手动提交编译量 diff --git a/app/utils/tcp.go b/app/utils/tcp.go index c00206a..cc05139 100644 --- a/app/utils/tcp.go +++ b/app/utils/tcp.go @@ -2,8 +2,10 @@ package utils import ( "bufio" + "context" "encoding/json" "fmt" + "github.com/qit-team/snow-core/redis" "net" "sync/atomic" "time" @@ -42,6 +44,9 @@ func handleConnection(conn net.Conn) { // 将读取到的字符串转换为字节切片 buf = []byte(line) if err == nil && len(buf) > 2 { + var data map[string]interface{} + json.Unmarshal(buf, &data) + redis.GetRedis().HDel(context.Background(), "kafka_message", data["serial_number"].(string)).Result() if atomic.LoadInt32(workSingleNum) >= int32(maxNum) { if atomic.LoadInt32(busySingle) == 0 { fmt.Println("繁忙") @@ -51,10 +56,11 @@ func handleConnection(conn net.Conn) { } } else { - var data map[string]interface{} - json.Unmarshal(buf, &data) + fmt.Println("收到", *workSingleNum, maxNum, string(buf)) conn.Write([]byte(data["serial_number"].(string) + "\n")) + fmt.Println(data["serial_number"].(string), "ggggggggggggg") + go handLogacal(conn, buf) atomic.AddInt32(workSingleNum, 1) conn.Write([]byte("2\n"))