XinYeYouKu/app/utils/mq/mqs/nats.go

49 lines
1.1 KiB
Go

package mq
import (
"encoding/json"
"fmt"
"github.com/nats-io/nats.go"
_ "github.com/nats-io/nats.go"
"github.com/streadway/amqp"
"qteam/app/utils"
"qteam/config"
)
type NatsMq struct {
Address string
nc *nats.Conn
}
func (n NatsMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error {
name = config.GetConf().ServiceName + "_" + name
fmt.Println("nats produce", name)
nc, _ := nats.Connect(n.Address)
defer nc.Close()
var content, err = json.Marshal(log)
nc.Publish(name, content)
return err
}
func (n NatsMq) Consume(name string, hand interface{}) {
if n.nc == nil || n.nc.IsClosed() == true {
if n.nc != nil {
n.nc.Close()
}
n.nc, _ = nats.Connect(n.Address)
}
nc := n.nc
fmt.Println("nats comsume", name)
//defer nc.Close()
_, err := nc.Subscribe(name, func(m *nats.Msg) {
utils.Log(nil, "Received a message: %s", string(m.Data))
var handler = hand.(func(tag uint64, ch *amqp.Channel, msg []byte))
handler(0, nil, m.Data)
})
fmt.Println("ttt", err)
}
func (n NatsMq) DelyConsume(name string, hand interface{}) {
}