package mq import ( "context" "fmt" "github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" semconv "go.opentelemetry.io/otel/semconv/v1.20.0" "go.opentelemetry.io/otel/trace" "strings" ) type Producer struct { ProducerClient rocketmq.Producer IsOpenTelemetry bool // 默认开启链路追踪,及时未配置上报地址也不影响业务,只是实际不会上报上去而已 } // WithProducerCredentials 设置生产者的凭证 func WithProducerCredentials(accessKey, secretKey, securityToken string) producer.Option { return producer.WithCredentials(primitive.Credentials{ AccessKey: accessKey, SecretKey: secretKey, SecurityToken: securityToken, }) } // NewProducer 创建一个生产者 // nameServer: 连接地址,多个中间用,号隔开 // opts: 配置项 func NewProducer(nameServer string, opts ...producer.Option) (*Producer, error) { //检查参数 if nameServer == "" { return nil, fmt.Errorf("rocketMQ NameServer 不能为空") } //创建生产者 nameServers := strings.Split(nameServer, ",") opts = append(opts, producer.WithNameServer(nameServers)) p, err := rocketmq.NewProducer(opts...) if err != nil { fmt.Println("创建 rocketMQ producer 失败: ", err) return nil, err } //此时并没有发起连接,在使用时才会连接 return &Producer{ProducerClient: p, IsOpenTelemetry: true}, nil } // DisableTelemetry 关闭链路追踪 func (p *Producer) DisableTelemetry() { p.IsOpenTelemetry = false } // Start 启动生产者 func (p *Producer) Start() error { return p.ProducerClient.Start() } // Shutdown 关闭生产者 func (p *Producer) Shutdown() error { return p.ProducerClient.Shutdown() } // SendOption 发送消息选项 type SendOption func(*primitive.Message) // WithSendKeysOption 设置消息的key func WithSendKeysOption(keys []string) SendOption { return func(msg *primitive.Message) { msg.WithKeys(keys) } } // WithSendShardingKeysOption 设置消息的key func WithSendShardingKeysOption(key string) SendOption { return func(msg *primitive.Message) { msg.WithShardingKey(key) } } // WithSendTagOption 设置消息的Tag func WithSendTagOption(tag string) SendOption { return func(msg *primitive.Message) { msg.WithTag(tag) } } // WithSendDelayLevelOption 设置消息的延迟级别 // reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h // delay level starts from 1. for example, if we set param level=1, then the delay time is 1s. func WithSendDelayLevelOption(level int) SendOption { return func(msg *primitive.Message) { msg.WithDelayTimeLevel(level) } } const openTelemetryPropertyName = "traceparent" // WithOpenTelemetryOption 设置消息的链接追踪信息 func WithOpenTelemetryOption(value string) SendOption { return func(msg *primitive.Message) { msg.WithProperty(openTelemetryPropertyName, value) } } // WithSendWithPropertyOption 设置消息的属性 func WithSendWithPropertyOption(key, value string) SendOption { return func(msg *primitive.Message) { msg.WithProperty(key, value) } } // SendSync 同步发送消息 // topic: 主题 // sendOptions: 发送选项,如WithSendTagOption // bodyList: 支持发送多个 func (p *Producer) SendSync(ctx context.Context, topic string, body []byte, sendOptions ...SendOption) error { return p.BatchSendSync(ctx, topic, sendOptions, body) } // SendAsync 异步发送消息 // topic: 主题 // sendOptions: 发送选项,如WithSendTagOption // callbackFn: 回调函数,无论成功与否都会回调,失败时err!=nil // bodyList: 支持发送多个 func (p *Producer) SendAsync(ctx context.Context, topic string, body []byte, callbackFn func(error), sendOptions ...SendOption) error { return p.BatchSendAsync(ctx, topic, callbackFn, sendOptions, body) } // BatchSendSync 同步发送消息 // topic: 主题 // sendOptions: 发送选项,如WithSendTagOption // bodyList: 支持发送多个 func (p *Producer) BatchSendSync(ctx context.Context, topic string, sendOptions []SendOption, bodyList ...[]byte) error { if err := p.checkSend(topic); err != nil { return err } msgList := make([]*primitive.Message, len(bodyList)) for i, body := range bodyList { msgList[i] = &primitive.Message{ Topic: topic, Body: body, } for _, option := range sendOptions { option(msgList[i]) } } // 链路追踪 var err error if p.IsOpenTelemetry { _, span := p.generateTraceSpan(ctx, topic, msgList) defer func() { // 记录错误 if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } span.End() }() } _, err = p.ProducerClient.SendSync(context.Background(), msgList...) if err != nil { return err } return nil } // generateTraceSpan 生成链路追踪的span var produceTraceContext = propagation.TraceContext{} func (p *Producer) generateTraceSpan(ctx context.Context, topic string, msgList []*primitive.Message) (context.Context, trace.Span) { tracer := otel.GetTracerProvider().Tracer("LSXD_Util") spanName := fmt.Sprintf("%s %s", topic, semconv.MessagingOperationPublish.Value.AsString()) spanCtx, span := tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindProducer)) span.SetAttributes( semconv.MessagingSystem("RocketMQ"), semconv.MessagingDestinationName(topic), semconv.MessagingBatchMessageCount(len(msgList)), semconv.MessagingRocketmqMessageKeys(msgList[0].GetKeys()), semconv.MessagingRocketmqMessageTag(msgList[0].GetTags()), ) //将span的trace数据写入 carrier := propagation.MapCarrier{} produceTraceContext.Inject(spanCtx, carrier) traceParent := carrier[openTelemetryPropertyName] for _, message := range msgList { message.WithProperty(openTelemetryPropertyName, traceParent) } return spanCtx, span } // BatchSendAsync 异步发送消息 // topic: 主题 // callbackFn: 回调方法,无论成功与否都会回调,失败时err!=nil // sendOptions: 发送选项,如WithSendTagOption // bodyList: 支持发送多个 func (p *Producer) BatchSendAsync(ctx context.Context, topic string, callbackFn func(error), sendOptions []SendOption, bodyList ...[]byte) error { if err := p.checkSend(topic); err != nil { return err } msgList := make([]*primitive.Message, len(bodyList)) for i, body := range bodyList { msgList[i] = &primitive.Message{ Topic: topic, Body: body, } for _, option := range sendOptions { option(msgList[i]) } } var err error var span trace.Span if p.IsOpenTelemetry { _, span = p.generateTraceSpan(ctx, topic, msgList) } err = p.ProducerClient.SendAsync(context.Background(), func(ctxErr context.Context, result *primitive.SendResult, err error) { if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } span.End() callbackFn(err) }, msgList...) return err } // checkTopicName 检查topicName是否符合规范 // 名字分三部分,由字母和.号组成:${模块名}_${业务名}_${事件},如:trade-order-created // 权限: func (p *Producer) checkSend(topicName string) error { arr := strings.Split(topicName, "_") isOk := len(arr) == 3 && arr[0] != "" && arr[1] != "" && arr[2] != "" if !isOk { return fmt.Errorf("topic名称不符合规范") } // 检查权限:待完善 return nil }