package mq // //import ( // "PaymentCenter/app/utils" // "PaymentCenter/config" // "context" // "fmt" // "github.com/Shopify/sarama" // "github.com/qit-team/snow-core/redis" // // "strconv" // "sync" //) // //type KafkaMq struct { //} // //// 同步 //func (n KafkaMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error { // //kafConfig := sarama.NewConfig() // //kafConfig.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 // //kafConfig.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition // //kafConfig.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 // // // //// 构造一个消息 // //msg := &sarama.ProducerMessage{} // //msg.Topic = name // //var data, _ = json.Marshal(log) // //msg.Value = sarama.StringEncoder(string(data)) // //// 连接kafka // //client, err := sarama.NewSyncProducer(config.GetConf().KafkaUrl, kafConfig) // //if err != nil { // // fmt.Println("producer closed, err:", err) // // return nil // //} // //defer client.Close() // //// 发送消息 // //pid, offset, err := client.SendMessage(msg) // //if err != nil { // // utils.Log(nil, "send msg failed, err:", err, pid, offset) // // return nil // //} // return nil //} // //func (n KafkaMq) Consume(name string, hand interface{}) { // consumer, err := sarama.NewConsumer(config.GetConf().KafkaUrl, nil) // if err != nil { // utils.Log(nil, "kafka comsume", err.Error()) // return // } // partitionList, err := consumer.Partitions(name) // 根据topic取到所有的分区 // if err != nil { // utils.Log(nil, "kafka comsume", err.Error()) // return // } // //utils.Log(nil,"kafka comsume",name,partitionList) // for partition := range partitionList { // 遍历所有的分区 // // 针对每个分区创建一个对应的分区消费者 // var offsetReDis, _ = redis.GetRedis().Incr(context.Background(), "kafka_consume:"+strconv.Itoa(int(partition))).Result() //保证多消费者不重复消费 // var offset int64 = sarama.OffsetNewest // if offsetReDis > 0 { // //offset = int64(offsetReDis) // } // pc, err := consumer.ConsumePartition(name, int32(partition), offset) // //utils.Log(nil,"partion",int32(partition)) // if err != nil { // fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) // return // } // defer pc.AsyncClose() // var wg sync.WaitGroup // wg.Add(1) // // 异步从每个分区消费信息 // go func(sarama.PartitionConsumer) { // for msg := range pc.Messages() { // defer wg.Done() // var handler = hand.(func(tag uint64, ch interface{}, msg []byte)) // handler(0, nil, msg.Value) // //utils.Log(nil,"hand msg",string(msg.Value),msg.Offset) // } // }(pc) // wg.Wait() // } //} //func (n KafkaMq) DelyConsume(name string, hand interface{}) { // //}