From 25498049544bd952ad1e0d31aad3bca842903f7a Mon Sep 17 00:00:00 2001 From: renzhiyuan <465386466@qq.com> Date: Fri, 21 Feb 2025 11:07:33 +0800 Subject: [PATCH] first push --- README.md | 76 +++++++++++ consumer_config.go | 32 +++++ consumer_conn_config.go | 9 ++ consumer_manager.go | 289 ++++++++++++++++++++++++++++++++++++++++ consumer_message.go | 64 +++++++++ consumer_test.go | 62 +++++++++ go.mod | 38 ++++++ producer.go | 266 ++++++++++++++++++++++++++++++++++++ producer_test.go | 87 ++++++++++++ recover.go | 40 ++++++ 10 files changed, 963 insertions(+) create mode 100644 README.md create mode 100644 consumer_config.go create mode 100644 consumer_conn_config.go create mode 100644 consumer_manager.go create mode 100644 consumer_message.go create mode 100644 consumer_test.go create mode 100644 go.mod create mode 100644 producer.go create mode 100644 producer_test.go create mode 100644 recover.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..3cfaa93 --- /dev/null +++ b/README.md @@ -0,0 +1,76 @@ +# 事件总线 +统一的事件总线,低层使用RocketMQ,进一步封装:约束命名规范、权限验证、链路追踪等 + +## 发送 +[参考:](/event/producer_test.go) +1. 生成发送实例 + ```go + producer, err := event.NewProducer("192.168.6.107:9876") + + // 带有验证凭证的示例 + producer, err := NewProducer("192.168.6.107:9876", producer.WithProducerCredentials("accessKey", "secretKey", "securityToken")) + ``` + 生成时可选的配置项有: + * event.WithProducerCredentials 设置生产者的凭证 +2. 在Data层中注入生成的实例 + +3. 在biz中声明event的interface + +4. 在data层中实现biz声明的interface,同repository类似 + +5. 使用的方法: + * SendSync 同步发送 + * SendAsync 异步发送 + * BatchSendSync 批量同步发送 + * BatchSendAsync 批量异步发送 + +6. 默认支持链路追踪,如要关闭,请调用DisableTraceTelemetry方法 + +## 消费 +[参考:](/event/producer_test.go) +### 一、随着Kratos服务一起启动 +1. 在server中新增consumer.go + ```go + // ConsumerServer 消费者Server + type ConsumerServer struct { + manager *event.ConsumerManager + } + + // NewConsumerServer 工厂方法 + func NewConsumerServer( + //注入一些依赖的对象(包括biz,同http、grpc的Sever类似) + ) *ConsumerServer { + manager := event.NewConsumerManager() + + // 添加一些订阅方法,示例: + _ = manager.Subscribe(ctx, connConf, consumerConf, func(message *ConsumerMessage) error { + // mock 业务耗时 + time.Sleep(10 * time.Second) + // 返回nil才会commit,否则会重试 + return nil + }) + + return &ConsumerServer{manager:manager} + } + + func (c *ConsumerServer) Start(ctx context.Context) error { + return c.manager.Start() + } + + func (c *ConsumerServer) Stop(ctx context.Context) error { + return c.manager.Stop() + } + + ``` + +2. 添加进provider_set.go +3. 打开main.go + ```go + // 在newApp方法中注入ConsumerServer,并添加到 + serverOption := kratos.Server( + //在这里面添加ConsumerServer的实例 + ) + ``` +### 二、使用cli模式启动 +参考上面的启动方式,顺序为:subscribe -> start -> stop + diff --git a/consumer_config.go b/consumer_config.go new file mode 100644 index 0000000..b130a7b --- /dev/null +++ b/consumer_config.go @@ -0,0 +1,32 @@ +package mq + +import "github.com/apache/rocketmq-client-go/v2" + +// ConsumerConfig 消费者配置 +type ConsumerConfig struct { + TopicName string // 必填,主题名称 + + // GroupName 必填,消费者分组名称,填写消费的业务名称,例如:CreatePay-创建支付单 + // 实际注册时,会自动在前面加上TopicName + GroupName string + + // ConsumerCnt,消费者个数,默认为1,不能超过MQ的消费Queue个数 + // 需要保证顺序消费的场景才推荐使用,否则推荐配置PerCoroutineCnt参数来开启并发消费即可 + ConsumerCnt int + + // PerCoroutineCnt 每个consumer的协程个数,默认为20,它与ConsumerCnt都能实现并发消费,区别在于此参数是一个消费者,多个协程并发消费,性能更高 + // 由于是单消费者的并发处理,可能存在后拉取的比先拉取的先处理完,即无法保证严格的顺序消费要求(但大部分场景都没有顺序要求) + // SDK在收到消息时会启动一个新协程运行回调函数,最大运行协程数不超过此配置 + // 示例:2,则实际的并发数是4,正常消费2个,重试2个协程 + PerCoroutineCnt int + + // RetryCnt 消费最大重试次数,间隔时间查看https://rocketmq.apache.org/zh/docs/featureBehavior/10consumerretrypolicy + // 我们默认为38次,目的是给异常业务方保留最多48小时去修复,超过之后进入死信队列,3天后会被自动删除 + RetryCnt *int + + // 指定消费的tag,为空则消费所有tag + Tags []string + + //消费者实例 + pushConsumers []rocketmq.PushConsumer +} diff --git a/consumer_conn_config.go b/consumer_conn_config.go new file mode 100644 index 0000000..acbab44 --- /dev/null +++ b/consumer_conn_config.go @@ -0,0 +1,9 @@ +package mq + +// ConsumerConnConfig 消费者配置 +type ConsumerConnConfig struct { + NameServers string // 必填,多个用,号隔开 + AccessKey string // 连接rocketMQ的accessKey + SecretKey string // 连接rocketMQ的secretKey + SecurityToken string // 连接rocketMQ的securityToken +} diff --git a/consumer_manager.go b/consumer_manager.go new file mode 100644 index 0000000..d191d19 --- /dev/null +++ b/consumer_manager.go @@ -0,0 +1,289 @@ +package mq + +import ( + "context" + "fmt" + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/consumer" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/pkg/errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "go.opentelemetry.io/otel/trace" + "os" + "strings" + "sync/atomic" + "time" +) + +// ConsumerManager 消费者管理器 +type ConsumerManager struct { + consumerConfigs []*ConsumerConfig + activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出 + shutdownFlag atomic.Bool // 关闭标记 + + logger Logger + IsOpenTelemetry bool // 默认开启链路追踪,及时未配置上报地址也不影响业务,只是实际不会上报上去而已 +} + +// NewConsumerManager 创建一个消费者管理器 +func NewConsumerManager(logger Logger) *ConsumerManager { + return &ConsumerManager{logger: logger, IsOpenTelemetry: true} +} +func (c *ConsumerManager) DisableOpenTelemetry() { + c.IsOpenTelemetry = false +} + +// Subscribe 订阅一个主题 +func (c *ConsumerManager) Subscribe(ctx context.Context, connConf *ConsumerConnConfig, consumerConf *ConsumerConfig, fn func(context.Context, *ConsumerMessage) error, opts ...consumer.Option) error { + // 检查参数规范 + if !c.checkGroupName(consumerConf.TopicName, consumerConf.GroupName) { + return fmt.Errorf("groupName不符合规范,前缀必须是\"${topicName}_\"开头,如trade_recharge_dispatcher_pay") + } + if consumerConf.RetryCnt == nil || *consumerConf.RetryCnt < 0 { + retryCnt := 38 + consumerConf.RetryCnt = &retryCnt + } + if consumerConf.ConsumerCnt <= 0 { + consumerConf.ConsumerCnt = 1 + } + if consumerConf.PerCoroutineCnt <= 0 { + consumerConf.PerCoroutineCnt = 20 + } + + credentials := primitive.Credentials{ + AccessKey: connConf.AccessKey, + SecretKey: connConf.SecretKey, + SecurityToken: connConf.SecurityToken, + } + //限制groupName名称必须与topic捆绑 + nameServers := strings.Split(connConf.NameServers, ",") + opts = append(opts, + consumer.WithNameServer(nameServers), + consumer.WithCredentials(credentials), + consumer.WithConsumerModel(consumer.Clustering), + // 不要开启此参数,开启顺序消费,通过返回consumer.ConsumeRetryLater将会失效,需要自己实现重试机制 + //key.WithConsumerOrder(true), + consumer.WithGroupName(consumerConf.GroupName), + consumer.WithConsumeGoroutineNums(consumerConf.PerCoroutineCnt), + consumer.WithRetry(*consumerConf.RetryCnt), + // 不启用批消费,多个一起消费的特点是则其中一个失败则整体都会失败 + consumer.WithConsumeMessageBatchMaxSize(1), + //key.WithPullThresholdForQueue(50), + ) + // 启动指定个数的consumer + hostName, _ := os.Hostname() + now := time.Now().Unix() + for i := 0; i < consumerConf.ConsumerCnt; i++ { + currOpts := make([]consumer.Option, len(opts)) + copy(currOpts, opts) + currOpts = append(currOpts, consumer.WithInstance(fmt.Sprintf("%s:%s:%d:%d", hostName, consumerConf.GroupName, now, i+1))) + pushConsumer, err := rocketmq.NewPushConsumer(currOpts...) + if err != nil { + return err + } + selector := consumer.MessageSelector{} + // 过滤tag + if len(consumerConf.Tags) > 0 { + selector.Type = consumer.TAG + selector.Expression = strings.Join(consumerConf.Tags, " || ") + } + + err = pushConsumer.Subscribe(consumerConf.TopicName, selector, func(subCtx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) { + return c.callbackForReceive(subCtx, consumerConf, fn, ext...) + }) + if err != nil { + return err + } + consumerConf.pushConsumers = append(consumerConf.pushConsumers, pushConsumer) + } + + c.consumerConfigs = append(c.consumerConfigs, consumerConf) + return nil +} + +var consumerPropagator = propagation.TraceContext{} + +// callbackForReceive 收到消息的回调 +func (c *ConsumerManager) callbackForReceive(ctx context.Context, consumerConf *ConsumerConfig, fn func(context.Context, *ConsumerMessage) error, ext ...*primitive.MessageExt) (cr consumer.ConsumeResult, fnErr error) { + // 收到消息 + if c.shutdownFlag.Load() { + cr = consumer.ConsumeRetryLater + fnErr = fmt.Errorf("正在退出中,延期处理:%s,%s", consumerConf.TopicName, consumerConf.GroupName) + // 卡住,不再继续消费,等待退出 + // 测试发现在重试主题消费时,返回retryLater有时会被commit掉,导致消息丢失 + time.Sleep(24 * time.Hour) + return + } + + // 标记活跃状态 + c.activeCnt.Add(1) + defer func() { + c.activeCnt.Add(-1) + if v := recover(); v != nil { + cr = consumer.ConsumeRetryLater + fnErr = errors.Errorf("处理消息panic, groupName=%s,%+v", consumerConf.GroupName, v) + c.LogErrorf("%+v", fnErr) + return + } + }() + + var tracer trace.Tracer + if c.IsOpenTelemetry { + tracer = otel.GetTracerProvider().Tracer("MQ") + } + + // WithConsumeMessageBatchMaxSize 配置大于1时,ext才会存在多个,它的特点是要么全成功或全失败 + cr = consumer.ConsumeSuccess + spanName := fmt.Sprintf("%s %s %s", consumerConf.TopicName, semconv.MessagingOperationProcess.Value.AsString(), consumerConf.GroupName) + for _, v := range ext { + func() { + message := &ConsumerMessage{ + MsgId: v.MsgId, + Topic: v.Topic, + Body: v.Body, + ReconsumeTimes: v.ReconsumeTimes, + CompressedBody: v.CompressedBody, + Flag: v.Flag, + TransactionId: v.TransactionId, + Batch: v.Batch, + Compress: v.Compress, + Properties: v.GetProperties(), + } + // 链路追踪 + var span trace.Span + if tracer != nil { + if traceParent, ok := message.Properties[openTelemetryPropertyName]; ok { + var mapCarrier propagation.MapCarrier = map[string]string{ + openTelemetryPropertyName: traceParent, + } + ctx = consumerPropagator.Extract(ctx, mapCarrier) + } + ctx, span = tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindConsumer)) + span.SetAttributes( + semconv.MessagingSystem("RocketMQ"), + semconv.MessagingSourceName(consumerConf.TopicName), + semconv.MessagingRocketmqClientGroup(consumerConf.GroupName), + semconv.MessagingRocketmqMessageKeys(message.GetKeys()...), + semconv.MessagingRocketmqMessageTag(message.GetTags()), + semconv.MessagingRocketmqMessageDelayTimeLevel(message.GetDelayTimeLevel()), + semconv.MessageIDKey.String(message.MsgId), + ) + defer func() { + // 记录追踪信息 + spanErr := fnErr + var panicVal any + if panicVal = recover(); panicVal != nil { + spanErr = fmt.Errorf("%s消费者处理方法panic:%s", consumerConf.GroupName, panicVal) + } + if spanErr != nil { + span.RecordError(spanErr) + span.SetStatus(codes.Error, spanErr.Error()) + } + + span.End() + + if panicVal != nil { + panic(panicVal) + } + }() + } + + // 回调业务函数 + currErr := fn(ctx, message) + + if currErr != nil { + cr = consumer.ConsumeRetryLater + fnErr = currErr + c.LogErrorf("%s消费者处理方法返回失败,body=%s:%+v", consumerConf.GroupName, string(message.Body), currErr) + } + }() + } + return +} + +// Start 启动所有消费者 +func (c *ConsumerManager) Start(_ context.Context) error { + for _, consumerConf := range c.consumerConfigs { + for _, pushConsumer := range consumerConf.pushConsumers { + err := pushConsumer.Start() + if err != nil { + return err + } + } + } + return nil +} + +// Stop 停止所有消费者 +func (c *ConsumerManager) Stop(_ context.Context) error { + fmt.Println("开始停止消费者") + c.shutdownFlag.Store(true) + + for _, consumerConf := range c.consumerConfigs { + for _, pushConsumer := range consumerConf.pushConsumers { + pushConsumer.Suspend() // 似乎没起使用 + } + } + fmt.Println("已suspend所有消费者") + + //shutdown之间,保证正在处理的消费先提交 + _ = c.blockWaitFinish() + + var err error = nil + for _, consumerConf := range c.consumerConfigs { + for _, pushConsumer := range consumerConf.pushConsumers { + if closeErr := pushConsumer.Shutdown(); closeErr != nil { + err = closeErr + fmt.Println("消费者shutdown失败", closeErr) + } + } + } + + fmt.Println("已shutdown所有消费者") + + return err +} + +// blockWaitFinish 阻塞等待业务完成 +func (c *ConsumerManager) blockWaitFinish() error { + // 每1s检查下业务是否都处理完成 + + for { + cnt := c.activeCnt.Load() + if cnt == 0 { + //无业务处理,正常退 + break + } else { + fmt.Printf("等待消费者退出,%d 个正在运行\n", cnt) + } + time.Sleep(1 * time.Second) + } + + //防止极端情况下commit未完成 + // nolint + time.Sleep(1 * time.Second) + return nil +} + +// LogErrorf 记录错误日志 +func (c *ConsumerManager) LogErrorf(format string, args ...any) { + if c.logger != nil { + c.logger.Errorf(format, args...) + } else { + fmt.Printf(format+"\n", args...) + } +} + +// checkGroupName 检查groupName是否符合规范 +// 必须是以${topicName}_开头,目的是 +// 1. 防止相同相同groupName与多个topic的情况,避免出现消费不符异常的情况 +// 2. 在管理group时,能更清晰地体现出对应的topic +func (c *ConsumerManager) checkGroupName(topicName string, groupName string) bool { + if groupName == "" { + return false + } + return strings.HasPrefix(groupName, topicName+"_") +} diff --git a/consumer_message.go b/consumer_message.go new file mode 100644 index 0000000..27e7831 --- /dev/null +++ b/consumer_message.go @@ -0,0 +1,64 @@ +package mq + +import ( + "github.com/apache/rocketmq-client-go/v2/primitive" + "strconv" + "strings" +) + +// ConsumerMessage 消费者收到的消息 +type ConsumerMessage struct { + MsgId string + Topic string + Body []byte + ReconsumeTimes int32 + CompressedBody []byte + Flag int32 + TransactionId string + Batch bool + Compress bool + Properties map[string]string +} + +// GetKeys 获取消息的key +func (c *ConsumerMessage) GetKeys() []string { + if len(c.Properties) == 0 { + return nil + } + val, isOk := c.Properties[primitive.PropertyKeys] + if !isOk { + return nil + } + return strings.Split(val, primitive.PropertyKeySeparator) +} + +// GetTags 获取消息的tag +func (c *ConsumerMessage) GetTags() string { + if len(c.Properties) == 0 { + return "" + } + // nolint + val, _ := c.Properties[primitive.PropertyTags] + return val +} + +// GetShardingKey 获取消息的分区key +func (c *ConsumerMessage) GetShardingKey() string { + if len(c.Properties) == 0 { + return "" + } + // nolint + val, _ := c.Properties[primitive.PropertyShardingKey] + return val +} + +// GetDelayTimeLevel 获取消息的延迟级别 +func (c *ConsumerMessage) GetDelayTimeLevel() int { + if len(c.Properties) == 0 { + return 0 + } + // nolint + val, _ := c.Properties[primitive.PropertyDelayTimeLevel] + level, _ := strconv.Atoi(val) + return level +} diff --git a/consumer_test.go b/consumer_test.go new file mode 100644 index 0000000..84505df --- /dev/null +++ b/consumer_test.go @@ -0,0 +1,62 @@ +package mq + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" +) + +func TestConsumer_Start(t *testing.T) { + initTracer("http://192.168.6.194:14268/api/traces", 1, "go_unit_test_for_util_consumer") + + manager := NewConsumerManager(nil) + ctx := context.Background() + consumerConf := &ConsumerConfig{ + TopicName: "test_sdk_sync", + GroupName: "test_sdk_sync_myConsumer", + PerCoroutineCnt: 2, + } + connConf := &ConsumerConnConfig{ + NameServers: "192.168.6.107:9876", + } + i := atomic.Int32{} + + err := manager.Subscribe(ctx, connConf, consumerConf, func(ctx context.Context, message *ConsumerMessage) error { + cnt := i.Add(1) + + fmt.Printf("%d开始消费:%s ,body=%s \n", cnt, message.MsgId, message.Body) + fmt.Println(message.Properties) + // mock 业务耗时 + time.Sleep(1 * time.Second) + + fmt.Printf("%d已完成:%s ,body=%s \n", cnt, message.MsgId, message.Body) + return nil + }) + fmt.Println(err) + consumerConf = &ConsumerConfig{ + TopicName: "test_sdk_async", + GroupName: "test_sdk_async_myConsumer", + PerCoroutineCnt: 2, + } + err = manager.Subscribe(ctx, connConf, consumerConf, func(ctx context.Context, message *ConsumerMessage) error { + cnt := i.Add(1) + + fmt.Printf("%d开始消费:%s ,body=%s \n", cnt, message.MsgId, message.Body) + fmt.Println(message.Properties) + // mock 业务耗时 + time.Sleep(1 * time.Second) + + fmt.Printf("%d已完成:%s ,body=%s \n", cnt, message.MsgId, message.Body) + return nil + }) + + fmt.Println(err) + _ = manager.Start(context.Background()) + + time.Sleep(5 * time.Second) + + // mock 退出 + _ = manager.Stop(context.Background()) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cfd8dfe --- /dev/null +++ b/go.mod @@ -0,0 +1,38 @@ +module gitea.cdlsxd.cn/self-tools/l_rocketmq + +go 1.23.6 + +require ( + github.com/apache/rocketmq-client-go/v2 v2.1.2 + github.com/pkg/errors v0.9.1 + go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/exporters/jaeger v1.17.0 + go.opentelemetry.io/otel/sdk v1.34.0 + go.opentelemetry.io/otel/trace v1.34.0 +) + +require ( + github.com/emirpasic/gods v1.12.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/mock v1.3.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/patrickmn/go-cache v2.1.0+incompatible // indirect + github.com/sirupsen/logrus v1.4.0 // indirect + github.com/tidwall/gjson v1.13.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.uber.org/atomic v1.5.1 // indirect + golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect + golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect + stathat.com/c/consistent v1.0.0 // indirect +) diff --git a/producer.go b/producer.go new file mode 100644 index 0000000..e0cb62d --- /dev/null +++ b/producer.go @@ -0,0 +1,266 @@ +package mq + +import ( + "context" + "fmt" + "github.com/apache/rocketmq-client-go/v2" + "github.com/apache/rocketmq-client-go/v2/primitive" + "github.com/apache/rocketmq-client-go/v2/producer" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "go.opentelemetry.io/otel/trace" + "math/rand" + "strings" + "time" +) + +type Producer struct { + ProducerClient rocketmq.Producer + IsOpenTelemetry bool // 默认开启链路追踪,及时未配置上报地址也不影响业务,只是实际不会上报上去而已 +} + +// WithProducerCredentials 设置生产者的凭证 +func WithProducerCredentials(accessKey, secretKey, securityToken string) producer.Option { + return producer.WithCredentials(primitive.Credentials{ + AccessKey: accessKey, + SecretKey: secretKey, + SecurityToken: securityToken, + }) + +} + +// NewProducer 创建一个生产者 +// nameServer: 连接地址,多个中间用,号隔开 +// opts: 配置项 +func NewProducer(nameServer string, opts ...producer.Option) (*Producer, error) { + //检查参数 + if nameServer == "" { + return nil, fmt.Errorf("rocketMQ NameServer 不能为空") + } + //创建生产者 + nameServers := strings.Split(nameServer, ",") + group := generateRandomString(10) + opts = append(opts, producer.WithNameServer(nameServers), producer.WithGroupName(group)) + p, err := rocketmq.NewProducer(opts...) + if err != nil { + fmt.Println("创建 rocketMQ producer 失败: ", err) + return nil, err + } + + //此时并没有发起连接,在使用时才会连接 + return &Producer{ProducerClient: p, IsOpenTelemetry: true}, nil +} + +func generateRandomString(length int) string { + // 定义字符集 + const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + var sb strings.Builder + rand.Seed(time.Now().UnixNano()) // 设置种子 + + // 生成随机字符串 + for i := 0; i < length; i++ { + sb.WriteByte(charset[rand.Intn(len(charset))]) // 从字符集随机选取 + } + + return sb.String() +} + +// DisableTelemetry 关闭链路追踪 +func (p *Producer) DisableTelemetry() { + p.IsOpenTelemetry = false +} + +// Start 启动生产者 +func (p *Producer) Start() error { + return p.ProducerClient.Start() +} + +// Shutdown 关闭生产者 +func (p *Producer) Shutdown() error { + return p.ProducerClient.Shutdown() +} + +// SendOption 发送消息选项 +type SendOption func(*primitive.Message) + +// WithSendKeysOption 设置消息的key +func WithSendKeysOption(keys []string) SendOption { + return func(msg *primitive.Message) { + msg.WithKeys(keys) + } +} + +// WithSendShardingKeysOption 设置消息的key +func WithSendShardingKeysOption(key string) SendOption { + return func(msg *primitive.Message) { + msg.WithShardingKey(key) + } +} + +// WithSendTagOption 设置消息的Tag +func WithSendTagOption(tag string) SendOption { + return func(msg *primitive.Message) { + msg.WithTag(tag) + } +} + +// WithSendDelayLevelOption 设置消息的延迟级别 +// reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h +// delay level starts from 1. for example, if we set param level=1, then the delay time is 1s. +func WithSendDelayLevelOption(level int) SendOption { + return func(msg *primitive.Message) { + msg.WithDelayTimeLevel(level) + } +} + +const openTelemetryPropertyName = "traceparent" + +// WithOpenTelemetryOption 设置消息的链接追踪信息 +func WithOpenTelemetryOption(value string) SendOption { + return func(msg *primitive.Message) { + msg.WithProperty(openTelemetryPropertyName, value) + } +} + +// WithSendWithPropertyOption 设置消息的属性 +func WithSendWithPropertyOption(key, value string) SendOption { + return func(msg *primitive.Message) { + msg.WithProperty(key, value) + } +} + +// SendSync 同步发送消息 +// topic: 主题 +// sendOptions: 发送选项,如WithSendTagOption +// bodyList: 支持发送多个 +func (p *Producer) SendSync(ctx context.Context, topic string, body []byte, sendOptions ...SendOption) error { + return p.BatchSendSync(ctx, topic, sendOptions, body) +} + +// SendAsync 异步发送消息 +// topic: 主题 +// sendOptions: 发送选项,如WithSendTagOption +// callbackFn: 回调函数,无论成功与否都会回调,失败时err!=nil +// bodyList: 支持发送多个 +func (p *Producer) SendAsync(ctx context.Context, topic string, body []byte, callbackFn func(error), sendOptions ...SendOption) error { + return p.BatchSendAsync(ctx, topic, callbackFn, sendOptions, body) +} + +// BatchSendSync 同步发送消息 +// topic: 主题 +// sendOptions: 发送选项,如WithSendTagOption +// bodyList: 支持发送多个 +func (p *Producer) BatchSendSync(ctx context.Context, topic string, sendOptions []SendOption, bodyList ...[]byte) error { + if err := p.checkSend(topic); err != nil { + return err + } + msgList := make([]*primitive.Message, len(bodyList)) + for i, body := range bodyList { + msgList[i] = &primitive.Message{ + Topic: topic, + Body: body, + } + for _, option := range sendOptions { + option(msgList[i]) + } + } + + // 链路追踪 + var err error + if p.IsOpenTelemetry { + _, span := p.generateTraceSpan(ctx, topic, msgList) + defer func() { + // 记录错误 + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + }() + } + + _, err = p.ProducerClient.SendSync(context.Background(), msgList...) + if err != nil { + return err + } + return nil +} + +// generateTraceSpan 生成链路追踪的span +var produceTraceContext = propagation.TraceContext{} + +func (p *Producer) generateTraceSpan(ctx context.Context, topic string, msgList []*primitive.Message) (context.Context, trace.Span) { + tracer := otel.GetTracerProvider().Tracer("MQ") + spanName := fmt.Sprintf("%s %s", topic, semconv.MessagingOperationPublish.Value.AsString()) + spanCtx, span := tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindProducer)) + span.SetAttributes( + semconv.MessagingSystem("RocketMQ"), + semconv.MessagingDestinationName(topic), + semconv.MessagingBatchMessageCount(len(msgList)), + semconv.MessagingRocketmqMessageKeys(msgList[0].GetKeys()), + semconv.MessagingRocketmqMessageTag(msgList[0].GetTags()), + ) + //将span的trace数据写入 + carrier := propagation.MapCarrier{} + produceTraceContext.Inject(spanCtx, carrier) + traceParent := carrier[openTelemetryPropertyName] + for _, message := range msgList { + message.WithProperty(openTelemetryPropertyName, traceParent) + } + + return spanCtx, span +} + +// BatchSendAsync 异步发送消息 +// topic: 主题 +// callbackFn: 回调方法,无论成功与否都会回调,失败时err!=nil +// sendOptions: 发送选项,如WithSendTagOption +// bodyList: 支持发送多个 +func (p *Producer) BatchSendAsync(ctx context.Context, topic string, callbackFn func(error), sendOptions []SendOption, bodyList ...[]byte) error { + if err := p.checkSend(topic); err != nil { + return err + } + + msgList := make([]*primitive.Message, len(bodyList)) + for i, body := range bodyList { + msgList[i] = &primitive.Message{ + Topic: topic, + Body: body, + } + for _, option := range sendOptions { + option(msgList[i]) + } + } + + var err error + var span trace.Span + if p.IsOpenTelemetry { + _, span = p.generateTraceSpan(ctx, topic, msgList) + } + + err = p.ProducerClient.SendAsync(context.Background(), func(ctxErr context.Context, result *primitive.SendResult, err error) { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } + span.End() + + callbackFn(err) + }, msgList...) + return err +} + +// checkTopicName 检查topicName是否符合规范 +// 名字分三部分,由字母和.号组成:${模块名}_${业务名}_${事件},如:job-order-created +// 权限: +func (p *Producer) checkSend(topicName string) error { + arr := strings.Split(topicName, "_") + isOk := len(arr) == 3 && arr[0] != "" && arr[1] != "" && arr[2] != "" + if !isOk { + return fmt.Errorf("topic名称不符合规范") + } + // 检查权限:待完善 + return nil +} diff --git a/producer_test.go b/producer_test.go new file mode 100644 index 0000000..1b5312c --- /dev/null +++ b/producer_test.go @@ -0,0 +1,87 @@ +package mq + +import ( + "context" + "fmt" + "go.opentelemetry.io/otel" + // nolint + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + "os" + "testing" + "time" +) + +func TestProducer_SendSync(t *testing.T) { + initTracer("http://192.168.6.194:14268/api/traces", 1, "go_unit_test_for_util_producer") + + //http://192.168.6.107:9876/ + p, err := NewProducer("192.168.6.107:9876") + //p, err := NewProducer("192.168.6.107:9876", WithProducerCredentials("", "", "")) + if err != nil { + t.Fatal(err) + } + err = p.Start() + if err != nil { + t.Fatal(err) + } + for i := 0; i < 10; i++ { + err = p.SendSync( + context.Background(), + "test_sdk_sync", + []byte("hello world"+time.Now().Format(time.DateTime)+fmt.Sprintf("%d", i)), + WithSendKeysOption([]string{"ttt"}), + WithSendTagOption("TagA"), + WithSendWithPropertyOption("pk", "pv"), + ) + if err != nil { + fmt.Println("同步发送失败:", err) + } + err = p.SendAsync( + context.Background(), + "test_sdk_async", + []byte("hello world"+time.Now().Format(time.DateTime)+fmt.Sprintf("%d", i)), + func(err error) { + if err != nil { + //出错了 + fmt.Println("异步回调错误", err) + } else { + fmt.Println("异步回调成功") + } + }, + WithSendKeysOption([]string{"ttt"}), + WithSendTagOption("TagA"), + WithSendWithPropertyOption("pk", "pv"), + ) + if err != nil { + fmt.Println("异步发起失败", err) + } + } + _ = p.Shutdown() + time.Sleep(1 * time.Second) // 防止测试时span未上报完成 +} + +func initTracer(url string, sampler float64, name string) { + exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) + if err != nil { + panic(fmt.Errorf("failed to initialize jaeger exporter:%s", url)) + } + hostName, _ := os.Hostname() + tp := trace.NewTracerProvider( + // 将基于父span的采样率设置为100% + trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(sampler))), + // 单元测试使用同步发送 + //trace.WithBatcher(exporter), + trace.WithSyncer(exporter), + // 在资源中记录有关此应用程序的信息 + trace.WithResource(resource.NewSchemaless( + semconv.ServiceNameKey.String(name), + semconv.ServiceVersionKey.String("v1.0.0"), + semconv.ServiceInstanceIDKey.String(hostName), + semconv.GCPGceInstanceHostnameKey.String(hostName), + )), + ) + otel.SetTracerProvider(tp) +} diff --git a/recover.go b/recover.go new file mode 100644 index 0000000..f6de62f --- /dev/null +++ b/recover.go @@ -0,0 +1,40 @@ +package mq + +import ( + "fmt" + "github.com/apache/rocketmq-client-go/v2/primitive" +) + +type Logger interface { + // Debugf logs a formatted debugging message. + Debugf(format string, args ...interface{}) + + // Infof logs a formatted informational message. + Infof(format string, args ...interface{}) + + // Warnf logs a formatted warning message. + Warnf(format string, args ...interface{}) + + // Errorf logs a formatted error message. + Errorf(format string, args ...interface{}) +} + +func init() { + SetRecoverLogger(nil) +} + +// SetRecoverLogger 设置RocketMQ的recover的处理函数日志,不设置会导致协程panic后程序退出 +func SetRecoverLogger(logger Logger) { + SetRecoverHandler(func(err any) { + if logger == nil { + fmt.Printf("rocketmq 发生 panic:%+v\n", err) + return + } + logger.Errorf("rocketmq 发生 panic:%+v", err) + }) +} + +// SetRecoverHandler 设置RocketMQ的recover的处理函数,不设置会导致协程panic后程序退出 +func SetRecoverHandler(fn func(any)) { + primitive.PanicHandler = fn +}