49 lines
1.1 KiB
Go
49 lines
1.1 KiB
Go
|
package mq
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"github.com/nats-io/nats.go"
|
||
|
_ "github.com/nats-io/nats.go"
|
||
|
"github.com/streadway/amqp"
|
||
|
"qteam/app/utils"
|
||
|
"qteam/config"
|
||
|
)
|
||
|
|
||
|
type NatsMq struct {
|
||
|
Address string
|
||
|
nc *nats.Conn
|
||
|
}
|
||
|
|
||
|
func (n NatsMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error {
|
||
|
name = config.GetConf().ServiceName + "_" + name
|
||
|
fmt.Println("nats produce", name)
|
||
|
nc, _ := nats.Connect(n.Address)
|
||
|
defer nc.Close()
|
||
|
var content, err = json.Marshal(log)
|
||
|
nc.Publish(name, content)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (n NatsMq) Consume(name string, hand interface{}) {
|
||
|
if n.nc == nil || n.nc.IsClosed() == true {
|
||
|
if n.nc != nil {
|
||
|
n.nc.Close()
|
||
|
}
|
||
|
n.nc, _ = nats.Connect(n.Address)
|
||
|
}
|
||
|
nc := n.nc
|
||
|
fmt.Println("nats comsume", name)
|
||
|
//defer nc.Close()
|
||
|
_, err := nc.Subscribe(name, func(m *nats.Msg) {
|
||
|
utils.Log(nil, "Received a message: %s", string(m.Data))
|
||
|
var handler = hand.(func(tag uint64, ch *amqp.Channel, msg []byte))
|
||
|
handler(0, nil, m.Data)
|
||
|
})
|
||
|
fmt.Println("ttt", err)
|
||
|
}
|
||
|
|
||
|
func (n NatsMq) DelyConsume(name string, hand interface{}) {
|
||
|
|
||
|
}
|