增加dispacher对列

This commit is contained in:
qiyunfanbo126.com 2025-02-07 14:01:41 +08:00
parent 6bc8ae6108
commit bed1010283
7 changed files with 62 additions and 11 deletions

View File

@ -26,7 +26,8 @@ func StartQunueServer() error {
if config.GetConf().StartQunue == 1 {
for i := 0; i < 1; i++ {
fmt.Println("对列" + strconv.Itoa(i))
startQunue(config.GetConf().Topical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊
startQunue(config.GetConf().Topical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊
startQunue(config.GetConf().DispatchTopical, OrderCharge, common.MQ_KFK_V2, 0, "", i) //单聊
}
}
select {}

View File

@ -5,4 +5,5 @@ type MqMessage struct {
Property map[string]interface{} `json:"property"`
Key string `json:"serial_number"`
SendTime int64 `json:"send_time"`
Topical string `json:"topic"`
}

View File

@ -57,7 +57,7 @@ func (t *TcpHelper) reconnect(port string) {
//utils.Log(nil, "重连下游")
time.Sleep(1 * time.Second)
atomic.StoreInt32(t.Full, 1)
t.client = conn
//t.client = conn
t.reconnect(port)
}
}
@ -80,6 +80,7 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
fmt.Println("Error reading from connection:", err)
return err
}
fmt.Println("结果recvStr:", line)
// 将读取到的字符串转换为字节切片
buf = []byte(line)
if err == nil {
@ -88,13 +89,14 @@ func (t *TcpHelper) SendMsg(msg []byte) error {
recvStr = strings.Replace(recvStr, "\n", "", 1)
if len(recvStr) > 2 {
var orderNo = recvStr
utils.LogFile(nil, "ack", orderNo)
if _, ok := OrderMap.Load(orderNo); !ok {
OrderMap.Store(orderNo, "1")
return nil
}
} else {
if len(recvStr) > 0 {
//fmt.Println("结果recvStr:", recvStr)
fmt.Println("结果recvStr:", recvStr)
if recvStr == "5" {
fmt.Println("客户端繁忙")
atomic.StoreInt32(t.Full, 1)
@ -121,6 +123,11 @@ func (t *TcpHelper) resend() {
if err := recover(); err != nil {
utils.Log(nil, "重发失败", err)
}
}()
for {
if t.client == nil {
return
}
rs, err := redis.GetRedis().HGetAll(context.Background(), "kafka_message").Result()
if err == nil {
var data = map[string]interface{}{}
@ -130,10 +137,11 @@ func (t *TcpHelper) resend() {
if (float64(nowTime) - data["send_time"].(float64)) > 60 {
t.SendMsg([]byte(v))
}
time.Sleep(200 * time.Millisecond)
}
}
time.Sleep(3 * time.Minute)
}()
}
}()
}
@ -142,10 +150,13 @@ func (t *TcpHelper) watch(conn net.Conn) {
go func() {
defer func() {
if err := recover(); err != nil {
//fmt.Println("连接断开", err)
fmt.Println("连接断开", err)
}
}()
for {
if t.client == nil {
return
}
conn.SetWriteDeadline(time.Now().Add(expire))
_, err := conn.Write([]byte("1\n"))
@ -177,10 +188,11 @@ func (t *TcpHelper) watch(conn net.Conn) {
atomic.StoreInt32(t.Full, 1)
//utils.Log(nil, "连接关闭", err)
t.client.Close()
t.client = nil
t.reconnect(config.GetConf().OrderPort)
}
}
time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
}
}()
}

View File

@ -51,15 +51,20 @@ func (kk KafkaV2Mq) Produce(name string, log interface{}, delayTime int, args ..
func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
utils.Log(nil, "hand", ci)
var group = config.GetConf().KafkaGroup
if name == config.GetConf().DispatchTopical {
group = config.GetConf().DidpatchKafkaGroup
}
kfconfig := &kafka.ConfigMap{
"bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址
"group.id": config.GetConf().KafkaGroup, // 消费者组ID
"auto.offset.reset": "latest", // 自动从最早的消息开始消费
"bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址
"group.id": group, // 消费者组ID
"auto.offset.reset": "latest", // 自动从最早的消息开始消费earliest,latest
"heartbeat.interval.ms": 1000,
"session.timeout.ms": 45000,
"max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了
"enable.auto.commit": false,
}
defer func() {
if err := recover(); err != nil {
fmt.Println("消费中断", err)
@ -111,7 +116,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
var mqsg = entities.MqMessage{}
mqsg.Key = string(msg.Key)
mqsg.Property = make(map[string]interface{})
mqsg.Topical = name
if len(msg.Value) > 0 {
if msg.Headers != nil {
for _, v := range msg.Headers {
@ -128,6 +133,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"}
}
mqsg.SendTime = time.Now().Unix()
utils.LogFile(nil, "msg", string(msg.Key))
if mqsg.Key == "734760617161662465" {
fmt.Println("消费消息", mqsg.Key)
} else {
@ -137,6 +143,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
var data, _ = json.Marshal(mqsg)
_, err = redis.GetRedis().HSet(context.Background(), "kafka_message", string(msg.Key), data).Result()
err = handler(0, nil, data)
utils.LogFile(nil, "send msg", string(msg.Key), err)
if err == nil {
//手动提交编译量
kk.commitOffset(consumer, msg.TopicPartition)
@ -147,7 +154,7 @@ func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
}
} else {
//utils.Log(nil, "Error while consuming: %v\n", err)
utils.Log(nil, "Error while consuming: %v\n", err)
}
}
}()

View File

@ -1,9 +1,12 @@
package utils
import (
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"os"
"runtime"
"strconv"
"time"
)
@ -11,4 +14,22 @@ func Log(c *gin.Context, name string, msg ...interface{}) {
_, file, line, _ := runtime.Caller(1)
var datetime = time.Now().Format(time.DateTime)
fmt.Println(name, msg, file, line, datetime)
}
func LogFile(c *gin.Context, name string, msg ...interface{}) {
_, rfile, line, _ := runtime.Caller(1)
var datetime = time.Now().Format(time.DateTime)
file, err := os.OpenFile("log.txt", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println("Error opening file:", err)
return
}
defer file.Close()
var data, _ = json.Marshal(map[string]interface{}{"data": msg})
// 写入内容到文件末尾
_, err = file.WriteString("file:" + rfile + " line:" + strconv.Itoa(line) + " 时间:" + datetime + "" + name + ":" + string(data) + "!\n")
if err != nil {
fmt.Println("Error writing to file:", err)
return
}
}

View File

@ -35,6 +35,8 @@ type Config struct {
Num int `toml:"Num"`
Url string `toml:"Url"`
Topical string `toml:"Topical"`
DispatchTopical string `toml:"DispatchTopical"`
DidpatchKafkaGroup string `toml:"DidPatchKafkaGroup"`
}
func newConfig() *Config {

View File

@ -98,6 +98,13 @@ func startServer(opts *config.Options) (err error) {
return
}
//err = tcppool.TcpFactory.Init("9502").SendMsg([]byte("123"))
//for i := 0; i < 10000; i++ {
// var mqsg = entities.MqMessage{}
// mqsg.Key = time.Now().Format("20060102150405") + strconv.Itoa(int(rand.Int32N(1000000)))
// mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"}
// mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"}
// mqs.MqManager.GetMqByName(common.MQ_KFK).Produce("platform_all", "", 0)
//}
//引导程序
err = bootstrap.Bootstrap(conf)