package mq

//
//import (
//	"PaymentCenter/app/utils"
//	"PaymentCenter/config"
//	"context"
//	"fmt"
//	"github.com/Shopify/sarama"
//	"github.com/qit-team/snow-core/redis"
//
//	"strconv"
//	"sync"
//)
//
//type KafkaMq struct {
//}
//
//// 同步
//func (n KafkaMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error {
//	//kafConfig := sarama.NewConfig()
//	//kafConfig.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
//	//kafConfig.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
//	//kafConfig.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
//	//
//	//// 构造一个消息
//	//msg := &sarama.ProducerMessage{}
//	//msg.Topic = name
//	//var data, _ = json.Marshal(log)
//	//msg.Value = sarama.StringEncoder(string(data))
//	//// 连接kafka
//	//client, err := sarama.NewSyncProducer(config.GetConf().KafkaUrl, kafConfig)
//	//if err != nil {
//	//	fmt.Println("producer closed, err:", err)
//	//	return nil
//	//}
//	//defer client.Close()
//	//// 发送消息
//	//pid, offset, err := client.SendMessage(msg)
//	//if err != nil {
//	//	utils.Log(nil, "send msg failed, err:", err, pid, offset)
//	//	return nil
//	//}
//	return nil
//}
//
//func (n KafkaMq) Consume(name string, hand interface{}) {
//	consumer, err := sarama.NewConsumer(config.GetConf().KafkaUrl, nil)
//	if err != nil {
//		utils.Log(nil, "kafka comsume", err.Error())
//		return
//	}
//	partitionList, err := consumer.Partitions(name) // 根据topic取到所有的分区
//	if err != nil {
//		utils.Log(nil, "kafka comsume", err.Error())
//		return
//	}
//	//utils.Log(nil,"kafka comsume",name,partitionList)
//	for partition := range partitionList { // 遍历所有的分区
//		// 针对每个分区创建一个对应的分区消费者
//		var offsetReDis, _ = redis.GetRedis().Incr(context.Background(), "kafka_consume:"+strconv.Itoa(int(partition))).Result() //保证多消费者不重复消费
//		var offset int64 = sarama.OffsetNewest
//		if offsetReDis > 0 {
//			//offset = int64(offsetReDis)
//		}
//		pc, err := consumer.ConsumePartition(name, int32(partition), offset)
//		//utils.Log(nil,"partion",int32(partition))
//		if err != nil {
//			fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
//			return
//		}
//		defer pc.AsyncClose()
//		var wg sync.WaitGroup
//		wg.Add(1)
//		// 异步从每个分区消费信息
//		go func(sarama.PartitionConsumer) {
//			for msg := range pc.Messages() {
//				defer wg.Done()
//				var handler = hand.(func(tag uint64, ch interface{}, msg []byte))
//				handler(0, nil, msg.Value)
//				//utils.Log(nil,"hand msg",string(msg.Value),msg.Offset)
//			}
//		}(pc)
//		wg.Wait()
//	}
//}
//func (n KafkaMq) DelyConsume(name string, hand interface{}) {
//
//}