This commit is contained in:
李子铭 2025-03-10 13:43:11 +08:00
parent f6e758777f
commit 3e876ed49a
1 changed files with 7 additions and 5 deletions

View File

@ -3,7 +3,7 @@ package server
import (
"context"
"fmt"
mq_http_sdk "github.com/aliyunmq/mq-http-go-sdk"
mqhttpsdk "github.com/aliyunmq/mq-http-go-sdk"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport"
"github.com/gogap/errors"
@ -31,7 +31,9 @@ func NewWechatNotifyConsumer(
}
// Start 启动消息消费
// https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-and-subscribe-to-normal-messages-3?spm=a2c4g.11186623.0.0.52c216e8nzMenk
func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
if !w.conf.WechatNotifyMQ.IsOpenConsumer {
return nil
}
@ -52,7 +54,7 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
// 您在控制台创建的Group ID。
groupId := w.conf.WechatNotifyMQ.GroupId
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "")
client := mqhttpsdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "")
// 为每个 tag 启动一个消费协程
for _, tag := range w.conf.WechatNotifyMQ.Tags {
@ -64,10 +66,10 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
}
// consumeMessages 消费消息的具体逻辑
func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer mq_http_sdk.MQConsumer, tag string) {
func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer mqhttpsdk.MQConsumer, tag string) {
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
respChan := make(chan mqhttpsdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
@ -89,7 +91,7 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mqhttpsdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
log.Errorf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)