voucher/internal/server/wechat_notify_consumer.go

195 lines
6.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package server
import (
"context"
"fmt"
mq_http_sdk "github.com/aliyunmq/mq-http-go-sdk"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport"
"github.com/gogap/errors"
"strings"
"sync/atomic"
"time"
"voucher/internal/conf"
"voucher/internal/service"
)
var _ transport.Server = (*WechatNotifyConsumer)(nil)
type WechatNotifyConsumer struct {
conf *conf.Bootstrap
voucherService *service.VoucherService
activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出
shutdownFlag atomic.Bool // 关闭标记
}
func NewWechatNotifyConsumer(
conf *conf.Bootstrap,
voucherService *service.VoucherService,
) *WechatNotifyConsumer {
return &WechatNotifyConsumer{
conf: conf,
voucherService: voucherService,
}
}
// Start 启动消息消费
// https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-and-subscribe-to-normal-messages-3?spm=a2c4g.11186623.0.0.52c216e8nzMenk
func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
if !w.conf.WechatNotifyMQ.IsOpenConsumer {
log.Warnf("wechat notify consumer is not open")
return nil
}
// 设置HTTP协议客户端接入点进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
endpoint := w.conf.WechatNotifyMQ.EndPoint
// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
// AccessKey ID阿里云身份验证标识。
accessKeyId := w.conf.WechatNotifyMQ.AccessKeyId
// AccessKey Secret阿里云身份验证密钥。
accessKeySecret := w.conf.WechatNotifyMQ.AccessKeySecret
// 消息所属的Topic在消息队列RocketMQ版控制台创建。
//不同消息类型的Topic不能混用例如普通消息的Topic只能用于收发普通消息不能用于收发其他类型的消息。
topic := w.conf.WechatNotifyMQ.Topic
// Topic所属的实例ID在消息队列RocketMQ版控制台创建。
// 若实例有命名空间则实例ID必须传入若实例无命名空间则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
instanceId := w.conf.WechatNotifyMQ.InstanceId
// 您在控制台创建的Group ID。
groupId := w.conf.WechatNotifyMQ.GroupId
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, w.conf.WechatNotifyMQ.Tag)
w.consumeMessages(ctx, mqConsumer)
return nil
}
func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer mq_http_sdk.MQConsumer) {
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
var handles []string
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
w.processMessage(ctx, v)
}
// NextConsumeTime前若不确认消息消费成功则消息会被重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
if err := mqConsumer.AckMessage(handles); err != nil {
// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
log.Errorf("AckMessage Failed, err:%s\n", err)
if errAckItems, ok := err.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
log.Errorf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
}
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case err := <-errChan:
{
// Topic中没有消息可消费。
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
//fmt.Println("\nNo new message, continue!")
} else {
log.Errorf("ConsumeMessage Failed, err:%s\n", err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// 长轮询消费消息网络超时时间默认为35s。
// 长轮询表示如果Topic没有消息则客户端请求会在服务端挂起3s3s内如果有消息可以消费则立即返回响应。
mqConsumer.ConsumeMessage(respChan, errChan,
3, // 一次最多消费3条最多可设置为16条
10, // 长轮询时间3s最多可设置为30s
)
<-endChan
}
}
// 业务逻辑处理
func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_sdk.ConsumeMessageEntry) {
// 收到消息
if w.shutdownFlag.Load() {
fmt.Println("正在退出中,延期处理")
// 卡住,不再继续消费,等待退出
time.Sleep(24 * time.Hour)
return
}
// 标记活跃状态
w.activeCnt.Add(1)
defer func() {
w.activeCnt.Add(-1)
if v := recover(); v != nil {
log.Errorf("处理消息panic, %+v", v)
return
}
}()
log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody)
if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil {
log.Errorf("微信回调消费处理失败:%+v", err)
}
}
// Stop 停止消息消费
func (w *WechatNotifyConsumer) Stop(_ context.Context) error {
fmt.Println("关闭 wechat consumer 中...")
w.shutdownFlag.Store(true)
//shutdown之间保证正在处理的消费先提交
_ = w.blockWaitFinish()
fmt.Println("关闭 wechat consumer 完成")
return nil
}
// blockWaitFinish 阻塞等待业务完成
func (c *WechatNotifyConsumer) blockWaitFinish() error {
// 每1s检查下业务是否都处理完成
for {
cnt := c.activeCnt.Load()
if cnt == 0 {
//无业务处理,正常退
break
} else {
fmt.Printf("等待消费者退出,%d 个正在运行\n", cnt)
}
time.Sleep(1 * time.Second)
}
//防止极端情况下commit未完成
// nolint
time.Sleep(1 * time.Second)
return nil
}