增加队列

This commit is contained in:
qiyunfanbo126.com 2024-05-27 10:39:55 +08:00
parent 192d2abff7
commit 806f3b9781
6 changed files with 270 additions and 0 deletions

21
app/mq/hanlers.go Normal file
View File

@ -0,0 +1,21 @@
package mq
// 单聊
func SingleTalk(tag uint64, ch interface{}, msg []byte) {
//var data entities.SingTalkReq
//err := json.Unmarshal(msg, &data)
//utils.Log(nil, "msg", data)
//if err == nil {
// conn := netool.GetConnManagger().GetConnection(data.Msg.To)
// var sendOk = false
// if conn != nil && conn.IsActive() {
// if utils.WriteMsg(msgid.SINGLE_MSG, data, conn) {
// sendOk = true
// }
// }
// if !sendOk {
// common.PikaTool.Zadd(utils.GetRealKey(common2.USER_MSG)+data.Msg.To, data, time.Now().Unix())
// }
//}
}

27
app/mq/quenue.go Normal file
View File

@ -0,0 +1,27 @@
package mq
import (
"qteam/app/utils/mq"
)
func startQunue(name string, method interface{}, mqTp string, tp int, exhange string) {
if tp == 1 {
go mq.MqManager.GetMqByName(mqTp).Consume(name, method)
} else {
go mq.MqManager.GetMqByName(mqTp).DelyConsume(name, method)
}
}
// 队列服务
func StartQunueServer() error {
StartServer()
select {}
return nil
}
// 开启队列
func StartServer() error {
//startWorkers()
return nil
}

36
app/utils/mq/mqmanager.go Normal file
View File

@ -0,0 +1,36 @@
package mq
import (
common3 "qteam/app/constants/common"
mq "qteam/app/utils/mq/mqs"
"sync"
)
var (
MqManager = CMqManager{}
once sync.Once
)
type Imq interface {
Produce(name string, log interface{}, delayTime int, args ...interface{}) error
Consume(name string, hand interface{})
DelyConsume(name string, hand interface{})
}
type CMqManager struct {
mqs map[string]Imq
}
func (this *CMqManager) InitMq() {
this.mqs = make(map[string]Imq)
//this.mqs[common.MQ_RABBIT] = RabbitMq{}
//this.mqs[common.MQ_NSQ] = NsqMq{}
this.mqs[common3.MQ_NATS] = mq.NatsMq{}
this.mqs[common3.MQ_KFK] = mq.KafkaMq{}
}
func (this *CMqManager) GetMqByName(name string) Imq {
once.Do(func() {
this.InitMq()
})
return this.mqs[name]
}

89
app/utils/mq/mqs/kafka.go Normal file
View File

@ -0,0 +1,89 @@
package mq
import (
"context"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"github.com/qit-team/snow-core/redis"
"qteam/app/utils"
"qteam/config"
"strconv"
"sync"
)
type KafkaMq struct {
}
// 同步
func (n KafkaMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error {
kafConfig := sarama.NewConfig()
kafConfig.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follow都确认
kafConfig.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
kafConfig.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = name
var data, _ = json.Marshal(log)
msg.Value = sarama.StringEncoder(string(data))
// 连接kafka
client, err := sarama.NewSyncProducer(config.GetConf().KafkaUrl, kafConfig)
if err != nil {
fmt.Println("producer closed, err:", err)
return nil
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
utils.Log(nil, "send msg failed, err:", err, pid, offset)
return nil
}
return nil
}
func (n KafkaMq) Consume(name string, hand interface{}) {
consumer, err := sarama.NewConsumer(config.GetConf().KafkaUrl, nil)
if err != nil {
utils.Log(nil, "kafka comsume", err.Error())
return
}
partitionList, err := consumer.Partitions(name) // 根据topic取到所有的分区
if err != nil {
utils.Log(nil, "kafka comsume", err.Error())
return
}
//utils.Log(nil,"kafka comsume",name,partitionList)
for partition := range partitionList { // 遍历所有的分区
// 针对每个分区创建一个对应的分区消费者
var offsetReDis, _ = redis.GetRedis().Incr(context.Background(), "kafka_consume:"+strconv.Itoa(int(partition))).Result() //保证多消费者不重复消费
var offset int64 = sarama.OffsetNewest
if offsetReDis > 0 {
//offset = int64(offsetReDis)
}
pc, err := consumer.ConsumePartition(name, int32(partition), offset)
//utils.Log(nil,"partion",int32(partition))
if err != nil {
fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
return
}
defer pc.AsyncClose()
var wg sync.WaitGroup
wg.Add(1)
// 异步从每个分区消费信息
go func(sarama.PartitionConsumer) {
for msg := range pc.Messages() {
defer wg.Done()
var handler = hand.(func(tag uint64, ch interface{}, msg []byte))
handler(0, nil, msg.Value)
//utils.Log(nil,"hand msg",string(msg.Value),msg.Offset)
}
}(pc)
wg.Wait()
}
}
func (n KafkaMq) DelyConsume(name string, hand interface{}) {
}

48
app/utils/mq/mqs/nats.go Normal file
View File

@ -0,0 +1,48 @@
package mq
import (
"encoding/json"
"fmt"
"github.com/nats-io/nats.go"
_ "github.com/nats-io/nats.go"
"github.com/streadway/amqp"
"qteam/app/utils"
"qteam/config"
)
type NatsMq struct {
Address string
nc *nats.Conn
}
func (n NatsMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error {
name = config.GetConf().ServiceName + "_" + name
fmt.Println("nats produce", name)
nc, _ := nats.Connect(n.Address)
defer nc.Close()
var content, err = json.Marshal(log)
nc.Publish(name, content)
return err
}
func (n NatsMq) Consume(name string, hand interface{}) {
if n.nc == nil || n.nc.IsClosed() == true {
if n.nc != nil {
n.nc.Close()
}
n.nc, _ = nats.Connect(n.Address)
}
nc := n.nc
fmt.Println("nats comsume", name)
//defer nc.Close()
_, err := nc.Subscribe(name, func(m *nats.Msg) {
utils.Log(nil, "Received a message: %s", string(m.Data))
var handler = hand.(func(tag uint64, ch *amqp.Channel, msg []byte))
handler(0, nil, m.Data)
})
fmt.Println("ttt", err)
}
func (n NatsMq) DelyConsume(name string, hand interface{}) {
}

View File

@ -9,6 +9,7 @@ import (
"github.com/qit-team/snow-core/redis"
"net"
"qteam/config"
"reflect"
"runtime"
"strconv"
"strings"
@ -79,3 +80,51 @@ func GeneratorToken(playerName string, playerId string) string {
}
return tk
}
func EntityCopy(dst, src interface{}) {
dstValue := reflect.ValueOf(dst).Elem()
srcValue := reflect.ValueOf(src).Elem()
for i := 0; i < srcValue.NumField(); i++ {
srcField := srcValue.Field(i)
srcName := srcValue.Type().Field(i).Name
dstFieldByName := dstValue.FieldByName(srcName)
if dstFieldByName.IsValid() {
switch dstFieldByName.Kind() {
case reflect.Ptr:
switch srcField.Kind() {
case reflect.Ptr:
if srcField.IsNil() {
dstFieldByName.Set(reflect.New(dstFieldByName.Type().Elem()))
} else {
dstFieldByName.Set(srcField)
}
default:
dstFieldByName.Set(srcField.Addr())
}
default:
switch srcField.Kind() {
case reflect.Ptr:
if srcField.IsNil() {
dstFieldByName.Set(reflect.Zero(dstFieldByName.Type()))
} else {
dstFieldByName.Set(srcField.Elem())
}
default:
if srcField.Type().Name() == "Time" {
if (srcField.Interface().(time.Time).Unix()) < 1 {
dstFieldByName.Set(reflect.ValueOf(""))
} else {
dstFieldByName.Set(reflect.ValueOf(srcField.Interface().(time.Time).Format("2006-01-02 15:04:05")))
}
} else {
dstFieldByName.Set(srcField)
}
}
}
}
}
}