From 4a2e8f8307e8c80d6bc4547350a271be8826ce09 Mon Sep 17 00:00:00 2001 From: ziming Date: Mon, 26 May 2025 09:23:20 +0800 Subject: [PATCH] add api --- internal/server/wechat_notify_consumer.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/internal/server/wechat_notify_consumer.go b/internal/server/wechat_notify_consumer.go index d80ad2c..a795ca9 100644 --- a/internal/server/wechat_notify_consumer.go +++ b/internal/server/wechat_notify_consumer.go @@ -58,12 +58,12 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error { client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "") mqConsumer := client.GetConsumer(instanceId, topic, groupId, w.conf.WechatNotifyMQ.Tag) - w.consumeMessages(mqConsumer) + w.consumeMessages(ctx, mqConsumer) return nil } -func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer) { +func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer mq_http_sdk.MQConsumer) { for { endChan := make(chan int) respChan := make(chan mq_http_sdk.ConsumeMessageResponse) @@ -77,7 +77,7 @@ func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer for _, v := range resp.Messages { handles = append(handles, v.ReceiptHandle) - w.processMessage(v) + w.processMessage(ctx, v) } // NextConsumeTime前若不确认消息消费成功,则消息会被重复消费。 @@ -127,10 +127,9 @@ func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer } // 业务逻辑处理 -func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntry) { +func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_sdk.ConsumeMessageEntry) { log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody) - ctx := context.Background() if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil { log.Errorf("微信回调消费处理失败:%+v", err) }