voucher/internal/server/wechat_notify_consume.go

156 lines
6.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package server
import (
"context"
"fmt"
mq_http_sdk "github.com/aliyunmq/mq-http-go-sdk"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/transport"
"github.com/gogap/errors"
"strings"
"time"
"voucher/internal/conf"
)
var _ transport.Server = (*WechatNotifyConsumer)(nil)
type WechatNotifyConsumer struct {
conf *conf.Bootstrap
}
func NewWechatNotifyConsumer(conf *conf.Bootstrap) *WechatNotifyConsumer {
return &WechatNotifyConsumer{conf: conf}
}
func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
if !w.conf.WechatNotifyMQ.IsOpenConsumer {
log.Warnf("wechat notify MQ is not open")
return nil
}
//// 设置HTTP协议客户端接入点进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
//endpoint := w.conf.WechatNotifyMQ.EndPoint
//// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
//// AccessKey ID阿里云身份验证标识。
//accessKey := w.conf.WechatNotifyMQ.AccessKeyId
//// AccessKey Secret阿里云身份验证密钥。
//secretKey := w.conf.WechatNotifyMQ.AccessKeySecret
//// 消息所属的Topic在消息队列RocketMQ版控制台创建。
////不同消息类型的Topic不能混用例如普通消息的Topic只能用于收发普通消息不能用于收发其他类型的消息。
//topic := w.conf.WechatNotifyMQ.Topic
//// Topic所属的实例ID在消息队列RocketMQ版控制台创建。
//// 若实例有命名空间则实例ID必须传入若实例无命名空间则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
//instanceId := w.conf.WechatNotifyMQ.InstanceId
//// 您在控制台创建的Group ID。
//groupId := w.conf.WechatNotifyMQ.GroupId
//
//tag := w.conf.WechatNotifyMQ.Tag
/**
ACCESS_KEY_ID=LTAI5tPyV7FynQNTfEvbEBuX
ACCESS_KEY_SECRET=tZmTh8cV98xAQgtlRU0soWcb6Tpd4T
END_POINT=http://1389288909295870.mqrest.cn-hangzhou.aliyuncs.com
REGION_ID=cn-hangzhou
INSTANCE_ID= MQ_INST_1389288909295870_BYSoMttI
TOPIC=notify
TAG=coupon_usage_notify_market
REGISTER_TAG_URL= https://wpcallbacks.api.1688sup.com/wechatPay/register_tag
GROUP_ID=market_pro
DEBUG=false
*/
// 设置HTTP协议客户端接入点进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
endpoint := "http://1389288909295870.mqrest.cn-hangzhou.aliyuncs.com"
// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
// AccessKey ID阿里云身份验证标识。
accessKeyId := "LTAI5tPyV7FynQNTfEvbEBuX"
// AccessKey Secret阿里云身份验证密钥。
accessKeySecret := "tZmTh8cV98xAQgtlRU0soWcb6Tpd4T"
// 消息所属的Topic在消息队列RocketMQ版控制台创建。
topic := "notify"
// Topic所属的实例ID在消息队列RocketMQ版控制台创建。
// 若实例有命名空间则实例ID必须传入若实例无命名空间则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
instanceId := "MQ_INST_1389288909295870_BYSoMttI"
//groupId := "market_pro"
groupId := "GID_market_pro"
tag := "voucher_notify_dev"
//tag := "voucher_notify_pro"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, tag)
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// 处理业务逻辑。
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
log.Warnf("接收消息成功 wechat notify messageTag:%s, message: %s", v.MessageTag, v.MessageBody)
fmt.Printf("接收消息成功 wechat notify messageTag:%s, message: %s", v.MessageTag, v.MessageBody)
}
// NextConsumeTime前若不确认消息消费成功则消息会被重复消费。
// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
log.Errorf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
} else {
log.Errorf("ackerr:%+v\n", ackerr)
fmt.Println("ack err =", ackerr)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
// Topic中没有消息可消费。
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// 长轮询消费消息网络超时时间默认为35s。
// 长轮询表示如果Topic没有消息则客户端请求会在服务端挂起3s3s内如果有消息可以消费则立即返回响应。
mqConsumer.ConsumeMessage(respChan, errChan,
3, // 一次最多消费3条最多可设置为16条
10, // 长轮询时间3s最多可设置为30s
)
<-endChan
}
}
func (w *WechatNotifyConsumer) Stop(ctx context.Context) error {
fmt.Println("关闭 wechat consumer 中...")
return nil
}