transfer_middleware/until/mq/consumer_manager.go

290 lines
9.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package mq
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"
"os"
"strings"
"sync/atomic"
"time"
)
// ConsumerManager 消费者管理器
type ConsumerManager struct {
consumerConfigs []*ConsumerConfig
activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出
shutdownFlag atomic.Bool // 关闭标记
logger Logger
IsOpenTelemetry bool // 默认开启链路追踪,及时未配置上报地址也不影响业务,只是实际不会上报上去而已
}
// NewConsumerManager 创建一个消费者管理器
func NewConsumerManager(logger Logger) *ConsumerManager {
return &ConsumerManager{logger: logger, IsOpenTelemetry: true}
}
func (c *ConsumerManager) DisableOpenTelemetry() {
c.IsOpenTelemetry = false
}
// Subscribe 订阅一个主题
func (c *ConsumerManager) Subscribe(ctx context.Context, connConf *ConsumerConnConfig, consumerConf *ConsumerConfig, fn func(context.Context, *ConsumerMessage) error, opts ...consumer.Option) error {
// 检查参数规范
if !c.checkGroupName(consumerConf.TopicName, consumerConf.GroupName) {
return fmt.Errorf("groupName不符合规范前缀必须是\"${topicName}_\"开头如trade_recharge_dispatcher_pay")
}
if consumerConf.RetryCnt == nil || *consumerConf.RetryCnt < 0 {
retryCnt := 38
consumerConf.RetryCnt = &retryCnt
}
if consumerConf.ConsumerCnt <= 0 {
consumerConf.ConsumerCnt = 1
}
if consumerConf.PerCoroutineCnt <= 0 {
consumerConf.PerCoroutineCnt = 20
}
credentials := primitive.Credentials{
AccessKey: connConf.AccessKey,
SecretKey: connConf.SecretKey,
SecurityToken: connConf.SecurityToken,
}
//限制groupName名称必须与topic捆绑
nameServers := strings.Split(connConf.NameServers, ",")
opts = append(opts,
consumer.WithNameServer(nameServers),
consumer.WithCredentials(credentials),
consumer.WithConsumerModel(consumer.Clustering),
// 不要开启此参数开启顺序消费通过返回consumer.ConsumeRetryLater将会失效需要自己实现重试机制
//consumer.WithConsumerOrder(true),
consumer.WithGroupName(consumerConf.GroupName),
consumer.WithConsumeGoroutineNums(consumerConf.PerCoroutineCnt),
consumer.WithRetry(*consumerConf.RetryCnt),
// 不启用批消费,多个一起消费的特点是则其中一个失败则整体都会失败
consumer.WithConsumeMessageBatchMaxSize(1),
//consumer.WithPullThresholdForQueue(50),
)
// 启动指定个数的consumer
hostName, _ := os.Hostname()
now := time.Now().Unix()
for i := 0; i < consumerConf.ConsumerCnt; i++ {
currOpts := make([]consumer.Option, len(opts))
copy(currOpts, opts)
currOpts = append(currOpts, consumer.WithInstance(fmt.Sprintf("%s:%s:%d:%d", hostName, consumerConf.GroupName, now, i+1)))
pushConsumer, err := rocketmq.NewPushConsumer(currOpts...)
if err != nil {
return err
}
selector := consumer.MessageSelector{}
// 过滤tag
if len(consumerConf.Tags) > 0 {
selector.Type = consumer.TAG
selector.Expression = strings.Join(consumerConf.Tags, " || ")
}
err = pushConsumer.Subscribe(consumerConf.TopicName, selector, func(subCtx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
return c.callbackForReceive(subCtx, consumerConf, fn, ext...)
})
if err != nil {
return err
}
consumerConf.pushConsumers = append(consumerConf.pushConsumers, pushConsumer)
}
c.consumerConfigs = append(c.consumerConfigs, consumerConf)
return nil
}
var consumerPropagator = propagation.TraceContext{}
// callbackForReceive 收到消息的回调
func (c *ConsumerManager) callbackForReceive(ctx context.Context, consumerConf *ConsumerConfig, fn func(context.Context, *ConsumerMessage) error, ext ...*primitive.MessageExt) (cr consumer.ConsumeResult, fnErr error) {
// 收到消息
if c.shutdownFlag.Load() {
cr = consumer.ConsumeRetryLater
fnErr = fmt.Errorf("正在退出中,延期处理:%s,%s", consumerConf.TopicName, consumerConf.GroupName)
// 卡住,不再继续消费,等待退出
// 测试发现在重试主题消费时返回retryLater有时会被commit掉导致消息丢失
time.Sleep(24 * time.Hour)
return
}
// 标记活跃状态
c.activeCnt.Add(1)
defer func() {
c.activeCnt.Add(-1)
if v := recover(); v != nil {
cr = consumer.ConsumeRetryLater
fnErr = errors.Errorf("处理消息panic, groupName=%s%+v", consumerConf.GroupName, v)
c.LogErrorf("%+v", fnErr)
return
}
}()
var tracer trace.Tracer
if c.IsOpenTelemetry {
tracer = otel.GetTracerProvider().Tracer("LSXD_Util")
}
// WithConsumeMessageBatchMaxSize 配置大于1时ext才会存在多个它的特点是要么全成功或全失败
cr = consumer.ConsumeSuccess
spanName := fmt.Sprintf("%s %s %s", consumerConf.TopicName, semconv.MessagingOperationProcess.Value.AsString(), consumerConf.GroupName)
for _, v := range ext {
func() {
message := &ConsumerMessage{
MsgId: v.MsgId,
Topic: v.Topic,
Body: v.Body,
ReconsumeTimes: v.ReconsumeTimes,
CompressedBody: v.CompressedBody,
Flag: v.Flag,
TransactionId: v.TransactionId,
Batch: v.Batch,
Compress: v.Compress,
Properties: v.GetProperties(),
}
// 链路追踪
var span trace.Span
if tracer != nil {
if traceParent, ok := message.Properties[openTelemetryPropertyName]; ok {
var mapCarrier propagation.MapCarrier = map[string]string{
openTelemetryPropertyName: traceParent,
}
ctx = consumerPropagator.Extract(ctx, mapCarrier)
}
ctx, span = tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindConsumer))
span.SetAttributes(
semconv.MessagingSystem("RocketMQ"),
semconv.MessagingSourceName(consumerConf.TopicName),
semconv.MessagingRocketmqClientGroup(consumerConf.GroupName),
semconv.MessagingRocketmqMessageKeys(message.GetKeys()...),
semconv.MessagingRocketmqMessageTag(message.GetTags()),
semconv.MessagingRocketmqMessageDelayTimeLevel(message.GetDelayTimeLevel()),
semconv.MessageIDKey.String(message.MsgId),
)
defer func() {
// 记录追踪信息
spanErr := fnErr
var panicVal any
if panicVal = recover(); panicVal != nil {
spanErr = fmt.Errorf("%s消费者处理方法panic:%s", consumerConf.GroupName, panicVal)
}
if spanErr != nil {
span.RecordError(spanErr)
span.SetStatus(codes.Error, spanErr.Error())
}
span.End()
if panicVal != nil {
panic(panicVal)
}
}()
}
// 回调业务函数
currErr := fn(ctx, message)
if currErr != nil {
cr = consumer.ConsumeRetryLater
fnErr = currErr
c.LogErrorf("%s消费者处理方法返回失败,body=%s%+v", consumerConf.GroupName, string(message.Body), currErr)
}
}()
}
return
}
// Start 启动所有消费者
func (c *ConsumerManager) Start(_ context.Context) error {
for _, consumerConf := range c.consumerConfigs {
for _, pushConsumer := range consumerConf.pushConsumers {
err := pushConsumer.Start()
if err != nil {
return err
}
}
}
return nil
}
// Stop 停止所有消费者
func (c *ConsumerManager) Stop(_ context.Context) error {
fmt.Println("开始停止消费者")
c.shutdownFlag.Store(true)
for _, consumerConf := range c.consumerConfigs {
for _, pushConsumer := range consumerConf.pushConsumers {
pushConsumer.Suspend() // 似乎没起使用
}
}
fmt.Println("已suspend所有消费者")
//shutdown之间保证正在处理的消费先提交
_ = c.blockWaitFinish()
var err error = nil
for _, consumerConf := range c.consumerConfigs {
for _, pushConsumer := range consumerConf.pushConsumers {
if closeErr := pushConsumer.Shutdown(); closeErr != nil {
err = closeErr
fmt.Println("消费者shutdown失败", closeErr)
}
}
}
fmt.Println("已shutdown所有消费者")
return err
}
// blockWaitFinish 阻塞等待业务完成
func (c *ConsumerManager) blockWaitFinish() error {
// 每1s检查下业务是否都处理完成
for {
cnt := c.activeCnt.Load()
if cnt == 0 {
//无业务处理,正常退
break
} else {
fmt.Printf("等待消费者退出,%d 个正在运行\n", cnt)
}
time.Sleep(1 * time.Second)
}
//防止极端情况下commit未完成
// nolint
time.Sleep(1 * time.Second)
return nil
}
// LogErrorf 记录错误日志
func (c *ConsumerManager) LogErrorf(format string, args ...any) {
if c.logger != nil {
c.logger.Errorf(format, args...)
} else {
fmt.Printf(format+"\n", args...)
}
}
// checkGroupName 检查groupName是否符合规范
// 必须是以${topicName}_开头目的是
// 1. 防止相同相同groupName与多个topic的情况避免出现消费不符异常的情况
// 2. 在管理group时能更清晰地体现出对应的topic
func (c *ConsumerManager) checkGroupName(topicName string, groupName string) bool {
if groupName == "" {
return false
}
return strings.HasPrefix(groupName, topicName+"_")
}