voucher/internal/data/mq.go

108 lines
3.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package data
import (
"fmt"
mq_http_sdk "github.com/aliyunmq/mq-http-go-sdk"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
"github.com/go-kratos/kratos/v2/log"
"strconv"
"time"
"voucher/internal/conf"
"voucher/internal/pkg/mq"
)
type RocketMQ struct {
//Mq 的 producer
MqProducer *mq.Producer
}
func NewRocketMQ(c *conf.Bootstrap) (*RocketMQ, func(), error) {
mqProducer, errMq := buildMqProducer(c.RocketMQ)
cleanup := func() {
if mqProducer != nil {
if err := mqProducer.Shutdown(); err != nil {
log.Error("关闭 rocketMQ producer 失败:", err)
}
}
}
return &RocketMQ{MqProducer: mqProducer}, cleanup, errMq
}
// buildMqProducer rocket producer
func buildMqProducer(c *conf.RocketMQ) (*mq.Producer, error) {
if c == nil {
return nil, nil
}
var p *mq.Producer
var err error
if c.AccessKey != "" && c.SecretKey != "" {
p, err = mq.NewProducer(c.Addr, producer.WithCredentials(primitive.Credentials{
AccessKey: c.AccessKey,
SecretKey: c.SecretKey,
}))
} else {
p, err = mq.NewProducer(c.Addr)
}
if err != nil {
fmt.Println("创建 rocketMQ producer 失败: ", err)
return nil, err
}
err = p.Start()
if err != nil {
fmt.Println("rocketMQ producer start 失败: ", err)
return nil, err
}
//此时并没有发起连接,在使用时才会
return p, nil
}
func wechatNotifyProducer() {
// 设置HTTP协议客户端接入点进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
endpoint := "http://1389288909295870.mqrest.cn-hangzhou.aliyuncs.com"
// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
// AccessKey ID阿里云身份验证标识。
accessKey := "LTAI5tPyV7FynQNTfEvbEBuX"
// AccessKey Secret阿里云身份验证密钥。
secretKey := "tZmTh8cV98xAQgtlRU0soWcb6Tpd4T"
// 消息所属的Topic在消息队列RocketMQ版控制台创建。
topic := "notify"
// Topic所属的实例ID在消息队列RocketMQ版控制台创建。
// 若实例有命名空间则实例ID必须传入若实例无命名空间则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
instanceId := "MQ_INST_1389288909295870_BYSoMttI"
tag := "voucher_notify_dev"
//tag := "voucher_notify_pro"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
// 循环发送2条消息。
for i := 0; i < 2; i++ {
var msg mq_http_sdk.PublishMessageRequest
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!", //消息内容。
MessageTag: tag, // 消息标签。
Properties: map[string]string{}, // 消息属性。
}
// 设置消息的Key。
msg.MessageKey = "MessageKey"
// 设置消息自定义属性。
msg.Properties["a"] = strconv.Itoa(i)
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
}