package data import ( "fmt" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "github.com/go-kratos/kratos/v2/log" "voucher/internal/conf" "voucher/internal/pkg/mq" ) type RocketMQ struct { //Mq 的 producer MqProducer *mq.Producer } func NewRocketMQ(c *conf.Bootstrap) (*RocketMQ, func(), error) { mqProducer, errMq := buildMqProducer(c.RocketMQ) cleanup := func() { if mqProducer != nil { if err := mqProducer.Shutdown(); err != nil { log.Error("关闭 rocketMQ producer 失败:", err) } } } return &RocketMQ{MqProducer: mqProducer}, cleanup, errMq } // buildMqProducer rocket producer func buildMqProducer(c *conf.RocketMQ) (*mq.Producer, error) { if c == nil { return nil, nil } var p *mq.Producer var err error if c.AccessKey != "" && c.SecretKey != "" { p, err = mq.NewProducer(c.Addr, producer.WithCredentials(primitive.Credentials{ AccessKey: c.AccessKey, SecretKey: c.SecretKey, })) } else { p, err = mq.NewProducer(c.Addr) } if err != nil { fmt.Println("创建 rocketMQ producer 失败: ", err) return nil, err } err = p.Start() if err != nil { fmt.Println("rocketMQ producer start 失败: ", err) return nil, err } //此时并没有发起连接,在使用时才会 return p, nil }