mq/app/utils/mq/kafka_v2.go

166 lines
5.0 KiB
Go

package mqs
import (
"context"
"encoding/json"
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/qit-team/snow-core/redis"
"math/rand/v2"
"quenue/app/http/entities"
"quenue/app/http/tcppool"
"quenue/app/utils"
config "quenue/config"
"strconv"
"sync/atomic"
"time"
)
var (
now int32 = 0
workNum = &now
)
type KafkaV2Mq struct {
}
func (kk KafkaV2Mq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error {
kfconfig := &kafka.ConfigMap{
"bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址
}
producer, err := kafka.NewProducer(kfconfig)
if err != nil {
utils.Log(nil, "kafka productor init error", err)
}
defer producer.Close()
var logData, _ = json.Marshal(log)
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &name, Partition: kafka.PartitionAny},
Value: []byte(logData),
}
err = producer.Produce(message, nil)
if err != nil {
utils.Log(nil, "Failed to produce message:", err)
return err
}
utils.Log(nil, "kafka produce", err)
producer.Flush(15 * 1000)
return err
}
func (kk KafkaV2Mq) Consume(name string, hand interface{}, ci int) {
utils.Log(nil, "hand", ci)
kfconfig := &kafka.ConfigMap{
"bootstrap.servers": config.GetConf().KafkaUrl, // Kafka服务器地址
"group.id": config.GetConf().KafkaGroup, // 消费者组ID
"auto.offset.reset": "earliest", // 自动从最早的消息开始消费
"heartbeat.interval.ms": 1000,
"session.timeout.ms": 45000,
"max.poll.interval.ms": 300000, // 5 分钟, 防止积压的时候认为掉线了
"enable.auto.commit": false,
}
defer func() {
if err := recover(); err != nil {
fmt.Println("消费中断", err)
}
}()
utils.Log(nil, "kafka config", kfconfig)
var start = time.Now()
var end time.Time
consumer, err := kafka.NewConsumer(kfconfig)
if err != nil {
utils.Log(nil, "comsume", err)
}
defer consumer.Close()
err = consumer.Subscribe(name, func(c *kafka.Consumer, event kafka.Event) error {
switch ev := event.(type) {
case kafka.AssignedPartitions:
fmt.Printf("Assigned partitions: %v\n", ev.Partitions)
// 在这里处理分区分配
c.Assign(ev.Partitions)
case kafka.RevokedPartitions:
fmt.Printf("Revoked partitions: %v\n", ev.Partitions)
// 在这里处理分区撤销
c.Unassign()
default:
fmt.Printf("Ignored event: %s\n", ev)
}
return nil
})
if err != nil {
utils.Log(nil, "Failed to subscribe:", err)
}
fmt.Println(config.GetConf().Num)
for i := 0; i < config.GetConf().Num; i++ {
go func() {
for {
if atomic.LoadInt32(tcppool.TcpFactory.Full) == 1 {
//utils.Log(nil, "对列阻塞")
time.Sleep(100 * time.Millisecond)
continue
} else {
//utils.Log(nil, "对列放开")
}
msg, err := consumer.ReadMessage(1 * time.Second)
if err == nil {
utils.Log(nil, "offset", msg.TopicPartition.Offset)
fmt.Println(msg.TopicPartition.Partition, "分区")
var handler = hand.(func(tag uint64, ch interface{}, msg []byte) error)
var mqsg = entities.MqMessage{}
mqsg.Key = string(msg.Key)
mqsg.Property = make(map[string]interface{})
if len(msg.Value) > 0 {
if msg.Headers != nil {
for _, v := range msg.Headers {
if v.Key == "property" {
json.Unmarshal(v.Value, &mqsg.Property)
}
mqsg.Property[string(v.Key)] = string(v.Value)
}
}
err = json.Unmarshal(msg.Value, &mqsg.Body)
} else {
mqsg.Key = time.Now().Format("20060102150405") + strconv.Itoa(int(rand.Int32N(1000000)))
mqsg.Property = map[string]interface{}{"reseller_id": 20001, "order_product_id": 104, "platform_product_id": 592, "serial_number": "100000001", "platform_tag": "InternalTest", "tag": "PInternalTest"}
mqsg.Body = map[string]interface{}{"reseller_id": 20001, "order_price": 12.01, "account_type": 0, "serial_number": "100000001"}
}
mqsg.SendTime = time.Now().Unix()
if mqsg.Key == "734760617161662465" {
fmt.Println("消费消息", mqsg.Key)
} else {
fmt.Println(mqsg.Key)
}
var data, _ = json.Marshal(mqsg)
_, err = redis.GetRedis().HSet(context.Background(), "kafka_message", string(msg.Key), data).Result()
err = handler(0, nil, data)
if err == nil {
//手动提交编译量
kk.commitOffset(consumer, msg.TopicPartition)
atomic.AddInt32(workNum, 1)
tcppool.OrderMap.Delete(string(msg.Key))
end = time.Now()
utils.Log(nil, "消费耗时", end.Sub(start), "消息数", *workNum)
}
} else {
//utils.Log(nil, "Error while consuming: %v\n", err)
}
}
}()
}
select {}
}
func (kk *KafkaV2Mq) commitOffset(consumer *kafka.Consumer, tp kafka.TopicPartition) {
// 创建一个偏移量提交请求
offsets := []kafka.TopicPartition{tp}
commit, err := consumer.CommitOffsets(offsets)
if err != nil {
utils.Log(nil, "Failed to commit offset: %v", err)
} else {
utils.Log(nil, "Committed offset: %v", commit)
}
}