voucher/internal/data/mq.go

58 lines
1.3 KiB
Go

package data
import (
"fmt"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/go-kratos/kratos/v2/log"
"voucher/internal/conf"
"voucher/internal/pkg/mq"
)
type RocketMQ struct {
//Mq 的 producer
MqProducer *mq.Producer
}
func NewRocketMQ(c *conf.Bootstrap) (*RocketMQ, func(), error) {
mqProducer, errMq := buildMqProducer(c.RocketMQ)
cleanup := func() {
if mqProducer != nil {
if err := mqProducer.Shutdown(); err != nil {
log.Error("关闭 rocketMQ producer 失败:", err)
}
}
}
return &RocketMQ{MqProducer: mqProducer}, cleanup, errMq
}
// buildMqProducer rocket producer
func buildMqProducer(c *conf.RocketMQ) (*mq.Producer, error) {
if c == nil {
return nil, nil
}
var p *mq.Producer
var err error
if c.AccessKey != "" && c.SecretKey != "" {
p, err = mq.NewProducer(c.Addr, producer.WithCredentials(primitive.Credentials{
AccessKey: c.AccessKey,
SecretKey: c.SecretKey,
}))
} else {
p, err = mq.NewProducer(c.Addr)
}
if err != nil {
fmt.Println("创建 rocketMQ producer 失败: ", err)
return nil, err
}
err = p.Start()
if err != nil {
fmt.Println("rocketMQ producer start 失败: ", err)
return nil, err
}
//此时并没有发起连接,在使用时才会
return p, nil
}