first push
This commit is contained in:
commit
2549804954
|
@ -0,0 +1,76 @@
|
||||||
|
# 事件总线
|
||||||
|
统一的事件总线,低层使用RocketMQ,进一步封装:约束命名规范、权限验证、链路追踪等
|
||||||
|
|
||||||
|
## 发送
|
||||||
|
[参考:](/event/producer_test.go)
|
||||||
|
1. 生成发送实例
|
||||||
|
```go
|
||||||
|
producer, err := event.NewProducer("192.168.6.107:9876")
|
||||||
|
|
||||||
|
// 带有验证凭证的示例
|
||||||
|
producer, err := NewProducer("192.168.6.107:9876", producer.WithProducerCredentials("accessKey", "secretKey", "securityToken"))
|
||||||
|
```
|
||||||
|
生成时可选的配置项有:
|
||||||
|
* event.WithProducerCredentials 设置生产者的凭证
|
||||||
|
2. 在Data层中注入生成的实例
|
||||||
|
|
||||||
|
3. 在biz中声明event的interface
|
||||||
|
|
||||||
|
4. 在data层中实现biz声明的interface,同repository类似
|
||||||
|
|
||||||
|
5. 使用的方法:
|
||||||
|
* SendSync 同步发送
|
||||||
|
* SendAsync 异步发送
|
||||||
|
* BatchSendSync 批量同步发送
|
||||||
|
* BatchSendAsync 批量异步发送
|
||||||
|
|
||||||
|
6. 默认支持链路追踪,如要关闭,请调用DisableTraceTelemetry方法
|
||||||
|
|
||||||
|
## 消费
|
||||||
|
[参考:](/event/producer_test.go)
|
||||||
|
### 一、随着Kratos服务一起启动
|
||||||
|
1. 在server中新增consumer.go
|
||||||
|
```go
|
||||||
|
// ConsumerServer 消费者Server
|
||||||
|
type ConsumerServer struct {
|
||||||
|
manager *event.ConsumerManager
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewConsumerServer 工厂方法
|
||||||
|
func NewConsumerServer(
|
||||||
|
//注入一些依赖的对象(包括biz,同http、grpc的Sever类似)
|
||||||
|
) *ConsumerServer {
|
||||||
|
manager := event.NewConsumerManager()
|
||||||
|
|
||||||
|
// 添加一些订阅方法,示例:
|
||||||
|
_ = manager.Subscribe(ctx, connConf, consumerConf, func(message *ConsumerMessage) error {
|
||||||
|
// mock 业务耗时
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
// 返回nil才会commit,否则会重试
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return &ConsumerServer{manager:manager}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConsumerServer) Start(ctx context.Context) error {
|
||||||
|
return c.manager.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ConsumerServer) Stop(ctx context.Context) error {
|
||||||
|
return c.manager.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
2. 添加进provider_set.go
|
||||||
|
3. 打开main.go
|
||||||
|
```go
|
||||||
|
// 在newApp方法中注入ConsumerServer,并添加到
|
||||||
|
serverOption := kratos.Server(
|
||||||
|
//在这里面添加ConsumerServer的实例
|
||||||
|
)
|
||||||
|
```
|
||||||
|
### 二、使用cli模式启动
|
||||||
|
参考上面的启动方式,顺序为:subscribe -> start -> stop
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
import "github.com/apache/rocketmq-client-go/v2"
|
||||||
|
|
||||||
|
// ConsumerConfig 消费者配置
|
||||||
|
type ConsumerConfig struct {
|
||||||
|
TopicName string // 必填,主题名称
|
||||||
|
|
||||||
|
// GroupName 必填,消费者分组名称,填写消费的业务名称,例如:CreatePay-创建支付单
|
||||||
|
// 实际注册时,会自动在前面加上TopicName
|
||||||
|
GroupName string
|
||||||
|
|
||||||
|
// ConsumerCnt,消费者个数,默认为1,不能超过MQ的消费Queue个数
|
||||||
|
// 需要保证顺序消费的场景才推荐使用,否则推荐配置PerCoroutineCnt参数来开启并发消费即可
|
||||||
|
ConsumerCnt int
|
||||||
|
|
||||||
|
// PerCoroutineCnt 每个consumer的协程个数,默认为20,它与ConsumerCnt都能实现并发消费,区别在于此参数是一个消费者,多个协程并发消费,性能更高
|
||||||
|
// 由于是单消费者的并发处理,可能存在后拉取的比先拉取的先处理完,即无法保证严格的顺序消费要求(但大部分场景都没有顺序要求)
|
||||||
|
// SDK在收到消息时会启动一个新协程运行回调函数,最大运行协程数不超过此配置
|
||||||
|
// 示例:2,则实际的并发数是4,正常消费2个,重试2个协程
|
||||||
|
PerCoroutineCnt int
|
||||||
|
|
||||||
|
// RetryCnt 消费最大重试次数,间隔时间查看https://rocketmq.apache.org/zh/docs/featureBehavior/10consumerretrypolicy
|
||||||
|
// 我们默认为38次,目的是给异常业务方保留最多48小时去修复,超过之后进入死信队列,3天后会被自动删除
|
||||||
|
RetryCnt *int
|
||||||
|
|
||||||
|
// 指定消费的tag,为空则消费所有tag
|
||||||
|
Tags []string
|
||||||
|
|
||||||
|
//消费者实例
|
||||||
|
pushConsumers []rocketmq.PushConsumer
|
||||||
|
}
|
|
@ -0,0 +1,9 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
// ConsumerConnConfig 消费者配置
|
||||||
|
type ConsumerConnConfig struct {
|
||||||
|
NameServers string // 必填,多个用,号隔开
|
||||||
|
AccessKey string // 连接rocketMQ的accessKey
|
||||||
|
SecretKey string // 连接rocketMQ的secretKey
|
||||||
|
SecurityToken string // 连接rocketMQ的securityToken
|
||||||
|
}
|
|
@ -0,0 +1,289 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/apache/rocketmq-client-go/v2"
|
||||||
|
"github.com/apache/rocketmq-client-go/v2/consumer"
|
||||||
|
"github.com/apache/rocketmq-client-go/v2/primitive"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/propagation"
|
||||||
|
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ConsumerManager 消费者管理器
|
||||||
|
type ConsumerManager struct {
|
||||||
|
consumerConfigs []*ConsumerConfig
|
||||||
|
activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出
|
||||||
|
shutdownFlag atomic.Bool // 关闭标记
|
||||||
|
|
||||||
|
logger Logger
|
||||||
|
IsOpenTelemetry bool // 默认开启链路追踪,及时未配置上报地址也不影响业务,只是实际不会上报上去而已
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewConsumerManager 创建一个消费者管理器
|
||||||
|
func NewConsumerManager(logger Logger) *ConsumerManager {
|
||||||
|
return &ConsumerManager{logger: logger, IsOpenTelemetry: true}
|
||||||
|
}
|
||||||
|
func (c *ConsumerManager) DisableOpenTelemetry() {
|
||||||
|
c.IsOpenTelemetry = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe 订阅一个主题
|
||||||
|
func (c *ConsumerManager) Subscribe(ctx context.Context, connConf *ConsumerConnConfig, consumerConf *ConsumerConfig, fn func(context.Context, *ConsumerMessage) error, opts ...consumer.Option) error {
|
||||||
|
// 检查参数规范
|
||||||
|
if !c.checkGroupName(consumerConf.TopicName, consumerConf.GroupName) {
|
||||||
|
return fmt.Errorf("groupName不符合规范,前缀必须是\"${topicName}_\"开头,如trade_recharge_dispatcher_pay")
|
||||||
|
}
|
||||||
|
if consumerConf.RetryCnt == nil || *consumerConf.RetryCnt < 0 {
|
||||||
|
retryCnt := 38
|
||||||
|
consumerConf.RetryCnt = &retryCnt
|
||||||
|
}
|
||||||
|
if consumerConf.ConsumerCnt <= 0 {
|
||||||
|
consumerConf.ConsumerCnt = 1
|
||||||
|
}
|
||||||
|
if consumerConf.PerCoroutineCnt <= 0 {
|
||||||
|
consumerConf.PerCoroutineCnt = 20
|
||||||
|
}
|
||||||
|
|
||||||
|
credentials := primitive.Credentials{
|
||||||
|
AccessKey: connConf.AccessKey,
|
||||||
|
SecretKey: connConf.SecretKey,
|
||||||
|
SecurityToken: connConf.SecurityToken,
|
||||||
|
}
|
||||||
|
//限制groupName名称必须与topic捆绑
|
||||||
|
nameServers := strings.Split(connConf.NameServers, ",")
|
||||||
|
opts = append(opts,
|
||||||
|
consumer.WithNameServer(nameServers),
|
||||||
|
consumer.WithCredentials(credentials),
|
||||||
|
consumer.WithConsumerModel(consumer.Clustering),
|
||||||
|
// 不要开启此参数,开启顺序消费,通过返回consumer.ConsumeRetryLater将会失效,需要自己实现重试机制
|
||||||
|
//key.WithConsumerOrder(true),
|
||||||
|
consumer.WithGroupName(consumerConf.GroupName),
|
||||||
|
consumer.WithConsumeGoroutineNums(consumerConf.PerCoroutineCnt),
|
||||||
|
consumer.WithRetry(*consumerConf.RetryCnt),
|
||||||
|
// 不启用批消费,多个一起消费的特点是则其中一个失败则整体都会失败
|
||||||
|
consumer.WithConsumeMessageBatchMaxSize(1),
|
||||||
|
//key.WithPullThresholdForQueue(50),
|
||||||
|
)
|
||||||
|
// 启动指定个数的consumer
|
||||||
|
hostName, _ := os.Hostname()
|
||||||
|
now := time.Now().Unix()
|
||||||
|
for i := 0; i < consumerConf.ConsumerCnt; i++ {
|
||||||
|
currOpts := make([]consumer.Option, len(opts))
|
||||||
|
copy(currOpts, opts)
|
||||||
|
currOpts = append(currOpts, consumer.WithInstance(fmt.Sprintf("%s:%s:%d:%d", hostName, consumerConf.GroupName, now, i+1)))
|
||||||
|
pushConsumer, err := rocketmq.NewPushConsumer(currOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
selector := consumer.MessageSelector{}
|
||||||
|
// 过滤tag
|
||||||
|
if len(consumerConf.Tags) > 0 {
|
||||||
|
selector.Type = consumer.TAG
|
||||||
|
selector.Expression = strings.Join(consumerConf.Tags, " || ")
|
||||||
|
}
|
||||||
|
|
||||||
|
err = pushConsumer.Subscribe(consumerConf.TopicName, selector, func(subCtx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
|
||||||
|
return c.callbackForReceive(subCtx, consumerConf, fn, ext...)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
consumerConf.pushConsumers = append(consumerConf.pushConsumers, pushConsumer)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.consumerConfigs = append(c.consumerConfigs, consumerConf)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var consumerPropagator = propagation.TraceContext{}
|
||||||
|
|
||||||
|
// callbackForReceive 收到消息的回调
|
||||||
|
func (c *ConsumerManager) callbackForReceive(ctx context.Context, consumerConf *ConsumerConfig, fn func(context.Context, *ConsumerMessage) error, ext ...*primitive.MessageExt) (cr consumer.ConsumeResult, fnErr error) {
|
||||||
|
// 收到消息
|
||||||
|
if c.shutdownFlag.Load() {
|
||||||
|
cr = consumer.ConsumeRetryLater
|
||||||
|
fnErr = fmt.Errorf("正在退出中,延期处理:%s,%s", consumerConf.TopicName, consumerConf.GroupName)
|
||||||
|
// 卡住,不再继续消费,等待退出
|
||||||
|
// 测试发现在重试主题消费时,返回retryLater有时会被commit掉,导致消息丢失
|
||||||
|
time.Sleep(24 * time.Hour)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 标记活跃状态
|
||||||
|
c.activeCnt.Add(1)
|
||||||
|
defer func() {
|
||||||
|
c.activeCnt.Add(-1)
|
||||||
|
if v := recover(); v != nil {
|
||||||
|
cr = consumer.ConsumeRetryLater
|
||||||
|
fnErr = errors.Errorf("处理消息panic, groupName=%s,%+v", consumerConf.GroupName, v)
|
||||||
|
c.LogErrorf("%+v", fnErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var tracer trace.Tracer
|
||||||
|
if c.IsOpenTelemetry {
|
||||||
|
tracer = otel.GetTracerProvider().Tracer("MQ")
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithConsumeMessageBatchMaxSize 配置大于1时,ext才会存在多个,它的特点是要么全成功或全失败
|
||||||
|
cr = consumer.ConsumeSuccess
|
||||||
|
spanName := fmt.Sprintf("%s %s %s", consumerConf.TopicName, semconv.MessagingOperationProcess.Value.AsString(), consumerConf.GroupName)
|
||||||
|
for _, v := range ext {
|
||||||
|
func() {
|
||||||
|
message := &ConsumerMessage{
|
||||||
|
MsgId: v.MsgId,
|
||||||
|
Topic: v.Topic,
|
||||||
|
Body: v.Body,
|
||||||
|
ReconsumeTimes: v.ReconsumeTimes,
|
||||||
|
CompressedBody: v.CompressedBody,
|
||||||
|
Flag: v.Flag,
|
||||||
|
TransactionId: v.TransactionId,
|
||||||
|
Batch: v.Batch,
|
||||||
|
Compress: v.Compress,
|
||||||
|
Properties: v.GetProperties(),
|
||||||
|
}
|
||||||
|
// 链路追踪
|
||||||
|
var span trace.Span
|
||||||
|
if tracer != nil {
|
||||||
|
if traceParent, ok := message.Properties[openTelemetryPropertyName]; ok {
|
||||||
|
var mapCarrier propagation.MapCarrier = map[string]string{
|
||||||
|
openTelemetryPropertyName: traceParent,
|
||||||
|
}
|
||||||
|
ctx = consumerPropagator.Extract(ctx, mapCarrier)
|
||||||
|
}
|
||||||
|
ctx, span = tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindConsumer))
|
||||||
|
span.SetAttributes(
|
||||||
|
semconv.MessagingSystem("RocketMQ"),
|
||||||
|
semconv.MessagingSourceName(consumerConf.TopicName),
|
||||||
|
semconv.MessagingRocketmqClientGroup(consumerConf.GroupName),
|
||||||
|
semconv.MessagingRocketmqMessageKeys(message.GetKeys()...),
|
||||||
|
semconv.MessagingRocketmqMessageTag(message.GetTags()),
|
||||||
|
semconv.MessagingRocketmqMessageDelayTimeLevel(message.GetDelayTimeLevel()),
|
||||||
|
semconv.MessageIDKey.String(message.MsgId),
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
// 记录追踪信息
|
||||||
|
spanErr := fnErr
|
||||||
|
var panicVal any
|
||||||
|
if panicVal = recover(); panicVal != nil {
|
||||||
|
spanErr = fmt.Errorf("%s消费者处理方法panic:%s", consumerConf.GroupName, panicVal)
|
||||||
|
}
|
||||||
|
if spanErr != nil {
|
||||||
|
span.RecordError(spanErr)
|
||||||
|
span.SetStatus(codes.Error, spanErr.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
span.End()
|
||||||
|
|
||||||
|
if panicVal != nil {
|
||||||
|
panic(panicVal)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// 回调业务函数
|
||||||
|
currErr := fn(ctx, message)
|
||||||
|
|
||||||
|
if currErr != nil {
|
||||||
|
cr = consumer.ConsumeRetryLater
|
||||||
|
fnErr = currErr
|
||||||
|
c.LogErrorf("%s消费者处理方法返回失败,body=%s:%+v", consumerConf.GroupName, string(message.Body), currErr)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start 启动所有消费者
|
||||||
|
func (c *ConsumerManager) Start(_ context.Context) error {
|
||||||
|
for _, consumerConf := range c.consumerConfigs {
|
||||||
|
for _, pushConsumer := range consumerConf.pushConsumers {
|
||||||
|
err := pushConsumer.Start()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop 停止所有消费者
|
||||||
|
func (c *ConsumerManager) Stop(_ context.Context) error {
|
||||||
|
fmt.Println("开始停止消费者")
|
||||||
|
c.shutdownFlag.Store(true)
|
||||||
|
|
||||||
|
for _, consumerConf := range c.consumerConfigs {
|
||||||
|
for _, pushConsumer := range consumerConf.pushConsumers {
|
||||||
|
pushConsumer.Suspend() // 似乎没起使用
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fmt.Println("已suspend所有消费者")
|
||||||
|
|
||||||
|
//shutdown之间,保证正在处理的消费先提交
|
||||||
|
_ = c.blockWaitFinish()
|
||||||
|
|
||||||
|
var err error = nil
|
||||||
|
for _, consumerConf := range c.consumerConfigs {
|
||||||
|
for _, pushConsumer := range consumerConf.pushConsumers {
|
||||||
|
if closeErr := pushConsumer.Shutdown(); closeErr != nil {
|
||||||
|
err = closeErr
|
||||||
|
fmt.Println("消费者shutdown失败", closeErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Println("已shutdown所有消费者")
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// blockWaitFinish 阻塞等待业务完成
|
||||||
|
func (c *ConsumerManager) blockWaitFinish() error {
|
||||||
|
// 每1s检查下业务是否都处理完成
|
||||||
|
|
||||||
|
for {
|
||||||
|
cnt := c.activeCnt.Load()
|
||||||
|
if cnt == 0 {
|
||||||
|
//无业务处理,正常退
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
fmt.Printf("等待消费者退出,%d 个正在运行\n", cnt)
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
//防止极端情况下commit未完成
|
||||||
|
// nolint
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LogErrorf 记录错误日志
|
||||||
|
func (c *ConsumerManager) LogErrorf(format string, args ...any) {
|
||||||
|
if c.logger != nil {
|
||||||
|
c.logger.Errorf(format, args...)
|
||||||
|
} else {
|
||||||
|
fmt.Printf(format+"\n", args...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkGroupName 检查groupName是否符合规范
|
||||||
|
// 必须是以${topicName}_开头,目的是
|
||||||
|
// 1. 防止相同相同groupName与多个topic的情况,避免出现消费不符异常的情况
|
||||||
|
// 2. 在管理group时,能更清晰地体现出对应的topic
|
||||||
|
func (c *ConsumerManager) checkGroupName(topicName string, groupName string) bool {
|
||||||
|
if groupName == "" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return strings.HasPrefix(groupName, topicName+"_")
|
||||||
|
}
|
|
@ -0,0 +1,64 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/apache/rocketmq-client-go/v2/primitive"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ConsumerMessage 消费者收到的消息
|
||||||
|
type ConsumerMessage struct {
|
||||||
|
MsgId string
|
||||||
|
Topic string
|
||||||
|
Body []byte
|
||||||
|
ReconsumeTimes int32
|
||||||
|
CompressedBody []byte
|
||||||
|
Flag int32
|
||||||
|
TransactionId string
|
||||||
|
Batch bool
|
||||||
|
Compress bool
|
||||||
|
Properties map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetKeys 获取消息的key
|
||||||
|
func (c *ConsumerMessage) GetKeys() []string {
|
||||||
|
if len(c.Properties) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
val, isOk := c.Properties[primitive.PropertyKeys]
|
||||||
|
if !isOk {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return strings.Split(val, primitive.PropertyKeySeparator)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetTags 获取消息的tag
|
||||||
|
func (c *ConsumerMessage) GetTags() string {
|
||||||
|
if len(c.Properties) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
// nolint
|
||||||
|
val, _ := c.Properties[primitive.PropertyTags]
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetShardingKey 获取消息的分区key
|
||||||
|
func (c *ConsumerMessage) GetShardingKey() string {
|
||||||
|
if len(c.Properties) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
// nolint
|
||||||
|
val, _ := c.Properties[primitive.PropertyShardingKey]
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDelayTimeLevel 获取消息的延迟级别
|
||||||
|
func (c *ConsumerMessage) GetDelayTimeLevel() int {
|
||||||
|
if len(c.Properties) == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
// nolint
|
||||||
|
val, _ := c.Properties[primitive.PropertyDelayTimeLevel]
|
||||||
|
level, _ := strconv.Atoi(val)
|
||||||
|
return level
|
||||||
|
}
|
|
@ -0,0 +1,62 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestConsumer_Start(t *testing.T) {
|
||||||
|
initTracer("http://192.168.6.194:14268/api/traces", 1, "go_unit_test_for_util_consumer")
|
||||||
|
|
||||||
|
manager := NewConsumerManager(nil)
|
||||||
|
ctx := context.Background()
|
||||||
|
consumerConf := &ConsumerConfig{
|
||||||
|
TopicName: "test_sdk_sync",
|
||||||
|
GroupName: "test_sdk_sync_myConsumer",
|
||||||
|
PerCoroutineCnt: 2,
|
||||||
|
}
|
||||||
|
connConf := &ConsumerConnConfig{
|
||||||
|
NameServers: "192.168.6.107:9876",
|
||||||
|
}
|
||||||
|
i := atomic.Int32{}
|
||||||
|
|
||||||
|
err := manager.Subscribe(ctx, connConf, consumerConf, func(ctx context.Context, message *ConsumerMessage) error {
|
||||||
|
cnt := i.Add(1)
|
||||||
|
|
||||||
|
fmt.Printf("%d开始消费:%s ,body=%s \n", cnt, message.MsgId, message.Body)
|
||||||
|
fmt.Println(message.Properties)
|
||||||
|
// mock 业务耗时
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
fmt.Printf("%d已完成:%s ,body=%s \n", cnt, message.MsgId, message.Body)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
fmt.Println(err)
|
||||||
|
consumerConf = &ConsumerConfig{
|
||||||
|
TopicName: "test_sdk_async",
|
||||||
|
GroupName: "test_sdk_async_myConsumer",
|
||||||
|
PerCoroutineCnt: 2,
|
||||||
|
}
|
||||||
|
err = manager.Subscribe(ctx, connConf, consumerConf, func(ctx context.Context, message *ConsumerMessage) error {
|
||||||
|
cnt := i.Add(1)
|
||||||
|
|
||||||
|
fmt.Printf("%d开始消费:%s ,body=%s \n", cnt, message.MsgId, message.Body)
|
||||||
|
fmt.Println(message.Properties)
|
||||||
|
// mock 业务耗时
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
fmt.Printf("%d已完成:%s ,body=%s \n", cnt, message.MsgId, message.Body)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println(err)
|
||||||
|
_ = manager.Start(context.Background())
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
// mock 退出
|
||||||
|
_ = manager.Stop(context.Background())
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
module gitea.cdlsxd.cn/self-tools/l_rocketmq
|
||||||
|
|
||||||
|
go 1.23.6
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/apache/rocketmq-client-go/v2 v2.1.2
|
||||||
|
github.com/pkg/errors v0.9.1
|
||||||
|
go.opentelemetry.io/otel v1.34.0
|
||||||
|
go.opentelemetry.io/otel/exporters/jaeger v1.17.0
|
||||||
|
go.opentelemetry.io/otel/sdk v1.34.0
|
||||||
|
go.opentelemetry.io/otel/trace v1.34.0
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/emirpasic/gods v1.12.0 // indirect
|
||||||
|
github.com/go-logr/logr v1.4.2 // indirect
|
||||||
|
github.com/go-logr/stdr v1.2.2 // indirect
|
||||||
|
github.com/golang/mock v1.3.1 // indirect
|
||||||
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
|
github.com/json-iterator/go v1.1.12 // indirect
|
||||||
|
github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
|
||||||
|
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
|
||||||
|
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||||
|
github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
|
||||||
|
github.com/sirupsen/logrus v1.4.0 // indirect
|
||||||
|
github.com/tidwall/gjson v1.13.0 // indirect
|
||||||
|
github.com/tidwall/match v1.1.1 // indirect
|
||||||
|
github.com/tidwall/pretty v1.2.0 // indirect
|
||||||
|
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||||
|
go.opentelemetry.io/otel/metric v1.34.0 // indirect
|
||||||
|
go.uber.org/atomic v1.5.1 // indirect
|
||||||
|
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
|
||||||
|
golang.org/x/lint v0.0.0-20190930215403-16217165b5de // indirect
|
||||||
|
golang.org/x/sys v0.29.0 // indirect
|
||||||
|
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e // indirect
|
||||||
|
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
|
||||||
|
stathat.com/c/consistent v1.0.0 // indirect
|
||||||
|
)
|
|
@ -0,0 +1,266 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"github.com/apache/rocketmq-client-go/v2"
|
||||||
|
"github.com/apache/rocketmq-client-go/v2/primitive"
|
||||||
|
"github.com/apache/rocketmq-client-go/v2/producer"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"go.opentelemetry.io/otel/propagation"
|
||||||
|
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
|
||||||
|
"go.opentelemetry.io/otel/trace"
|
||||||
|
"math/rand"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Producer struct {
|
||||||
|
ProducerClient rocketmq.Producer
|
||||||
|
IsOpenTelemetry bool // 默认开启链路追踪,及时未配置上报地址也不影响业务,只是实际不会上报上去而已
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithProducerCredentials 设置生产者的凭证
|
||||||
|
func WithProducerCredentials(accessKey, secretKey, securityToken string) producer.Option {
|
||||||
|
return producer.WithCredentials(primitive.Credentials{
|
||||||
|
AccessKey: accessKey,
|
||||||
|
SecretKey: secretKey,
|
||||||
|
SecurityToken: securityToken,
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewProducer 创建一个生产者
|
||||||
|
// nameServer: 连接地址,多个中间用,号隔开
|
||||||
|
// opts: 配置项
|
||||||
|
func NewProducer(nameServer string, opts ...producer.Option) (*Producer, error) {
|
||||||
|
//检查参数
|
||||||
|
if nameServer == "" {
|
||||||
|
return nil, fmt.Errorf("rocketMQ NameServer 不能为空")
|
||||||
|
}
|
||||||
|
//创建生产者
|
||||||
|
nameServers := strings.Split(nameServer, ",")
|
||||||
|
group := generateRandomString(10)
|
||||||
|
opts = append(opts, producer.WithNameServer(nameServers), producer.WithGroupName(group))
|
||||||
|
p, err := rocketmq.NewProducer(opts...)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("创建 rocketMQ producer 失败: ", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
//此时并没有发起连接,在使用时才会连接
|
||||||
|
return &Producer{ProducerClient: p, IsOpenTelemetry: true}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateRandomString(length int) string {
|
||||||
|
// 定义字符集
|
||||||
|
const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||||
|
var sb strings.Builder
|
||||||
|
rand.Seed(time.Now().UnixNano()) // 设置种子
|
||||||
|
|
||||||
|
// 生成随机字符串
|
||||||
|
for i := 0; i < length; i++ {
|
||||||
|
sb.WriteByte(charset[rand.Intn(len(charset))]) // 从字符集随机选取
|
||||||
|
}
|
||||||
|
|
||||||
|
return sb.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
// DisableTelemetry 关闭链路追踪
|
||||||
|
func (p *Producer) DisableTelemetry() {
|
||||||
|
p.IsOpenTelemetry = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start 启动生产者
|
||||||
|
func (p *Producer) Start() error {
|
||||||
|
return p.ProducerClient.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Shutdown 关闭生产者
|
||||||
|
func (p *Producer) Shutdown() error {
|
||||||
|
return p.ProducerClient.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendOption 发送消息选项
|
||||||
|
type SendOption func(*primitive.Message)
|
||||||
|
|
||||||
|
// WithSendKeysOption 设置消息的key
|
||||||
|
func WithSendKeysOption(keys []string) SendOption {
|
||||||
|
return func(msg *primitive.Message) {
|
||||||
|
msg.WithKeys(keys)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithSendShardingKeysOption 设置消息的key
|
||||||
|
func WithSendShardingKeysOption(key string) SendOption {
|
||||||
|
return func(msg *primitive.Message) {
|
||||||
|
msg.WithShardingKey(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithSendTagOption 设置消息的Tag
|
||||||
|
func WithSendTagOption(tag string) SendOption {
|
||||||
|
return func(msg *primitive.Message) {
|
||||||
|
msg.WithTag(tag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithSendDelayLevelOption 设置消息的延迟级别
|
||||||
|
// reference delay level definition: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
|
||||||
|
// delay level starts from 1. for example, if we set param level=1, then the delay time is 1s.
|
||||||
|
func WithSendDelayLevelOption(level int) SendOption {
|
||||||
|
return func(msg *primitive.Message) {
|
||||||
|
msg.WithDelayTimeLevel(level)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const openTelemetryPropertyName = "traceparent"
|
||||||
|
|
||||||
|
// WithOpenTelemetryOption 设置消息的链接追踪信息
|
||||||
|
func WithOpenTelemetryOption(value string) SendOption {
|
||||||
|
return func(msg *primitive.Message) {
|
||||||
|
msg.WithProperty(openTelemetryPropertyName, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithSendWithPropertyOption 设置消息的属性
|
||||||
|
func WithSendWithPropertyOption(key, value string) SendOption {
|
||||||
|
return func(msg *primitive.Message) {
|
||||||
|
msg.WithProperty(key, value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendSync 同步发送消息
|
||||||
|
// topic: 主题
|
||||||
|
// sendOptions: 发送选项,如WithSendTagOption
|
||||||
|
// bodyList: 支持发送多个
|
||||||
|
func (p *Producer) SendSync(ctx context.Context, topic string, body []byte, sendOptions ...SendOption) error {
|
||||||
|
return p.BatchSendSync(ctx, topic, sendOptions, body)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendAsync 异步发送消息
|
||||||
|
// topic: 主题
|
||||||
|
// sendOptions: 发送选项,如WithSendTagOption
|
||||||
|
// callbackFn: 回调函数,无论成功与否都会回调,失败时err!=nil
|
||||||
|
// bodyList: 支持发送多个
|
||||||
|
func (p *Producer) SendAsync(ctx context.Context, topic string, body []byte, callbackFn func(error), sendOptions ...SendOption) error {
|
||||||
|
return p.BatchSendAsync(ctx, topic, callbackFn, sendOptions, body)
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchSendSync 同步发送消息
|
||||||
|
// topic: 主题
|
||||||
|
// sendOptions: 发送选项,如WithSendTagOption
|
||||||
|
// bodyList: 支持发送多个
|
||||||
|
func (p *Producer) BatchSendSync(ctx context.Context, topic string, sendOptions []SendOption, bodyList ...[]byte) error {
|
||||||
|
if err := p.checkSend(topic); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
msgList := make([]*primitive.Message, len(bodyList))
|
||||||
|
for i, body := range bodyList {
|
||||||
|
msgList[i] = &primitive.Message{
|
||||||
|
Topic: topic,
|
||||||
|
Body: body,
|
||||||
|
}
|
||||||
|
for _, option := range sendOptions {
|
||||||
|
option(msgList[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 链路追踪
|
||||||
|
var err error
|
||||||
|
if p.IsOpenTelemetry {
|
||||||
|
_, span := p.generateTraceSpan(ctx, topic, msgList)
|
||||||
|
defer func() {
|
||||||
|
// 记录错误
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
}
|
||||||
|
span.End()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = p.ProducerClient.SendSync(context.Background(), msgList...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// generateTraceSpan 生成链路追踪的span
|
||||||
|
var produceTraceContext = propagation.TraceContext{}
|
||||||
|
|
||||||
|
func (p *Producer) generateTraceSpan(ctx context.Context, topic string, msgList []*primitive.Message) (context.Context, trace.Span) {
|
||||||
|
tracer := otel.GetTracerProvider().Tracer("MQ")
|
||||||
|
spanName := fmt.Sprintf("%s %s", topic, semconv.MessagingOperationPublish.Value.AsString())
|
||||||
|
spanCtx, span := tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindProducer))
|
||||||
|
span.SetAttributes(
|
||||||
|
semconv.MessagingSystem("RocketMQ"),
|
||||||
|
semconv.MessagingDestinationName(topic),
|
||||||
|
semconv.MessagingBatchMessageCount(len(msgList)),
|
||||||
|
semconv.MessagingRocketmqMessageKeys(msgList[0].GetKeys()),
|
||||||
|
semconv.MessagingRocketmqMessageTag(msgList[0].GetTags()),
|
||||||
|
)
|
||||||
|
//将span的trace数据写入
|
||||||
|
carrier := propagation.MapCarrier{}
|
||||||
|
produceTraceContext.Inject(spanCtx, carrier)
|
||||||
|
traceParent := carrier[openTelemetryPropertyName]
|
||||||
|
for _, message := range msgList {
|
||||||
|
message.WithProperty(openTelemetryPropertyName, traceParent)
|
||||||
|
}
|
||||||
|
|
||||||
|
return spanCtx, span
|
||||||
|
}
|
||||||
|
|
||||||
|
// BatchSendAsync 异步发送消息
|
||||||
|
// topic: 主题
|
||||||
|
// callbackFn: 回调方法,无论成功与否都会回调,失败时err!=nil
|
||||||
|
// sendOptions: 发送选项,如WithSendTagOption
|
||||||
|
// bodyList: 支持发送多个
|
||||||
|
func (p *Producer) BatchSendAsync(ctx context.Context, topic string, callbackFn func(error), sendOptions []SendOption, bodyList ...[]byte) error {
|
||||||
|
if err := p.checkSend(topic); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
msgList := make([]*primitive.Message, len(bodyList))
|
||||||
|
for i, body := range bodyList {
|
||||||
|
msgList[i] = &primitive.Message{
|
||||||
|
Topic: topic,
|
||||||
|
Body: body,
|
||||||
|
}
|
||||||
|
for _, option := range sendOptions {
|
||||||
|
option(msgList[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var err error
|
||||||
|
var span trace.Span
|
||||||
|
if p.IsOpenTelemetry {
|
||||||
|
_, span = p.generateTraceSpan(ctx, topic, msgList)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = p.ProducerClient.SendAsync(context.Background(), func(ctxErr context.Context, result *primitive.SendResult, err error) {
|
||||||
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
}
|
||||||
|
span.End()
|
||||||
|
|
||||||
|
callbackFn(err)
|
||||||
|
}, msgList...)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkTopicName 检查topicName是否符合规范
|
||||||
|
// 名字分三部分,由字母和.号组成:${模块名}_${业务名}_${事件},如:job-order-created
|
||||||
|
// 权限:
|
||||||
|
func (p *Producer) checkSend(topicName string) error {
|
||||||
|
arr := strings.Split(topicName, "_")
|
||||||
|
isOk := len(arr) == 3 && arr[0] != "" && arr[1] != "" && arr[2] != ""
|
||||||
|
if !isOk {
|
||||||
|
return fmt.Errorf("topic名称不符合规范")
|
||||||
|
}
|
||||||
|
// 检查权限:待完善
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -0,0 +1,87 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
// nolint
|
||||||
|
"go.opentelemetry.io/otel/exporters/jaeger"
|
||||||
|
"go.opentelemetry.io/otel/sdk/resource"
|
||||||
|
"go.opentelemetry.io/otel/sdk/trace"
|
||||||
|
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestProducer_SendSync(t *testing.T) {
|
||||||
|
initTracer("http://192.168.6.194:14268/api/traces", 1, "go_unit_test_for_util_producer")
|
||||||
|
|
||||||
|
//http://192.168.6.107:9876/
|
||||||
|
p, err := NewProducer("192.168.6.107:9876")
|
||||||
|
//p, err := NewProducer("192.168.6.107:9876", WithProducerCredentials("", "", ""))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = p.Start()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
err = p.SendSync(
|
||||||
|
context.Background(),
|
||||||
|
"test_sdk_sync",
|
||||||
|
[]byte("hello world"+time.Now().Format(time.DateTime)+fmt.Sprintf("%d", i)),
|
||||||
|
WithSendKeysOption([]string{"ttt"}),
|
||||||
|
WithSendTagOption("TagA"),
|
||||||
|
WithSendWithPropertyOption("pk", "pv"),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("同步发送失败:", err)
|
||||||
|
}
|
||||||
|
err = p.SendAsync(
|
||||||
|
context.Background(),
|
||||||
|
"test_sdk_async",
|
||||||
|
[]byte("hello world"+time.Now().Format(time.DateTime)+fmt.Sprintf("%d", i)),
|
||||||
|
func(err error) {
|
||||||
|
if err != nil {
|
||||||
|
//出错了
|
||||||
|
fmt.Println("异步回调错误", err)
|
||||||
|
} else {
|
||||||
|
fmt.Println("异步回调成功")
|
||||||
|
}
|
||||||
|
},
|
||||||
|
WithSendKeysOption([]string{"ttt"}),
|
||||||
|
WithSendTagOption("TagA"),
|
||||||
|
WithSendWithPropertyOption("pk", "pv"),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("异步发起失败", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = p.Shutdown()
|
||||||
|
time.Sleep(1 * time.Second) // 防止测试时span未上报完成
|
||||||
|
}
|
||||||
|
|
||||||
|
func initTracer(url string, sampler float64, name string) {
|
||||||
|
exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("failed to initialize jaeger exporter:%s", url))
|
||||||
|
}
|
||||||
|
hostName, _ := os.Hostname()
|
||||||
|
tp := trace.NewTracerProvider(
|
||||||
|
// 将基于父span的采样率设置为100%
|
||||||
|
trace.WithSampler(trace.ParentBased(trace.TraceIDRatioBased(sampler))),
|
||||||
|
// 单元测试使用同步发送
|
||||||
|
//trace.WithBatcher(exporter),
|
||||||
|
trace.WithSyncer(exporter),
|
||||||
|
// 在资源中记录有关此应用程序的信息
|
||||||
|
trace.WithResource(resource.NewSchemaless(
|
||||||
|
semconv.ServiceNameKey.String(name),
|
||||||
|
semconv.ServiceVersionKey.String("v1.0.0"),
|
||||||
|
semconv.ServiceInstanceIDKey.String(hostName),
|
||||||
|
semconv.GCPGceInstanceHostnameKey.String(hostName),
|
||||||
|
)),
|
||||||
|
)
|
||||||
|
otel.SetTracerProvider(tp)
|
||||||
|
}
|
|
@ -0,0 +1,40 @@
|
||||||
|
package mq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/apache/rocketmq-client-go/v2/primitive"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Logger interface {
|
||||||
|
// Debugf logs a formatted debugging message.
|
||||||
|
Debugf(format string, args ...interface{})
|
||||||
|
|
||||||
|
// Infof logs a formatted informational message.
|
||||||
|
Infof(format string, args ...interface{})
|
||||||
|
|
||||||
|
// Warnf logs a formatted warning message.
|
||||||
|
Warnf(format string, args ...interface{})
|
||||||
|
|
||||||
|
// Errorf logs a formatted error message.
|
||||||
|
Errorf(format string, args ...interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
SetRecoverLogger(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRecoverLogger 设置RocketMQ的recover的处理函数日志,不设置会导致协程panic后程序退出
|
||||||
|
func SetRecoverLogger(logger Logger) {
|
||||||
|
SetRecoverHandler(func(err any) {
|
||||||
|
if logger == nil {
|
||||||
|
fmt.Printf("rocketmq 发生 panic:%+v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Errorf("rocketmq 发生 panic:%+v", err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetRecoverHandler 设置RocketMQ的recover的处理函数,不设置会导致协程panic后程序退出
|
||||||
|
func SetRecoverHandler(fn func(any)) {
|
||||||
|
primitive.PanicHandler = fn
|
||||||
|
}
|
Loading…
Reference in New Issue