From d1cc841e6013c3f6513a5bb01dfe3219b9c37d17 Mon Sep 17 00:00:00 2001 From: qwangseu <1195974065@qq.com> Date: Fri, 18 Apr 2025 12:11:46 +0800 Subject: [PATCH] Feature/keep alive (#25) * feat(#keepAlive) : keepAlive * feat(#keepAlive) : keepAlive --------- Co-authored-by: mike.wq --- client/client.go | 88 +++++++++++++++++++++++++++++++++++++----------- client/config.go | 2 +- client/option.go | 13 ++++++- 3 files changed, 82 insertions(+), 21 deletions(-) diff --git a/client/client.go b/client/client.go index f46503c..c544c48 100644 --- a/client/client.go +++ b/client/client.go @@ -35,16 +35,19 @@ type StreamClient struct { subscriptions map[string]map[string]handler.IFrameHandler - conn *websocket.Conn - sessionId string - mutex sync.Mutex - extras map[string]string - openApiHost string - proxy string + conn *websocket.Conn + sessionId string + mutex sync.Mutex + extras map[string]string + openApiHost string + proxy string + keepAliveIdle time.Duration } func NewStreamClient(options ...ClientOption) *StreamClient { - cli := &StreamClient{} + cli := &StreamClient{ + keepAliveIdle: 120 * time.Second, + } defaultOptions := []ClientOption{ WithSubscription(utils.SubscriptionTypeKSystem, "disconnect", cli.OnDisconnect), @@ -133,21 +136,68 @@ func (cli *StreamClient) processLoop() { } }() + if cli.conn == nil { + logger.GetLogger().Errorf("connection process connect nil, maybe disconnected.") + return + } + + readChan := make(chan []byte) + pongChan := make(chan struct{}) + closeChan := make(chan struct{}) + defer func() { close(closeChan) }() + defer func() { close(pongChan) }() + defer func() { close(readChan) }() + + cli.conn.SetPongHandler(func(appData string) error { + pongChan <- struct{}{} + return nil + }) + //开始启动协程读数据 + go func() { + for { + messageType, message, err := cli.conn.ReadMessage() + if err != nil { + logger.GetLogger().Errorf("connection process read message error: messageType=[%d] message=[%s] error=[%s]", messageType, string(message), err) + closeChan <- struct{}{} + return + } + if messageType == websocket.TextMessage { + readChan <- message + } + } + }() + + //循环处理事件 for { - if cli.conn == nil { - logger.GetLogger().Errorf("connection process connect nil, maybe disconnected.") + timer := time.NewTimer(cli.keepAliveIdle) + select { + case msg, ok := <-readChan: + timer.Stop() + if ok { + go cli.processDataFrame(msg) + } else { + logger.GetLogger().Errorf("connection process is closed") + return + } + case <-timer.C: + e := cli.conn.WriteMessage(websocket.PingMessage, nil) + if e != nil { + logger.GetLogger().Errorf("connection write ping message error: error=[%s]", e) + return + } + go func() { + select { + case <-pongChan: + return + case <-time.After(5 * time.Second): + logger.GetLogger().Errorf("ping time out, connection is closing") + closeChan <- struct{}{} + return + } + }() + case <-closeChan: return } - - messageType, message, err := cli.conn.ReadMessage() - if err != nil { - logger.GetLogger().Errorf("connection process read message error: messageType=[%d] message=[%s] error=[%s]", messageType, string(message), err) - return - } - - logger.GetLogger().Debugf("[wire] [websocket] remote => local: \n%s", string(message)) - - go cli.processDataFrame(message) } } diff --git a/client/config.go b/client/config.go index 4469a4e..d0124bf 100644 --- a/client/config.go +++ b/client/config.go @@ -41,7 +41,7 @@ type UserAgentConfig struct { func NewDingtalkGoSDKUserAgent() *UserAgentConfig { return &UserAgentConfig{ - UserAgent: "dingtalk-sdk-go/v0.9.0", + UserAgent: "dingtalk-sdk-go/v0.9.1", } } diff --git a/client/option.go b/client/option.go index 87e5fa4..30af59d 100644 --- a/client/option.go +++ b/client/option.go @@ -1,6 +1,9 @@ package client -import "github.com/open-dingtalk/dingtalk-stream-sdk-go/handler" +import ( + "github.com/open-dingtalk/dingtalk-stream-sdk-go/handler" + "time" +) /** * @Author linya.jj @@ -27,6 +30,14 @@ func WithSubscription(stype, stopic string, frameHandler handler.IFrameHandler) } } +func WithKeepAlive(keepAliveIdle time.Duration) ClientOption { + return func(client *StreamClient) { + if keepAliveIdle >= 3*time.Second { + client.keepAliveIdle = keepAliveIdle + } + } +} + func WithUserAgent(ua *UserAgentConfig) ClientOption { return func(c *StreamClient) { if ua.Valid() != nil {