增加stream插件消息处理逻辑和Demo

This commit is contained in:
xinning.wwm 2023-08-09 15:03:13 +08:00
parent 24b714f7ed
commit 42205fc514
6 changed files with 123 additions and 1 deletions

View File

@ -11,6 +11,7 @@ import (
"github.com/open-dingtalk/dingtalk-stream-sdk-go/handler"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/logger"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/plugin"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/utils"
"io"
"net/http"
@ -380,6 +381,11 @@ func (cli *StreamClient) RegisterChatBotCallbackRouter(messageHandler chatbot.IC
cli.RegisterRouter(utils.SubscriptionTypeKCallback, payload.BotMessageCallbackTopic, chatbot.NewDefaultChatBotFrameHandler(messageHandler).OnEventReceived)
}
// AI插件的注册函数
func (cli *StreamClient) RegisterPluginCallbackRouter(messageHandler plugin.IDingTalkPluginHandler) {
cli.RegisterRouter(utils.SubscriptionTypeKCallback, payload.BotPluginCallbackTopic, plugin.NewDefaultDingTalkPluginFrameHandler(messageHandler).OnEventReceived)
}
// 事件类型的注册函数
func (cli *StreamClient) RegisterEventRouter(topic string, frameHandler handler.IFrameHandler) {
cli.RegisterRouter(utils.SubscriptionTypeKEvent, topic, frameHandler)

14
example/echo_service.go Normal file
View File

@ -0,0 +1,14 @@
package main
type EchoRequest struct {
Message string
}
type EchoResponse struct {
EchoMessage string
}
func Echo(echoRequest *EchoRequest) *EchoResponse {
return &EchoResponse{
EchoMessage: "You said: " + echoRequest.Message,
}
}

View File

@ -9,6 +9,7 @@ import (
"github.com/open-dingtalk/dingtalk-stream-sdk-go/event"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/logger"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/plugin"
)
/**
@ -27,6 +28,20 @@ func OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackData
return []byte(""), nil
}
// 简单的插件处理实现
func OnPluginRequestReceived(ctx context.Context, message *plugin.DingTalkPluginMessage) (interface{}, error) {
if message.AbilityKey == "echo" {
echoRequest := &EchoRequest{}
err := message.ParseData(echoRequest)
if err != nil {
return nil, err
}
echoResponse := Echo(echoRequest)
return echoResponse, nil
}
return nil, nil
}
// 事件处理
func OnEventReceived(ctx context.Context, df *payload.DataFrame) (frameResp *payload.DataFrameResponse, err error) {
eventHeader := event.NewEventHeaderFromDataFrame(df)
@ -62,6 +77,7 @@ func main() {
//注册callback类型的处理函数
cli.RegisterChatBotCallbackRouter(OnChatBotMessageReceived)
cli.RegisterPluginCallbackRouter(OnPluginRequestReceived)
err := cli.Start(context.Background())
if err != nil {
panic(err)

View File

@ -23,7 +23,8 @@ const (
DataFrameResponseStatusCodeKInternalError = 500
DataFrameResponseStatusCodeKHandlerNotFound = 404
BotMessageCallbackTopic = "/v1.0/im/bot/messages/get" //机器人消息统一回调topic
BotMessageCallbackTopic = "/v1.0/im/bot/messages/get" //机器人消息统一回调topic
BotPluginCallbackTopic = "/v1.0/agi/plugins/callback" //AI插件消息统一回调topic
)
func GenerateMessageId(prefix string) string {

40
plugin/model.go Normal file
View File

@ -0,0 +1,40 @@
package plugin
import (
"errors"
"fmt"
"reflect"
)
type DingTalkPluginMessage struct {
PluginId string `json:"pluginId"`
PluginVersion string `json:"pluginVersion"`
AbilityKey string `json:"abilityKey"`
Data interface{} `json:"data"`
RequestId string `json:"requestId"`
}
func (req *DingTalkPluginMessage) ParseData(model interface{}) error {
//TO DO 处理异常
defer func() {
recover()
}()
m, ok := req.Data.(map[string]interface{})
if !ok {
return errors.New(fmt.Sprintf("invalid data: %v", req.Data))
}
stValue := reflect.ValueOf(model).Elem()
sType := stValue.Type()
for i := 0; i < sType.NumField(); i++ {
field := sType.Field(i)
if value, ok := m[field.Name]; ok {
stValue.Field(i).Set(reflect.ValueOf(value))
}
}
return nil
}
type DingTalkPluginResponse struct {
Result interface{} `json:"result"`
RequestId string `json:"requestId"`
}

45
plugin/plugin_handler.go Normal file
View File

@ -0,0 +1,45 @@
package plugin
import (
"context"
"encoding/json"
"github.com/open-dingtalk/dingtalk-stream-sdk-go/payload"
)
type CallbackResponse struct {
Response interface{} `json:"response"`
}
type IDingTalkPluginHandler func(c context.Context, data *DingTalkPluginMessage) (interface{}, error)
type DefaultDingTalkPluginFrameHandler struct {
defaultHandler IDingTalkPluginHandler
}
func NewDefaultDingTalkPluginFrameHandler(defaultHandler IDingTalkPluginHandler) *DefaultDingTalkPluginFrameHandler {
return &DefaultDingTalkPluginFrameHandler{
defaultHandler: defaultHandler,
}
}
func (h *DefaultDingTalkPluginFrameHandler) OnEventReceived(ctx context.Context, df *payload.DataFrame) (*payload.DataFrameResponse, error) {
msgData := &DingTalkPluginMessage{}
err := json.Unmarshal([]byte(df.Data), msgData)
if err != nil {
return nil, err
}
if h.defaultHandler == nil {
return payload.NewDataFrameResponse(payload.DataFrameResponseStatusCodeKHandlerNotFound), nil
}
result, err := h.defaultHandler(ctx, msgData)
if err != nil {
return nil, err
}
dingTalkPluginResponse := &DingTalkPluginResponse{RequestId: msgData.RequestId, Result: result}
callbackResponse := &CallbackResponse{Response: dingTalkPluginResponse}
frameResp := payload.NewSuccessDataFrameResponse()
frameResp.SetJson(callbackResponse)
return frameResp, nil
}