增加dispacher对列

This commit is contained in:
qiyunfanbo126.com 2025-02-08 11:07:09 +08:00
parent bed1010283
commit bd6bd6f321
3 changed files with 83 additions and 71 deletions

View File

@ -26,8 +26,8 @@ func StartQunueServer() error {
if config.GetConf().StartQunue == 1 {
for i := 0; i < 1; i++ {
fmt.Println("对列" + strconv.Itoa(i))
startQunue(config.GetConf().Topical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊
startQunue(config.GetConf().DispatchTopical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊
startQunue(config.GetConf().Topical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //
//startQunue(config.GetConf().DispatchTopical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //分发
}
}
select {}

View File

@ -6,11 +6,11 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/qit-team/snow-core/redis"
"net"
"quenue/app/utils"
"quenue/config"
"strings"
"sync"
"sync/atomic"
"time"
@ -28,6 +28,7 @@ type TcpHelper struct {
client net.Conn
lastTime int64
Full *int32
Comsumer *kafka.Consumer
}
func (t *TcpHelper) Init(port string) *TcpHelper {
@ -37,6 +38,7 @@ func (t *TcpHelper) Init(port string) *TcpHelper {
t.client = conn
atomic.StoreInt32(t.Full, 0)
t.watch(t.client)
t.heart()
t.resend()
} else {
atomic.StoreInt32(t.Full, 1)
@ -53,6 +55,7 @@ func (t *TcpHelper) reconnect(port string) {
atomic.StoreInt32(t.Full, 0)
t.client = conn
t.watch(t.client)
t.heart()
} else {
//utils.Log(nil, "重连下游")
time.Sleep(1 * time.Second)
@ -73,44 +76,6 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
_, err := clinet.Write(msg)
var end = time.Now().Unix()
fmt.Println(end-start, "秒")
buf := make([]byte, 1024)
reader := bufio.NewReader(clinet)
line, err := reader.ReadString('\n')
if err != nil {
fmt.Println("Error reading from connection:", err)
return err
}
fmt.Println("结果recvStr:", line)
// 将读取到的字符串转换为字节切片
buf = []byte(line)
if err == nil {
if len(buf) > 0 {
recvStr := string(buf)
recvStr = strings.Replace(recvStr, "\n", "", 1)
if len(recvStr) > 2 {
var orderNo = recvStr
utils.LogFile(nil, "ack", orderNo)
if _, ok := OrderMap.Load(orderNo); !ok {
OrderMap.Store(orderNo, "1")
return nil
}
} else {
if len(recvStr) > 0 {
fmt.Println("结果recvStr:", recvStr)
if recvStr == "5" {
fmt.Println("客户端繁忙")
atomic.StoreInt32(t.Full, 1)
} else if recvStr == "2" {
fmt.Println("客户端空闲")
atomic.StoreInt32(t.Full, 0)
}
}
}
}
return errors.New("无效ack")
}
return err
}
func (t *TcpHelper) Close(conn net.Conn) {
@ -133,6 +98,10 @@ func (t *TcpHelper) resend() {
var data = map[string]interface{}{}
var nowTime = time.Now().Unix()
for _, v := range rs {
if atomic.LoadInt32(t.Full) == 1 {
time.Sleep(1 * time.Second)
continue
}
json.Unmarshal([]byte(v), &data)
if (float64(nowTime) - data["send_time"].(float64)) > 60 {
t.SendMsg([]byte(v))
@ -146,6 +115,25 @@ func (t *TcpHelper) resend() {
}
func (t *TcpHelper) heart() {
go func() {
defer func() {
if err := recover(); err != nil {
utils.Log(nil, "err", err)
}
}()
for {
if t.client == nil {
return
}
t.client.SetWriteDeadline(time.Now().Add(expire))
t.client.Write([]byte("1\n"))
time.Sleep(2 * time.Second)
}
}()
}
func (t *TcpHelper) watch(conn net.Conn) {
go func() {
defer func() {
@ -154,12 +142,8 @@ func (t *TcpHelper) watch(conn net.Conn) {
}
}()
for {
if t.client == nil {
return
}
conn.SetWriteDeadline(time.Now().Add(expire))
_, err := conn.Write([]byte("1\n"))
var err error
if err != nil {
//utils.Log(nil, "连接关闭", err)
atomic.StoreInt32(t.Full, 1)
@ -167,32 +151,58 @@ func (t *TcpHelper) watch(conn net.Conn) {
t.reconnect(config.GetConf().OrderPort)
return
} else {
var buffer = make([]byte, 1)
var buffer = make([]byte, 1024)
// 持续读取数据
t.client.SetReadDeadline(time.Now().Add(expire))
n, err := t.client.Read(buffer[:])
if err == nil && n > 0 {
recvStr := string(buffer[:n])
//n, err := t.client.Read(buffer[:])
reader := bufio.NewReader(t.client)
line, err := reader.ReadString('\n')
buffer = []byte(line)
if err == nil && len(buffer) > 0 {
recvStr := string(buffer[:len(buffer)-1])
//fmt.Println("结果recvStr:", recvStr)
if recvStr == "5" {
utils.Log(nil, "客户端繁忙")
atomic.StoreInt32(t.Full, 1)
} else if recvStr == "2" {
utils.Log(nil, "客户端空闲")
atomic.StoreInt32(t.Full, 0)
} else if recvStr == "6" {
utils.Log(nil, "客户端心跳")
atomic.StoreInt32(t.Full, 0)
if len(recvStr) > 1 {
//手动提交编译量
var partion, ok = OrderMap.Load(recvStr)
if ok {
CommitOffset(t.Comsumer, partion.(kafka.TopicPartition))
OrderMap.Delete(recvStr)
}
} else {
if recvStr == "5" {
utils.Log(nil, "客户端繁忙")
atomic.StoreInt32(t.Full, 1)
} else if recvStr == "2" {
utils.Log(nil, "客户端空闲")
atomic.StoreInt32(t.Full, 0)
} else if recvStr == "6" {
utils.Log(nil, "客户端心跳")
//atomic.StoreInt32(t.Full, 1)
}
}
} else {
atomic.StoreInt32(t.Full, 1)
//utils.Log(nil, "连接关闭", err)
t.client.Close()
t.client = nil
t.reconnect(config.GetConf().OrderPort)
if t.client != nil {
t.client.Close()
conn = nil
t.client = nil
t.reconnect(config.GetConf().OrderPort)
}
}
}
time.Sleep(2 * time.Second)
//time.Sleep(2 * time.Second)
}
}()
}
func CommitOffset(consumer *kafka.Consumer, tp kafka.TopicPartition) {
// 创建一个偏移量提交请求
offsets := []kafka.TopicPartition{tp}
commit, err := consumer.CommitOffsets(offsets)
if err != nil {
utils.Log(nil, "Failed to commit offset: %v", err)
} else {
utils.Log(nil, "Committed offset: %v", commit)
}
}

View File

@ -58,7 +58,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
kfconfig := &kafka.ConfigMap{
"bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址
"group.id": group, // 消费者组ID
"auto.offset.reset": "latest", // 自动从最早的消息开始消费earliest,latest
"auto.offset.reset": "earliest", // 自动从最早的消息开始消费earliest,latest
"heartbeat.interval.ms": 1000,
"session.timeout.ms": 45000,
"max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了
@ -71,13 +71,14 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
}
}()
utils.Log(nil, "kafka config", kfconfig)
var start = time.Now()
var end time.Time
//var start = time.Now()
//var end time.Time
consumer, err := kafka.NewConsumer(kfconfig)
if err != nil {
utils.Log(nil, "comsume", err)
}
tcppool.TcpFactory.Comsumer = consumer
defer consumer.Close()
err = consumer.Subscribe(name, func(c *kafka.Consumer, event kafka.Event) error {
switch ev := event.(type) {
@ -110,6 +111,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
}
msg, err := consumer.ReadMessage(1 * time.Second)
if err == nil {
tcppool.OrderMap.Store(string(msg.Key), msg.TopicPartition)
utils.Log(nil, "offset", msg.TopicPartition.Offset)
fmt.Println(msg.TopicPartition.Partition, "分区")
var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error)
@ -146,11 +148,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
utils.LogFile(nil, "send msg", string(msg.Key), err)
if err == nil {
//手动提交编译量
kk.commitOffset(consumer, msg.TopicPartition)
atomic.AddInt32(workNum, 1)
tcppool.OrderMap.Delete(string(msg.Key))
end = time.Now()
utils.Log(nil, "消费耗时", end.Sub(start), "消息数", *workNum)
//kk.commitOffset(consumer, msg.TopicPartition)
//atomic.AddInt32(workNum, 1)
//tcppool.OrderMap.Delete(string(msg.Key))
//end = time.Now()
//utils.Log(nil, "消费耗时", end.Sub(start), "消息数", *workNum)
}
} else {