From 3e876ed49af752918fd805ed6712f2fbfeb74f72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AD=90=E9=93=AD?= Date: Mon, 10 Mar 2025 13:43:11 +0800 Subject: [PATCH] cmb --- internal/server/wechat_notify_consumer.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/server/wechat_notify_consumer.go b/internal/server/wechat_notify_consumer.go index 7337b34..b261b18 100644 --- a/internal/server/wechat_notify_consumer.go +++ b/internal/server/wechat_notify_consumer.go @@ -3,7 +3,7 @@ package server import ( "context" "fmt" - mq_http_sdk "github.com/aliyunmq/mq-http-go-sdk" + mqhttpsdk "github.com/aliyunmq/mq-http-go-sdk" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport" "github.com/gogap/errors" @@ -31,7 +31,9 @@ func NewWechatNotifyConsumer( } // Start 启动消息消费 +// https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-and-subscribe-to-normal-messages-3?spm=a2c4g.11186623.0.0.52c216e8nzMenk func (w *WechatNotifyConsumer) Start(ctx context.Context) error { + if !w.conf.WechatNotifyMQ.IsOpenConsumer { return nil } @@ -52,7 +54,7 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error { // 您在控制台创建的Group ID。 groupId := w.conf.WechatNotifyMQ.GroupId - client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "") + client := mqhttpsdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "") // 为每个 tag 启动一个消费协程 for _, tag := range w.conf.WechatNotifyMQ.Tags { @@ -64,10 +66,10 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error { } // consumeMessages 消费消息的具体逻辑 -func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer mq_http_sdk.MQConsumer, tag string) { +func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer mqhttpsdk.MQConsumer, tag string) { for { endChan := make(chan int) - respChan := make(chan mq_http_sdk.ConsumeMessageResponse) + respChan := make(chan mqhttpsdk.ConsumeMessageResponse) errChan := make(chan error) go func() { select { @@ -89,7 +91,7 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m ackerr := mqConsumer.AckMessage(handles) if ackerr != nil { // 某些消息的句柄可能超时,会导致消息消费状态确认不成功。 - if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok { + if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mqhttpsdk.ErrAckItem); ok { for _, errAckItem := range errAckItems { log.Errorf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg) fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)