Feature/keep alive (#25)

* feat(#keepAlive) : keepAlive

* feat(#keepAlive) : keepAlive

---------

Co-authored-by: mike.wq <mike.wq@alibaba-inc.com>
This commit is contained in:
qwangseu 2025-04-18 12:11:46 +08:00 committed by GitHub
parent 7a8ceac0e8
commit d1cc841e60
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 82 additions and 21 deletions

View File

@ -41,10 +41,13 @@ type StreamClient struct {
extras map[string]string
openApiHost string
proxy string
keepAliveIdle time.Duration
}
func NewStreamClient(options ...ClientOption) *StreamClient {
cli := &StreamClient{}
cli := &StreamClient{
keepAliveIdle: 120 * time.Second,
}
defaultOptions := []ClientOption{
WithSubscription(utils.SubscriptionTypeKSystem, "disconnect", cli.OnDisconnect),
@ -133,21 +136,68 @@ func (cli *StreamClient) processLoop() {
}
}()
for {
if cli.conn == nil {
logger.GetLogger().Errorf("connection process connect nil, maybe disconnected.")
return
}
readChan := make(chan []byte)
pongChan := make(chan struct{})
closeChan := make(chan struct{})
defer func() { close(closeChan) }()
defer func() { close(pongChan) }()
defer func() { close(readChan) }()
cli.conn.SetPongHandler(func(appData string) error {
pongChan <- struct{}{}
return nil
})
//开始启动协程读数据
go func() {
for {
messageType, message, err := cli.conn.ReadMessage()
if err != nil {
logger.GetLogger().Errorf("connection process read message error: messageType=[%d] message=[%s] error=[%s]", messageType, string(message), err)
closeChan <- struct{}{}
return
}
if messageType == websocket.TextMessage {
readChan <- message
}
}
}()
logger.GetLogger().Debugf("[wire] [websocket] remote => local: \n%s", string(message))
go cli.processDataFrame(message)
//循环处理事件
for {
timer := time.NewTimer(cli.keepAliveIdle)
select {
case msg, ok := <-readChan:
timer.Stop()
if ok {
go cli.processDataFrame(msg)
} else {
logger.GetLogger().Errorf("connection process is closed")
return
}
case <-timer.C:
e := cli.conn.WriteMessage(websocket.PingMessage, nil)
if e != nil {
logger.GetLogger().Errorf("connection write ping message error: error=[%s]", e)
return
}
go func() {
select {
case <-pongChan:
return
case <-time.After(5 * time.Second):
logger.GetLogger().Errorf("ping time out, connection is closing")
closeChan <- struct{}{}
return
}
}()
case <-closeChan:
return
}
}
}

View File

@ -41,7 +41,7 @@ type UserAgentConfig struct {
func NewDingtalkGoSDKUserAgent() *UserAgentConfig {
return &UserAgentConfig{
UserAgent: "dingtalk-sdk-go/v0.9.0",
UserAgent: "dingtalk-sdk-go/v0.9.1",
}
}

View File

@ -1,6 +1,9 @@
package client
import "github.com/open-dingtalk/dingtalk-stream-sdk-go/handler"
import (
"github.com/open-dingtalk/dingtalk-stream-sdk-go/handler"
"time"
)
/**
* @Author linya.jj
@ -27,6 +30,14 @@ func WithSubscription(stype, stopic string, frameHandler handler.IFrameHandler)
}
}
func WithKeepAlive(keepAliveIdle time.Duration) ClientOption {
return func(client *StreamClient) {
if keepAliveIdle >= 3*time.Second {
client.keepAliveIdle = keepAliveIdle
}
}
}
func WithUserAgent(ua *UserAgentConfig) ClientOption {
return func(c *StreamClient) {
if ua.Valid() != nil {