voucher/internal/server/wechat_notify_consumer.go

127 lines
4.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package server
import (
"context"
"fmt"
mqhttpsdk "github.com/aliyunmq/mq-http-go-sdk"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport"
"strings"
"time"
"voucher/internal/conf"
"voucher/internal/service"
)
var _ transport.Server = (*WechatNotifyConsumer)(nil)
type WechatNotifyConsumer struct {
conf *conf.Bootstrap
voucherService *service.VoucherService
}
func NewWechatNotifyConsumer(
conf *conf.Bootstrap,
voucherService *service.VoucherService,
) *WechatNotifyConsumer {
return &WechatNotifyConsumer{
conf: conf,
voucherService: voucherService,
}
}
// Start 启动消息消费
// https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-and-subscribe-to-normal-messages-3?spm=a2c4g.11186623.0.0.52c216e8nzMenk
func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
if !w.conf.WechatNotifyMQ.IsOpenConsumer {
log.Warnf("wechat notify consumer is not open")
return nil
}
// 设置HTTP协议客户端接入点进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
endpoint := w.conf.WechatNotifyMQ.EndPoint
// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
// AccessKey ID阿里云身份验证标识。
accessKeyId := w.conf.WechatNotifyMQ.AccessKeyId
// AccessKey Secret阿里云身份验证密钥。
accessKeySecret := w.conf.WechatNotifyMQ.AccessKeySecret
// 消息所属的Topic在消息队列RocketMQ版控制台创建。
//不同消息类型的Topic不能混用例如普通消息的Topic只能用于收发普通消息不能用于收发其他类型的消息。
topic := w.conf.WechatNotifyMQ.Topic
// Topic所属的实例ID在消息队列RocketMQ版控制台创建。
// 若实例有命名空间则实例ID必须传入若实例无命名空间则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
instanceId := w.conf.WechatNotifyMQ.InstanceId
// 您在控制台创建的Group ID。
groupId := w.conf.WechatNotifyMQ.GroupId
client := mqhttpsdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, w.conf.WechatNotifyMQ.Tag)
w.consumeMessages(mqConsumer)
return nil
}
func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mqhttpsdk.MQConsumer) {
for {
respChan := make(chan mqhttpsdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
var handles []string
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
// 模拟业务逻辑处理
if err := w.processMessage(v); err != nil {
log.Errorf("Failed to process message %s: %v", v.MessageId, err)
continue
}
// 确认消息消费成功
if err := mqConsumer.AckMessage([]string{v.ReceiptHandle}); err != nil {
log.Errorf("Ack message %s failed: %v", v.MessageId, err)
}
}
case err := <-errChan:
if strings.Contains(err.Error(), "MessageNotExist") {
fmt.Println("No new messages available.")
} else {
log.Errorf("Error occurred: %v", err)
}
case <-time.After(35 * time.Second):
log.Errorf("Timeout of consumer message.")
}
}()
// 长轮询消费消息,每次最多拉取 3 条消息,超时时间为 30 秒
mqConsumer.ConsumeMessage(respChan, errChan, 3, 30)
// 避免频繁轮询,增加适当的间隔
time.Sleep(2 * time.Second)
}
}
// 模拟业务逻辑处理
func (w *WechatNotifyConsumer) processMessage(msg mqhttpsdk.ConsumeMessageEntry) error {
log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody)
//ctx := context.Background()
//if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil {
// log.Errorf("微信回调消费处理失败:%+v", err)
//}
return nil
}
// Stop 停止消息消费
func (w *WechatNotifyConsumer) Stop(_ context.Context) error {
fmt.Println("关闭 wechat consumer 中...")
return nil
}