transfer_middleware/until/mq/consumer_manager.go

290 lines
9.2 KiB
Go
Raw Normal View History

2024-06-18 16:34:14 +08:00
package mq
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"go.opentelemetry.io/otel/trace"
"os"
"strings"
"sync/atomic"
"time"
)
// ConsumerManager 消费者管理器
type ConsumerManager struct {
consumerConfigs []*ConsumerConfig
activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出
shutdownFlag atomic.Bool // 关闭标记
logger Logger
IsOpenTelemetry bool // 默认开启链路追踪,及时未配置上报地址也不影响业务,只是实际不会上报上去而已
}
// NewConsumerManager 创建一个消费者管理器
func NewConsumerManager(logger Logger) *ConsumerManager {
return &ConsumerManager{logger: logger, IsOpenTelemetry: true}
}
func (c *ConsumerManager) DisableOpenTelemetry() {
c.IsOpenTelemetry = false
}
// Subscribe 订阅一个主题
func (c *ConsumerManager) Subscribe(ctx context.Context, connConf *ConsumerConnConfig, consumerConf *ConsumerConfig, fn func(context.Context, *ConsumerMessage) error, opts ...consumer.Option) error {
// 检查参数规范
if !c.checkGroupName(consumerConf.TopicName, consumerConf.GroupName) {
return fmt.Errorf("groupName不符合规范前缀必须是\"${topicName}_\"开头如trade_recharge_dispatcher_pay")
}
if consumerConf.RetryCnt == nil || *consumerConf.RetryCnt < 0 {
retryCnt := 38
consumerConf.RetryCnt = &retryCnt
}
if consumerConf.ConsumerCnt <= 0 {
consumerConf.ConsumerCnt = 1
}
if consumerConf.PerCoroutineCnt <= 0 {
consumerConf.PerCoroutineCnt = 20
}
credentials := primitive.Credentials{
AccessKey: connConf.AccessKey,
SecretKey: connConf.SecretKey,
SecurityToken: connConf.SecurityToken,
}
//限制groupName名称必须与topic捆绑
nameServers := strings.Split(connConf.NameServers, ",")
opts = append(opts,
consumer.WithNameServer(nameServers),
consumer.WithCredentials(credentials),
consumer.WithConsumerModel(consumer.Clustering),
// 不要开启此参数开启顺序消费通过返回consumer.ConsumeRetryLater将会失效需要自己实现重试机制
//consumer.WithConsumerOrder(true),
consumer.WithGroupName(consumerConf.GroupName),
consumer.WithConsumeGoroutineNums(consumerConf.PerCoroutineCnt),
consumer.WithRetry(*consumerConf.RetryCnt),
// 不启用批消费,多个一起消费的特点是则其中一个失败则整体都会失败
consumer.WithConsumeMessageBatchMaxSize(1),
//consumer.WithPullThresholdForQueue(50),
)
// 启动指定个数的consumer
hostName, _ := os.Hostname()
now := time.Now().Unix()
for i := 0; i < consumerConf.ConsumerCnt; i++ {
currOpts := make([]consumer.Option, len(opts))
copy(currOpts, opts)
currOpts = append(currOpts, consumer.WithInstance(fmt.Sprintf("%s:%s:%d:%d", hostName, consumerConf.GroupName, now, i+1)))
pushConsumer, err := rocketmq.NewPushConsumer(currOpts...)
if err != nil {
return err
}
selector := consumer.MessageSelector{}
// 过滤tag
if len(consumerConf.Tags) > 0 {
selector.Type = consumer.TAG
selector.Expression = strings.Join(consumerConf.Tags, " || ")
}
err = pushConsumer.Subscribe(consumerConf.TopicName, selector, func(subCtx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
return c.callbackForReceive(subCtx, consumerConf, fn, ext...)
})
if err != nil {
return err
}
consumerConf.pushConsumers = append(consumerConf.pushConsumers, pushConsumer)
}
c.consumerConfigs = append(c.consumerConfigs, consumerConf)
return nil
}
var consumerPropagator = propagation.TraceContext{}
// callbackForReceive 收到消息的回调
func (c *ConsumerManager) callbackForReceive(ctx context.Context, consumerConf *ConsumerConfig, fn func(context.Context, *ConsumerMessage) error, ext ...*primitive.MessageExt) (cr consumer.ConsumeResult, fnErr error) {
// 收到消息
if c.shutdownFlag.Load() {
cr = consumer.ConsumeRetryLater
fnErr = fmt.Errorf("正在退出中,延期处理:%s,%s", consumerConf.TopicName, consumerConf.GroupName)
// 卡住,不再继续消费,等待退出
// 测试发现在重试主题消费时返回retryLater有时会被commit掉导致消息丢失
time.Sleep(24 * time.Hour)
return
}
// 标记活跃状态
c.activeCnt.Add(1)
defer func() {
c.activeCnt.Add(-1)
if v := recover(); v != nil {
cr = consumer.ConsumeRetryLater
fnErr = errors.Errorf("处理消息panic, groupName=%s%+v", consumerConf.GroupName, v)
c.LogErrorf("%+v", fnErr)
return
}
}()
var tracer trace.Tracer
if c.IsOpenTelemetry {
tracer = otel.GetTracerProvider().Tracer("LSXD_Util")
}
// WithConsumeMessageBatchMaxSize 配置大于1时ext才会存在多个它的特点是要么全成功或全失败
cr = consumer.ConsumeSuccess
spanName := fmt.Sprintf("%s %s %s", consumerConf.TopicName, semconv.MessagingOperationProcess.Value.AsString(), consumerConf.GroupName)
for _, v := range ext {
func() {
message := &ConsumerMessage{
MsgId: v.MsgId,
Topic: v.Topic,
Body: v.Body,
ReconsumeTimes: v.ReconsumeTimes,
CompressedBody: v.CompressedBody,
Flag: v.Flag,
TransactionId: v.TransactionId,
Batch: v.Batch,
Compress: v.Compress,
Properties: v.GetProperties(),
}
// 链路追踪
var span trace.Span
if tracer != nil {
if traceParent, ok := message.Properties[openTelemetryPropertyName]; ok {
var mapCarrier propagation.MapCarrier = map[string]string{
openTelemetryPropertyName: traceParent,
}
ctx = consumerPropagator.Extract(ctx, mapCarrier)
}
ctx, span = tracer.Start(ctx, spanName, trace.WithSpanKind(trace.SpanKindConsumer))
span.SetAttributes(
semconv.MessagingSystem("RocketMQ"),
semconv.MessagingSourceName(consumerConf.TopicName),
semconv.MessagingRocketmqClientGroup(consumerConf.GroupName),
semconv.MessagingRocketmqMessageKeys(message.GetKeys()...),
semconv.MessagingRocketmqMessageTag(message.GetTags()),
semconv.MessagingRocketmqMessageDelayTimeLevel(message.GetDelayTimeLevel()),
semconv.MessageIDKey.String(message.MsgId),
)
defer func() {
// 记录追踪信息
spanErr := fnErr
var panicVal any
if panicVal = recover(); panicVal != nil {
spanErr = fmt.Errorf("%s消费者处理方法panic:%s", consumerConf.GroupName, panicVal)
}
if spanErr != nil {
span.RecordError(spanErr)
span.SetStatus(codes.Error, spanErr.Error())
}
span.End()
if panicVal != nil {
panic(panicVal)
}
}()
}
// 回调业务函数
currErr := fn(ctx, message)
if currErr != nil {
cr = consumer.ConsumeRetryLater
fnErr = currErr
c.LogErrorf("%s消费者处理方法返回失败,body=%s%+v", consumerConf.GroupName, string(message.Body), currErr)
}
}()
}
return
}
// Start 启动所有消费者
func (c *ConsumerManager) Start(_ context.Context) error {
for _, consumerConf := range c.consumerConfigs {
for _, pushConsumer := range consumerConf.pushConsumers {
err := pushConsumer.Start()
if err != nil {
return err
}
}
}
return nil
}
// Stop 停止所有消费者
func (c *ConsumerManager) Stop(_ context.Context) error {
fmt.Println("开始停止消费者")
c.shutdownFlag.Store(true)
for _, consumerConf := range c.consumerConfigs {
for _, pushConsumer := range consumerConf.pushConsumers {
pushConsumer.Suspend() // 似乎没起使用
}
}
fmt.Println("已suspend所有消费者")
//shutdown之间保证正在处理的消费先提交
_ = c.blockWaitFinish()
var err error = nil
for _, consumerConf := range c.consumerConfigs {
for _, pushConsumer := range consumerConf.pushConsumers {
if closeErr := pushConsumer.Shutdown(); closeErr != nil {
err = closeErr
fmt.Println("消费者shutdown失败", closeErr)
}
}
}
fmt.Println("已shutdown所有消费者")
return err
}
// blockWaitFinish 阻塞等待业务完成
func (c *ConsumerManager) blockWaitFinish() error {
// 每1s检查下业务是否都处理完成
for {
cnt := c.activeCnt.Load()
if cnt == 0 {
//无业务处理,正常退
break
} else {
fmt.Printf("等待消费者退出,%d 个正在运行\n", cnt)
}
time.Sleep(1 * time.Second)
}
//防止极端情况下commit未完成
// nolint
time.Sleep(1 * time.Second)
return nil
}
// LogErrorf 记录错误日志
func (c *ConsumerManager) LogErrorf(format string, args ...any) {
if c.logger != nil {
c.logger.Errorf(format, args...)
} else {
fmt.Printf(format+"\n", args...)
}
}
// checkGroupName 检查groupName是否符合规范
// 必须是以${topicName}_开头目的是
// 1. 防止相同相同groupName与多个topic的情况避免出现消费不符异常的情况
// 2. 在管理group时能更清晰地体现出对应的topic
func (c *ConsumerManager) checkGroupName(topicName string, groupName string) bool {
if groupName == "" {
return false
}
return strings.HasPrefix(groupName, topicName+"_")
}