This commit is contained in:
李子铭 2025-03-07 16:38:55 +08:00
parent 1ee1fd6e5b
commit 83a23f7431
2 changed files with 35 additions and 45 deletions

View File

@ -10,16 +10,24 @@ import (
"strings"
"time"
"voucher/internal/conf"
"voucher/internal/service"
)
var _ transport.Server = (*WechatNotifyConsumer)(nil)
type WechatNotifyConsumer struct {
conf *conf.Bootstrap
conf *conf.Bootstrap
voucherService *service.VoucherService
}
func NewWechatNotifyConsumer(conf *conf.Bootstrap) *WechatNotifyConsumer {
return &WechatNotifyConsumer{conf: conf}
func NewWechatNotifyConsumer(
conf *conf.Bootstrap,
voucherService *service.VoucherService,
) *WechatNotifyConsumer {
return &WechatNotifyConsumer{
conf: conf,
voucherService: voucherService,
}
}
func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
@ -28,55 +36,23 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
return nil
}
//// 设置HTTP协议客户端接入点进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
//endpoint := w.conf.WechatNotifyMQ.EndPoint
//// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
//// AccessKey ID阿里云身份验证标识。
//accessKey := w.conf.WechatNotifyMQ.AccessKeyId
//// AccessKey Secret阿里云身份验证密钥。
//secretKey := w.conf.WechatNotifyMQ.AccessKeySecret
//// 消息所属的Topic在消息队列RocketMQ版控制台创建。
////不同消息类型的Topic不能混用例如普通消息的Topic只能用于收发普通消息不能用于收发其他类型的消息。
//topic := w.conf.WechatNotifyMQ.Topic
//// Topic所属的实例ID在消息队列RocketMQ版控制台创建。
//// 若实例有命名空间则实例ID必须传入若实例无命名空间则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
//instanceId := w.conf.WechatNotifyMQ.InstanceId
//// 您在控制台创建的Group ID。
//groupId := w.conf.WechatNotifyMQ.GroupId
//
//tag := w.conf.WechatNotifyMQ.Tag
/**
ACCESS_KEY_ID=LTAI5tPyV7FynQNTfEvbEBuX
ACCESS_KEY_SECRET=tZmTh8cV98xAQgtlRU0soWcb6Tpd4T
END_POINT=http://1389288909295870.mqrest.cn-hangzhou.aliyuncs.com
REGION_ID=cn-hangzhou
INSTANCE_ID= MQ_INST_1389288909295870_BYSoMttI
TOPIC=notify
TAG=coupon_usage_notify_market
REGISTER_TAG_URL= https://wpcallbacks.api.1688sup.com/wechatPay/register_tag
GROUP_ID=market_pro
DEBUG=false
*/
// 设置HTTP协议客户端接入点进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
endpoint := "http://1389288909295870.mqrest.cn-hangzhou.aliyuncs.com"
endpoint := w.conf.WechatNotifyMQ.EndPoint
// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
// AccessKey ID阿里云身份验证标识。
accessKeyId := "LTAI5tPyV7FynQNTfEvbEBuX"
accessKeyId := w.conf.WechatNotifyMQ.AccessKeyId
// AccessKey Secret阿里云身份验证密钥。
accessKeySecret := "tZmTh8cV98xAQgtlRU0soWcb6Tpd4T"
accessKeySecret := w.conf.WechatNotifyMQ.AccessKeySecret
// 消息所属的Topic在消息队列RocketMQ版控制台创建。
topic := "notify"
//不同消息类型的Topic不能混用例如普通消息的Topic只能用于收发普通消息不能用于收发其他类型的消息。
topic := w.conf.WechatNotifyMQ.Topic
// Topic所属的实例ID在消息队列RocketMQ版控制台创建。
// 若实例有命名空间则实例ID必须传入若实例无命名空间则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
instanceId := "MQ_INST_1389288909295870_BYSoMttI"
instanceId := w.conf.WechatNotifyMQ.InstanceId
// 您在控制台创建的Group ID。
groupId := w.conf.WechatNotifyMQ.GroupId
//groupId := "market_pro"
groupId := "GID_market_pro"
tag := "voucher_notify_dev"
//tag := "voucher_notify_pro"
tag := w.conf.WechatNotifyMQ.Tag
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "")
@ -94,9 +70,13 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
log.Warnf("接收消息成功 wechat notify messageTag:%s, message: %s", v.MessageTag, v.MessageBody)
fmt.Printf("接收消息成功 wechat notify messageTag:%s, message: %s", v.MessageTag, v.MessageBody)
if err := w.voucherService.WechatNotifyConsumer(ctx, v.MessageTag, v.MessageBody); err != nil {
log.Errorf("wechat notify messageTag:%s, message: %s, err:%+v", v.MessageTag, v.MessageBody, err)
}
}
// NextConsumeTime前若不确认消息消费成功则消息会被重复消费。

View File

@ -0,0 +1,10 @@
package service
import (
"context"
)
func (j *VoucherService) WechatNotifyConsumer(ctx context.Context, tag, msg string) error {
return nil
}