diff --git a/go.mod b/go.mod index 521dbdb..76bed20 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module ai_scheduler go 1.24.7 require ( + gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go v0.9.3 gitea.cdlsxd.cn/self-tools/l_request v1.0.8 github.com/alibabacloud-go/darabonba-openapi/v2 v2.0.12 github.com/alibabacloud-go/dingtalk v1.6.96 @@ -11,6 +12,7 @@ require ( github.com/cloudwego/eino v0.7.7 github.com/cloudwego/eino-ext/components/model/ollama v0.1.6 github.com/cloudwego/eino-ext/components/model/openai v0.1.5 + github.com/coze-dev/coze-go v0.0.0-20251029161603-312b7fd62d20 github.com/emirpasic/gods v1.18.1 github.com/faabiosr/cachego v0.26.0 github.com/fastwego/dingding v1.0.0-beta.4 @@ -24,10 +26,10 @@ require ( github.com/google/uuid v1.6.0 github.com/google/wire v0.7.0 github.com/ollama/ollama v0.12.7 - github.com/open-dingtalk/dingtalk-stream-sdk-go v0.9.1 github.com/redis/go-redis/v9 v9.16.0 github.com/spf13/viper v1.17.0 github.com/tmc/langchaingo v0.1.13 + golang.org/x/sync v0.15.0 google.golang.org/grpc v1.64.0 gorm.io/driver/mysql v1.6.0 gorm.io/gorm v1.31.0 @@ -52,7 +54,6 @@ require ( github.com/clbanning/mxj/v2 v2.5.5 // indirect github.com/cloudwego/base64x v0.1.6 // indirect github.com/cloudwego/eino-ext/libs/acl/openai v0.1.2 // indirect - github.com/coze-dev/coze-go v0.0.0-20251029161603-312b7fd62d20 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dlclark/regexp2 v1.11.4 // indirect github.com/dustin/go-humanize v1.0.1 // indirect diff --git a/go.sum b/go.sum index 6a18537..a6e9a9c 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,8 @@ cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3f dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go v0.9.3 h1:qaSPxVz5kHCs2AWvShnOG8mUgrUP9Gc3uUB4ZX1BF5A= +gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go v0.9.3/go.mod h1:5mCPTjBxOk69LRJPHWJRNTkfxcffqlQSOBMD4M5JVnE= gitea.cdlsxd.cn/self-tools/l_request v1.0.8 h1:FaKRql9mCVcSoaGqPeBOAruZ52slzRngQ6VRTYKNSsA= gitea.cdlsxd.cn/self-tools/l_request v1.0.8/go.mod h1:Qf4hVXm2Eu5vOvwXk8D7U0q/aekMCkZ4Fg9wnRKlasQ= gitea.com/xorm/sqlfiddle v0.0.0-20180821085327-62ce714f951a h1:lSA0F4e9A2NcQSqGqTOXqu2aRi/XEQxDCBwM8yJtE6s= @@ -275,8 +277,6 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v1.17.2 h1:fQnZVsXk8uxXIStYb0N4bGk7jeyTalG/wsZjQ25dO0g= github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfreZ6J5gM2i+k= -github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= -github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -354,8 +354,6 @@ github.com/onsi/ginkgo v1.13.0/go.mod h1:+REjRxOmWfHCjfv9TTWB1jD1Frx4XydAD3zm1ls github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/open-dingtalk/dingtalk-stream-sdk-go v0.9.1 h1:Lb/Uzkiw2Ugt2Xf03J5wmv81PdkYOiWbI8CNBi1boC8= -github.com/open-dingtalk/dingtalk-stream-sdk-go v0.9.1/go.mod h1:ln3IqPYYocZbYvl9TAOrG/cxGR9xcn4pnZRLdCTEGEU= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/internal/biz/ding_talk_bot.go b/internal/biz/ding_talk_bot.go index d799796..b8976df 100644 --- a/internal/biz/ding_talk_bot.go +++ b/internal/biz/ding_talk_bot.go @@ -8,9 +8,15 @@ import ( "ai_scheduler/internal/data/impl" "ai_scheduler/internal/data/model" "ai_scheduler/internal/entitys" + "ai_scheduler/internal/pkg/l_request" "ai_scheduler/internal/tools" + "ai_scheduler/tmpl/dataTemp" + "io" + "net/http" "strconv" + "time" + "ai_scheduler/internal/config" "context" "database/sql" "encoding/json" @@ -18,9 +24,9 @@ import ( "fmt" "strings" + "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot" + "github.com/coze-dev/coze-go" "github.com/gofiber/fiber/v2/log" - "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" - "xorm.io/builder" ) @@ -36,6 +42,8 @@ type DingTalkBotBiz struct { botGroupImpl *impl.BotGroupImpl toolManager *tools.Manager chatHis *impl.BotChatHisImpl + conf *config.Config + cardSend *dingtalk.SendCardClient } // NewDingTalkBotBiz @@ -48,6 +56,8 @@ func NewDingTalkBotBiz( tools *tools_regis.ToolRegis, chatHis *impl.BotChatHisImpl, toolManager *tools.Manager, + conf *config.Config, + cardSend *dingtalk.SendCardClient, ) *DingTalkBotBiz { return &DingTalkBotBiz{ do: do, @@ -59,6 +69,8 @@ func NewDingTalkBotBiz( botGroupImpl: botGroupImpl, toolManager: toolManager, chatHis: chatHis, + conf: conf, + cardSend: cardSend, } } @@ -75,7 +87,7 @@ func (d *DingTalkBotBiz) GetDingTalkBotCfgList() (dingBotList []entitys.DingTalk if err != nil { d.log.Info("初始化“%s”失败:%s", v.BotName, err.Error()) } - config.BotIndex = v.BotIndex + config.BotIndex = v.RobotCode dingBotList = append(dingBotList, config) } return @@ -92,8 +104,8 @@ func (d *DingTalkBotBiz) InitRequire(ctx context.Context, data *chatbot.BotCallb } func (d *DingTalkBotBiz) Do(ctx context.Context, requireData *entitys.RequireDataDingTalkBot) (err error) { - entitys.ResLoading(requireData.Ch, "", "收到消息,正在处理中,请稍等") - defer close(requireData.Ch) + //entitys.ResLoading(requireData.Ch, "", "收到消息,正在处理中,请稍等") + //defer close(requireData.Ch) switch constants.ConversationType(requireData.Req.ConversationType) { case constants.ConversationTypeSingle: err = d.handleSingleChat(ctx, requireData) @@ -124,7 +136,7 @@ func (d *DingTalkBotBiz) handleSingleChat(ctx context.Context, requireData *enti } func (d *DingTalkBotBiz) handleGroupChat(ctx context.Context, requireData *entitys.RequireDataDingTalkBot) (err error) { - group, err := d.initGroup(ctx, requireData.Req.ConversationId, requireData.Req.ConversationTitle) + group, err := d.initGroup(ctx, requireData.Req.ConversationId, requireData.Req.ConversationTitle, requireData.Req.RobotCode) if err != nil { return @@ -142,8 +154,8 @@ func (d *DingTalkBotBiz) handleGroupChat(ctx context.Context, requireData *entit return d.handleMatch(ctx, rec) } -func (d *DingTalkBotBiz) initGroup(ctx context.Context, conversationId string, conversationTitle string) (group *model.AiBotGroup, err error) { - group, err = d.botGroupImpl.GetByConversationId(conversationId) +func (d *DingTalkBotBiz) initGroup(ctx context.Context, conversationId string, conversationTitle string, robotCode string) (group *model.AiBotGroup, err error) { + group, err = d.botGroupImpl.GetByConversationIdAndRobotCode(conversationId, robotCode) if err != nil { if !errors.Is(err, sql.ErrNoRows) { @@ -155,10 +167,11 @@ func (d *DingTalkBotBiz) initGroup(ctx context.Context, conversationId string, c group = &model.AiBotGroup{ ConversationID: conversationId, Title: conversationTitle, + RobotCode: robotCode, ToolList: "", } //如果不存在则创建 - d.botGroupImpl.Add(group) + _, err = d.botGroupImpl.Add(group) } return } @@ -199,13 +212,19 @@ func (d *DingTalkBotBiz) recognize(ctx context.Context, requireData *entitys.Req userContent, err := d.getUserContent(requireData.Req.Msgtype, requireData.Req.Text.Content) if err != nil { - return nil, err + return } rec = &entitys.Recognize{ Ch: requireData.Ch, SystemPrompt: d.defaultPrompt(), UserContent: userContent, } + //历史记录 + rec.ChatHis, err = d.getHis(ctx, constants.ConversationType(requireData.Req.ConversationType), requireData.ID) + if err != nil { + return + } + //工具注册 if len(tools) > 0 { rec.Tasks = make([]entitys.RegistrationTask, 0, len(tools)) for _, task := range tools { @@ -226,6 +245,36 @@ func (d *DingTalkBotBiz) recognize(ctx context.Context, requireData *entitys.Req return } +func (d *DingTalkBotBiz) getHis(ctx context.Context, conversationType constants.ConversationType, Id int32) (content entitys.ChatHis, err error) { + + var ( + his []model.AiBotChatHi + ) + cond := builder.NewCond() + cond = cond.And(builder.Eq{"his_type": conversationType}) + cond = cond.And(builder.Eq{"id": Id}) + _, err = d.chatHis.GetListToStruct(&cond, &dataTemp.ReqPageBo{Limit: d.conf.Sys.SessionLen}, &his, "his_id desc") + if err != nil { + return + } + messages := make([]entitys.HisMessage, 0, len(his)) + for _, v := range his { + messages = append(messages, entitys.HisMessage{ + Role: constants.Caller(v.Role), // 用户角色 + Content: v.Content, // 用户输入内容 + Timestamp: v.CreateAt.Format(time.DateTime), + }) + } + return entitys.ChatHis{ + SessionId: fmt.Sprintf("%s_%d", conversationType, Id), + Messages: messages, + Context: entitys.HisContext{ + UserLanguage: constants.LangZhCN, // 默认中文 + SystemMode: constants.SystemModeTechnicalSupport, // 默认技术支持模式 + }, + }, nil +} + func (d *DingTalkBotBiz) getUserContent(msgType string, msgContent interface{}) (content *entitys.RecognizeUserContent, err error) { switch constants.BotMsgType(msgType) { case constants.BotMsgTypeText: @@ -261,16 +310,116 @@ func (d *DingTalkBotBiz) handleMatch(ctx context.Context, rec *entitys.Recognize return d.otherTask(ctx, rec) } switch constants.TaskType(pointTask.Type) { - //case constants.TaskTypeApi: - //return d.handleApiTask(ctx, requireData, pointTask) case constants.TaskTypeFunc: return d.handleTask(ctx, rec, pointTask) + case constants.TaskTypeCozeWorkflow: + return d.handleCozeWorkflow(ctx, rec, pointTask) default: return d.otherTask(ctx, rec) } return } +func (d *DingTalkBotBiz) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) { + entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)\n") + + customClient := &http.Client{ + Timeout: time.Minute * 30, + } + + authCli := coze.NewTokenAuth(d.conf.Coze.ApiSecret) + cozeCli := coze.NewCozeAPI( + authCli, + coze.WithBaseURL(d.conf.Coze.BaseURL), + coze.WithHttpClient(customClient), + ) + + // 从参数中获取workflowID + type requestParams struct { + Request l_request.Request `json:"request"` + } + var config requestParams + err = json.Unmarshal([]byte(task.Config), &config) + if err != nil { + return err + } + workflowId, ok := config.Request.Json["workflow_id"].(string) + if !ok { + return fmt.Errorf("workflow_id不能为空") + } + // 提取参数 + var data map[string]interface{} + err = json.Unmarshal([]byte(rec.Match.Parameters), &data) + + req := &coze.RunWorkflowsReq{ + WorkflowID: workflowId, + Parameters: data, + // IsAsync: true, + } + + stream := config.Request.Json["stream"].(bool) + + entitys.ResLog(rec.Ch, task.Index, "工作流执行中...") + + if stream { + streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req) + if err != nil { + return err + } + + handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index) + } else { + resp, err := cozeCli.Workflows.Runs.Create(ctx, req) + if err != nil { + return err + } + + entitys.ResJson(rec.Ch, task.Index, resp.Data) + } + + return +} + +// handleCozeWorkflowEvents 处理 coze 工作流事件 +func handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) { + defer resp.Close() + for { + event, err := resp.Recv() + if errors.Is(err, io.EOF) { + fmt.Println("Stream finished") + break + } + if err != nil { + fmt.Println("Error receiving event:", err) + break + } + + switch event.Event { + case coze.WorkflowEventTypeMessage: + entitys.ResStream(ch, index, event.Message.Content) + case coze.WorkflowEventTypeError: + entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %s", event.Error)) + case coze.WorkflowEventTypeDone: + entitys.ResEnd(ch, index, "工作流执行完成") + case coze.WorkflowEventTypeInterrupt: + resumeReq := &coze.ResumeRunWorkflowsReq{ + WorkflowID: workflowID, + EventID: event.Interrupt.InterruptData.EventID, + ResumeData: "your data", + InterruptType: event.Interrupt.InterruptData.Type, + } + newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq) + if err != nil { + entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error())) + return + } + entitys.ResLog(ch, index, "工作流恢复执行中...") + handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index) + } + } + fmt.Printf("done, log:%s\n", resp.Response().LogID()) +} + func (d *DingTalkBotBiz) handleTask(ctx context.Context, rec *entitys.Recognize, task *model.AiBotTool) (err error) { var configData entitys.ConfigDataTool err = json.Unmarshal([]byte(task.Config), &configData) @@ -290,56 +439,42 @@ func (d *DingTalkBotBiz) otherTask(ctx context.Context, rec *entitys.Recognize) entitys.ResText(rec.Ch, "", rec.Match.Reasoning) return } -func (d *DingTalkBotBiz) HandleRes(ctx context.Context, data *chatbot.BotCallbackDataModel, resp entitys.Response) error { - switch resp.Type { - case entitys.ResponseText: - return d.replyText(ctx, data.SessionWebhook, resp.Content) - case entitys.ResponseStream: - return d.replySteam(ctx, data.SessionWebhook, resp.Content) - case entitys.ResponseImg: - return d.replyImg(ctx, data.SessionWebhook, resp.Content) - case entitys.ResponseFile: - return d.replyFile(ctx, data.SessionWebhook, resp.Content) - case entitys.ResponseMarkdown: - return d.replyMarkdown(ctx, data.SessionWebhook, resp.Content) - case entitys.ResponseActionCard: - return d.replyActionCard(ctx, data.SessionWebhook, resp.Content) - default: - return nil - } + +//func (d *DingTalkBotBiz) HandleRes(ctx context.Context, data *chatbot.BotCallbackDataModel, resp entitys.Response, ch chan string) error { +// switch resp.Type { +// case entitys.ResponseText: +// return d.replyText(ctx, data.SessionWebhook, resp.Content) +// case entitys.ResponseStream: +// +// return d.replySteam(ctx, data, ch) +// case entitys.ResponseImg: +// return d.replyImg(ctx, data.SessionWebhook, resp.Content) +// case entitys.ResponseFile: +// return d.replyFile(ctx, data.SessionWebhook, resp.Content) +// case entitys.ResponseMarkdown: +// return d.replyMarkdown(ctx, data.SessionWebhook, resp.Content) +// case entitys.ResponseActionCard: +// return d.replyActionCard(ctx, data.SessionWebhook, resp.Content) +// default: +// return nil +// } +//} + +func (d *DingTalkBotBiz) HandleStreamRes(ctx context.Context, data *chatbot.BotCallbackDataModel, content chan string) (err error) { + err = d.cardSend.NewCard(ctx, &dingtalk.CardSend{ + RobotCode: data.RobotCode, + ConversationType: constants.ConversationType(data.ConversationType), + Template: constants.CardTempDefault, + ContentChannel: content, // 指定内容通道 + ConversationId: data.ConversationId, + SenderStaffId: data.SenderStaffId, + Title: data.Text.Content, + }) + + return } -func (d *DingTalkBotBiz) SaveHis(ctx context.Context, requireData *entitys.RequireDataDingTalkBot, chat []string) (err error) { - if len(chat) == 0 { - return - } - his := []*model.AiBotChatHi{ - { - HisType: requireData.Req.ConversationType, - ID: requireData.ID, - Role: "user", - Content: requireData.Req.Text.Content, - }, - { - HisType: requireData.Req.ConversationType, - ID: requireData.ID, - Role: "system", - Content: strings.Join(chat, "\n"), - }, - } - _, err = d.chatHis.Add(his) - return err -} - -func (d *DingTalkBotBiz) replySteam(ctx context.Context, SessionWebhook string, content string, arg ...string) error { - msg := content - if len(arg) > 0 { - msg = fmt.Sprintf(content, arg) - } - return d.replier.SimpleReplyText(ctx, SessionWebhook, []byte(msg)) -} - -func (d *DingTalkBotBiz) replyText(ctx context.Context, SessionWebhook string, content string, arg ...string) error { +func (d *DingTalkBotBiz) ReplyText(ctx context.Context, SessionWebhook string, content string, arg ...string) error { msg := content if len(arg) > 0 { msg = fmt.Sprintf(content, arg) @@ -379,6 +514,28 @@ func (d *DingTalkBotBiz) replyActionCard(ctx context.Context, SessionWebhook str return d.replier.SimpleReplyText(ctx, SessionWebhook, []byte(msg)) } +func (d *DingTalkBotBiz) SaveHis(ctx context.Context, requireData *entitys.RequireDataDingTalkBot, chat []string) (err error) { + if len(chat) == 0 { + return + } + his := []*model.AiBotChatHi{ + { + HisType: requireData.Req.ConversationType, + ID: requireData.ID, + Role: "user", + Content: requireData.Req.Text.Content, + }, + { + HisType: requireData.Req.ConversationType, + ID: requireData.ID, + Role: "system", + Content: strings.Join(chat, "\n"), + }, + } + _, err = d.chatHis.Add(his) + return err +} + func (d *DingTalkBotBiz) defaultPrompt() string { return `[system] 你是一个智能路由系统,核心职责是 **精准解析用户意图并路由至对应任务模块**。请严格遵循以下规则: diff --git a/internal/biz/handle/dingtalk/auth.go b/internal/biz/handle/dingtalk/auth.go index 2b359a9..143ab45 100644 --- a/internal/biz/handle/dingtalk/auth.go +++ b/internal/biz/handle/dingtalk/auth.go @@ -6,6 +6,7 @@ import ( "ai_scheduler/internal/data/impl" "ai_scheduler/internal/data/model" "ai_scheduler/internal/entitys" + "ai_scheduler/internal/pkg" "ai_scheduler/internal/pkg/l_request" "ai_scheduler/utils" "context" @@ -38,21 +39,26 @@ func (a *Auth) GetAccessToken(ctx context.Context, clientId string, clientSecret return nil, errors.New("clientId is empty") } accessToken := a.redis.Get(ctx, a.getKey(clientId)).Val() + var expire time.Duration if accessToken == "" { dingTalkAuthRes, _err := a.getNewAccessToken(ctx, clientId, clientSecret) if _err != nil { return nil, _err } - err = a.redis.SetEx(ctx, a.getKey(clientId), dingTalkAuthRes.AccessToken, time.Duration(dingTalkAuthRes.ExpireIn-3600)*time.Second).Err() + expire = time.Duration(dingTalkAuthRes.ExpireIn-3600) * time.Second + err = a.redis.SetEx(ctx, a.getKey(clientId), dingTalkAuthRes.AccessToken, expire).Err() if err != nil { return } accessToken = dingTalkAuthRes.AccessToken + } else { + expire, _ = a.redis.TTL(ctx, a.getKey(clientId)).Result() } return &AuthInfo{ ClientId: clientId, ClientSecret: clientSecret, AccessToken: accessToken, + Expire: expire, }, nil } @@ -60,6 +66,10 @@ func (a *Auth) getKey(clientId string) string { return a.cfg.Redis.Key + ":" + constants.DingTalkAuthBaseKeyPrefix + ":" + clientId } +func (a *Auth) getKeyBot(botCode string) string { + return a.cfg.Redis.Key + ":" + constants.DingTalkAuthBaseKeyBotPrefix + ":" + botCode +} + func (a *Auth) getNewAccessToken(ctx context.Context, clientId string, clientSecret string) (auth DingTalkAuthIRes, err error) { if clientId == "" || clientSecret == "" { err = errors.New("clientId or clientSecret is empty") @@ -89,30 +99,61 @@ func (a *Auth) GetTokenFromBotOption(ctx context.Context, botOption ...BotOption option(botInfo) } - if botInfo.id == 0 && botInfo.botConfig == nil { + if botInfo.Id == 0 && botInfo.BotConfig == nil && botInfo.BotCode == "" { err = errors.New("botInfo is nil") return } - if botInfo.botConfig == nil { - var botConfigDo model.AiBotConfig - cond := builder.NewCond() - cond = cond.And(builder.Eq{"bot_id": botInfo.id}) - err = a.botConfigImpl.GetOneBySearchToStrut(&cond, &botConfigDo) + + if botInfo.BotConfig == nil { + err = a.GetBotConfigFromModel(botInfo) if err != nil { return } - if botConfigDo.BotID == 0 { - err = errors.New("未找到机器人服务配置") + } + + authInfo := a.redis.Get(ctx, a.getKeyBot(botInfo.BotConfig.RobotCode)).Val() + if authInfo == "" { + var botConfig entitys.DingTalkBot + err = json.Unmarshal([]byte(botInfo.BotConfig.BotConfig), &botConfig) + if err != nil { + log.Infof("初始化“%s”失败:%s", botInfo.BotConfig.BotName, err.Error()) return } - botInfo.botConfig = &botConfigDo + token, err = a.GetAccessToken(ctx, botConfig.ClientId, botConfig.ClientSecret) + if err != nil { + return + } + err = a.redis.SetEx(ctx, a.getKeyBot(botInfo.BotConfig.RobotCode), pkg.JsonStringIgonErr(token), token.Expire).Err() + if err != nil { + return + } + } else { + var tokenData AuthInfo + err = json.Unmarshal([]byte(authInfo), &tokenData) + token = &tokenData } - var botConfig entitys.DingTalkBot - err = json.Unmarshal([]byte(botInfo.botConfig.BotConfig), &botConfig) + return +} + +func (a *Auth) GetBotConfigFromModel(botInfo *Bot) (err error) { + var ( + botConfigDo model.AiBotConfig + ) + cond := builder.NewCond() + if botInfo.Id > 0 { + cond = cond.And(builder.Eq{"bot_id": botInfo.Id}) + } + if botInfo.BotCode != "" { + cond = cond.And(builder.Eq{"robot_code": botInfo.BotCode}) + } + err = a.botConfigImpl.GetOneBySearchToStrut(&cond, &botConfigDo) if err != nil { - log.Infof("初始化“%s”失败:%s", botInfo.botConfig.BotName, err.Error()) return } - return a.GetAccessToken(ctx, botConfig.ClientId, botConfig.ClientSecret) - + if botConfigDo.BotID == 0 { + err = errors.New("未找到机器人服务配置") + return + } + botInfo.BotConfig = &botConfigDo + return nil } diff --git a/internal/biz/handle/dingtalk/option.go b/internal/biz/handle/dingtalk/option.go index fb473c7..3b72795 100644 --- a/internal/biz/handle/dingtalk/option.go +++ b/internal/biz/handle/dingtalk/option.go @@ -3,19 +3,34 @@ package dingtalk import "ai_scheduler/internal/data/model" type Bot struct { - id int - botConfig *model.AiBotConfig + Id int + BotCode string + BotConfig *model.AiBotConfig } type BotOption func(*Bot) func WithId(id int) BotOption { return func(b *Bot) { - b.id = id + b.Id = id } } -func WithBootConfig(BotConfig *model.AiBotConfig) BotOption { +func WithBotConfig(BotConfig *model.AiBotConfig) BotOption { return func(bot *Bot) { - bot.botConfig = BotConfig + bot.BotConfig = BotConfig + } +} + +func WithBotCode(BotCode string) BotOption { + return func(bot *Bot) { + bot.BotCode = BotCode + } +} + +func WithBot(botSelf *Bot) BotOption { + return func(bot *Bot) { + bot.BotCode = botSelf.BotCode + bot.Id = botSelf.Id + bot.BotConfig = botSelf.BotConfig } } diff --git a/internal/biz/handle/dingtalk/provider_set.go b/internal/biz/handle/dingtalk/provider_set.go index 579d464..70f31ff 100644 --- a/internal/biz/handle/dingtalk/provider_set.go +++ b/internal/biz/handle/dingtalk/provider_set.go @@ -8,4 +8,5 @@ var ProviderSetDingTalk = wire.NewSet( NewUser, NewAuth, NewDept, + NewSendCardClient, ) diff --git a/internal/biz/handle/dingtalk/send_card.go b/internal/biz/handle/dingtalk/send_card.go new file mode 100644 index 0000000..c2063da --- /dev/null +++ b/internal/biz/handle/dingtalk/send_card.go @@ -0,0 +1,287 @@ +package dingtalk + +import ( + "ai_scheduler/internal/data/constants" + "ai_scheduler/internal/pkg" + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + + openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client" + dingtalkim_1_0 "github.com/alibabacloud-go/dingtalk/im_1_0" + util "github.com/alibabacloud-go/tea-utils/v2/service" + "github.com/alibabacloud-go/tea/tea" + "github.com/gofiber/fiber/v2/log" + "github.com/google/uuid" +) + +const DefaultInterval = 100 * time.Millisecond +const HeardBeatX = 50 + +type SendCardClient struct { + Auth *Auth + CardClient *sync.Map + mu sync.RWMutex // 保护 CardClient 的并发访问 + logger log.AllLogger // 日志记录 + botOption *Bot +} + +func NewSendCardClient(auth *Auth, logger log.AllLogger) *SendCardClient { + return &SendCardClient{ + Auth: auth, + CardClient: &sync.Map{}, + logger: logger, + botOption: &Bot{}, + } +} + +// initClient 初始化或复用 DingTalk 客户端 +func (s *SendCardClient) initClient(robotCode string) (*dingtalkim_1_0.Client, error) { + if client, ok := s.CardClient.Load(robotCode); ok { + return client.(*dingtalkim_1_0.Client), nil + } + s.botOption.BotCode = robotCode + config := &openapi.Config{ + Protocol: tea.String("https"), + RegionId: tea.String("central"), + } + client, err := dingtalkim_1_0.NewClient(config) + if err != nil { + s.logger.Error("failed to init DingTalk client") + return nil, fmt.Errorf("init client failed: %w", err) + } + + s.CardClient.Store(robotCode, client) + return client, nil +} + +func (s *SendCardClient) NewCard(ctx context.Context, cardSend *CardSend) error { + // 参数校验 + if (len(cardSend.ContentSlice) == 0 || cardSend.ContentSlice == nil) && cardSend.ContentChannel == nil { + return errors.New("卡片内容不能为空") + } + if cardSend.UpdateInterval == 0 { + cardSend.UpdateInterval = DefaultInterval // 默认更新间隔 + } + if cardSend.Title == "" { + cardSend.Title = "钉钉卡片" + } + //替换标题 + replace, err := pkg.SafeReplace(string(cardSend.Template), "${title}", cardSend.Title) + if err != nil { + return err + } + cardSend.Template = constants.CardTemp(replace) + // 初始化客户端 + client, err := s.initClient(cardSend.RobotCode) + if err != nil { + return fmt.Errorf("初始化client失败: %w", err) + } + + // 生成卡片实例ID + cardInstanceId, err := uuid.NewUUID() + if err != nil { + return fmt.Errorf("创建uuid失败: %w", err) + } + + // 构建初始请求 + request, err := s.buildBaseRequest(cardSend, cardInstanceId.String()) + if err != nil { + return fmt.Errorf("请求失败: %w", err) + } + + // 发送初始卡片 + if _, err := s.SendInteractiveCard(ctx, request, cardSend.RobotCode, client); err != nil { + return fmt.Errorf("发送初始卡片失败: %w", err) + } + + // 处理切片内容(同步) + if len(cardSend.ContentSlice) > 0 { + if err := s.processContentSlice(ctx, cardSend, cardInstanceId.String(), client); err != nil { + return fmt.Errorf("内容同步失败: %w", err) + } + } + + // 处理通道内容(异步) + if cardSend.ContentChannel != nil { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + s.processContentChannel(ctx, cardSend, cardInstanceId.String(), client) + }() + wg.Wait() + } + + return nil +} + +// buildBaseRequest 构建基础请求 +func (s *SendCardClient) buildBaseRequest(cardSend *CardSend, cardInstanceId string) (*dingtalkim_1_0.SendRobotInteractiveCardRequest, error) { + cardData := fmt.Sprintf(string(cardSend.Template), "") // 初始空内容 + request := &dingtalkim_1_0.SendRobotInteractiveCardRequest{ + CardTemplateId: tea.String("StandardCard"), + CardBizId: tea.String(cardInstanceId), + CardData: tea.String(cardData), + RobotCode: tea.String(cardSend.RobotCode), + SendOptions: &dingtalkim_1_0.SendRobotInteractiveCardRequestSendOptions{}, + PullStrategy: tea.Bool(false), + } + + switch cardSend.ConversationType { + case constants.ConversationTypeGroup: + request.SetOpenConversationId(cardSend.ConversationId) + case constants.ConversationTypeSingle: + receiver, err := json.Marshal(map[string]string{"userId": cardSend.SenderStaffId}) + if err != nil { + return nil, fmt.Errorf("数据整理失败: %w", err) + } + request.SetSingleChatReceiver(string(receiver)) + default: + return nil, errors.New("未知的聊天场景") + } + + return request, nil +} + +// processContentChannel 处理通道内容(异步更新) +func (s *SendCardClient) processContentChannel(ctx context.Context, cardSend *CardSend, cardInstanceId string, client *dingtalkim_1_0.Client) { + defer func() { + if r := recover(); r != nil { + s.logger.Error("panic in processContentChannel") + } + }() + + ticker := time.NewTicker(cardSend.UpdateInterval) + defer ticker.Stop() + heartbeatTicker := time.NewTicker(time.Duration(HeardBeatX) * DefaultInterval) + defer heartbeatTicker.Stop() + + var ( + contentBuilder strings.Builder + lastUpdate time.Time + ) + for { + + select { + case content, ok := <-cardSend.ContentChannel: + if !ok { + // 通道关闭,发送最终内容 + if contentBuilder.Len() > 0 { + if err := s.updateCardContent(ctx, cardSend, cardInstanceId, contentBuilder.String(), client); err != nil { + s.logger.Errorf("更新卡片失败1:%s", err.Error()) + } + } + return + } + contentBuilder.WriteString(content) + if contentBuilder.Len() > 0 { + if err := s.updateCardContent(ctx, cardSend, cardInstanceId, contentBuilder.String(), client); err != nil { + s.logger.Errorf("更新卡片失败2:%s", err.Error()) + } + } + lastUpdate = time.Now() + + case <-heartbeatTicker.C: + if time.Now().Unix()-lastUpdate.Unix() >= HeardBeatX { + return + } + + case <-ctx.Done(): + s.logger.Info("context canceled, stop channel processing") + return + } + } + +} + +// processContentSlice 处理切片内容(同步更新) +func (s *SendCardClient) processContentSlice(ctx context.Context, cardSend *CardSend, cardInstanceId string, client *dingtalkim_1_0.Client) error { + var contentBuilder strings.Builder + for _, content := range cardSend.ContentSlice { + + contentBuilder.WriteString(content) + err := s.updateCardRequest(ctx, &UpdateCardRequest{ + Template: string(cardSend.Template), + Content: contentBuilder.String(), + Client: client, + RobotCode: cardSend.RobotCode, + CardInstanceId: cardInstanceId, + }) + if err != nil { + return fmt.Errorf("更新卡片失败: %w", err) + } + time.Sleep(cardSend.UpdateInterval) // 控制更新频率 + } + return nil +} + +// updateCardContent 封装卡片更新逻辑 +func (s *SendCardClient) updateCardContent(ctx context.Context, cardSend *CardSend, cardInstanceId, content string, client *dingtalkim_1_0.Client) error { + err := s.updateCardRequest(ctx, &UpdateCardRequest{ + Template: string(cardSend.Template), + Content: content, + Client: client, + RobotCode: cardSend.RobotCode, + CardInstanceId: cardInstanceId, + }) + + return err +} + +func (s *SendCardClient) updateCardRequest(ctx context.Context, updateCardRequest *UpdateCardRequest) error { + content, err := pkg.SafeReplace(updateCardRequest.Template, "%s", updateCardRequest.Content) + if err != nil { + return err + } + updateRequest := &dingtalkim_1_0.UpdateRobotInteractiveCardRequest{ + CardBizId: tea.String(updateCardRequest.CardInstanceId), + CardData: tea.String(content), + } + _, err = s.UpdateInteractiveCard(ctx, updateRequest, updateCardRequest.RobotCode, updateCardRequest.Client) + return err +} + +// UpdateInteractiveCard 更新交互卡片(封装错误处理) +func (s *SendCardClient) UpdateInteractiveCard(ctx context.Context, request *dingtalkim_1_0.UpdateRobotInteractiveCardRequest, robotCode string, client *dingtalkim_1_0.Client) (*dingtalkim_1_0.UpdateRobotInteractiveCardResponse, error) { + authInfo, err := s.Auth.GetTokenFromBotOption(ctx, WithBot(s.botOption)) + if err != nil { + return nil, fmt.Errorf("get token failed: %w", err) + } + + headers := &dingtalkim_1_0.UpdateRobotInteractiveCardHeaders{ + XAcsDingtalkAccessToken: tea.String(authInfo.AccessToken), + } + + response, err := client.UpdateRobotInteractiveCardWithOptions(request, headers, &util.RuntimeOptions{}) + if err != nil { + return nil, fmt.Errorf("API call failed: %w,request:%v", err, request.String()) + } + return response, nil +} + +// SendInteractiveCard 发送交互卡片(封装错误处理) +func (s *SendCardClient) SendInteractiveCard(ctx context.Context, request *dingtalkim_1_0.SendRobotInteractiveCardRequest, robotCode string, client *dingtalkim_1_0.Client) (res *dingtalkim_1_0.SendRobotInteractiveCardResponse, err error) { + err = s.Auth.GetBotConfigFromModel(s.botOption) + if err != nil { + return nil, fmt.Errorf("初始化bot失败: %w", err) + } + authInfo, err := s.Auth.GetTokenFromBotOption(ctx, WithBot(s.botOption)) + if err != nil { + return nil, fmt.Errorf("get token failed: %w", err) + } + + headers := &dingtalkim_1_0.SendRobotInteractiveCardHeaders{ + XAcsDingtalkAccessToken: tea.String(authInfo.AccessToken), + } + + response, err := client.SendRobotInteractiveCardWithOptions(request, headers, &util.RuntimeOptions{}) + if err != nil { + return nil, fmt.Errorf("API call failed: %w", err) + } + return response, nil +} diff --git a/internal/biz/handle/dingtalk/send_card.go.bak1 b/internal/biz/handle/dingtalk/send_card.go.bak1 new file mode 100644 index 0000000..9fb1e8d --- /dev/null +++ b/internal/biz/handle/dingtalk/send_card.go.bak1 @@ -0,0 +1,280 @@ +package dingtalk + +import ( + "ai_scheduler/internal/data/constants" + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "time" + + openapi "github.com/alibabacloud-go/darabonba-openapi/v2/client" + dingtalkcard_1_0 "github.com/alibabacloud-go/dingtalk/card_1_0" + dingtalkim_1_0 "github.com/alibabacloud-go/dingtalk/im_1_0" + util "github.com/alibabacloud-go/tea-utils/v2/service" + "github.com/alibabacloud-go/tea/tea" + "github.com/gofiber/fiber/v2/log" + "github.com/google/uuid" +) + +const DefaultInterval = 100 * time.Millisecond +const HeardBeatX = 50 + +type SendCardClient struct { + Auth *Auth + CardClient *sync.Map + mu sync.RWMutex // 保护 CardClient 的并发访问 + logger log.AllLogger // 日志记录 + botOption *Bot +} + +func NewSendCardClient(auth *Auth, logger log.AllLogger) *SendCardClient { + return &SendCardClient{ + Auth: auth, + CardClient: &sync.Map{}, + logger: logger, + botOption: &Bot{}, + } +} + +// initClient 初始化或复用 DingTalk 客户端 +func (s *SendCardClient) initClient(robotCode string) (*dingtalkcard_1_0.Client, error) { + if client, ok := s.CardClient.Load(robotCode); ok { + return client.(*dingtalkcard_1_0.Client), nil + } + s.botOption.BotCode = robotCode + config := &openapi.Config{ + Protocol: tea.String("https"), + RegionId: tea.String("central"), + } + client, err := dingtalkcard_1_0.NewClient(config) + if err != nil { + s.logger.Error("failed to init DingTalk client") + return nil, fmt.Errorf("init client failed: %w", err) + } + + s.CardClient.Store(robotCode, client) + return client, nil +} + +func (s *SendCardClient) NewCard(ctx context.Context, cardSend *CardSend) error { + // 参数校验 + if (len(cardSend.ContentSlice) == 0 || cardSend.ContentSlice == nil) && cardSend.ContentChannel == nil { + return errors.New("卡片内容不能为空") + } + if cardSend.UpdateInterval == 0 { + cardSend.UpdateInterval = DefaultInterval // 默认更新间隔 + } + if cardSend.Title == "" { + cardSend.Title = "钉钉卡片" + } + //替换标题 + cardSend.Template = constants.CardTemp(strings.Replace(string(cardSend.Template), "${title}", cardSend.Title, 1)) + // 初始化客户端 + client, err := s.initClient(cardSend.RobotCode) + if err != nil { + return fmt.Errorf("初始化client失败: %w", err) + } + + // 生成卡片实例ID + cardInstanceId, err := uuid.NewUUID() + if err != nil { + return fmt.Errorf("创建uuid失败: %w", err) + } + + // 构建初始请求 + request, err := s.buildBaseRequest(cardSend, cardInstanceId.String()) + if err != nil { + return fmt.Errorf("请求失败: %w", err) + } + + // 发送初始卡片 + if _, err := s.SendInteractiveCard(ctx, request, cardSend.RobotCode, client); err != nil { + return fmt.Errorf("发送初始卡片失败: %w", err) + } + + // 处理切片内容(同步) + if len(cardSend.ContentSlice) > 0 { + if err := s.processContentSlice(ctx, cardSend, cardInstanceId.String(), client); err != nil { + return fmt.Errorf("内容同步失败: %w", err) + } + } + + // 处理通道内容(异步) + if cardSend.ContentChannel != nil { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + s.processContentChannel(ctx, cardSend, cardInstanceId.String(), client) + }() + wg.Wait() + } + + return nil +} + +// buildBaseRequest 构建基础请求 +func (s *SendCardClient) buildBaseRequest(cardSend *CardSend, cardInstanceId string) (*dingtalkcard_1_0.StreamingUpdateRequest, error) { + cardData := fmt.Sprintf(string(cardSend.Template), "") // 初始空内容 + request := &dingtalkcard_1_0.StreamingUpdateRequest{ + OutTrackId: tea.String("your-out-track-id"), + Guid: tea.String("0F714542-0AFC-2B0E-CF14-E2D39F5BFFE8"), + Key: tea.String("your-ai-param"), + Content: tea.String("test"), + IsFull: tea.Bool(false), + IsFinalize: tea.Bool(false), + IsError: tea.Bool(false), + } + + switch cardSend.ConversationType { + case constants.ConversationTypeGroup: + request.SetOpenConversationId(cardSend.ConversationId) + case constants.ConversationTypeSingle: + receiver, err := json.Marshal(map[string]string{"userId": cardSend.SenderStaffId}) + if err != nil { + return nil, fmt.Errorf("数据整理失败: %w", err) + } + request.SetSingleChatReceiver(string(receiver)) + default: + return nil, errors.New("未知的聊天场景") + } + + return request, nil +} + +// processContentChannel 处理通道内容(异步更新) +func (s *SendCardClient) processContentChannel(ctx context.Context, cardSend *CardSend, cardInstanceId string, client *dingtalkim_1_0.Client) { + defer func() { + if r := recover(); r != nil { + s.logger.Error("panic in processContentChannel") + } + }() + + ticker := time.NewTicker(cardSend.UpdateInterval) + defer ticker.Stop() + heartbeatTicker := time.NewTicker(time.Duration(HeardBeatX) * DefaultInterval) + defer heartbeatTicker.Stop() + + var ( + contentBuilder strings.Builder + lastUpdate time.Time + ) + for { + + select { + case content, ok := <-cardSend.ContentChannel: + if !ok { + // 通道关闭,发送最终内容 + if contentBuilder.Len() > 0 { + if err := s.updateCardContent(ctx, cardSend, cardInstanceId, contentBuilder.String(), client); err != nil { + s.logger.Errorf("更新卡片失败1:%s", err.Error()) + } + } + return + } + contentBuilder.WriteString(content) + if contentBuilder.Len() > 0 { + if err := s.updateCardContent(ctx, cardSend, cardInstanceId, contentBuilder.String(), client); err != nil { + s.logger.Errorf("更新卡片失败2:%s", err.Error()) + } + } + lastUpdate = time.Now() + + case <-heartbeatTicker.C: + if time.Now().Unix()-lastUpdate.Unix() >= HeardBeatX { + return + } + + case <-ctx.Done(): + s.logger.Info("context canceled, stop channel processing") + return + } + } + +} + +// processContentSlice 处理切片内容(同步更新) +func (s *SendCardClient) processContentSlice(ctx context.Context, cardSend *CardSend, cardInstanceId string, client *dingtalkim_1_0.Client) error { + var contentBuilder strings.Builder + for _, content := range cardSend.ContentSlice { + contentBuilder.WriteString(content) + err := s.updateCardRequest(ctx, &UpdateCardRequest{ + Template: string(cardSend.Template), + Content: contentBuilder.String(), + Client: client, + RobotCode: cardSend.RobotCode, + CardInstanceId: cardInstanceId, + }) + if err != nil { + return fmt.Errorf("更新卡片失败: %w", err) + } + time.Sleep(cardSend.UpdateInterval) // 控制更新频率 + } + return nil +} + +// updateCardContent 封装卡片更新逻辑 +func (s *SendCardClient) updateCardContent(ctx context.Context, cardSend *CardSend, cardInstanceId, content string, client *dingtalkim_1_0.Client) error { + err := s.updateCardRequest(ctx, &UpdateCardRequest{ + Template: string(cardSend.Template), + Content: content, + Client: client, + RobotCode: cardSend.RobotCode, + CardInstanceId: cardInstanceId, + }) + + return err +} + +func (s *SendCardClient) updateCardRequest(ctx context.Context, updateCardRequest *UpdateCardRequest) error { + + updateRequest := &dingtalkim_1_0.UpdateRobotInteractiveCardRequest{ + CardBizId: tea.String(updateCardRequest.CardInstanceId), + CardData: tea.String(fmt.Sprintf(updateCardRequest.Template, updateCardRequest.Content)), + } + _, err := s.UpdateInteractiveCard(ctx, updateRequest, updateCardRequest.RobotCode, updateCardRequest.Client) + return err +} + +// UpdateInteractiveCard 更新交互卡片(封装错误处理) +func (s *SendCardClient) UpdateInteractiveCard(ctx context.Context, request *dingtalkim_1_0.UpdateRobotInteractiveCardRequest, robotCode string, client *dingtalkim_1_0.Client) (*dingtalkim_1_0.UpdateRobotInteractiveCardResponse, error) { + authInfo, err := s.Auth.GetTokenFromBotOption(ctx, WithBot(s.botOption)) + if err != nil { + return nil, fmt.Errorf("get token failed: %w", err) + } + + headers := &dingtalkim_1_0.UpdateRobotInteractiveCardHeaders{ + XAcsDingtalkAccessToken: tea.String(authInfo.AccessToken), + } + + response, err := client.UpdateRobotInteractiveCardWithOptions(request, headers, &util.RuntimeOptions{}) + if err != nil { + return nil, fmt.Errorf("API call failed: %w,request:%v", err, request.String()) + } + return response, nil +} + +// SendInteractiveCard 发送交互卡片(封装错误处理) +func (s *SendCardClient) SendInteractiveCard(ctx context.Context, request *dingtalkim_1_0.SendRobotInteractiveCardRequest, robotCode string, client *dingtalkim_1_0.Client) (res *dingtalkim_1_0.SendRobotInteractiveCardResponse, err error) { + err = s.Auth.GetBotConfigFromModel(s.botOption) + if err != nil { + return nil, fmt.Errorf("初始化bot失败: %w", err) + } + authInfo, err := s.Auth.GetTokenFromBotOption(ctx, WithBot(s.botOption)) + if err != nil { + return nil, fmt.Errorf("get token failed: %w", err) + } + + headers := &dingtalkim_1_0.SendRobotInteractiveCardHeaders{ + XAcsDingtalkAccessToken: tea.String(authInfo.AccessToken), + } + + response, err := client.SendRobotInteractiveCardWithOptions(request, headers, &util.RuntimeOptions{}) + if err != nil { + return nil, fmt.Errorf("API call failed: %w", err) + } + return response, nil +} diff --git a/internal/biz/handle/dingtalk/types.go b/internal/biz/handle/dingtalk/types.go index a36ea76..5baa770 100644 --- a/internal/biz/handle/dingtalk/types.go +++ b/internal/biz/handle/dingtalk/types.go @@ -1,6 +1,11 @@ package dingtalk -import "time" +import ( + "ai_scheduler/internal/data/constants" + "time" + + dingtalkim_1_0 "github.com/alibabacloud-go/dingtalk/im_1_0" +) type DingTalkAuthIRes struct { AccessToken string `json:"accessToken"` @@ -78,7 +83,28 @@ type DeptResResult struct { } type AuthInfo struct { - ClientId string `json:"clientId"` - ClientSecret string `json:"clientSecret"` - AccessToken string `json:"accessToken"` + ClientId string `json:"clientId"` + ClientSecret string `json:"clientSecret"` + AccessToken string `json:"accessToken"` + Expire time.Duration `json:"expireIn"` +} + +type CardSend struct { + RobotCode string + ConversationType constants.ConversationType + ConversationId string + Template constants.CardTemp + SenderStaffId string + Title string + ContentSlice []string + ContentChannel chan string + UpdateInterval time.Duration // 控制通道更新的频率 +} + +type UpdateCardRequest struct { + Template string + Content string + Client *dingtalkim_1_0.Client + RobotCode string + CardInstanceId string } diff --git a/internal/data/constants/bot.go b/internal/data/constants/bot.go index 78a46f1..d0ca85c 100644 --- a/internal/data/constants/bot.go +++ b/internal/data/constants/bot.go @@ -34,6 +34,8 @@ const ( const DingTalkAuthBaseKeyPrefix = "dingTalk_auth" +const DingTalkAuthBaseKeyBotPrefix = "dingTalk_auth_bot" + // PermissionType 工具使用权限 type PermissionType int32 diff --git a/internal/data/constants/dingtalk.go b/internal/data/constants/dingtalk.go index c6d55b0..fbbc7b8 100644 --- a/internal/data/constants/dingtalk.go +++ b/internal/data/constants/dingtalk.go @@ -49,3 +49,32 @@ type BotMsgType string const ( BotMsgTypeText BotMsgType = "text" ) + +type CardTemp string + +const ( + CardTempDefault CardTemp = `{ + "config": { + "autoLayout": true, + "enableForward": true + }, + "header": { + "title": { + "type": "text", + "text": "${title}", + }, + "logo": "@lALPDfJ6V_FPDmvNAfTNAfQ" + }, + "contents": [ + { + "type": "divider", + "id": "divider_1765952728523" + }, + { + "type": "markdown", + "text": "%s", + "id": "markdown_1765970168635" + } + ] +}` +) diff --git a/internal/data/impl/bot_group.go b/internal/data/impl/bot_group.go index 4382d82..e0593c4 100644 --- a/internal/data/impl/bot_group.go +++ b/internal/data/impl/bot_group.go @@ -17,9 +17,9 @@ func NewBotGroupImpl(db *utils.Db) *BotGroupImpl { } } -func (k BotGroupImpl) GetByConversationId(staffId string) (*model.AiBotGroup, error) { +func (k BotGroupImpl) GetByConversationIdAndRobotCode(staffId string, robotCode string) (*model.AiBotGroup, error) { var data model.AiBotGroup - err := k.Db.Model(k.Model).Where("conversation_id = ?", staffId).Find(&data).Error + err := k.Db.Model(k.Model).Where("conversation_id = ? and robot_code = ?", staffId, robotCode).Find(&data).Error if data.GroupID == 0 { err = sql.ErrNoRows } diff --git a/internal/data/model/ai_bot_config.gen.go b/internal/data/model/ai_bot_config.gen.go index 6885e81..e6142f7 100644 --- a/internal/data/model/ai_bot_config.gen.go +++ b/internal/data/model/ai_bot_config.gen.go @@ -13,11 +13,11 @@ const TableNameAiBotConfig = "ai_bot_config" // AiBotConfig mapped from table type AiBotConfig struct { BotID int32 `gorm:"column:bot_id;primaryKey;autoIncrement:true" json:"bot_id"` - SysID int32 `gorm:"column:sys_id;not null" json:"sys_id"` BotType int32 `gorm:"column:bot_type;not null;default:1;comment:类型,1为钉钉机器人" json:"bot_type"` // 类型,1为钉钉机器人 - BotName string `gorm:"column:bot_name;not null;comment:名字" json:"bot_name"` // 名字 - BotConfig string `gorm:"column:bot_config;not null;comment:配置" json:"bot_config"` // 配置 - BotIndex string `gorm:"column:bot_index;not null;comment:索引" json:"bot_index"` // 索引 + SysPrompt string `gorm:"column:sys_prompt" json:"sys_prompt"` + BotName string `gorm:"column:bot_name;not null;comment:名字" json:"bot_name"` // 名字 + BotConfig string `gorm:"column:bot_config;not null;comment:配置" json:"bot_config"` // 配置 + RobotCode string `gorm:"column:robot_code;not null;comment:索引" json:"robot_code"` // 索引 CreateAt time.Time `gorm:"column:create_at;default:CURRENT_TIMESTAMP" json:"create_at"` UpdatedAt time.Time `gorm:"column:updated_at;default:CURRENT_TIMESTAMP" json:"updated_at"` Status int32 `gorm:"column:status;not null" json:"status"` diff --git a/internal/data/model/ai_bot_group.gen.go b/internal/data/model/ai_bot_group.gen.go index d0ff93a..80c50d1 100644 --- a/internal/data/model/ai_bot_group.gen.go +++ b/internal/data/model/ai_bot_group.gen.go @@ -14,6 +14,7 @@ const TableNameAiBotGroup = "ai_bot_group" type AiBotGroup struct { GroupID int32 `gorm:"column:group_id;primaryKey;autoIncrement:true" json:"group_id"` ConversationID string `gorm:"column:conversation_id;not null;comment:会话ID" json:"conversation_id"` // 会话ID + RobotCode string `gorm:"column:robot_code;not null;comment:绑定机器人code" json:"robot_code"` // 绑定机器人code Title string `gorm:"column:title;not null;comment:群名称" json:"title"` // 群名称 ToolList string `gorm:"column:tool_list;not null;comment:开通工具列表" json:"tool_list"` // 开通工具列表 Status int32 `gorm:"column:status;not null;default:1" json:"status"` diff --git a/internal/entitys/bot.go b/internal/entitys/bot.go index 46661ab..ed0902c 100644 --- a/internal/entitys/bot.go +++ b/internal/entitys/bot.go @@ -3,7 +3,7 @@ package entitys import ( "ai_scheduler/internal/data/model" - "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" + "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot" ) type RequireDataDingTalkBot struct { diff --git a/internal/pkg/func.go b/internal/pkg/func.go index 57321bd..32c404b 100644 --- a/internal/pkg/func.go +++ b/internal/pkg/func.go @@ -133,3 +133,35 @@ func SliceIntToString(slice []int) []string { } return strSlice } + +// SafeReplace 替换字符串中的 %s,并自动转义特殊字符(如 ") +/** + * SafeReplace 函数用于安全地替换模板字符串中的占位符 + * @param template 原始模板字符串 + * @param replaceTag 要被替换的占位符(如 "%s") + * @param replacements 可变参数,用于替换占位符的字符串 + * @return 返回替换后的字符串和可能的错误 + */ +func SafeReplace(template string, replaceTag string, replacements ...string) (string, error) { + // 如果没有提供替换参数,直接返回原始模板 + if len(replacements) == 0 { + return template, nil + } + + // 检查模板中 %s 的数量是否匹配替换参数 + expectedReplacements := strings.Count(template, replaceTag) + if expectedReplacements != len(replacements) { + return "", fmt.Errorf("模板需要 %d 个替换参数,但提供了 %d 个", expectedReplacements, len(replacements)) + } + + // 逐个替换 %s,并转义特殊字符 + for _, rep := range replacements { + // 转义特殊字符(如 ", \, \n 等) + escaped := strconv.Quote(rep) + // 去掉 strconv.Quote 添加的额外引号 + escaped = escaped[1 : len(escaped)-1] + template = strings.Replace(template, replaceTag, escaped, 1) + } + + return template, nil +} diff --git a/internal/server/ding_talk_bot.go b/internal/server/ding_talk_bot.go index f68f543..2eb31c6 100644 --- a/internal/server/ding_talk_bot.go +++ b/internal/server/ding_talk_bot.go @@ -4,10 +4,12 @@ import ( "ai_scheduler/internal/entitys" "ai_scheduler/internal/services" "context" + "fmt" + "sync" + "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot" + "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/client" "github.com/go-kratos/kratos/v2/log" - "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" - "github.com/open-dingtalk/dingtalk-stream-sdk-go/client" ) type DingBotServiceInterface interface { @@ -54,18 +56,48 @@ func ProvideAllDingBotServices( } func (d *DingTalkBotServer) Run(ctx context.Context, botIndex string) { - for name, cli := range d.Clients { - if botIndex != "All" { - if name != botIndex { - continue + if botIndex == "" { + log.Info("未指定机器人索引,跳过启动") + return + } + + var targets []string + switch { + case botIndex == "All": + targets = make([]string, 0, len(d.Clients)) + for name := range d.Clients { + targets = append(targets, name) + } + default: + if _, exists := d.Clients[botIndex]; exists { + targets = []string{botIndex} + } else { + log.Infof("未找到索引为 %s 的机器人", botIndex) + return + } + } + + var wg sync.WaitGroup + errors := make([]error, 0, len(targets)) + + for _, name := range targets { + wg.Add(1) + go func(name string) { + defer wg.Done() + err := d.Clients[name].Start(ctx) + if err != nil { + log.Errorf("%s 启动失败: %v", name, err) + errors = append(errors, fmt.Errorf("%s: %w", name, err)) + } else { + log.Infof("%s 启动成功", name) } - } - err := cli.Start(ctx) - if err != nil { - log.Infof("%s启动失败", name) - continue - } - log.Infof("%s启动成功", name) + }(name) + } + + wg.Wait() + if len(errors) > 0 { + log.Errorf("部分机器人启动失败,总数: %d, 成功: %d, 失败: %d", + len(targets), len(targets)-len(errors), len(errors)) } } func DingBotServerInit(clientId string, clientSecret string, service DingBotServiceInterface) (cli *client.StreamClient) { diff --git a/internal/services/dtalk_bot.go b/internal/services/dtalk_bot.go index 5c9c92b..5b9a8e6 100644 --- a/internal/services/dtalk_bot.go +++ b/internal/services/dtalk_bot.go @@ -2,15 +2,15 @@ package services import ( "ai_scheduler/internal/biz" - "log" - "time" - "ai_scheduler/internal/config" - "ai_scheduler/internal/entitys" "context" + "log" + "sync" + "time" - "github.com/open-dingtalk/dingtalk-stream-sdk-go/chatbot" + "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot" + "golang.org/x/sync/errgroup" ) type DingBotService struct { @@ -18,65 +18,115 @@ type DingBotService struct { dingTalkBotBiz *biz.DingTalkBotBiz } -func NewDingBotService(config *config.Config, DingTalkBotBiz *biz.DingTalkBotBiz) *DingBotService { - return &DingBotService{config: config, dingTalkBotBiz: DingTalkBotBiz} +func NewDingBotService(config *config.Config, dingTalkBotBiz *biz.DingTalkBotBiz) *DingBotService { + return &DingBotService{ + config: config, + dingTalkBotBiz: dingTalkBotBiz, + } } func (d *DingBotService) GetServiceCfg() ([]entitys.DingTalkBot, error) { return d.dingTalkBotBiz.GetDingTalkBotCfgList() } -func (d *DingBotService) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) (content []byte, err error) { - var ( - lastErr error - chat []string - ) +func (d *DingBotService) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { requireData, err := d.dingTalkBotBiz.InitRequire(ctx, data) if err != nil { - return + return nil, err } - // 使用 ctx.Done() 通知 Do 方法提前终止 - subCtx, cancel := context.WithCancel(ctx) - defer func() { - cancel() - _ = d.dingTalkBotBiz.SaveHis(ctx, requireData, chat) - }() - // 异步执行 Do 方法 - done := make(chan error, 1) + // 启动后台任务(独立生命周期,带超时控制) go func() { - done <- d.dingTalkBotBiz.Do(subCtx, requireData) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + if err := d.runBackgroundTasks(ctx, data, requireData); err != nil { + log.Printf("后台任务执行失败: %v", err) + } }() - for { - select { - case <-ctx.Done(): - lastErr = ctx.Err() - goto cleanup - case resp, ok := <-requireData.Ch: - if !ok { - return []byte("success"), nil - } - if resp.Type == entitys.ResponseLog { - continue - } - if resp.Type == entitys.ResponseText || resp.Type == entitys.ResponseStream || resp.Type == entitys.ResponseJson { - chat = append(chat, resp.Content) - } - if err := d.dingTalkBotBiz.HandleRes(ctx, data, resp); err != nil { - log.Printf("HandleRes 失败: %v", err) + return []byte("success"), nil +} + +func (d *DingBotService) runBackgroundTasks(ctx context.Context, data *chatbot.BotCallbackDataModel, requireData *entitys.RequireDataDingTalkBot) error { + g, ctx := errgroup.WithContext(ctx) + var ( + chat []string + chatMu sync.Mutex + resChan = make(chan string, 10) + ) + + // 1. 流式处理协程 + g.Go(func() error { + defer func() { + // 确保通道最终关闭 + close(resChan) + }() + return d.dingTalkBotBiz.HandleStreamRes(ctx, data, resChan) + }) + + // 2. 业务处理协程(负责关闭requireData.Ch) + g.Go(func() error { + // 在完成时关闭通道 + defer close(requireData.Ch) + return d.dingTalkBotBiz.Do(ctx, requireData) + }) + + // 3. 结果收集协程(修改后的版本) + resultDone := make(chan struct{}) + g.Go(func() error { + // 使用defer确保通道关闭 + defer close(resultDone) + + // 处理通道中的数据 + for { + select { + case resp, ok := <-requireData.Ch: + if !ok { + return nil // 通道已关闭,正常退出 + } + if resp.Type != entitys.ResponseLog { + chatMu.Lock() + chat = append(chat, resp.Content) + chatMu.Unlock() + + select { + case resChan <- resp.Content: + case <-ctx.Done(): + return ctx.Err() + } + } + case <-ctx.Done(): + return ctx.Err() // 上下文取消,提前退出 } } - } -cleanup: - select { - case _err := <-done: - if _err != nil { - panic(_err) + }) + + // 4. 统一关闭通道的协程(只关闭resChan) + g.Go(func() error { + <-resultDone + // resChan已在流式处理协程关闭 + return nil + }) + + // 5. 历史记录保存协程 + g.Go(func() error { + <-resultDone + chatMu.Lock() + savedChat := make([]string, len(chat)) + copy(savedChat, chat) + chatMu.Unlock() + + if err := d.dingTalkBotBiz.SaveHis(ctx, requireData, savedChat); err != nil { + log.Printf("保存历史记录失败: %v", err) + return err } - case <-time.After(1 * time.Second): - log.Println("警告:等待 Do 方法超时,可能发生 goroutine 泄漏") + return nil + }) + + // 阻塞直到所有协程完成或出错 + if err := g.Wait(); err != nil { + return err } - return nil, lastErr + return nil } diff --git a/internal/services/dtalk_bot.go.bak b/internal/services/dtalk_bot.go.bak new file mode 100644 index 0000000..75c2c7f --- /dev/null +++ b/internal/services/dtalk_bot.go.bak @@ -0,0 +1,130 @@ +package services + +import ( + "ai_scheduler/internal/biz" + "log" + "sync" + "time" + + "ai_scheduler/internal/config" + "ai_scheduler/internal/entitys" + "context" + + "gitea.cdlsxd.cn/self-tools/l-dingtalk-stream-sdk-go/chatbot" +) + +type DingBotService struct { + config *config.Config + dingTalkBotBiz *biz.DingTalkBotBiz +} + +func NewDingBotService(config *config.Config, DingTalkBotBiz *biz.DingTalkBotBiz) *DingBotService { + return &DingBotService{config: config, dingTalkBotBiz: DingTalkBotBiz} +} + +func (d *DingBotService) GetServiceCfg() ([]entitys.DingTalkBot, error) { + return d.dingTalkBotBiz.GetDingTalkBotCfgList() +} + +func (d *DingBotService) OnChatBotMessageReceived(ctx context.Context, data *chatbot.BotCallbackDataModel) ([]byte, error) { + var ( + lastErr error + chat []string + streamWG sync.WaitGroup + resChan = make(chan string, 100) // 缓冲通道防止阻塞 + ) + + // 初始化请求 + requireData, err := d.dingTalkBotBiz.InitRequire(ctx, data) + if err != nil { + return nil, err + } + + // 创建子上下文用于控制goroutine生命周期 + subCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // 启动流式处理goroutine + streamWG.Add(1) + go func() { + defer streamWG.Done() + err = d.dingTalkBotBiz.HandleStreamRes(subCtx, data, resChan) + if err != nil { + return + } + }() + + // 启动业务处理goroutine + done := make(chan error, 1) + go func() { + done <- d.dingTalkBotBiz.Do(subCtx, requireData) + }() + + // 主处理循环 + for { + select { + case <-ctx.Done(): + lastErr = ctx.Err() + goto cleanup + + case resp, ok := <-requireData.Ch: + if !ok { + goto cleanup + } + + // 处理不同类型响应 + switch resp.Type { + case entitys.ResponseLog: + // 忽略日志类型 + continue + + //case entitys.ResponseText, entitys.ResponseJson: + // chat = append(chat, resp.Content) + // if err := d.dingTalkBotBiz.ReplyText(ctx, data.SessionWebhook, resp.Content); err != nil { + // log.Printf("处理非流响应失败: %v", err) + // lastErr = err + // } + + default: + chat = append(chat, resp.Content) + select { + case resChan <- resp.Content: + case <-ctx.Done(): + lastErr = ctx.Err() + goto cleanup + } + } + } + } + +cleanup: + streamWG.Wait() + // 关闭流式通道 + close(resChan) + + // 保存历史记录 + if saveErr := d.dingTalkBotBiz.SaveHis(ctx, requireData, chat); saveErr != nil { + log.Printf("保存历史记录失败: %v", saveErr) + if lastErr == nil { + lastErr = saveErr + } + } + + // 等待业务处理完成(带超时) + select { + case err := <-done: + if err != nil { + log.Printf("业务处理失败: %v", err) + if lastErr == nil { + lastErr = err + } + } + case <-time.After(3 * time.Second): // 增加超时时间 + log.Println("警告:等待业务处理超时,可能发生goroutine泄漏") + } + + if lastErr != nil { + return nil, lastErr + } + return []byte("success"), nil +}