package mq import ( "encoding/json" "fmt" "github.com/nats-io/nats.go" _ "github.com/nats-io/nats.go" "github.com/streadway/amqp" "qteam/app/utils" "qteam/config" ) type NatsMq struct { Address string nc *nats.Conn } func (n NatsMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error { name = config.GetConf().ServiceName + "_" + name fmt.Println("nats produce", name) nc, _ := nats.Connect(n.Address) defer nc.Close() var content, err = json.Marshal(log) nc.Publish(name, content) return err } func (n NatsMq) Consume(name string, hand interface{}) { if n.nc == nil || n.nc.IsClosed() == true { if n.nc != nil { n.nc.Close() } n.nc, _ = nats.Connect(n.Address) } nc := n.nc fmt.Println("nats comsume", name) //defer nc.Close() _, err := nc.Subscribe(name, func(m *nats.Msg) { utils.Log(nil, "Received a message: %s", string(m.Data)) var handler = hand.(func(tag uint64, ch *amqp.Channel, msg []byte)) handler(0, nil, m.Data) }) fmt.Println("ttt", err) } func (n NatsMq) DelyConsume(name string, hand interface{}) { }