diff --git a/app/handlers/mq/quenue.go b/app/handlers/mq/quenue.go
index 901e44a..184548b 100644
--- a/app/handlers/mq/quenue.go
+++ b/app/handlers/mq/quenue.go
@@ -26,8 +26,8 @@ func StartQunueServer() error {
 	if config.GetConf().StartQunue == 1 {
 		for i := 0; i < 1; i++ {
 			fmt.Println("对列" + strconv.Itoa(i))
-			startQunue(config.GetConf().Topical, OrderCharge, common.MQ_KFK_V2, 0, "", i)         //单聊
-			startQunue(config.GetConf().DispatchTopical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊
+			startQunue(config.GetConf().Topical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //
+			//startQunue(config.GetConf().DispatchTopical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //分发
 		}
 	}
 	select {}
diff --git a/app/http/tcppool/single.go b/app/http/tcppool/single.go
index 174be28..ba6a88c 100644
--- a/app/http/tcppool/single.go
+++ b/app/http/tcppool/single.go
@@ -6,11 +6,11 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"github.com/confluentinc/confluent-kafka-go/kafka"
 	"github.com/qit-team/snow-core/redis"
 	"net"
 	"quenue/app/utils"
 	"quenue/config"
-	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -28,6 +28,7 @@ type TcpHelper struct {
 	client   net.Conn
 	lastTime int64
 	Full     *int32
+	Comsumer *kafka.Consumer
 }
 
 func (t *TcpHelper) Init(port string) *TcpHelper {
@@ -37,6 +38,7 @@ func (t *TcpHelper) Init(port string) *TcpHelper {
 			t.client = conn
 			atomic.StoreInt32(t.Full, 0)
 			t.watch(t.client)
+			t.heart()
 			t.resend()
 		} else {
 			atomic.StoreInt32(t.Full, 1)
@@ -53,6 +55,7 @@ func (t *TcpHelper) reconnect(port string) {
 		atomic.StoreInt32(t.Full, 0)
 		t.client = conn
 		t.watch(t.client)
+		t.heart()
 	} else {
 		//utils.Log(nil, "重连下游")
 		time.Sleep(1 * time.Second)
@@ -73,44 +76,6 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
 	_, err := clinet.Write(msg)
 	var end = time.Now().Unix()
 	fmt.Println(end-start, "秒")
-	buf := make([]byte, 1024)
-	reader := bufio.NewReader(clinet)
-	line, err := reader.ReadString('\n')
-	if err != nil {
-		fmt.Println("Error reading from connection:", err)
-		return err
-	}
-	fmt.Println("结果:recvStr:", line)
-	// 将读取到的字符串转换为字节切片
-	buf = []byte(line)
-	if err == nil {
-		if len(buf) > 0 {
-			recvStr := string(buf)
-			recvStr = strings.Replace(recvStr, "\n", "", 1)
-			if len(recvStr) > 2 {
-				var orderNo = recvStr
-				utils.LogFile(nil, "ack", orderNo)
-				if _, ok := OrderMap.Load(orderNo); !ok {
-					OrderMap.Store(orderNo, "1")
-					return nil
-				}
-			} else {
-				if len(recvStr) > 0 {
-					fmt.Println("结果:recvStr:", recvStr)
-					if recvStr == "5" {
-						fmt.Println("客户端繁忙")
-						atomic.StoreInt32(t.Full, 1)
-					} else if recvStr == "2" {
-						fmt.Println("客户端空闲")
-						atomic.StoreInt32(t.Full, 0)
-					}
-				}
-
-			}
-
-		}
-		return errors.New("无效ack")
-	}
 	return err
 }
 func (t *TcpHelper) Close(conn net.Conn) {
@@ -133,6 +98,10 @@ func (t *TcpHelper) resend() {
 				var data = map[string]interface{}{}
 				var nowTime = time.Now().Unix()
 				for _, v := range rs {
+					if atomic.LoadInt32(t.Full) == 1 {
+						time.Sleep(1 * time.Second)
+						continue
+					}
 					json.Unmarshal([]byte(v), &data)
 					if (float64(nowTime) - data["send_time"].(float64)) > 60 {
 						t.SendMsg([]byte(v))
@@ -146,6 +115,25 @@ func (t *TcpHelper) resend() {
 
 }
 
+func (t *TcpHelper) heart() {
+	go func() {
+		defer func() {
+			if err := recover(); err != nil {
+				utils.Log(nil, "err", err)
+			}
+		}()
+		for {
+			if t.client == nil {
+				return
+			}
+			t.client.SetWriteDeadline(time.Now().Add(expire))
+			t.client.Write([]byte("1\n"))
+			time.Sleep(2 * time.Second)
+		}
+	}()
+
+}
+
 func (t *TcpHelper) watch(conn net.Conn) {
 	go func() {
 		defer func() {
@@ -154,12 +142,8 @@ func (t *TcpHelper) watch(conn net.Conn) {
 			}
 		}()
 		for {
-			if t.client == nil {
-				return
-			}
-			conn.SetWriteDeadline(time.Now().Add(expire))
-			_, err := conn.Write([]byte("1\n"))
 
+			var err error
 			if err != nil {
 				//utils.Log(nil, "连接关闭", err)
 				atomic.StoreInt32(t.Full, 1)
@@ -167,32 +151,58 @@ func (t *TcpHelper) watch(conn net.Conn) {
 				t.reconnect(config.GetConf().OrderPort)
 				return
 			} else {
-				var buffer = make([]byte, 1)
+				var buffer = make([]byte, 1024)
 				// 持续读取数据
 				t.client.SetReadDeadline(time.Now().Add(expire))
-				n, err := t.client.Read(buffer[:])
-				if err == nil && n > 0 {
-					recvStr := string(buffer[:n])
+				//n, err := t.client.Read(buffer[:])
+				reader := bufio.NewReader(t.client)
+				line, err := reader.ReadString('\n')
+				buffer = []byte(line)
+				if err == nil && len(buffer) > 0 {
+					recvStr := string(buffer[:len(buffer)-1])
 					//fmt.Println("结果:recvStr:", recvStr)
-					if recvStr == "5" {
-						utils.Log(nil, "客户端繁忙")
-						atomic.StoreInt32(t.Full, 1)
-					} else if recvStr == "2" {
-						utils.Log(nil, "客户端空闲")
-						atomic.StoreInt32(t.Full, 0)
-					} else if recvStr == "6" {
-						utils.Log(nil, "客户端心跳")
-						atomic.StoreInt32(t.Full, 0)
+					if len(recvStr) > 1 {
+						//手动提交编译量
+						var partion, ok = OrderMap.Load(recvStr)
+						if ok {
+							CommitOffset(t.Comsumer, partion.(kafka.TopicPartition))
+							OrderMap.Delete(recvStr)
+						}
+					} else {
+						if recvStr == "5" {
+							utils.Log(nil, "客户端繁忙")
+							atomic.StoreInt32(t.Full, 1)
+						} else if recvStr == "2" {
+							utils.Log(nil, "客户端空闲")
+							atomic.StoreInt32(t.Full, 0)
+						} else if recvStr == "6" {
+							utils.Log(nil, "客户端心跳")
+							//atomic.StoreInt32(t.Full, 1)
+						}
 					}
+
 				} else {
 					atomic.StoreInt32(t.Full, 1)
 					//utils.Log(nil, "连接关闭", err)
-					t.client.Close()
-					t.client = nil
-					t.reconnect(config.GetConf().OrderPort)
+					if t.client != nil {
+						t.client.Close()
+						conn = nil
+						t.client = nil
+						t.reconnect(config.GetConf().OrderPort)
+					}
 				}
 			}
-			time.Sleep(2 * time.Second)
+			//time.Sleep(2 * time.Second)
 		}
 	}()
 }
+func CommitOffset(consumer *kafka.Consumer, tp kafka.TopicPartition) {
+	// 创建一个偏移量提交请求
+	offsets := []kafka.TopicPartition{tp}
+	commit, err := consumer.CommitOffsets(offsets)
+	if err != nil {
+		utils.Log(nil, "Failed to commit offset: %v", err)
+	} else {
+		utils.Log(nil, "Committed offset: %v", commit)
+	}
+}
diff --git a/app/utils/mq/kafka_v2.go b/app/utils/mq/kafka_v2.go
index 0f4cbf1..3a2e400 100644
--- a/app/utils/mq/kafka_v2.go
+++ b/app/utils/mq/kafka_v2.go
@@ -58,7 +58,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
 	kfconfig := &kafka.ConfigMap{
 		"bootstrap.servers":     config.GetConf().KafkaUrl, // Kafka服务器地址
 		"group.id":              group,                     // 消费者组ID
-		"auto.offset.reset":     "latest",                  // 自动从最早的消息开始消费earliest,latest
+		"auto.offset.reset":     "earliest",                // 自动从最早的消息开始消费earliest,latest
 		"heartbeat.interval.ms": 1000,
 		"session.timeout.ms":    45000,
 		"max.poll.interval.ms":  300000, // 5 分钟, 防止积压的时候认为掉线了
@@ -71,13 +71,14 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
 		}
 	}()
 	utils.Log(nil, "kafka config", kfconfig)
-	var start = time.Now()
-	var end time.Time
+	//var start = time.Now()
+	//var end time.Time
 	consumer, err := kafka.NewConsumer(kfconfig)
 
 	if err != nil {
 		utils.Log(nil, "comsume", err)
 	}
+	tcppool.TcpFactory.Comsumer = consumer
 	defer consumer.Close()
 	err = consumer.Subscribe(name, func(c *kafka.Consumer, event kafka.Event) error {
 		switch ev := event.(type) {
@@ -110,6 +111,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
 				}
 				msg, err := consumer.ReadMessage(1 * time.Second)
 				if err == nil {
+					tcppool.OrderMap.Store(string(msg.Key), msg.TopicPartition)
 					utils.Log(nil, "offset", msg.TopicPartition.Offset)
 					fmt.Println(msg.TopicPartition.Partition, "分区")
 					var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error)
@@ -146,11 +148,11 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
 					utils.LogFile(nil, "send msg", string(msg.Key), err)
 					if err == nil {
 						//手动提交编译量
-						kk.commitOffset(consumer, msg.TopicPartition)
-						atomic.AddInt32(workNum, 1)
-						tcppool.OrderMap.Delete(string(msg.Key))
-						end = time.Now()
-						utils.Log(nil, "消费耗时", end.Sub(start), "消息数", *workNum)
+						//kk.commitOffset(consumer, msg.TopicPartition)
+						//atomic.AddInt32(workNum, 1)
+						//tcppool.OrderMap.Delete(string(msg.Key))
+						//end = time.Now()
+						//utils.Log(nil, "消费耗时", end.Sub(start), "消息数", *workNum)
 
 					}
 				} else {