voucher/internal/server/wechat_notify_consumer.go

142 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package server
import (
"context"
"fmt"
mqhttpsdk "github.com/aliyunmq/mq-http-go-sdk"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport"
"github.com/gogap/errors"
"strings"
"time"
"voucher/internal/conf"
"voucher/internal/service"
)
var _ transport.Server = (*WechatNotifyConsumer)(nil)
type WechatNotifyConsumer struct {
conf *conf.Bootstrap
voucherService *service.VoucherService
}
func NewWechatNotifyConsumer(
conf *conf.Bootstrap,
voucherService *service.VoucherService,
) *WechatNotifyConsumer {
return &WechatNotifyConsumer{
conf: conf,
voucherService: voucherService,
}
}
// Start 启动消息消费
// https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-and-subscribe-to-normal-messages-3?spm=a2c4g.11186623.0.0.52c216e8nzMenk
func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
if !w.conf.WechatNotifyMQ.IsOpenConsumer {
return nil
}
// 设置HTTP协议客户端接入点进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
endpoint := w.conf.WechatNotifyMQ.EndPoint
// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
// AccessKey ID阿里云身份验证标识。
accessKeyId := w.conf.WechatNotifyMQ.AccessKeyId
// AccessKey Secret阿里云身份验证密钥。
accessKeySecret := w.conf.WechatNotifyMQ.AccessKeySecret
// 消息所属的Topic在消息队列RocketMQ版控制台创建。
//不同消息类型的Topic不能混用例如普通消息的Topic只能用于收发普通消息不能用于收发其他类型的消息。
topic := w.conf.WechatNotifyMQ.Topic
// Topic所属的实例ID在消息队列RocketMQ版控制台创建。
// 若实例有命名空间则实例ID必须传入若实例无命名空间则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
instanceId := w.conf.WechatNotifyMQ.InstanceId
// 您在控制台创建的Group ID。
groupId := w.conf.WechatNotifyMQ.GroupId
client := mqhttpsdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "")
// 为每个 tag 启动一个消费协程
for _, tag := range w.conf.WechatNotifyMQ.Tags {
mqConsumer := client.GetConsumer(instanceId, topic, groupId, tag)
go w.consumeMessages(ctx, mqConsumer, tag)
}
return nil
}
// consumeMessages 消费消息的具体逻辑
func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer mqhttpsdk.MQConsumer, tag string) {
for {
endChan := make(chan int)
respChan := make(chan mqhttpsdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// 处理业务逻辑。
var handles []string
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
log.Warnf("微信回调消费接收消息成功 messageTag:%s, message: %s", v.MessageTag, v.MessageBody)
if err := w.voucherService.WechatNotifyConsumer(ctx, v.MessageTag, v.MessageBody); err != nil {
log.Errorf("微信回调消费处理失败err:%+v", err)
}
}
// NextConsumeTime前若不确认消息消费成功则消息会被重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mqhttpsdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
log.Errorf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
} else {
log.Errorf("ack err:%+v\n", ackerr)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
// Topic中没有消息可消费。
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
log.Errorf("\tTopic中没有消息可消费判定报错:%v\n", err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// 长轮询消费消息网络超时时间默认为35s。
// 长轮询表示如果Topic没有消息则客户端请求会在服务端挂起3s3s内如果有消息可以消费则立即返回响应。
mqConsumer.ConsumeMessage(respChan, errChan,
3, // 一次最多消费3条最多可设置为16条
5, // 长轮询时间3s最多可设置为30s
)
<-endChan
}
}
// Stop 停止消息消费
func (w *WechatNotifyConsumer) Stop(ctx context.Context) error {
fmt.Println("关闭 wechat consumer 中...")
return nil
}