From f47c5f261773256466588f22ec4e75b0fce61b68 Mon Sep 17 00:00:00 2001 From: jiangjian Date: Thu, 25 May 2023 17:32:05 +0800 Subject: [PATCH] =?UTF-8?q?feat=20#1=20=E5=A2=9E=E5=8A=A0chatbot=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E7=9A=84=E5=A4=84=E7=90=86=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- chatbot/chatbot_handler.go | 30 ++++++------- chatbot/chatbot_replier.go | 73 ++++++++++++++++++++++++++++++++ client/client.go | 38 +++++++++++++---- example/example.go | 87 +++++++------------------------------- payload/data_frame.go | 64 ++++++++++++++++++++++++---- payload/data_frame_test.go | 2 +- payload/utils.go | 5 ++- 7 files changed, 190 insertions(+), 109 deletions(-) create mode 100644 chatbot/chatbot_replier.go diff --git a/chatbot/chatbot_handler.go b/chatbot/chatbot_handler.go index bf671a9..b3be28f 100644 --- a/chatbot/chatbot_handler.go +++ b/chatbot/chatbot_handler.go @@ -11,41 +11,35 @@ import ( * @Date 2023/3/22 18:30 */ -type IMessageHandler func(c context.Context, data *BotCallbackDataModel) error +type IChatBotMessageHandler func(c context.Context, data *BotCallbackDataModel) ([]byte, error) type DefaultChatBotFrameHandler struct { - defaultHandler IMessageHandler + defaultHandler IChatBotMessageHandler } -func NewDefaultChatBotFrameHandler(defaultHandler IMessageHandler) *DefaultChatBotFrameHandler { +func NewDefaultChatBotFrameHandler(defaultHandler IChatBotMessageHandler) *DefaultChatBotFrameHandler { return &DefaultChatBotFrameHandler{ defaultHandler: defaultHandler, } } func (h *DefaultChatBotFrameHandler) OnEventReceived(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) { - frameResp := &payload.DataFrameResponse{ - Code: 200, - Headers: payload.DataFrameHeader{ - payload.DataFrameHeaderKContentType: payload.DataFrameContentTypeKJson, - payload.DataFrameHeaderKMessageId: df.GetMessageId(), - }, - Message: "ok", - Data: "", - } - msgData := &BotCallbackDataModel{} err := json.Unmarshal([]byte(df.Data), msgData) if err != nil { return nil, err } - if h.defaultHandler != nil { - err = h.defaultHandler(ctx, msgData) - if err != nil { - return nil, err - } + if h.defaultHandler == nil { + return payload.NewDataFrameResponse(payload.DataFrameResponseStatusCodeKHandlerNotFound), nil } + data, err := h.defaultHandler(ctx, msgData) + if err != nil { + return nil, err + } + + frameResp := payload.NewSuccessDataFrameResponse() + frameResp.SetData(string(data)) return frameResp, nil } diff --git a/chatbot/chatbot_replier.go b/chatbot/chatbot_replier.go new file mode 100644 index 0000000..a3ad0d4 --- /dev/null +++ b/chatbot/chatbot_replier.go @@ -0,0 +1,73 @@ +package chatbot + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +type ChatbotReplier struct { +} + +func NewChatbotReplier() *ChatbotReplier { + return &ChatbotReplier{} +} + +func (r *ChatbotReplier) SimpleReplyText(ctx context.Context, sessionWebhook string, content []byte) error { + requestBody := map[string]interface{}{ + "msgtype": "text", + "text": map[string]interface{}{ + "content": string(content), + }, + } + return r.ReplyMessage(ctx, sessionWebhook, requestBody) +} + +func (r *ChatbotReplier) SimpleReplyMarkdown(ctx context.Context, sessionWebhook string, title, content []byte) error { + requestBody := map[string]interface{}{ + "msgtype": "markdown", + "markdown": map[string]interface{}{ + "title": string(title), + "text": string(content), + }, + } + return r.ReplyMessage(ctx, sessionWebhook, requestBody) +} + +func (r *ChatbotReplier) ReplyMessage(ctx context.Context, sessionWebhook string, requestBody map[string]interface{}) error { + requestJsonBody, _ := json.Marshal(requestBody) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, sessionWebhook, bytes.NewReader(requestJsonBody)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "*/*") + + httpClient := &http.Client{ + Transport: http.DefaultTransport, + Timeout: 5 * time.Second, + } + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != 200 { + defer resp.Body.Close() + + responseJsonBody, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + return fmt.Errorf(string(responseJsonBody)) + } + + return nil +} diff --git a/client/client.go b/client/client.go index 86251dc..d3f93e0 100644 --- a/client/client.go +++ b/client/client.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "github.com/gorilla/websocket" + "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" "github.com/open-dingtalk/dingtalk-stream-sdk-go/handler" "github.com/open-dingtalk/dingtalk-stream-sdk-go/logger" "github.com/open-dingtalk/dingtalk-stream-sdk-go/payload" @@ -146,20 +147,29 @@ func (cli *StreamClient) processDataFrame(rawData []byte) { return } + var dataAck *payload.DataFrameResponse frameHandler, err := cli.GetHandler(dataFrame.Type, dataFrame.GetTopic()) - if err != nil { - logger.GetLogger().Errorf("connection processDataFrame unregistered handler: type=[%s] topic=[%s]", dataFrame.Type, dataFrame.GetTopic()) - return - } + if err != nil || frameHandler == nil { + // 没有注册handler,返回404 + dataAck = payload.NewDataFrameResponse(payload.DataFrameResponseStatusCodeKHandlerNotFound) + } else { + dataAck, err = frameHandler(context.Background(), dataFrame) - dataAck, err := frameHandler(context.Background(), dataFrame) - - if dataAck == nil && err != nil { - dataAck = payload.NewErrorDataFrameResponse(dataFrame.GetMessageId(), err) + if err != nil && dataAck == nil { + dataAck = payload.NewErrorDataFrameResponse(err) + } } if dataAck == nil { - return + dataAck = payload.NewSuccessDataFrameResponse() + } + + if dataAck.GetHeader(payload.DataFrameHeaderKMessageId) == "" { + dataAck.SetHeader(payload.DataFrameHeaderKMessageId, dataFrame.GetMessageId()) + } + + if dataAck.GetHeader(payload.DataFrameHeaderKContentType) == "" { + dataAck.SetHeader(payload.DataFrameHeaderKContentType, payload.DataFrameContentTypeKJson) } errSend := cli.SendDataFrameResponse(context.Background(), dataAck) @@ -362,7 +372,17 @@ func (cli *StreamClient) RegisterCallbackRouter(topic string, frameHandler handl cli.RegisterRouter(utils.SubscriptionTypeKCallback, topic, frameHandler) } +// 聊天机器人的注册函数 +func (cli *StreamClient) RegisterChatBotCallbackRouter(messageHandler chatbot.IChatBotMessageHandler) { + cli.RegisterRouter(utils.SubscriptionTypeKCallback, payload.BotMessageCallbackTopic, chatbot.NewDefaultChatBotFrameHandler(messageHandler).OnEventReceived) +} + // 事件类型的注册函数 func (cli *StreamClient) RegisterEventRouter(topic string, frameHandler handler.IFrameHandler) { cli.RegisterRouter(utils.SubscriptionTypeKEvent, topic, frameHandler) } + +// 事件类型的注册函数 +func (cli *StreamClient) RegisterAllEventRouter(frameHandler handler.IFrameHandler) { + cli.RegisterRouter(utils.SubscriptionTypeKEvent, "*", frameHandler) +} diff --git a/example/example.go b/example/example.go index f6e0ea3..437aa27 100644 --- a/example/example.go +++ b/example/example.go @@ -1,9 +1,7 @@ package main import ( - "bytes" "context" - "encoding/json" "flag" "fmt" "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" @@ -11,8 +9,6 @@ import ( "github.com/open-dingtalk/dingtalk-stream-sdk-go/event" "github.com/open-dingtalk/dingtalk-stream-sdk-go/logger" "github.com/open-dingtalk/dingtalk-stream-sdk-go/payload" - "net/http" - "time" ) /** @@ -20,59 +16,19 @@ import ( * @Date 2023/3/22 18:30 */ -func OnChatBotMessageReceived(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) { - frameResp := &payload.DataFrameResponse{ - Code: 200, - Headers: payload.DataFrameHeader{ - payload.DataFrameHeaderKContentType: payload.DataFrameContentTypeKJson, - payload.DataFrameHeaderKMessageId: df.GetMessageId(), - }, - Message: "ok", - Data: "", - } +// 简单的应答机器人实现 +func OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { + replyMsg := []byte(fmt.Sprintf("msg received: [%s]", data.Text.Content)) - // 反序列化机器人回调消息 - msgData := &chatbot.BotCallbackDataModel{} - err := json.Unmarshal([]byte(df.Data), msgData) - if err != nil { - // TODO 处理错误:回调消息反序列化出错 - return frameResp, nil - } + chatbotReplier := chatbot.NewChatbotReplier() + chatbotReplier.SimpleReplyText(ctx, data.SessionWebhook, replyMsg) + chatbotReplier.SimpleReplyMarkdown(ctx, data.SessionWebhook, []byte("Markdown消息"), replyMsg) - //处理方式:回复文本消息,通过SessionWebhook来回复消息 - requestBody := map[string]interface{}{ - "msgtype": "text", - "text": map[string]interface{}{ - "content": fmt.Sprintf("msg received: [%s]", msgData.Text.Content), - }, - } - - requestJsonBody, _ := json.Marshal(requestBody) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, msgData.SessionWebhook, bytes.NewReader(requestJsonBody)) - if err != nil { - // TODO 处理错误 - return frameResp, nil - } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Accept", "*/*") - - httpClient := &http.Client{ - Transport: http.DefaultTransport, - Timeout: 5 * time.Second, //设置超时,包含connection时间、任意重定向时间、读取response body时间 - } - - _, err = httpClient.Do(req) - if err != nil { - // TODO 处理错误: 回复消息出错 - return frameResp, nil - } - - return frameResp, nil + return []byte(""), nil } -// 事件处理函数 -func OnEventReceived(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) { +// 事件处理 +func OnEventReceived(ctx context.Context, df *payload.DataFrame) (frameResp *payload.DataFrameResponse, err error) { eventHeader := event.NewEventHeaderFromDataFrame(df) logger.GetLogger().Infof("received event, eventId=[%s] eventBornTime=[%d] eventCorpId=[%s] eventType=[%s] eventUnifiedAppId=[%s] data=[%s]", @@ -83,21 +39,13 @@ func OnEventReceived(ctx context.Context, df *payload.DataFrame) (*payload.DataF eventHeader.EventUnifiedAppId, df.Data) - resultStr, _ := json.Marshal(event.NewEventProcessResultSuccess()) + frameResp = payload.NewSuccessDataFrameResponse() + frameResp.SetJson(event.NewEventProcessResultSuccess()) - frameResp := &payload.DataFrameResponse{ - Code: payload.DataFrameResponseStatusCodeKOK, - Headers: payload.DataFrameHeader{ - payload.DataFrameHeaderKContentType: payload.DataFrameContentTypeKJson, - payload.DataFrameHeaderKMessageId: df.GetMessageId(), - }, - Message: "ok", - Data: string(resultStr), - } - - return frameResp, nil + return } +// go run example/*.go --client_id your-client-id --client_secret your-client-secret func main() { var clientId, clientSecret string flag.StringVar(&clientId, "client_id", "", "your-client-id") @@ -107,15 +55,12 @@ func main() { logger.SetLogger(logger.NewStdTestLogger()) - cli := client.NewStreamClient( - client.WithAppCredential(client.NewAppCredentialConfig(clientId, clientSecret)), - client.WithUserAgent(client.NewDingtalkGoSDKUserAgent()), - ) + cli := client.NewStreamClient(client.WithAppCredential(client.NewAppCredentialConfig(clientId, clientSecret))) //注册事件类型的处理函数 - cli.RegisterEventRouter("*", OnEventReceived) + cli.RegisterAllEventRouter(OnEventReceived) //注册callback类型的处理函数 - cli.RegisterCallbackRouter(payload.BotMessageCallbackTopic, OnChatBotMessageReceived) + cli.RegisterChatBotCallbackRouter(OnChatBotMessageReceived) err := cli.Start(context.Background()) if err != nil { diff --git a/payload/data_frame.go b/payload/data_frame.go index 0230caf..e0ff040 100644 --- a/payload/data_frame.go +++ b/payload/data_frame.go @@ -92,6 +92,57 @@ type DataFrameResponse struct { Data string `json:"data"` } +func NewDataFrameResponse(code int) *DataFrameResponse { + return &DataFrameResponse{ + Code: code, + Headers: DataFrameHeader{}, + Message: "", + Data: "", + } +} + +func NewSuccessDataFrameResponse() *DataFrameResponse { + return NewDataFrameResponse(DataFrameResponseStatusCodeKOK) +} + +func (r *DataFrameResponse) SetHeader(key, value string) { + if r == nil { + return + } + + r.Headers.Set(key, value) +} + +func (r *DataFrameResponse) GetHeader(key string) string { + if r == nil { + return "" + } + + return r.Headers.Get(key) +} + +func (r *DataFrameResponse) SetData(data string) { + if r == nil { + return + } + + r.Data = data +} + +func (r *DataFrameResponse) SetJson(dataModel interface{}) error { + if r == nil { + return nil + } + + data, err := json.Marshal(dataModel) + if err != nil { + return err + } + + r.Data = string(data) + return nil +} + func (df *DataFrameResponse) Encode() []byte { if df == nil { return nil @@ -120,22 +171,19 @@ func NewDataFrameAckPong(messageId string) *DataFrameResponse { DataFrameHeaderKMessageId: messageId, }, Message: "ok", - Data: "", //TODO data内容暂留空 + Data: "", } } -func NewErrorDataFrameResponse(messageId string, err error) *DataFrameResponse { +func NewErrorDataFrameResponse(err error) *DataFrameResponse { if err == nil { return nil } return &DataFrameResponse{ - Code: 400, //TODO errorcode 细化 - Headers: DataFrameHeader{ - DataFrameHeaderKContentType: DataFrameContentTypeKJson, - DataFrameHeaderKMessageId: messageId, - }, + Code: DataFrameResponseStatusCodeKInternalError, + Headers: DataFrameHeader{}, Message: err.Error(), - Data: "", //TODO data内容暂留空 + Data: "", } } diff --git a/payload/data_frame_test.go b/payload/data_frame_test.go index 1bb4208..7066200 100644 --- a/payload/data_frame_test.go +++ b/payload/data_frame_test.go @@ -71,7 +71,7 @@ func TestNewDataFrameAckPong(t *testing.T) { } func TestNewErrorDataFrameResponse(t *testing.T) { - errResp := NewErrorDataFrameResponse("messageId", errors.New("error")) + errResp := NewErrorDataFrameResponse(errors.New("error")) assert.NotNil(t, errResp) assert.Equal(t, "error", errResp.Message) } diff --git a/payload/utils.go b/payload/utils.go index 11a43e4..8a48fc2 100644 --- a/payload/utils.go +++ b/payload/utils.go @@ -19,8 +19,9 @@ const ( DataFrameContentTypeKJson = "application/json" DataFrameContentTypeKBase64 = "base64String" - DataFrameResponseStatusCodeKOK = 200 - DataFrameResponseStatusCodeKInternalError = 500 + DataFrameResponseStatusCodeKOK = 200 + DataFrameResponseStatusCodeKInternalError = 500 + DataFrameResponseStatusCodeKHandlerNotFound = 404 BotMessageCallbackTopic = "/v1.0/im/bot/messages/get" //机器人消息统一回调topic )