diff --git a/internal/server/wechat_notify_consumer.go b/internal/server/wechat_notify_consumer.go index 0f1669a..fc8ce97 100644 --- a/internal/server/wechat_notify_consumer.go +++ b/internal/server/wechat_notify_consumer.go @@ -3,9 +3,10 @@ package server import ( "context" "fmt" - mqhttpsdk "github.com/aliyunmq/mq-http-go-sdk" + mq_http_sdk "github.com/aliyunmq/mq-http-go-sdk" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/transport" + "github.com/gogap/errors" "strings" "time" "voucher/internal/conf" @@ -54,7 +55,7 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error { // 您在控制台创建的Group ID。 groupId := w.conf.WechatNotifyMQ.GroupId - client := mqhttpsdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "") + client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "") mqConsumer := client.GetConsumer(instanceId, topic, groupId, w.conf.WechatNotifyMQ.Tag) w.consumeMessages(mqConsumer) @@ -62,61 +63,77 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error { return nil } -func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mqhttpsdk.MQConsumer) { +func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer) { for { - respChan := make(chan mqhttpsdk.ConsumeMessageResponse) + endChan := make(chan int) + respChan := make(chan mq_http_sdk.ConsumeMessageResponse) errChan := make(chan error) - go func() { select { case resp := <-respChan: - var handles []string + { + var handles []string - for _, v := range resp.Messages { - handles = append(handles, v.ReceiptHandle) + for _, v := range resp.Messages { + handles = append(handles, v.ReceiptHandle) - // 模拟业务逻辑处理 - if err := w.processMessage(v); err != nil { - log.Errorf("Failed to process message %s: %v", v.MessageId, err) - continue + w.processMessage(v) } - // 确认消息消费成功 - if err := mqConsumer.AckMessage([]string{v.ReceiptHandle}); err != nil { - log.Errorf("Ack message %s failed: %v", v.MessageId, err) + // NextConsumeTime前若不确认消息消费成功,则消息会被重复消费。 + // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。 + if err := mqConsumer.AckMessage(handles); err != nil { + // 某些消息的句柄可能超时,会导致消息消费状态确认不成功。 + log.Errorf("AckMessage Failed, err:%s\n", err) + + if errAckItems, ok := err.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok { + for _, errAckItem := range errAckItems { + log.Errorf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg) + } + } + + time.Sleep(time.Duration(3) * time.Second) } + + endChan <- 1 } - case err := <-errChan: - if strings.Contains(err.Error(), "MessageNotExist") { - fmt.Println("No new messages available.") - } else { - log.Errorf("Error occurred: %v", err) + { + // Topic中没有消息可消费。 + if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") { + fmt.Println("\nNo new message, continue!") + } else { + log.Errorf("ConsumeMessage Failed, err:%s\n", err) + time.Sleep(time.Duration(3) * time.Second) + } + endChan <- 1 } - case <-time.After(35 * time.Second): - log.Errorf("Timeout of consumer message.") + { + fmt.Println("Timeout of consumer message ??") + endChan <- 1 + } } }() - // 长轮询消费消息,每次最多拉取 3 条消息,超时时间为 30 秒 - mqConsumer.ConsumeMessage(respChan, errChan, 3, 30) - - // 避免频繁轮询,增加适当的间隔 - time.Sleep(2 * time.Second) + // 长轮询消费消息,网络超时时间默认为35s。 + // 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回响应。 + mqConsumer.ConsumeMessage(respChan, errChan, + 5, // 一次最多消费3条(最多可设置为16条)。 + 15, // 长轮询时间3s(最多可设置为30s)。 + ) + <-endChan } } -// 模拟业务逻辑处理 -func (w *WechatNotifyConsumer) processMessage(msg mqhttpsdk.ConsumeMessageEntry) error { +// 业务逻辑处理 +func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntry) { log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody) - //ctx := context.Background() - //if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil { - // log.Errorf("微信回调消费处理失败:%+v", err) - //} - - return nil + ctx := context.Background() + if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil { + log.Errorf("微信回调消费处理失败:%+v", err) + } } // Stop 停止消息消费