diff --git a/internal/server/wechat_notify_consumer.go b/internal/server/wechat_notify_consumer.go index a795ca9..a061c36 100644 --- a/internal/server/wechat_notify_consumer.go +++ b/internal/server/wechat_notify_consumer.go @@ -8,6 +8,7 @@ import ( "github.com/go-kratos/kratos/v2/transport" "github.com/gogap/errors" "strings" + "sync/atomic" "time" "voucher/internal/conf" "voucher/internal/service" @@ -18,6 +19,9 @@ var _ transport.Server = (*WechatNotifyConsumer)(nil) type WechatNotifyConsumer struct { conf *conf.Bootstrap voucherService *service.VoucherService + + activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出 + shutdownFlag atomic.Bool // 关闭标记 } func NewWechatNotifyConsumer( @@ -128,6 +132,25 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m // 业务逻辑处理 func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_sdk.ConsumeMessageEntry) { + + // 收到消息 + if w.shutdownFlag.Load() { + fmt.Println("正在退出中,延期处理") + // 卡住,不再继续消费,等待退出 + time.Sleep(24 * time.Hour) + return + } + + // 标记活跃状态 + w.activeCnt.Add(1) + defer func() { + w.activeCnt.Add(-1) + if v := recover(); v != nil { + log.Errorf("处理消息panic, ,%+v", v) + return + } + }() + log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody) if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil { @@ -138,5 +161,32 @@ func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_s // Stop 停止消息消费 func (w *WechatNotifyConsumer) Stop(_ context.Context) error { fmt.Println("关闭 wechat consumer 中...") + + w.shutdownFlag.Store(true) + + //shutdown之间,保证正在处理的消费先提交 + _ = w.blockWaitFinish() + + return nil +} + +// blockWaitFinish 阻塞等待业务完成 +func (c *WechatNotifyConsumer) blockWaitFinish() error { + // 每1s检查下业务是否都处理完成 + + for { + cnt := c.activeCnt.Load() + if cnt == 0 { + //无业务处理,正常退 + break + } else { + fmt.Printf("等待消费者退出,%d 个正在运行\n", cnt) + } + time.Sleep(1 * time.Second) + } + + //防止极端情况下commit未完成 + // nolint + time.Sleep(1 * time.Second) return nil }