检测服务关闭~
This commit is contained in:
parent
4a2e8f8307
commit
f59bf4fa15
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"github.com/go-kratos/kratos/v2/transport"
|
"github.com/go-kratos/kratos/v2/transport"
|
||||||
"github.com/gogap/errors"
|
"github.com/gogap/errors"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
"voucher/internal/conf"
|
"voucher/internal/conf"
|
||||||
"voucher/internal/service"
|
"voucher/internal/service"
|
||||||
|
|
@ -18,6 +19,9 @@ var _ transport.Server = (*WechatNotifyConsumer)(nil)
|
||||||
type WechatNotifyConsumer struct {
|
type WechatNotifyConsumer struct {
|
||||||
conf *conf.Bootstrap
|
conf *conf.Bootstrap
|
||||||
voucherService *service.VoucherService
|
voucherService *service.VoucherService
|
||||||
|
|
||||||
|
activeCnt atomic.Int32 //当前正在处理业务的个数,用于退出时业务平滑的退出
|
||||||
|
shutdownFlag atomic.Bool // 关闭标记
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWechatNotifyConsumer(
|
func NewWechatNotifyConsumer(
|
||||||
|
|
@ -128,6 +132,25 @@ func (w *WechatNotifyConsumer) consumeMessages(ctx context.Context, mqConsumer m
|
||||||
|
|
||||||
// 业务逻辑处理
|
// 业务逻辑处理
|
||||||
func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_sdk.ConsumeMessageEntry) {
|
func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_sdk.ConsumeMessageEntry) {
|
||||||
|
|
||||||
|
// 收到消息
|
||||||
|
if w.shutdownFlag.Load() {
|
||||||
|
fmt.Println("正在退出中,延期处理")
|
||||||
|
// 卡住,不再继续消费,等待退出
|
||||||
|
time.Sleep(24 * time.Hour)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 标记活跃状态
|
||||||
|
w.activeCnt.Add(1)
|
||||||
|
defer func() {
|
||||||
|
w.activeCnt.Add(-1)
|
||||||
|
if v := recover(); v != nil {
|
||||||
|
log.Errorf("处理消息panic, ,%+v", v)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody)
|
log.Warnf("微信回调消费接收消息成功 messageId:%s, messageTag:%s, message: %s", msg.MessageId, msg.MessageTag, msg.MessageBody)
|
||||||
|
|
||||||
if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil {
|
if err := w.voucherService.WechatUseNotifyConsumer(ctx, msg.MessageTag, msg.MessageBody); err != nil {
|
||||||
|
|
@ -138,5 +161,32 @@ func (w *WechatNotifyConsumer) processMessage(ctx context.Context, msg mq_http_s
|
||||||
// Stop 停止消息消费
|
// Stop 停止消息消费
|
||||||
func (w *WechatNotifyConsumer) Stop(_ context.Context) error {
|
func (w *WechatNotifyConsumer) Stop(_ context.Context) error {
|
||||||
fmt.Println("关闭 wechat consumer 中...")
|
fmt.Println("关闭 wechat consumer 中...")
|
||||||
|
|
||||||
|
w.shutdownFlag.Store(true)
|
||||||
|
|
||||||
|
//shutdown之间,保证正在处理的消费先提交
|
||||||
|
_ = w.blockWaitFinish()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// blockWaitFinish 阻塞等待业务完成
|
||||||
|
func (c *WechatNotifyConsumer) blockWaitFinish() error {
|
||||||
|
// 每1s检查下业务是否都处理完成
|
||||||
|
|
||||||
|
for {
|
||||||
|
cnt := c.activeCnt.Load()
|
||||||
|
if cnt == 0 {
|
||||||
|
//无业务处理,正常退
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
fmt.Printf("等待消费者退出,%d 个正在运行\n", cnt)
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
//防止极端情况下commit未完成
|
||||||
|
// nolint
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue