This commit is contained in:
ziming 2025-05-26 09:23:20 +08:00
parent c267632672
commit 4a2e8f8307
1 changed files with 4 additions and 5 deletions

View File

@ -58,12 +58,12 @@ func (w *WechatNotifyConsumer) Start(ctx context.Context) error {
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKeyId, accessKeySecret, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, w.conf.WechatNotifyMQ.Tag)
w.consumeMessages(mqConsumer)
w.consumeMessages(ctx, mqConsumer)
return nil
}
func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer) {
func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer mq_http_sdk.MQConsumer) {
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
@ -77,7 +77,7 @@ func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
w.processMessage(v)
w.processMessage(ctx, v)
}
// NextConsumeTime前若不确认消息消费成功则消息会被重复消费。
@ -127,10 +127,9 @@ func (w *WechatNotifyConsumer) consumeMessages(mqConsumer mq_http_sdk.MQConsumer
}
// 业务逻辑处理
func (w *WechatNotifyConsumer) processMessage(msg mq_http_sdk.ConsumeMessageEntry) {
func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_sdk.ConsumeMessageEntry) {
log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody)
ctx := context.Background()
if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil {
log.Errorf("微信回调消费处理失败:%+v", err)
}