90 lines
2.6 KiB
Go
90 lines
2.6 KiB
Go
|
package mq
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"github.com/Shopify/sarama"
|
||
|
"github.com/qit-team/snow-core/redis"
|
||
|
"qteam/app/utils"
|
||
|
"qteam/config"
|
||
|
|
||
|
"strconv"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
type KafkaMq struct {
|
||
|
}
|
||
|
|
||
|
// 同步
|
||
|
func (n KafkaMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error {
|
||
|
kafConfig := sarama.NewConfig()
|
||
|
kafConfig.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
|
||
|
kafConfig.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
|
||
|
kafConfig.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
|
||
|
|
||
|
// 构造一个消息
|
||
|
msg := &sarama.ProducerMessage{}
|
||
|
msg.Topic = name
|
||
|
var data, _ = json.Marshal(log)
|
||
|
msg.Value = sarama.StringEncoder(string(data))
|
||
|
// 连接kafka
|
||
|
client, err := sarama.NewSyncProducer(config.GetConf().KafkaUrl, kafConfig)
|
||
|
if err != nil {
|
||
|
fmt.Println("producer closed, err:", err)
|
||
|
return nil
|
||
|
}
|
||
|
defer client.Close()
|
||
|
// 发送消息
|
||
|
pid, offset, err := client.SendMessage(msg)
|
||
|
if err != nil {
|
||
|
utils.Log(nil, "send msg failed, err:", err, pid, offset)
|
||
|
return nil
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (n KafkaMq) Consume(name string, hand interface{}) {
|
||
|
consumer, err := sarama.NewConsumer(config.GetConf().KafkaUrl, nil)
|
||
|
if err != nil {
|
||
|
utils.Log(nil, "kafka comsume", err.Error())
|
||
|
return
|
||
|
}
|
||
|
partitionList, err := consumer.Partitions(name) // 根据topic取到所有的分区
|
||
|
if err != nil {
|
||
|
utils.Log(nil, "kafka comsume", err.Error())
|
||
|
return
|
||
|
}
|
||
|
//utils.Log(nil,"kafka comsume",name,partitionList)
|
||
|
for partition := range partitionList { // 遍历所有的分区
|
||
|
// 针对每个分区创建一个对应的分区消费者
|
||
|
var offsetReDis, _ = redis.GetRedis().Incr(context.Background(), "kafka_consume:"+strconv.Itoa(int(partition))).Result() //保证多消费者不重复消费
|
||
|
var offset int64 = sarama.OffsetNewest
|
||
|
if offsetReDis > 0 {
|
||
|
//offset = int64(offsetReDis)
|
||
|
}
|
||
|
pc, err := consumer.ConsumePartition(name, int32(partition), offset)
|
||
|
//utils.Log(nil,"partion",int32(partition))
|
||
|
if err != nil {
|
||
|
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
|
||
|
return
|
||
|
}
|
||
|
defer pc.AsyncClose()
|
||
|
var wg sync.WaitGroup
|
||
|
wg.Add(1)
|
||
|
// 异步从每个分区消费信息
|
||
|
go func(sarama.PartitionConsumer) {
|
||
|
for msg := range pc.Messages() {
|
||
|
defer wg.Done()
|
||
|
var handler = hand.(func(tag uint64, ch interface{}, msg []byte))
|
||
|
handler(0, nil, msg.Value)
|
||
|
//utils.Log(nil,"hand msg",string(msg.Value),msg.Offset)
|
||
|
}
|
||
|
}(pc)
|
||
|
wg.Wait()
|
||
|
}
|
||
|
}
|
||
|
func (n KafkaMq) DelyConsume(name string, hand interface{}) {
|
||
|
|
||
|
}
|