This commit is contained in:
李子铭 2025-03-19 15:46:27 +08:00
parent d856e002e3
commit 81cc61348a
1 changed files with 52 additions and 35 deletions

View File

@ -3,9 +3,10 @@ package server
import (
"context"
"fmt"
mqhttpsdk "github.com/aliyunmq/mq-http-go-sdk"
mq_http_sdk "github.com/aliyunmq/mq-http-go-sdk"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport"
"github.com/gogap/errors"
"strings"
"time"
"voucher/internal/conf"
@ -54,7 +55,7 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
// 您在控制台创建的Group ID。
groupId := w.conf.WechatNotifyMQ.GroupId
client := mqhttpsdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "")
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, w.conf.WechatNotifyMQ.Tag)
w.consumeMessages(mqConsumer)
@ -62,61 +63,77 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
return nil
}
func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mqhttpsdk.MQConsumer) {
func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer) {
for {
respChan := make(chan mqhttpsdk.ConsumeMessageResponse)
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
var handles []string
{
var handles []string
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
// 模拟业务逻辑处理
if err := w.processMessage(v); err != nil {
log.Errorf("Failed to process message %s: %v", v.MessageId, err)
continue
w.processMessage(v)
}
// 确认消息消费成功
if err := mqConsumer.AckMessage([]string{v.ReceiptHandle}); err != nil {
log.Errorf("Ack message %s failed: %v", v.MessageId, err)
// NextConsumeTime前若不确认消息消费成功则消息会被重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
if err := mqConsumer.AckMessage(handles); err != nil {
// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
log.Errorf("AckMessage Failed, err:%s\n", err)
if errAckItems, ok := err.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
log.Errorf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
}
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case err := <-errChan:
if strings.Contains(err.Error(), "MessageNotExist") {
fmt.Println("No new messages available.")
} else {
log.Errorf("Error occurred: %v", err)
{
// Topic中没有消息可消费。
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
log.Errorf("ConsumeMessage Failed, err:%s\n", err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
log.Errorf("Timeout of consumer message.")
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// 长轮询消费消息,每次最多拉取 3 条消息,超时时间为 30 秒
mqConsumer.ConsumeMessage(respChan, errChan, 3, 30)
// 避免频繁轮询,增加适当的间隔
time.Sleep(2 * time.Second)
// 长轮询消费消息网络超时时间默认为35s。
// 长轮询表示如果Topic没有消息则客户端请求会在服务端挂起3s3s内如果有消息可以消费则立即返回响应。
mqConsumer.ConsumeMessage(respChan, errChan,
5, // 一次最多消费3条最多可设置为16条
15, // 长轮询时间3s最多可设置为30s
)
<-endChan
}
}
// 模拟业务逻辑处理
func (w *WechatNotifyConsumer) processMessage(msg mqhttpsdk.ConsumeMessageEntry) error {
// 业务逻辑处理
func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntry) {
log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody)
//ctx := context.Background()
//if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil {
// log.Errorf("微信回调消费处理失败:%+v", err)
//}
return nil
ctx := context.Background()
if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil {
log.Errorf("微信回调消费处理失败:%+v", err)
}
}
// Stop 停止消息消费