to #feat 初始化提交

This commit is contained in:
蒋剑 2023-05-12 13:57:29 +08:00
parent 16f426cfa2
commit ae3f631591
34 changed files with 1582 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/.idea

24
Makefile Normal file
View File

@ -0,0 +1,24 @@
# Makefile for nbartookt
# Go parameters
# GOCMD=GOOS=linux GOARCH=amd64 go
# GOBUILD=CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo
ifndef GOCMD
GOCMD=go
endif
ifndef GOBUILD
GOBUILD=$(GOCMD) build
endif
GOCLEAN=$(GOCMD) clean
GOINSTALL=$(GOCMD) install
GOTEST=$(GOCMD) test
GODEP=$(GOTEST) -i
GOFMT=gofmt -w
BINARY_NAME=wukong-eos
all: test
test:
$(GOTEST) -v -cover=true -coverprofile=./sdk.cover ./...
go tool cover -html=./sdk.cover -o ./sdk.html
fmt:
find ./ -name "*.go" | xargs gofmt -w

345
client/client.go Normal file
View File

@ -0,0 +1,345 @@
package client
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/handler"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/logger"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/utils"
"io"
"net/http"
"sync"
"time"
)
/**
* @Author linya.jj
* @Date 2023/3/22 14:23
*/
type StreamClient struct {
AppCredential *AppCredentialConfig
UserAgent *UserAgentConfig
AutoReconnect bool
subscriptions map[string]map[string]handler.IFrameHandler
conn *websocket.Conn
sessionId string
mutex sync.Mutex
}
func NewStreamClient(options ...ClientOption) *StreamClient {
cli := &StreamClient{}
defaultOptions := []ClientOption{
WithSubscription(utils.SubscriptionTypeKSystem, "disconnect", cli.OnDisconnect),
WithSubscription(utils.SubscriptionTypeKSystem, "ping", cli.OnPing),
WithUserAgent(NewDingtalkGoSDKUserAgent()),
WithAutoReconnect(true),
}
for _, option := range defaultOptions {
option(cli)
}
for _, option := range options {
if option == nil {
continue
}
option(cli)
}
return cli
}
func (cli *StreamClient) Start(ctx context.Context) error {
if cli.conn != nil {
return nil
}
cli.mutex.Lock()
defer cli.mutex.Unlock()
if cli.conn != nil {
return nil
}
endpoint, err := cli.GetConnectionEndpoint(ctx)
if err != nil {
return err
}
wssUrl := fmt.Sprintf("%s?ticket=%s", endpoint.Endpoint, endpoint.Ticket)
header := make(http.Header)
conn, resp, err := websocket.DefaultDialer.Dial(wssUrl, header)
if err != nil {
return err
}
// 建连失败
if resp.StatusCode >= http.StatusBadRequest {
return utils.ErrorFromHttpResponseBody(resp)
}
cli.conn = conn
cli.sessionId = endpoint.Ticket
logger.GetLogger().Infof("connect success, sessionId=[%s]", cli.sessionId)
go cli.processLoop()
return nil
}
func (cli *StreamClient) processLoop() {
defer func() {
if err := recover(); err != nil {
logger.GetLogger().Errorf("connection process panic due to unknown reason, error=[%s]", err)
}
if cli.AutoReconnect {
go cli.reconnect()
}
}()
for {
if cli.conn == nil {
logger.GetLogger().Errorf("connection process connect nil, maybe disconnected.")
return
}
messageType, message, err := cli.conn.ReadMessage()
if err != nil {
logger.GetLogger().Errorf("connection process read message error: messageType=[%d] message=[%s] error=[%s]", messageType, string(message), err)
return
}
logger.GetLogger().Debugf("ReadRawMessage : messageType=[%d] message=[%s]", messageType, string(message))
go cli.processDataFrame(message)
}
}
func (cli *StreamClient) processDataFrame(rawData []byte) {
defer func() {
if err := recover(); err != nil {
logger.GetLogger().Errorf("connection processDataFrame panic, error=[%s]", err)
}
}()
dataFrame, err := payload.DecodeDataFrame(rawData)
if err != nil {
logger.GetLogger().Errorf("connection process decode data frame error: length=[%d] error=[%s]", len(rawData), err)
return
}
if dataFrame == nil || dataFrame.Headers == nil {
logger.GetLogger().Errorf("connection processDataFrame dataFrame nil.")
return
}
frameHandler, err := cli.GetHandler(dataFrame.Type, dataFrame.GetTopic())
if err != nil {
logger.GetLogger().Errorf("connection processDataFrame unregistered handler: type=[%s] topic=[%s]", dataFrame.Type, dataFrame.GetTopic())
return
}
dataAck, err := frameHandler(context.Background(), dataFrame)
if dataAck == nil && err != nil {
dataAck = payload.NewErrorDataFrameResponse(dataFrame.GetMessageId(), err)
}
if dataAck == nil {
return
}
errSend := cli.SendDataFrameResponse(context.Background(), dataAck)
logger.GetLogger().Debugf("SendFrameAck dataAck=[%v", dataAck)
if errSend != nil {
logger.GetLogger().Errorf("connection processDataFrame send response error: error=[%s]", errSend)
}
}
func (cli *StreamClient) Close() {
if cli.conn == nil {
return
}
cli.mutex.Lock()
defer cli.mutex.Unlock()
if cli.conn == nil {
return
}
if err := cli.conn.Close(); err != nil {
logger.GetLogger().Errorf("StreamClient close. error=[%s]", err)
}
cli.conn = nil
cli.sessionId = ""
}
func (cli *StreamClient) reconnect() {
defer func() {
if err := recover(); err != nil {
logger.GetLogger().Errorf("reconect panic due to unknown reason. error=[%s]", err)
}
}()
cli.Close()
for {
err := cli.Start(context.Background())
if err != nil {
logger.GetLogger().Errorf("StreamClient reconnect error. error=[%s]", err)
time.Sleep(time.Second * 3)
} else {
logger.GetLogger().Infof("StreamClient reconnect success")
return
}
}
}
func (cli *StreamClient) GetHandler(stype, stopic string) (handler.IFrameHandler, error) {
subs := cli.subscriptions[stype]
if subs == nil || subs[stopic] == nil {
return nil, errors.New("HandlerNotRegistedForTypeTopic_" + stype + "_" + stopic)
}
return subs[stopic], nil
}
func (cli *StreamClient) CheckConfigValid() error {
if err := cli.AppCredential.Valid(); err != nil {
return err
}
if err := cli.UserAgent.Valid(); err != nil {
return err
}
if cli.subscriptions == nil {
return errors.New("subscriptionsNil")
}
for ttype, subs := range cli.subscriptions {
if _, ok := utils.SubscriptionTypeSet[ttype]; !ok {
return errors.New("UnKnownSubscriptionType_" + ttype)
}
if len(subs) <= 0 {
return errors.New("NoHandlersRegistedForType_" + ttype)
}
for ttopic, h := range subs {
if h == nil {
return errors.New("HandlerNilForTypeTopic_" + ttype + "_" + ttopic)
}
}
}
return nil
}
func (cli *StreamClient) GetConnectionEndpoint(ctx context.Context) (*payload.ConnectionEndpointResponse, error) {
if err := cli.CheckConfigValid(); err != nil {
return nil, err
}
requestModel := payload.ConnectionEndpointRequest{
ClientId: cli.AppCredential.ClientId,
ClientSecret: cli.AppCredential.ClientSecret,
UserAgent: cli.UserAgent.UserAgent,
Subscriptions: make([]*payload.SubscriptionModel, 0),
}
for ttype, subs := range cli.subscriptions {
for ttopic, _ := range subs {
requestModel.Subscriptions = append(requestModel.Subscriptions, &payload.SubscriptionModel{
Type: ttype,
Topic: ttopic,
})
}
}
requestJsonBody, _ := json.Marshal(requestModel)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, utils.GetConnectionEndpointAPIUrl, bytes.NewReader(requestJsonBody))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
httpClient := &http.Client{
Transport: http.DefaultTransport,
Timeout: 5 * time.Second, //设置超时包含connection时间、任意重定向时间、读取response body时间
}
resp, err := httpClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, utils.ErrorFromHttpResponseBody(resp)
}
defer resp.Body.Close()
responseJsonBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
endpoint := &payload.ConnectionEndpointResponse{}
if err := json.Unmarshal(responseJsonBody, endpoint); err != nil {
return nil, err
}
if err := endpoint.Valid(); err != nil {
return nil, err
}
return endpoint, nil
}
func (cli *StreamClient) OnDisconnect(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) {
logger.GetLogger().Debugf("StreamClient.OnDisconnect")
cli.Close()
return nil, nil
}
func (cli *StreamClient) OnPing(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) {
dfPong := payload.NewDataFrameAckPong(df.GetMessageId())
dfPong.Data = df.Data
return dfPong, nil
}
// 返回正常数据包
func (cli *StreamClient) SendDataFrameResponse(ctx context.Context, resp *payload.DataFrameResponse) error {
if resp == nil {
return errors.New("SendDataFrameResponseError_ResponseNil")
}
if cli.conn == nil {
logger.GetLogger().Errorf("SendDataFrameResponse error, conn nil, maybe disconnected.")
return errors.New("disconnected")
}
return cli.conn.WriteJSON(resp)
}

49
client/client_test.go Normal file
View File

@ -0,0 +1,49 @@
package client
import (
"testing"
)
/**
* @Author linya.jj
* @Date 2023/3/22 14:23
*/
func TestNewDingtalkOpenStreamClient(t *testing.T) {
}
func TestDingtalkOpenStreamClient_Start(t *testing.T) {
}
func TestDingtalkOpenStreamClient_processDataFrame(t *testing.T) {
}
func TestDingtalkOpenStreamClient_Close(t *testing.T) {
}
func TestDingtalkOpenStreamClient_reconnect(t *testing.T) {
}
func TestDingtalkOpenStreamClient_GetHandler(t *testing.T) {
}
func TestDingtalkOpenStreamClient_CheckConfigValid(t *testing.T) {
}
func TestDingtalkOpenStreamClient_GetConnectionEndpoint(t *testing.T) {
}
func TestDingtalkOpenStreamClient_OnDisconnect(t *testing.T) {
}
func TestDingtalkOpenStreamClient_OnPing(t *testing.T) {
}
func TestDingtalkOpenStreamClient_SendDataFrameResponse(t *testing.T) {
}
func TestDingtalkOpenStreamClient_SendErrorResponse(t *testing.T) {
}

58
client/config.go Normal file
View File

@ -0,0 +1,58 @@
package client
import (
"errors"
)
/**
* @Author linya.jj
* @Date 2023/3/22 14:50
*/
// 应用秘钥信息
type AppCredentialConfig struct {
ClientId string `json:"clientKey" yaml:"clientKey"` //自建应用appKey; 三方应用suiteKey
ClientSecret string `json:"clientSecret" yaml:"clientSecret"` //自建应用appSecret; 三方应用suiteSecret
}
func NewAppCredentialConfig(clientId, clientSecret string) *AppCredentialConfig {
return &AppCredentialConfig{
ClientId: clientId,
ClientSecret: clientSecret,
}
}
func (c *AppCredentialConfig) Valid() error {
if c == nil {
return errors.New("AppCredentialConfigNil")
}
if c.ClientId == "" || c.ClientSecret == "" {
return errors.New("AppCredentialConfigEmpty")
}
return nil
}
// UA信息
type UserAgentConfig struct {
UserAgent string `json:"user_agent"`
}
func NewDingtalkGoSDKUserAgent() *UserAgentConfig {
return &UserAgentConfig{
UserAgent: "dingtalk-sdk-go/0.1.0",
}
}
func (c *UserAgentConfig) Valid() error {
if c == nil {
return errors.New("UserAgentConfigNil")
}
if c.UserAgent == "" {
return errors.New("UserAgentConfigEmpty")
}
return nil
}

33
client/config_test.go Normal file
View File

@ -0,0 +1,33 @@
package client
import (
"github.com/stretchr/testify/assert"
"testing"
)
/**
* @Author linya.jj
* @Date 2023/3/22 14:50
*/
func TestAppCredentialConfig_Valid(t *testing.T) {
conf := NewAppCredentialConfig("clientId", "clientSecret")
assert.Nil(t, conf.Valid())
conf.ClientId = ""
assert.NotNil(t, conf.Valid())
conf = nil
assert.NotNil(t, conf.Valid())
}
func TestDingtalkGoSDKUserAgent_Valid(t *testing.T) {
conf := NewDingtalkGoSDKUserAgent()
assert.Nil(t, conf.Valid())
conf.UserAgent = ""
assert.NotNil(t, conf.Valid())
conf = nil
assert.NotNil(t, conf.Valid())
}

46
client/option.go Normal file
View File

@ -0,0 +1,46 @@
package client
import "github.com/open-dingtalk/dingtalk-stream-sdk-go/handler"
/**
* @Author linya.jj
* @Date 2023/3/22 14:48
*/
type ClientOption func(*StreamClient)
func WithAutoReconnect(autoReconnect bool) ClientOption {
return func(c *StreamClient) {
c.AutoReconnect = autoReconnect
}
}
func WithAppCredential(cred *AppCredentialConfig) ClientOption {
return func(c *StreamClient) {
c.AppCredential = cred
}
}
func WithSubscription(stype, stopic string, frameHandler handler.IFrameHandler) ClientOption {
return func(c *StreamClient) {
if c.subscriptions == nil {
c.subscriptions = make(map[string]map[string]handler.IFrameHandler)
}
if _, ok := c.subscriptions[stype]; !ok {
c.subscriptions[stype] = make(map[string]handler.IFrameHandler)
}
c.subscriptions[stype][stopic] = frameHandler
}
}
func WithUserAgent(ua *UserAgentConfig) ClientOption {
return func(c *StreamClient) {
if ua.Valid() != nil {
ua = NewDingtalkGoSDKUserAgent()
}
c.UserAgent = ua
}
}

38
client/option_test.go Normal file
View File

@ -0,0 +1,38 @@
package client
import (
"context"
"github.com/stretchr/testify/assert"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
"testing"
)
/**
* @Author linya.jj
* @Date 2023/3/22 14:48
*/
func TestWithAppCredential(t *testing.T) {
op := WithAppCredential(NewAppCredentialConfig("clientId", "clientSecret"))
c := NewStreamClient(op)
assert.Equal(t, "clientId", c.AppCredential.ClientId)
assert.Equal(t, "clientSecret", c.AppCredential.ClientSecret)
}
func TestWithSubscription(t *testing.T) {
op := WithSubscription("stype", "stopic", func(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) {
return nil, nil
})
c := NewStreamClient(op)
h, err := c.GetHandler("stype", "stopic")
assert.Nil(t, err)
assert.NotNil(t, h)
}
func TestWithUserAgent(t *testing.T) {
op := WithUserAgent(NewDingtalkGoSDKUserAgent())
c := NewStreamClient(op)
assert.NotNil(t, c.UserAgent)
}

73
event/event.go Normal file
View File

@ -0,0 +1,73 @@
package event
import (
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
"strconv"
)
/**
* @Author linya.jj
* @Date 2023/4/26 17:15
*/
const (
DataFrameHeaderKEventId = "eventId"
DataFrameHeaderKEventBornTime = "eventBornTime"
DataFrameHeaderKEventCorpId = "eventCorpId"
DataFrameHeaderKEventType = "eventType"
DataFrameHeaderKEventUnifiedAppId = "eventUnifiedAppId"
)
type EventHeader struct {
EventId string `json:"eventId"`
EventBornTime int64 `json:"eventBornTime"`
EventCorpId string `json:"eventCorpId"`
EventType string `json:"eventType"`
EventUnifiedAppId string `json:"eventUnifiedAppId"`
}
func NewEventHeaderFromDataFrame(df *payload.DataFrame) *EventHeader {
if df == nil {
return &EventHeader{}
}
eventHeader := &EventHeader{
EventId: df.GetHeader(DataFrameHeaderKEventId),
EventBornTime: 0,
EventCorpId: df.GetHeader(DataFrameHeaderKEventCorpId),
EventType: df.GetHeader(DataFrameHeaderKEventType),
EventUnifiedAppId: df.GetHeader(DataFrameHeaderKEventUnifiedAppId),
}
if ts, err := strconv.ParseInt(df.GetHeader(DataFrameHeaderKEventBornTime), 10, 64); err == nil {
eventHeader.EventBornTime = ts
}
return eventHeader
}
type EventProcessStatusType string
var (
EventProcessStatusKSuccess EventProcessStatusType = "SUCCESS"
EventProcessStatusKLater EventProcessStatusType = "LATER"
)
type EventProcessResult struct {
Status EventProcessStatusType `json:"status"`
Message string `json:"message"`
}
func NewEventProcessResultSuccess() *EventProcessResult {
return &EventProcessResult{
Status: EventProcessStatusKSuccess,
Message: "success",
}
}
func NewEventProcessResultLater() *EventProcessResult {
return &EventProcessResult{
Status: EventProcessStatusKLater,
Message: "later",
}
}

26
event/event_handler.go Normal file
View File

@ -0,0 +1,26 @@
package event
import (
"context"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/logger"
)
/**
* @Author linya.jj
* @Date 2023/4/27 09:25
*/
type IEventHandler func(c context.Context, header *EventHeader, rawData []byte) (EventProcessStatusType, error)
func EventHandlerDoNothing(c context.Context, header *EventHeader, rawData []byte) (EventProcessStatusType, error) {
logger.GetLogger().Debugf("EventHandlerDoNothing header=[%s], rawData=[%s]",
header, rawData)
return EventProcessStatusKSuccess, nil
}
func EventHandlerSaveToRDS(c context.Context, header *EventHeader, rawData []byte) (EventProcessStatusType, error) {
// TODO save data to rds here
return EventProcessStatusKSuccess, nil
}

View File

@ -0,0 +1,24 @@
package event
import (
"context"
"github.com/stretchr/testify/assert"
"testing"
)
/**
* @Author linya.jj
* @Date 2023/4/27 09:25
*/
func TestEventHandlerDoNothing(t *testing.T) {
status, err := EventHandlerDoNothing(context.Background(), nil, []byte(""))
assert.Nil(t, err)
assert.Equal(t, EventProcessStatusKSuccess, status)
}
func TestEventHandlerSaveToRDS(t *testing.T) {
status, err := EventHandlerSaveToRDS(context.Background(), nil, []byte(""))
assert.Nil(t, err)
assert.Equal(t, EventProcessStatusKSuccess, status)
}

47
event/event_test.go Normal file
View File

@ -0,0 +1,47 @@
package event
import (
"github.com/stretchr/testify/assert"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/utils"
"testing"
)
/**
* @Author linya.jj
* @Date 2023/4/26 17:15
*/
func TestNewEventHeaderFromDataFrame(t *testing.T) {
assert.NotNil(t, NewEventHeaderFromDataFrame(nil))
df := &payload.DataFrame{
SpecVersion: "version",
Type: utils.SubscriptionTypeKEvent,
Time: 12345678,
Headers: payload.DataFrameHeader{
DataFrameHeaderKEventId: "eventId",
DataFrameHeaderKEventBornTime: "1234567890",
DataFrameHeaderKEventCorpId: "eventCorpId",
DataFrameHeaderKEventType: "eventType",
DataFrameHeaderKEventUnifiedAppId: "eventUnifiedAppId",
},
Data: "",
}
eh := NewEventHeaderFromDataFrame(df)
assert.NotNil(t, eh)
assert.Equal(t, "eventId", eh.EventId)
assert.Equal(t, int64(1234567890), eh.EventBornTime)
assert.Equal(t, "eventCorpId", eh.EventCorpId)
assert.Equal(t, "eventType", eh.EventType)
assert.Equal(t, "eventUnifiedAppId", eh.EventUnifiedAppId)
}
func TestNewEventProcessResultSuccess(t *testing.T) {
assert.NotNil(t, NewEventProcessResultSuccess())
}
func TestNewEventProcessResultLater(t *testing.T) {
assert.NotNil(t, NewEventProcessResultLater())
}

63
event/frame_handler.go Normal file
View File

@ -0,0 +1,63 @@
package event
import (
"context"
"encoding/json"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/logger"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
)
/**
* @Author linya.jj
* @Date 2023/4/26 17:15
*/
type DefaultEventFrameHandler struct {
defaultHandler IEventHandler
}
func NewDefaultEventFrameHandler(defaultHandler IEventHandler) *DefaultEventFrameHandler {
return &DefaultEventFrameHandler{
defaultHandler: defaultHandler,
}
}
func (h *DefaultEventFrameHandler) OnEventReceived(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) {
eventHeader := NewEventHeaderFromDataFrame(df)
if h.defaultHandler == nil {
logger.GetLogger().Warningf("No event handler found, drop this event. eventType=[%s], eventId=[%s], eventCorpId=[%s]",
eventHeader.EventType, eventHeader.EventId, eventHeader.EventCorpId)
return nil, nil
}
ret, err := h.defaultHandler(ctx, eventHeader, []byte(df.Data))
if err != nil {
logger.GetLogger().Errorf("Event handler process error. eventType=[%s], eventId=[%s], eventCorpId=[%s] err=[%s]",
eventHeader.EventType, eventHeader.EventId, eventHeader.EventCorpId, err)
ret = EventProcessStatusKLater
}
result := NewEventProcessResultSuccess()
code := payload.DataFrameResponseStatusCodeKOK
if ret != EventProcessStatusKSuccess {
code = payload.DataFrameResponseStatusCodeKInternalError
result = NewEventProcessResultLater()
}
resultStr, _ := json.Marshal(result)
frameResp := &payload.DataFrameResponse{
Code: code,
Headers: payload.DataFrameHeader{
payload.DataFrameHeaderKContentType: payload.DataFrameContentTypeKJson,
payload.DataFrameHeaderKMessageId: df.GetMessageId(),
},
Message: "ok",
Data: string(resultStr),
}
return frameResp, nil
}

View File

@ -0,0 +1,50 @@
package event
import (
"context"
"errors"
"github.com/stretchr/testify/assert"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
"testing"
)
/**
* @Author linya.jj
* @Date 2023/4/26 17:15
*/
func EventHandlerSuccess(c context.Context, header *EventHeader, rawData []byte) (EventProcessStatusType, error) {
return EventProcessStatusKSuccess, nil
}
func EventHandlerLater(c context.Context, header *EventHeader, rawData []byte) (EventProcessStatusType, error) {
return EventProcessStatusKLater, nil
}
func EventHandlerLaterError(c context.Context, header *EventHeader, rawData []byte) (EventProcessStatusType, error) {
return EventProcessStatusKLater, errors.New("error")
}
func TestDefaultEventFrameHandler_OnEventReceived(t *testing.T) {
defh := NewDefaultEventFrameHandler(nil)
ret, err := defh.OnEventReceived(context.Background(), nil)
assert.Nil(t, ret)
assert.Nil(t, err)
df := &payload.DataFrame{}
defh = NewDefaultEventFrameHandler(EventHandlerSuccess)
ret, err = defh.OnEventReceived(context.Background(), df)
assert.Equal(t, payload.DataFrameResponseStatusCodeKOK, ret.Code)
assert.Nil(t, err)
defh = NewDefaultEventFrameHandler(EventHandlerLater)
ret, err = defh.OnEventReceived(context.Background(), df)
assert.Equal(t, payload.DataFrameResponseStatusCodeKInternalError, ret.Code)
assert.Nil(t, err)
defh = NewDefaultEventFrameHandler(EventHandlerLaterError)
ret, err = defh.OnEventReceived(context.Background(), df)
assert.Equal(t, payload.DataFrameResponseStatusCodeKInternalError, ret.Code)
assert.Nil(t, err)
}

47
example/example_bot.go Normal file
View File

@ -0,0 +1,47 @@
package main
import (
"context"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/client"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/logger"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/utils"
)
/**
* @Author linya.jj
* @Date 2023/3/22 18:30
*/
func OnBotCallback(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) {
frameResp := &payload.DataFrameResponse{
Code: 200,
Headers: payload.DataFrameHeader{
payload.DataFrameHeaderKContentType: payload.DataFrameContentTypeKJson,
payload.DataFrameHeaderKMessageId: df.GetMessageId(),
},
Message: "ok",
Data: "",
}
return frameResp, nil
}
func RunBotListener() {
logger.SetLogger(logger.NewStdTestLogger())
cli := client.NewStreamClient(
client.WithAppCredential(client.NewAppCredentialConfig("your-client-id", "your-client-secret")),
client.WithUserAgent(client.NewDingtalkGoSDKUserAgent()),
client.WithSubscription(utils.SubscriptionTypeKCallback, payload.BotMessageCallbackTopic, OnBotCallback),
)
err := cli.Start(context.Background())
if err != nil {
panic(err)
}
defer cli.Close()
select {}
}

35
example/example_event.go Normal file
View File

@ -0,0 +1,35 @@
package main
import (
"context"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/client"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/event"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/logger"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/utils"
)
/**
* @Author linya.jj
* @Date 2023/3/22 18:30
*/
func RunEventListener() {
logger.SetLogger(logger.NewStdTestLogger())
eventHandler := event.NewDefaultEventFrameHandler(event.EventHandlerDoNothing)
cli := client.NewStreamClient(
client.WithAppCredential(client.NewAppCredentialConfig("your-client-id", "your-client-secret")),
client.WithUserAgent(client.NewDingtalkGoSDKUserAgent()),
client.WithSubscription(utils.SubscriptionTypeKEvent, "*", eventHandler.OnEventReceived),
)
err := cli.Start(context.Background())
if err != nil {
panic(err)
}
defer cli.Close()
select {}
}

12
example/example_main.go Normal file
View File

@ -0,0 +1,12 @@
package main
/**
* @Author linya.jj
* @Date 2023/3/22 18:30
*/
func main() {
RunBotListener()
select {}
}

15
go.mod Normal file
View File

@ -0,0 +1,15 @@
module github.com/open-dingtalk/dingtalk-stream-sdk-go
go 1.18
require (
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.5.0
github.com/stretchr/testify v1.8.2
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

21
go.sum Normal file
View File

@ -0,0 +1,21 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

13
handler/ihandler.go Normal file
View File

@ -0,0 +1,13 @@
package handler
import (
"context"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
)
/**
* @Author linya.jj
* @Date 2023/3/22 14:27
*/
type IFrameHandler func(c context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error)

13
handler/ihandler_test.go Normal file
View File

@ -0,0 +1,13 @@
package handler
import (
"testing"
)
/**
* @Author linya.jj
* @Date 2023/3/22 14:27
*/
func TestOnEventReceived(t *testing.T) {
}

56
logger/logger.go Normal file
View File

@ -0,0 +1,56 @@
package logger
/**
* @Author linya.jj
* @Date 2023/3/22 14:30
*/
type ILogger interface {
Debugf(format string, args ...interface{})
Infof(format string, args ...interface{})
Warningf(format string, args ...interface{})
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
}
var (
sdkLogger ILogger
)
func SetLogger(customLogger ILogger) {
sdkLogger = customLogger
}
func GetLogger() ILogger {
if sdkLogger == nil {
sdkLogger = &doNothingLogger{}
}
return sdkLogger
}
type doNothingLogger struct {
}
func (l *doNothingLogger) Debugf(format string, args ...interface{}) {
}
func (l *doNothingLogger) Infof(format string, args ...interface{}) {
}
func (l *doNothingLogger) Warningf(format string, args ...interface{}) {
}
func (l *doNothingLogger) Errorf(format string, args ...interface{}) {
}
func (l *doNothingLogger) Fatalf(format string, args ...interface{}) {
}

19
logger/logger_test.go Normal file
View File

@ -0,0 +1,19 @@
package logger
import (
"github.com/stretchr/testify/assert"
"testing"
)
/**
* @Author linya.jj
* @Date 2023/3/22 14:30
*/
func TestSetGetSDKLogger(t *testing.T) {
assert.NotNil(t, GetLogger())
stdLogger := NewStdTestLogger()
SetLogger(stdLogger)
assert.Equal(t, stdLogger, GetLogger())
}

49
logger/std_logger.go Normal file
View File

@ -0,0 +1,49 @@
package logger
import (
"fmt"
"time"
)
/**
* @Author linya.jj
* @Date 2023/3/22 14:32
*/
// This logger is only for debug. Do not use it online.
type StdTestLogger struct {
}
func NewStdTestLogger() *StdTestLogger {
return &StdTestLogger{}
}
func (l *StdTestLogger) Debugf(format string, args ...interface{}) {
fmt.Printf("%s [Debug] ", time.Now().String())
fmt.Printf(format, args...)
fmt.Print("\n")
}
func (l *StdTestLogger) Infof(format string, args ...interface{}) {
fmt.Printf("%s [INFO] ", time.Now().String())
fmt.Printf(format, args...)
fmt.Print("\n")
}
func (l *StdTestLogger) Warningf(format string, args ...interface{}) {
fmt.Printf("%s [WARNING] ", time.Now().String())
fmt.Printf(format, args...)
fmt.Print("\n")
}
func (l *StdTestLogger) Errorf(format string, args ...interface{}) {
fmt.Printf("%s [ERROR] ", time.Now().String())
fmt.Printf(format, args...)
fmt.Print("\n")
}
func (l *StdTestLogger) Fatalf(format string, args ...interface{}) {
fmt.Printf("%s [FATAL] ", time.Now().String())
fmt.Printf(format, args...)
fmt.Print("\n")
}

20
logger/std_logger_test.go Normal file
View File

@ -0,0 +1,20 @@
package logger
import (
"testing"
)
/**
* @Author linya.jj
* @Date 2023/3/22 14:32
*/
func TestStdLogger_Output(t *testing.T) {
stdLogger := NewStdTestLogger()
stdLogger.Debugf("logger level: %s", "debug")
stdLogger.Infof("logger level: %s", "info")
stdLogger.Warningf("logger level: %s", "warning")
stdLogger.Errorf("logger level: %s", "error")
stdLogger.Fatalf("logger level: %s", "fatal")
}

39
payload/connection.go Normal file
View File

@ -0,0 +1,39 @@
package payload
import "errors"
/**
* @Author linya.jj
* @Date 2023/3/22 18:22
*/
type SubscriptionModel struct {
Type string `json:"type"`
Topic string `json:"topic"`
}
// 长连接接入点请求
type ConnectionEndpointRequest struct {
ClientId string `json:"clientId"` //自建应用appKey; 三方应用suiteKey
ClientSecret string `json:"clientSecret"` //自建应用appSecret; 三方应用suiteSecret
Subscriptions []*SubscriptionModel `json:"subscriptions"`
UserAgent string `json:"ua"`
}
// 长连接接入点参数
type ConnectionEndpointResponse struct {
Endpoint string `json:"endpoint"`
Ticket string `json:"ticket"`
}
func (r *ConnectionEndpointResponse) Valid() error {
if r == nil {
return errors.New("ConnectionEndpointResponseNil")
}
if r.Endpoint == "" || r.Ticket == "" {
return errors.New("ConnectionEndpointResponseContentEmpty")
}
return nil
}

View File

@ -0,0 +1,26 @@
package payload
import (
"github.com/stretchr/testify/assert"
"testing"
)
/**
* @Author linya.jj
* @Date 2023/3/22 18:22
*/
func TestConnectionEndpointResponse_Valid(t *testing.T) {
resp := &ConnectionEndpointResponse{
Endpoint: "ep",
Ticket: "ti",
}
assert.Nil(t, resp.Valid())
resp.Endpoint = ""
assert.NotNil(t, resp.Valid())
resp = nil
assert.NotNil(t, resp.Valid())
}

141
payload/data_frame.go Normal file
View File

@ -0,0 +1,141 @@
package payload
import (
"encoding/json"
"strconv"
)
/**
* @Author linya.jj
* @Date 2023/3/31 09:57
*/
type DataFrameHeader map[string]string
func (h DataFrameHeader) Get(key string) string {
return h[key]
}
func (h DataFrameHeader) Set(key, value string) {
h[key] = value
}
type DataFrame struct {
SpecVersion string `json:"specVersion"`
Type string `json:"type"`
Time int64 `json:"time"`
Headers DataFrameHeader `json:"headers""`
Data string `json:"data"`
}
func (df *DataFrame) Encode() []byte {
if df == nil {
return nil
}
data, _ := json.Marshal(df)
return data
}
func (df *DataFrame) GetTopic() string {
if df == nil {
return ""
}
return df.Headers.Get(DataFrameHeaderKTopic)
}
func (df *DataFrame) GetMessageId() string {
if df == nil {
return ""
}
return df.Headers.Get(DataFrameHeaderKMessageId)
}
func (df *DataFrame) GetTimestamp() int64 {
if df == nil {
return 0
}
strTs := df.Headers.Get(DataFrameHeaderKTime)
ts, err := strconv.ParseInt(strTs, 10, 64)
if err != nil {
return 0
}
return ts
}
func (df *DataFrame) GetHeader(header string) string {
if df == nil {
return ""
}
return df.Headers.Get(header)
}
func DecodeDataFrame(rawData []byte) (*DataFrame, error) {
df := &DataFrame{}
err := json.Unmarshal(rawData, df)
if err != nil {
return nil, err
}
return df, nil
}
type DataFrameResponse struct {
Code int `json:"code"`
Headers DataFrameHeader `json:"headers"`
Message string `json:"message"`
Data string `json:"data"`
}
func (df *DataFrameResponse) Encode() []byte {
if df == nil {
return nil
}
data, _ := json.Marshal(df)
return data
}
func DecodeDataFrameResponse(rawData []byte) (*DataFrameResponse, error) {
resp := &DataFrameResponse{}
err := json.Unmarshal(rawData, resp)
if err != nil {
return nil, err
}
return resp, nil
}
func NewDataFrameAckPong(messageId string) *DataFrameResponse {
return &DataFrameResponse{
Code: DataFrameResponseStatusCodeKOK,
Headers: DataFrameHeader{
DataFrameHeaderKContentType: DataFrameContentTypeKJson,
DataFrameHeaderKMessageId: messageId,
},
Message: "ok",
Data: "", //TODO data内容暂留空
}
}
func NewErrorDataFrameResponse(messageId string, err error) *DataFrameResponse {
if err == nil {
return nil
}
return &DataFrameResponse{
Code: 400, //TODO errorcode 细化
Headers: DataFrameHeader{
DataFrameHeaderKContentType: DataFrameContentTypeKJson,
DataFrameHeaderKMessageId: messageId,
},
Message: err.Error(),
Data: "", //TODO data内容暂留空
}
}

View File

@ -0,0 +1,77 @@
package payload
import (
"errors"
"github.com/stretchr/testify/assert"
"testing"
)
/**
* @Author linya.jj
* @Date 2023/3/31 09:57
*/
func TestDataFrameHeader_GetSet(t *testing.T) {
h := &DataFrameHeader{}
h.Set("k1", "v1")
h.Set("k2", "v2")
assert.Equal(t, "v1", h.Get("k1"))
assert.Equal(t, "v2", h.Get("k2"))
assert.Equal(t, "", h.Get("k_notexist"))
h.Set("k1", "v11")
assert.Equal(t, "v11", h.Get("k1"))
}
func TestDataFrame_EncodeDecode(t *testing.T) {
df := &DataFrame{
SpecVersion: "sv",
Type: "t",
Time: 123456,
Headers: DataFrameHeader{"k1": "v1"},
Data: "data",
}
ss := df.Encode()
assert.NotEqual(t, "", string(ss))
df0, err := DecodeDataFrame(ss)
assert.Nil(t, err)
assert.EqualValues(t, df, df0)
ss = []byte(`{"time":"time"}`)
_, err = DecodeDataFrame([]byte(ss))
assert.NotNil(t, err)
}
func TestDataFrameResponse_EncodeDecode(t *testing.T) {
resp := &DataFrameResponse{
Code: 200,
Headers: DataFrameHeader{"k1": "v1"},
Message: "msg",
Data: "data",
}
ss := resp.Encode()
assert.NotEqual(t, "", string(ss))
resp0, err := DecodeDataFrameResponse(ss)
assert.Nil(t, err)
assert.EqualValues(t, resp, resp0)
ss = []byte(`{"code":"code"}`)
_, err = DecodeDataFrameResponse(ss)
assert.NotNil(t, err)
}
func TestNewDataFrameAckPong(t *testing.T) {
pong := NewDataFrameAckPong("messageId")
assert.NotNil(t, pong)
}
func TestNewErrorDataFrameResponse(t *testing.T) {
errResp := NewErrorDataFrameResponse("messageId", errors.New("error"))
assert.NotNil(t, errResp)
assert.Equal(t, "error", errResp.Message)
}

30
payload/utils.go Normal file
View File

@ -0,0 +1,30 @@
package payload
import (
"fmt"
"github.com/google/uuid"
)
/**
* @Author linya.jj
* @Date 2023/4/7 15:13
*/
const (
DataFrameHeaderKTopic = "topic"
DataFrameHeaderKContentType = "contentType"
DataFrameHeaderKMessageId = "messageId"
DataFrameHeaderKTime = "time"
DataFrameContentTypeKJson = "application/json"
DataFrameContentTypeKBase64 = "base64String"
DataFrameResponseStatusCodeKOK = 200
DataFrameResponseStatusCodeKInternalError = 500
BotMessageCallbackTopic = "bot_got_msg" //机器人消息统一回调topic
)
func GenerateMessageId(prefix string) string {
return fmt.Sprintf("%s-%s", prefix, uuid.New().String())
}

17
payload/utils_test.go Normal file
View File

@ -0,0 +1,17 @@
package payload
import (
"github.com/stretchr/testify/assert"
"strings"
"testing"
)
/**
* @Author linya.jj
* @Date 2023/4/7 15:13
*/
func TestGenerateMessageId(t *testing.T) {
assert.NotEqual(t, "", GenerateMessageId("prefix-"))
assert.True(t, strings.HasPrefix(GenerateMessageId("prefix-"), "prefix-"))
}

28
utils/errors.go Normal file
View File

@ -0,0 +1,28 @@
package utils
import (
"errors"
"io"
"net/http"
)
/**
* @Author linya.jj
* @Date 2023/3/31 09:51
*/
// 把http response的内容转换成error对象
func ErrorFromHttpResponseBody(resp *http.Response) error {
if resp == nil {
return errors.New("HttpResponseNil")
}
defer resp.Body.Close()
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return errors.New(string(responseBody))
}

23
utils/errors_test.go Normal file
View File

@ -0,0 +1,23 @@
package utils
import (
"github.com/stretchr/testify/assert"
"io"
"net/http"
"strings"
"testing"
)
/**
* @Author linya.jj
* @Date 2023/3/31 09:51
*/
func TestErrorFromHttpResponse(t *testing.T) {
assert.NotNil(t, ErrorFromHttpResponseBody(nil))
resp := &http.Response{
Body: io.NopCloser(strings.NewReader("error")),
}
assert.NotNil(t, ErrorFromHttpResponseBody(resp))
}

24
utils/utils.go Normal file
View File

@ -0,0 +1,24 @@
package utils
/**
* @Author linya.jj
* @Date 2023/3/22 15:23
*/
const (
GetConnectionEndpointAPIUrl = "https://api.dingtalk.com/v1.0/gateway/connections/open"
)
const (
SubscriptionTypeKSystem = "SYSTEM" //系统请求
SubscriptionTypeKEvent = "EVENT" //事件
SubscriptionTypeKCallback = "CALLBACK" //回调
)
var (
SubscriptionTypeSet = map[string]bool{
SubscriptionTypeKSystem: true,
SubscriptionTypeKEvent: true,
SubscriptionTypeKCallback: true,
}
)