diff --git a/app/mq/hanlers.go b/app/mq/hanlers.go new file mode 100644 index 0000000..9990bac --- /dev/null +++ b/app/mq/hanlers.go @@ -0,0 +1,21 @@ +package mq + +// 单聊 +func SingleTalk(tag uint64, ch interface{}, msg []byte) { + //var data entities.SingTalkReq + //err := json.Unmarshal(msg, &data) + //utils.Log(nil, "msg", data) + //if err == nil { + // conn := netool.GetConnManagger().GetConnection(data.Msg.To) + // var sendOk = false + // if conn != nil && conn.IsActive() { + // if utils.WriteMsg(msgid.SINGLE_MSG, data, conn) { + // sendOk = true + // } + // } + // if !sendOk { + // common.PikaTool.Zadd(utils.GetRealKey(common2.USER_MSG)+data.Msg.To, data, time.Now().Unix()) + // } + //} + +} diff --git a/app/mq/quenue.go b/app/mq/quenue.go new file mode 100644 index 0000000..c93ebee --- /dev/null +++ b/app/mq/quenue.go @@ -0,0 +1,27 @@ +package mq + +import ( + "qteam/app/utils/mq" +) + +func startQunue(name string, method interface{}, mqTp string, tp int, exhange string) { + if tp == 1 { + go mq.MqManager.GetMqByName(mqTp).Consume(name, method) + } else { + go mq.MqManager.GetMqByName(mqTp).DelyConsume(name, method) + } + +} + +// 队列服务 +func StartQunueServer() error { + StartServer() + select {} + return nil +} + +// 开启队列 +func StartServer() error { + //startWorkers() + return nil +} diff --git a/app/utils/mq/mqmanager.go b/app/utils/mq/mqmanager.go new file mode 100644 index 0000000..7247e1f --- /dev/null +++ b/app/utils/mq/mqmanager.go @@ -0,0 +1,36 @@ +package mq + +import ( + common3 "qteam/app/constants/common" + mq "qteam/app/utils/mq/mqs" + "sync" +) + +var ( + MqManager = CMqManager{} + once sync.Once +) + +type Imq interface { + Produce(name string, log interface{}, delayTime int, args ...interface{}) error + Consume(name string, hand interface{}) + DelyConsume(name string, hand interface{}) +} + +type CMqManager struct { + mqs map[string]Imq +} + +func (this *CMqManager) InitMq() { + this.mqs = make(map[string]Imq) + //this.mqs[common.MQ_RABBIT] = RabbitMq{} + //this.mqs[common.MQ_NSQ] = NsqMq{} + this.mqs[common3.MQ_NATS] = mq.NatsMq{} + this.mqs[common3.MQ_KFK] = mq.KafkaMq{} +} +func (this *CMqManager) GetMqByName(name string) Imq { + once.Do(func() { + this.InitMq() + }) + return this.mqs[name] +} diff --git a/app/utils/mq/mqs/kafka.go b/app/utils/mq/mqs/kafka.go new file mode 100644 index 0000000..3103cf1 --- /dev/null +++ b/app/utils/mq/mqs/kafka.go @@ -0,0 +1,89 @@ +package mq + +import ( + "context" + "encoding/json" + "fmt" + "github.com/Shopify/sarama" + "github.com/qit-team/snow-core/redis" + "qteam/app/utils" + "qteam/config" + + "strconv" + "sync" +) + +type KafkaMq struct { +} + +// 同步 +func (n KafkaMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error { + kafConfig := sarama.NewConfig() + kafConfig.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认 + kafConfig.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition + kafConfig.Producer.Return.Successes = true // 成功交付的消息将在success channel返回 + + // 构造一个消息 + msg := &sarama.ProducerMessage{} + msg.Topic = name + var data, _ = json.Marshal(log) + msg.Value = sarama.StringEncoder(string(data)) + // 连接kafka + client, err := sarama.NewSyncProducer(config.GetConf().KafkaUrl, kafConfig) + if err != nil { + fmt.Println("producer closed, err:", err) + return nil + } + defer client.Close() + // 发送消息 + pid, offset, err := client.SendMessage(msg) + if err != nil { + utils.Log(nil, "send msg failed, err:", err, pid, offset) + return nil + } + return nil +} + +func (n KafkaMq) Consume(name string, hand interface{}) { + consumer, err := sarama.NewConsumer(config.GetConf().KafkaUrl, nil) + if err != nil { + utils.Log(nil, "kafka comsume", err.Error()) + return + } + partitionList, err := consumer.Partitions(name) // 根据topic取到所有的分区 + if err != nil { + utils.Log(nil, "kafka comsume", err.Error()) + return + } + //utils.Log(nil,"kafka comsume",name,partitionList) + for partition := range partitionList { // 遍历所有的分区 + // 针对每个分区创建一个对应的分区消费者 + var offsetReDis, _ = redis.GetRedis().Incr(context.Background(), "kafka_consume:"+strconv.Itoa(int(partition))).Result() //保证多消费者不重复消费 + var offset int64 = sarama.OffsetNewest + if offsetReDis > 0 { + //offset = int64(offsetReDis) + } + pc, err := consumer.ConsumePartition(name, int32(partition), offset) + //utils.Log(nil,"partion",int32(partition)) + if err != nil { + fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err) + return + } + defer pc.AsyncClose() + var wg sync.WaitGroup + wg.Add(1) + // 异步从每个分区消费信息 + go func(sarama.PartitionConsumer) { + for msg := range pc.Messages() { + defer wg.Done() + var handler = hand.(func(tag uint64, ch interface{}, msg []byte)) + handler(0, nil, msg.Value) + //utils.Log(nil,"hand msg",string(msg.Value),msg.Offset) + } + }(pc) + wg.Wait() + } +} +func (n KafkaMq) DelyConsume(name string, hand interface{}) { + +} diff --git a/app/utils/mq/mqs/nats.go b/app/utils/mq/mqs/nats.go new file mode 100644 index 0000000..3805f81 --- /dev/null +++ b/app/utils/mq/mqs/nats.go @@ -0,0 +1,48 @@ +package mq + +import ( + "encoding/json" + "fmt" + "github.com/nats-io/nats.go" + _ "github.com/nats-io/nats.go" + "github.com/streadway/amqp" + "qteam/app/utils" + "qteam/config" +) + +type NatsMq struct { + Address string + nc *nats.Conn +} + +func (n NatsMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error { + name = config.GetConf().ServiceName + "_" + name + fmt.Println("nats produce", name) + nc, _ := nats.Connect(n.Address) + defer nc.Close() + var content, err = json.Marshal(log) + nc.Publish(name, content) + return err +} + +func (n NatsMq) Consume(name string, hand interface{}) { + if n.nc == nil || n.nc.IsClosed() == true { + if n.nc != nil { + n.nc.Close() + } + n.nc, _ = nats.Connect(n.Address) + } + nc := n.nc + fmt.Println("nats comsume", name) + //defer nc.Close() + _, err := nc.Subscribe(name, func(m *nats.Msg) { + utils.Log(nil, "Received a message: %s", string(m.Data)) + var handler = hand.(func(tag uint64, ch *amqp.Channel, msg []byte)) + handler(0, nil, m.Data) + }) + fmt.Println("ttt", err) +} + +func (n NatsMq) DelyConsume(name string, hand interface{}) { + +} diff --git a/app/utils/util.go b/app/utils/util.go index a3d0a93..7d3cb3b 100644 --- a/app/utils/util.go +++ b/app/utils/util.go @@ -9,6 +9,7 @@ import ( "github.com/qit-team/snow-core/redis" "net" "qteam/config" + "reflect" "runtime" "strconv" "strings" @@ -79,3 +80,51 @@ func GeneratorToken(playerName string, playerId string) string { } return tk } +func EntityCopy(dst, src interface{}) { + dstValue := reflect.ValueOf(dst).Elem() + srcValue := reflect.ValueOf(src).Elem() + + for i := 0; i < srcValue.NumField(); i++ { + srcField := srcValue.Field(i) + + srcName := srcValue.Type().Field(i).Name + dstFieldByName := dstValue.FieldByName(srcName) + + if dstFieldByName.IsValid() { + switch dstFieldByName.Kind() { + case reflect.Ptr: + switch srcField.Kind() { + case reflect.Ptr: + if srcField.IsNil() { + dstFieldByName.Set(reflect.New(dstFieldByName.Type().Elem())) + } else { + dstFieldByName.Set(srcField) + } + default: + dstFieldByName.Set(srcField.Addr()) + } + default: + switch srcField.Kind() { + case reflect.Ptr: + if srcField.IsNil() { + dstFieldByName.Set(reflect.Zero(dstFieldByName.Type())) + } else { + dstFieldByName.Set(srcField.Elem()) + } + default: + if srcField.Type().Name() == "Time" { + if (srcField.Interface().(time.Time).Unix()) < 1 { + dstFieldByName.Set(reflect.ValueOf("")) + } else { + dstFieldByName.Set(reflect.ValueOf(srcField.Interface().(time.Time).Format("2006-01-02 15:04:05"))) + } + + } else { + dstFieldByName.Set(srcField) + } + + } + } + } + } +}