feat #1 增加chatbot相关的处理函数

This commit is contained in:
jiangjian 2023-05-25 17:32:05 +08:00
parent 26dea572e8
commit f47c5f2617
7 changed files with 190 additions and 109 deletions

View File

@ -11,41 +11,35 @@ import (
* @Date 2023/3/22 18:30
*/
type IMessageHandler func(c context.Context, data *BotCallbackDataModel) error
type IChatBotMessageHandler func(c context.Context, data *BotCallbackDataModel) ([]byte, error)
type DefaultChatBotFrameHandler struct {
defaultHandler IMessageHandler
defaultHandler IChatBotMessageHandler
}
func NewDefaultChatBotFrameHandler(defaultHandler IMessageHandler) *DefaultChatBotFrameHandler {
func NewDefaultChatBotFrameHandler(defaultHandler IChatBotMessageHandler) *DefaultChatBotFrameHandler {
return &DefaultChatBotFrameHandler{
defaultHandler: defaultHandler,
}
}
func (h *DefaultChatBotFrameHandler) OnEventReceived(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) {
frameResp := &payload.DataFrameResponse{
Code: 200,
Headers: payload.DataFrameHeader{
payload.DataFrameHeaderKContentType: payload.DataFrameContentTypeKJson,
payload.DataFrameHeaderKMessageId: df.GetMessageId(),
},
Message: "ok",
Data: "",
}
msgData := &BotCallbackDataModel{}
err := json.Unmarshal([]byte(df.Data), msgData)
if err != nil {
return nil, err
}
if h.defaultHandler != nil {
err = h.defaultHandler(ctx, msgData)
if h.defaultHandler == nil {
return payload.NewDataFrameResponse(payload.DataFrameResponseStatusCodeKHandlerNotFound), nil
}
data, err := h.defaultHandler(ctx, msgData)
if err != nil {
return nil, err
}
}
frameResp := payload.NewSuccessDataFrameResponse()
frameResp.SetData(string(data))
return frameResp, nil
}

View File

@ -0,0 +1,73 @@
package chatbot
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
type ChatbotReplier struct {
}
func NewChatbotReplier() *ChatbotReplier {
return &ChatbotReplier{}
}
func (r *ChatbotReplier) SimpleReplyText(ctx context.Context, sessionWebhook string, content []byte) error {
requestBody := map[string]interface{}{
"msgtype": "text",
"text": map[string]interface{}{
"content": string(content),
},
}
return r.ReplyMessage(ctx, sessionWebhook, requestBody)
}
func (r *ChatbotReplier) SimpleReplyMarkdown(ctx context.Context, sessionWebhook string, title, content []byte) error {
requestBody := map[string]interface{}{
"msgtype": "markdown",
"markdown": map[string]interface{}{
"title": string(title),
"text": string(content),
},
}
return r.ReplyMessage(ctx, sessionWebhook, requestBody)
}
func (r *ChatbotReplier) ReplyMessage(ctx context.Context, sessionWebhook string, requestBody map[string]interface{}) error {
requestJsonBody, _ := json.Marshal(requestBody)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, sessionWebhook, bytes.NewReader(requestJsonBody))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "*/*")
httpClient := &http.Client{
Transport: http.DefaultTransport,
Timeout: 5 * time.Second,
}
resp, err := httpClient.Do(req)
if err != nil {
return err
}
if resp.StatusCode != 200 {
defer resp.Body.Close()
responseJsonBody, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return fmt.Errorf(string(responseJsonBody))
}
return nil
}

View File

@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/handler"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/logger"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
@ -146,20 +147,29 @@ func (cli *StreamClient) processDataFrame(rawData []byte) {
return
}
var dataAck *payload.DataFrameResponse
frameHandler, err := cli.GetHandler(dataFrame.Type, dataFrame.GetTopic())
if err != nil {
logger.GetLogger().Errorf("connection processDataFrame unregistered handler: type=[%s] topic=[%s]", dataFrame.Type, dataFrame.GetTopic())
return
if err != nil || frameHandler == nil {
// 没有注册handler返回404
dataAck = payload.NewDataFrameResponse(payload.DataFrameResponseStatusCodeKHandlerNotFound)
} else {
dataAck, err = frameHandler(context.Background(), dataFrame)
if err != nil && dataAck == nil {
dataAck = payload.NewErrorDataFrameResponse(err)
}
dataAck, err := frameHandler(context.Background(), dataFrame)
if dataAck == nil && err != nil {
dataAck = payload.NewErrorDataFrameResponse(dataFrame.GetMessageId(), err)
}
if dataAck == nil {
return
dataAck = payload.NewSuccessDataFrameResponse()
}
if dataAck.GetHeader(payload.DataFrameHeaderKMessageId) == "" {
dataAck.SetHeader(payload.DataFrameHeaderKMessageId, dataFrame.GetMessageId())
}
if dataAck.GetHeader(payload.DataFrameHeaderKContentType) == "" {
dataAck.SetHeader(payload.DataFrameHeaderKContentType, payload.DataFrameContentTypeKJson)
}
errSend := cli.SendDataFrameResponse(context.Background(), dataAck)
@ -362,7 +372,17 @@ func (cli *StreamClient) RegisterCallbackRouter(topic string, frameHandler handl
cli.RegisterRouter(utils.SubscriptionTypeKCallback, topic, frameHandler)
}
// 聊天机器人的注册函数
func (cli *StreamClient) RegisterChatBotCallbackRouter(messageHandler chatbot.IChatBotMessageHandler) {
cli.RegisterRouter(utils.SubscriptionTypeKCallback, payload.BotMessageCallbackTopic, chatbot.NewDefaultChatBotFrameHandler(messageHandler).OnEventReceived)
}
// 事件类型的注册函数
func (cli *StreamClient) RegisterEventRouter(topic string, frameHandler handler.IFrameHandler) {
cli.RegisterRouter(utils.SubscriptionTypeKEvent, topic, frameHandler)
}
// 事件类型的注册函数
func (cli *StreamClient) RegisterAllEventRouter(frameHandler handler.IFrameHandler) {
cli.RegisterRouter(utils.SubscriptionTypeKEvent, "*", frameHandler)
}

View File

@ -1,9 +1,7 @@
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot"
@ -11,8 +9,6 @@ import (
"github.com/open-dingtalk/dingtalk-stream-sdk-go/event"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/logger"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
"net/http"
"time"
)
/**
@ -20,59 +16,19 @@ import (
* @Date 2023/3/22 18:30
*/
func OnChatBotMessageReceived(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) {
frameResp := &payload.DataFrameResponse{
Code: 200,
Headers: payload.DataFrameHeader{
payload.DataFrameHeaderKContentType: payload.DataFrameContentTypeKJson,
payload.DataFrameHeaderKMessageId: df.GetMessageId(),
},
Message: "ok",
Data: "",
// 简单的应答机器人实现
func OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) {
replyMsg := []byte(fmt.Sprintf("msg received: [%s]", data.Text.Content))
chatbotReplier := chatbot.NewChatbotReplier()
chatbotReplier.SimpleReplyText(ctx, data.SessionWebhook, replyMsg)
chatbotReplier.SimpleReplyMarkdown(ctx, data.SessionWebhook, []byte("Markdown消息"), replyMsg)
return []byte(""), nil
}
// 反序列化机器人回调消息
msgData := &chatbot.BotCallbackDataModel{}
err := json.Unmarshal([]byte(df.Data), msgData)
if err != nil {
// TODO 处理错误:回调消息反序列化出错
return frameResp, nil
}
//处理方式回复文本消息通过SessionWebhook来回复消息
requestBody := map[string]interface{}{
"msgtype": "text",
"text": map[string]interface{}{
"content": fmt.Sprintf("msg received: [%s]", msgData.Text.Content),
},
}
requestJsonBody, _ := json.Marshal(requestBody)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, msgData.SessionWebhook, bytes.NewReader(requestJsonBody))
if err != nil {
// TODO 处理错误
return frameResp, nil
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "*/*")
httpClient := &http.Client{
Transport: http.DefaultTransport,
Timeout: 5 * time.Second, //设置超时包含connection时间、任意重定向时间、读取response body时间
}
_, err = httpClient.Do(req)
if err != nil {
// TODO 处理错误: 回复消息出错
return frameResp, nil
}
return frameResp, nil
}
// 事件处理函数
func OnEventReceived(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) {
// 事件处理
func OnEventReceived(ctx context.Context, df *payload.DataFrame) (frameResp *payload.DataFrameResponse, err error) {
eventHeader := event.NewEventHeaderFromDataFrame(df)
logger.GetLogger().Infof("received event, eventId=[%s] eventBornTime=[%d] eventCorpId=[%s] eventType=[%s] eventUnifiedAppId=[%s] data=[%s]",
@ -83,21 +39,13 @@ func OnEventReceived(ctx context.Context, df *payload.DataFrame) (*payload.DataF
eventHeader.EventUnifiedAppId,
df.Data)
resultStr, _ := json.Marshal(event.NewEventProcessResultSuccess())
frameResp = payload.NewSuccessDataFrameResponse()
frameResp.SetJson(event.NewEventProcessResultSuccess())
frameResp := &payload.DataFrameResponse{
Code: payload.DataFrameResponseStatusCodeKOK,
Headers: payload.DataFrameHeader{
payload.DataFrameHeaderKContentType: payload.DataFrameContentTypeKJson,
payload.DataFrameHeaderKMessageId: df.GetMessageId(),
},
Message: "ok",
Data: string(resultStr),
}
return frameResp, nil
return
}
// go run example/*.go --client_id your-client-id --client_secret your-client-secret
func main() {
var clientId, clientSecret string
flag.StringVar(&clientId, "client_id", "", "your-client-id")
@ -107,15 +55,12 @@ func main() {
logger.SetLogger(logger.NewStdTestLogger())
cli := client.NewStreamClient(
client.WithAppCredential(client.NewAppCredentialConfig(clientId, clientSecret)),
client.WithUserAgent(client.NewDingtalkGoSDKUserAgent()),
)
cli := client.NewStreamClient(client.WithAppCredential(client.NewAppCredentialConfig(clientId, clientSecret)))
//注册事件类型的处理函数
cli.RegisterEventRouter("*", OnEventReceived)
cli.RegisterAllEventRouter(OnEventReceived)
//注册callback类型的处理函数
cli.RegisterCallbackRouter(payload.BotMessageCallbackTopic, OnChatBotMessageReceived)
cli.RegisterChatBotCallbackRouter(OnChatBotMessageReceived)
err := cli.Start(context.Background())
if err != nil {

View File

@ -92,6 +92,57 @@ type DataFrameResponse struct {
Data string `json:"data"`
}
func NewDataFrameResponse(code int) *DataFrameResponse {
return &DataFrameResponse{
Code: code,
Headers: DataFrameHeader{},
Message: "",
Data: "",
}
}
func NewSuccessDataFrameResponse() *DataFrameResponse {
return NewDataFrameResponse(DataFrameResponseStatusCodeKOK)
}
func (r *DataFrameResponse) SetHeader(key, value string) {
if r == nil {
return
}
r.Headers.Set(key, value)
}
func (r *DataFrameResponse) GetHeader(key string) string {
if r == nil {
return ""
}
return r.Headers.Get(key)
}
func (r *DataFrameResponse) SetData(data string) {
if r == nil {
return
}
r.Data = data
}
func (r *DataFrameResponse) SetJson(dataModel interface{}) error {
if r == nil {
return nil
}
data, err := json.Marshal(dataModel)
if err != nil {
return err
}
r.Data = string(data)
return nil
}
func (df *DataFrameResponse) Encode() []byte {
if df == nil {
return nil
@ -120,22 +171,19 @@ func NewDataFrameAckPong(messageId string) *DataFrameResponse {
DataFrameHeaderKMessageId: messageId,
},
Message: "ok",
Data: "", //TODO data内容暂留空
Data: "",
}
}
func NewErrorDataFrameResponse(messageId string, err error) *DataFrameResponse {
func NewErrorDataFrameResponse(err error) *DataFrameResponse {
if err == nil {
return nil
}
return &DataFrameResponse{
Code: 400, //TODO errorcode 细化
Headers: DataFrameHeader{
DataFrameHeaderKContentType: DataFrameContentTypeKJson,
DataFrameHeaderKMessageId: messageId,
},
Code: DataFrameResponseStatusCodeKInternalError,
Headers: DataFrameHeader{},
Message: err.Error(),
Data: "", //TODO data内容暂留空
Data: "",
}
}

View File

@ -71,7 +71,7 @@ func TestNewDataFrameAckPong(t *testing.T) {
}
func TestNewErrorDataFrameResponse(t *testing.T) {
errResp := NewErrorDataFrameResponse("messageId", errors.New("error"))
errResp := NewErrorDataFrameResponse(errors.New("error"))
assert.NotNil(t, errResp)
assert.Equal(t, "error", errResp.Message)
}

View File

@ -21,6 +21,7 @@ const (
DataFrameResponseStatusCodeKOK = 200
DataFrameResponseStatusCodeKInternalError = 500
DataFrameResponseStatusCodeKHandlerNotFound = 404
BotMessageCallbackTopic = "/v1.0/im/bot/messages/get" //机器人消息统一回调topic
)