58 lines
1.3 KiB
Go
58 lines
1.3 KiB
Go
package data
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/apache/rocketmq-client-go/v2/primitive"
|
|
"github.com/apache/rocketmq-client-go/v2/producer"
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"voucher/internal/conf"
|
|
"voucher/internal/pkg/mq"
|
|
)
|
|
|
|
type RocketMQ struct {
|
|
//Mq 的 producer
|
|
MqProducer *mq.Producer
|
|
}
|
|
|
|
func NewRocketMQ(c *conf.Bootstrap) (*RocketMQ, func(), error) {
|
|
mqProducer, errMq := buildMqProducer(c.RocketMQ)
|
|
|
|
cleanup := func() {
|
|
if mqProducer != nil {
|
|
if err := mqProducer.Shutdown(); err != nil {
|
|
log.Error("关闭 rocketMQ producer 失败:", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return &RocketMQ{MqProducer: mqProducer}, cleanup, errMq
|
|
}
|
|
|
|
// buildMqProducer rocket producer
|
|
func buildMqProducer(c *conf.RocketMQ) (*mq.Producer, error) {
|
|
if c == nil {
|
|
return nil, nil
|
|
}
|
|
var p *mq.Producer
|
|
var err error
|
|
if c.AccessKey != "" && c.SecretKey != "" {
|
|
p, err = mq.NewProducer(c.Addr, producer.WithCredentials(primitive.Credentials{
|
|
AccessKey: c.AccessKey,
|
|
SecretKey: c.SecretKey,
|
|
}))
|
|
} else {
|
|
p, err = mq.NewProducer(c.Addr)
|
|
}
|
|
if err != nil {
|
|
fmt.Println("创建 rocketMQ producer 失败: ", err)
|
|
return nil, err
|
|
}
|
|
err = p.Start()
|
|
if err != nil {
|
|
fmt.Println("rocketMQ producer start 失败: ", err)
|
|
return nil, err
|
|
}
|
|
//此时并没有发起连接,在使用时才会
|
|
return p, nil
|
|
}
|