From 83a23f74310c27cf3e2f819031127f939eb423a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AD=90=E9=93=AD?= Date: Fri, 7 Mar 2025 16:38:55 +0800 Subject: [PATCH] cmb --- internal/server/wechat_notify_consume.go | 70 ++++++++--------------- internal/service/wechat_notify_consume.go | 10 ++++ 2 files changed, 35 insertions(+), 45 deletions(-) create mode 100644 internal/service/wechat_notify_consume.go diff --git a/internal/server/wechat_notify_consume.go b/internal/server/wechat_notify_consume.go index 61d7b37..a9d6dcb 100644 --- a/internal/server/wechat_notify_consume.go +++ b/internal/server/wechat_notify_consume.go @@ -10,16 +10,24 @@ import ( "strings" "time" "voucher/internal/conf" + "voucher/internal/service" ) var _ transport.Server = (*WechatNotifyConsumer)(nil) type WechatNotifyConsumer struct { - conf *conf.Bootstrap + conf *conf.Bootstrap + voucherService *service.VoucherService } -func NewWechatNotifyConsumer(conf *conf.Bootstrap) *WechatNotifyConsumer { - return &WechatNotifyConsumer{conf: conf} +func NewWechatNotifyConsumer( + conf *conf.Bootstrap, + voucherService *service.VoucherService, +) *WechatNotifyConsumer { + return &WechatNotifyConsumer{ + conf: conf, + voucherService: voucherService, + } } func (w *WechatNotifyConsumer) Start(ctx context.Context) error { @@ -28,55 +36,23 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error { return nil } - //// 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 - //endpoint := w.conf.WechatNotifyMQ.EndPoint - //// 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。 - //// AccessKey ID,阿里云身份验证标识。 - //accessKey := w.conf.WechatNotifyMQ.AccessKeyId - //// AccessKey Secret,阿里云身份验证密钥。 - //secretKey := w.conf.WechatNotifyMQ.AccessKeySecret - //// 消息所属的Topic,在消息队列RocketMQ版控制台创建。 - ////不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。 - //topic := w.conf.WechatNotifyMQ.Topic - //// Topic所属的实例ID,在消息队列RocketMQ版控制台创建。 - //// 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。 - //instanceId := w.conf.WechatNotifyMQ.InstanceId - //// 您在控制台创建的Group ID。 - //groupId := w.conf.WechatNotifyMQ.GroupId - // - //tag := w.conf.WechatNotifyMQ.Tag - - /** - ACCESS_KEY_ID=LTAI5tPyV7FynQNTfEvbEBuX - ACCESS_KEY_SECRET=tZmTh8cV98xAQgtlRU0soWcb6Tpd4T - END_POINT=http://1389288909295870.mqrest.cn-hangzhou.aliyuncs.com - REGION_ID=cn-hangzhou - INSTANCE_ID= MQ_INST_1389288909295870_BYSoMttI - TOPIC=notify - TAG=coupon_usage_notify_market - REGISTER_TAG_URL= https://wpcallbacks.api.1688sup.com/wechatPay/register_tag - GROUP_ID=market_pro - DEBUG=false - */ - // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。 - endpoint := "http://1389288909295870.mqrest.cn-hangzhou.aliyuncs.com" + endpoint := w.conf.WechatNotifyMQ.EndPoint // 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。 // AccessKey ID,阿里云身份验证标识。 - accessKeyId := "LTAI5tPyV7FynQNTfEvbEBuX" + accessKeyId := w.conf.WechatNotifyMQ.AccessKeyId // AccessKey Secret,阿里云身份验证密钥。 - accessKeySecret := "tZmTh8cV98xAQgtlRU0soWcb6Tpd4T" + accessKeySecret := w.conf.WechatNotifyMQ.AccessKeySecret // 消息所属的Topic,在消息队列RocketMQ版控制台创建。 - topic := "notify" + //不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。 + topic := w.conf.WechatNotifyMQ.Topic // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。 // 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。 - instanceId := "MQ_INST_1389288909295870_BYSoMttI" + instanceId := w.conf.WechatNotifyMQ.InstanceId + // 您在控制台创建的Group ID。 + groupId := w.conf.WechatNotifyMQ.GroupId - //groupId := "market_pro" - groupId := "GID_market_pro" - - tag := "voucher_notify_dev" - //tag := "voucher_notify_pro" + tag := w.conf.WechatNotifyMQ.Tag client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "") @@ -94,9 +70,13 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error { var handles []string fmt.Printf("Consume %d messages---->\n", len(resp.Messages)) for _, v := range resp.Messages { + handles = append(handles, v.ReceiptHandle) log.Warnf("接收消息成功 wechat notify messageTag:%s, message: %s", v.MessageTag, v.MessageBody) - fmt.Printf("接收消息成功 wechat notify messageTag:%s, message: %s", v.MessageTag, v.MessageBody) + + if err := w.voucherService.WechatNotifyConsumer(ctx, v.MessageTag, v.MessageBody); err != nil { + log.Errorf("wechat notify messageTag:%s, message: %s, err:%+v", v.MessageTag, v.MessageBody, err) + } } // NextConsumeTime前若不确认消息消费成功,则消息会被重复消费。 diff --git a/internal/service/wechat_notify_consume.go b/internal/service/wechat_notify_consume.go new file mode 100644 index 0000000..9b01db6 --- /dev/null +++ b/internal/service/wechat_notify_consume.go @@ -0,0 +1,10 @@ +package service + +import ( + "context" +) + +func (j *VoucherService) WechatNotifyConsumer(ctx context.Context, tag, msg string) error { + + return nil +}