package tcppool

import (
	"bufio"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"github.com/qit-team/snow-core/redis"
	"net"
	"quenue/app/utils"
	"quenue/config"
	"sync"
	"sync/atomic"
	"time"
)

var (
	full       int32 = 0
	TcpFactory       = TcpHelper{Full: &full}
	lockSingle sync.Once
	OrderMap   = sync.Map{}
	expire     = 10 * time.Second
)

type TcpHelper struct {
	client   net.Conn
	lastTime int64
	Full     *int32
	Comsumer *kafka.Consumer
}

func (t *TcpHelper) Init(port string) *TcpHelper {
	lockSingle.Do(func() {
		var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, expire)
		if err == nil {
			t.client = conn
			atomic.StoreInt32(t.Full, 0)
			t.watch(t.client)
			t.heart()
			t.resend()
		} else {
			atomic.StoreInt32(t.Full, 1)
			t.reconnect(port)
			utils.Log(nil, config.GetConf().Url+":"+port)
		}
	})
	return t
}
func (t *TcpHelper) reconnect(port string) {
	var conn, err = net.DialTimeout("tcp", config.GetConf().Url+":"+port, expire)
	if err == nil {
		//utils.Log(nil, "连接下游成功")
		atomic.StoreInt32(t.Full, 0)
		t.client = conn
		t.watch(t.client)
		t.heart()
	} else {
		//utils.Log(nil, "重连下游")
		time.Sleep(1 * time.Second)
		atomic.StoreInt32(t.Full, 1)
		//t.client = conn
		t.reconnect(port)
	}
}
func (t *TcpHelper) SendMsg(msg []byte) error {
	msg = append(msg, '\n')
	t = t.Init(config.GetConf().OrderPort)
	if t.client == nil {
		return errors.New("客户端连接失败")
	}
	var start = time.Now().Unix()
	var clinet = t.Init(config.GetConf().OrderPort).client
	clinet.SetReadDeadline(time.Now().Add(time.Second * 10))
	_, err := clinet.Write(msg)
	var end = time.Now().Unix()
	fmt.Println(end-start, "秒")
	return err
}
func (t *TcpHelper) Close(conn net.Conn) {
	t.client.Close()
}

func (t *TcpHelper) resend() {
	go func() {
		defer func() {
			if err := recover(); err != nil {
				utils.Log(nil, "重发失败", err)
			}
		}()
		for {
			if t.client == nil {
				return
			}
			rs, err := redis.GetRedis().HGetAll(context.Background(), "kafka_message").Result()
			if err == nil {
				var data = map[string]interface{}{}
				var nowTime = time.Now().Unix()
				for _, v := range rs {
					if atomic.LoadInt32(t.Full) == 1 {
						time.Sleep(1 * time.Second)
						continue
					}
					json.Unmarshal([]byte(v), &data)
					if (float64(nowTime) - data["send_time"].(float64)) > 60 {
						t.SendMsg([]byte(v))
					}
					time.Sleep(200 * time.Millisecond)
				}
			}
			time.Sleep(3 * time.Minute)
		}
	}()

}

func (t *TcpHelper) heart() {
	go func() {
		defer func() {
			if err := recover(); err != nil {
				utils.Log(nil, "err", err)
			}
		}()
		for {
			if t.client == nil {
				return
			}
			t.client.SetWriteDeadline(time.Now().Add(expire))
			t.client.Write([]byte("1\n"))
			time.Sleep(2 * time.Second)
		}
	}()

}

func (t *TcpHelper) watch(conn net.Conn) {
	go func() {
		defer func() {
			if err := recover(); err != nil {
				fmt.Println("连接断开", err)
			}
		}()
		for {

			var err error
			if err != nil {
				//utils.Log(nil, "连接关闭", err)
				atomic.StoreInt32(t.Full, 1)
				t.client.Close()
				t.reconnect(config.GetConf().OrderPort)
				return
			} else {
				var buffer = make([]byte, 1024)
				// 持续读取数据
				t.client.SetReadDeadline(time.Now().Add(expire))
				//n, err := t.client.Read(buffer[:])
				reader := bufio.NewReader(t.client)
				line, err := reader.ReadString('\n')
				buffer = []byte(line)
				if err == nil && len(buffer) > 0 {
					recvStr := string(buffer[:len(buffer)-1])
					//fmt.Println("结果:recvStr:", recvStr)
					if len(recvStr) > 1 {
						//手动提交编译量
						var partion, ok = OrderMap.Load(recvStr)
						if ok {
							CommitOffset(t.Comsumer, partion.(kafka.TopicPartition))
							OrderMap.Delete(recvStr)
						}
					} else {
						if recvStr == "5" {
							utils.Log(nil, "客户端繁忙")
							atomic.StoreInt32(t.Full, 1)
						} else if recvStr == "2" {
							utils.Log(nil, "客户端空闲")
							atomic.StoreInt32(t.Full, 0)
						} else if recvStr == "6" {
							utils.Log(nil, "客户端心跳")
							//atomic.StoreInt32(t.Full, 1)
						}
					}

				} else {
					atomic.StoreInt32(t.Full, 1)
					//utils.Log(nil, "连接关闭", err)
					if t.client != nil {
						t.client.Close()
						conn = nil
						t.client = nil
						t.reconnect(config.GetConf().OrderPort)
					}
				}
			}
			//time.Sleep(2 * time.Second)
		}
	}()
}
func CommitOffset(consumer *kafka.Consumer, tp kafka.TopicPartition) {
	// 创建一个偏移量提交请求
	offsets := []kafka.TopicPartition{tp}
	commit, err := consumer.CommitOffsets(offsets)
	if err != nil {
		utils.Log(nil, "Failed to commit offset: %v", err)
	} else {
		utils.Log(nil, "Committed offset: %v", commit)
	}
}