package mq import ( "context" "encoding/json" "fmt" "github.com/Shopify/sarama" "github.com/qit-team/snow-core/redis" "qteam/app/utils" "qteam/config" "strconv" "sync" ) type KafkaMq struct { } // 同步 func (n KafkaMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error { kafConfig := sarama.NewConfig() kafConfig.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 kafConfig.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition kafConfig.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 // 构造一个消息 msg := &sarama.ProducerMessage{} msg.Topic = name var data, _ = json.Marshal(log) msg.Value = sarama.StringEncoder(string(data)) // 连接kafka client, err := sarama.NewSyncProducer(config.GetConf().KafkaUrl, kafConfig) if err != nil { fmt.Println("producer closed, err:", err) return nil } defer client.Close() // 发送消息 pid, offset, err := client.SendMessage(msg) if err != nil { utils.Log(nil, "send msg failed, err:", err, pid, offset) return nil } return nil } func (n KafkaMq) Consume(name string, hand interface{}) { consumer, err := sarama.NewConsumer(config.GetConf().KafkaUrl, nil) if err != nil { utils.Log(nil, "kafka comsume", err.Error()) return } partitionList, err := consumer.Partitions(name) // 根据topic取到所有的分区 if err != nil { utils.Log(nil, "kafka comsume", err.Error()) return } //utils.Log(nil,"kafka comsume",name,partitionList) for partition := range partitionList { // 遍历所有的分区 // 针对每个分区创建一个对应的分区消费者 var offsetReDis, _ = redis.GetRedis().Incr(context.Background(), "kafka_consume:"+strconv.Itoa(int(partition))).Result() //保证多消费者不重复消费 var offset int64 = sarama.OffsetNewest if offsetReDis > 0 { //offset = int64(offsetReDis) } pc, err := consumer.ConsumePartition(name, int32(partition), offset) //utils.Log(nil,"partion",int32(partition)) if err != nil { fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) return } defer pc.AsyncClose() var wg sync.WaitGroup wg.Add(1) // 异步从每个分区消费信息 go func(sarama.PartitionConsumer) { for msg := range pc.Messages() { defer wg.Done() var handler = hand.(func(tag uint64, ch interface{}, msg []byte)) handler(0, nil, msg.Value) //utils.Log(nil,"hand msg",string(msg.Value),msg.Offset) } }(pc) wg.Wait() } } func (n KafkaMq) DelyConsume(name string, hand interface{}) { }