From bed101028311acffeeed193c833315019cbbf299 Mon Sep 17 00:00:00 2001 From: "qiyunfanbo126.com" <815699> Date: Fri, 7 Feb 2025 14:01:41 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0dispacher=E5=AF=B9=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/handlers/mq/quenue.go | 3 ++- app/http/entities/order.go | 1 + app/http/tcppool/single.go | 22 +++++++++++++++++----- app/utils/mq/kafka_v2.go | 17 ++++++++++++----- app/utils/util.go | 21 +++++++++++++++++++++ config/config.go | 2 ++ main.go | 7 +++++++ 7 files changed, 62 insertions(+), 11 deletions(-) diff --git a/app/handlers/mq/quenue.go b/app/handlers/mq/quenue.go index 12c3db9..901e44a 100644 --- a/app/handlers/mq/quenue.go +++ b/app/handlers/mq/quenue.go @@ -26,7 +26,8 @@ func StartQunueServer() error { if config.GetConf().StartQunue == 1 { for i := 0; i < 1; i++ { fmt.Println("对列" + strconv.Itoa(i)) - startQunue(config.GetConf().Topical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊 + startQunue(config.GetConf().Topical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊 + startQunue(config.GetConf().DispatchTopical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊 } } select {} diff --git a/app/http/entities/order.go b/app/http/entities/order.go index 302dccc..ebeb142 100644 --- a/app/http/entities/order.go +++ b/app/http/entities/order.go @@ -5,4 +5,5 @@ type MqMessage struct { Property map[string]interface{} `json:"property"` Key string `json:"serial_number"` SendTime int64 `json:"send_time"` + Topical string `json:"topic"` } diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go index a8579a6..174be28 100644 --- a/app/http/tcppool/single.go +++ b/app/http/tcppool/single.go @@ -57,7 +57,7 @@ func (t *TcpHelper) reconnect(port string) { //utils.Log(nil, "重连下游") time.Sleep(1 * time.Second) atomic.StoreInt32(t.Full, 1) - t.client = conn + //t.client = conn t.reconnect(port) } } @@ -80,6 +80,7 @@ func (t *TcpHelper) SendMsg(msg []byte) error { fmt.Println("Error reading from connection:", err) return err } + fmt.Println("结果:recvStr:", line) // 将读取到的字符串转换为字节切片 buf = []byte(line) if err == nil { @@ -88,13 +89,14 @@ func (t *TcpHelper) SendMsg(msg []byte) error { recvStr = strings.Replace(recvStr, "\n", "", 1) if len(recvStr) > 2 { var orderNo = recvStr + utils.LogFile(nil, "ack", orderNo) if _, ok := OrderMap.Load(orderNo); !ok { OrderMap.Store(orderNo, "1") return nil } } else { if len(recvStr) > 0 { - //fmt.Println("结果:recvStr:", recvStr) + fmt.Println("结果:recvStr:", recvStr) if recvStr == "5" { fmt.Println("客户端繁忙") atomic.StoreInt32(t.Full, 1) @@ -121,6 +123,11 @@ func (t *TcpHelper) resend() { if err := recover(); err != nil { utils.Log(nil, "重发失败", err) } + }() + for { + if t.client == nil { + return + } rs, err := redis.GetRedis().HGetAll(context.Background(), "kafka_message").Result() if err == nil { var data = map[string]interface{}{} @@ -130,10 +137,11 @@ func (t *TcpHelper) resend() { if (float64(nowTime) - data["send_time"].(float64)) > 60 { t.SendMsg([]byte(v)) } + time.Sleep(200 * time.Millisecond) } } time.Sleep(3 * time.Minute) - }() + } }() } @@ -142,10 +150,13 @@ func (t *TcpHelper) watch(conn net.Conn) { go func() { defer func() { if err := recover(); err != nil { - //fmt.Println("连接断开", err) + fmt.Println("连接断开", err) } }() for { + if t.client == nil { + return + } conn.SetWriteDeadline(time.Now().Add(expire)) _, err := conn.Write([]byte("1\n")) @@ -177,10 +188,11 @@ func (t *TcpHelper) watch(conn net.Conn) { atomic.StoreInt32(t.Full, 1) //utils.Log(nil, "连接关闭", err) t.client.Close() + t.client = nil t.reconnect(config.GetConf().OrderPort) } } - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) } }() } diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go index a5b8bbe..0f4cbf1 100644 --- a/app/utils/mq/kafka_v2.go +++ b/app/utils/mq/kafka_v2.go @@ -51,15 +51,20 @@ func (kk KafkaV2Mq) Produce(name string, log interface{}, delayTime int, args .. func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { utils.Log(nil, "hand", ci) + var group = config.GetConf().KafkaGroup + if name == config.GetConf().DispatchTopical { + group = config.GetConf().DidpatchKafkaGroup + } kfconfig := &kafka.ConfigMap{ - "bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址 - "group.id": config.GetConf().KafkaGroup, // 消费者组ID - "auto.offset.reset": "latest", // 自动从最早的消息开始消费 + "bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址 + "group.id": group, // 消费者组ID + "auto.offset.reset": "latest", // 自动从最早的消息开始消费earliest,latest "heartbeat.interval.ms": 1000, "session.timeout.ms": 45000, "max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了 "enable.auto.commit": false, } + defer func() { if err := recover(); err != nil { fmt.Println("消费中断", err) @@ -111,7 +116,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { var mqsg = entities.MqMessage{} mqsg.Key = string(msg.Key) mqsg.Property = make(map[string]interface{}) - + mqsg.Topical = name if len(msg.Value) > 0 { if msg.Headers != nil { for _, v := range msg.Headers { @@ -128,6 +133,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"} } mqsg.SendTime = time.Now().Unix() + utils.LogFile(nil, "msg", string(msg.Key)) if mqsg.Key == "734760617161662465" { fmt.Println("消费消息", mqsg.Key) } else { @@ -137,6 +143,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { var data, _ = json.Marshal(mqsg) _, err = redis.GetRedis().HSet(context.Background(), "kafka_message", string(msg.Key), data).Result() err = handler(0, nil, data) + utils.LogFile(nil, "send msg", string(msg.Key), err) if err == nil { //手动提交编译量 kk.commitOffset(consumer, msg.TopicPartition) @@ -147,7 +154,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) { } } else { - //utils.Log(nil, "Error while consuming: %v\n", err) + utils.Log(nil, "Error while consuming: %v\n", err) } } }() diff --git a/app/utils/util.go b/app/utils/util.go index 618c649..0632f8c 100644 --- a/app/utils/util.go +++ b/app/utils/util.go @@ -1,9 +1,12 @@ package utils import ( + "encoding/json" "fmt" "github.com/gin-gonic/gin" + "os" "runtime" + "strconv" "time" ) @@ -11,4 +14,22 @@ func Log(c *gin.Context, name string, msg ...interface{}) { _, file, line, _ := runtime.Caller(1) var datetime = time.Now().Format(time.DateTime) fmt.Println(name, msg, file, line, datetime) + +} +func LogFile(c *gin.Context, name string, msg ...interface{}) { + _, rfile, line, _ := runtime.Caller(1) + var datetime = time.Now().Format(time.DateTime) + file, err := os.OpenFile("log.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + fmt.Println("Error opening file:", err) + return + } + defer file.Close() + var data, _ = json.Marshal(map[string]interface{}{"data": msg}) + // 写入内容到文件末尾 + _, err = file.WriteString("file:" + rfile + " line:" + strconv.Itoa(line) + " 时间:" + datetime + ":" + name + ":" + string(data) + "!\n") + if err != nil { + fmt.Println("Error writing to file:", err) + return + } } diff --git a/config/config.go b/config/config.go index ee59235..ec4bb53 100644 --- a/config/config.go +++ b/config/config.go @@ -35,6 +35,8 @@ type Config struct { Num int `toml:"Num"` Url string `toml:"Url"` Topical string `toml:"Topical"` + DispatchTopical string `toml:"DispatchTopical"` + DidpatchKafkaGroup string `toml:"DidPatchKafkaGroup"` } func newConfig() *Config { diff --git a/main.go b/main.go index 3d2632b..14503af 100644 --- a/main.go +++ b/main.go @@ -98,6 +98,13 @@ func startServer(opts *config.Options) (err error) { return } //err = tcppool.TcpFactory.Init("9502").SendMsg([]byte("123")) + //for i := 0; i < 10000; i++ { + // var mqsg = entities.MqMessage{} + // mqsg.Key = time.Now().Format("20060102150405") + strconv.Itoa(int(rand.Int32N(1000000))) + // mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"} + // mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"} + // mqs.MqManager.GetMqByName(common.MQ_KFK).Produce("platform_all", "", 0) + //} //引导程序 err = bootstrap.Bootstrap(conf)