ai_scheduler/internal/biz/do/handle.go

853 lines
26 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package do
import (
"ai_scheduler/internal/biz/llm_service"
"ai_scheduler/internal/config"
"ai_scheduler/internal/data/constants"
errorcode "ai_scheduler/internal/data/error"
errors "ai_scheduler/internal/data/error"
"ai_scheduler/internal/data/impl"
"ai_scheduler/internal/data/model"
"ai_scheduler/internal/domain/tools/common/knowledge_base"
"ai_scheduler/internal/domain/workflow/runtime"
"ai_scheduler/internal/entitys"
"ai_scheduler/internal/gateway"
"ai_scheduler/internal/pkg/dingtalk"
"ai_scheduler/internal/pkg/l_request"
"ai_scheduler/internal/pkg/mapstructure"
"ai_scheduler/internal/pkg/rec_extra"
"ai_scheduler/internal/pkg/util"
"ai_scheduler/internal/tools"
"bufio"
errorsSpecial "errors"
"io"
"net/http"
"time"
"context"
"encoding/json"
"fmt"
"strings"
"github.com/coze-dev/coze-go"
"github.com/gofiber/fiber/v2/log"
"github.com/ollama/ollama/api"
"gorm.io/gorm/utils"
)
type Handle struct {
Ollama *llm_service.OllamaService
toolManager *tools.Manager
conf *config.Config
sessionImpl *impl.SessionImpl
workflowManager *runtime.Registry
dingtalkOldClient *dingtalk.OldClient
dingtalkContactClient *dingtalk.ContactClient
dingtalkNotableClient *dingtalk.NotableClient
}
func NewHandle(
Ollama *llm_service.OllamaService,
toolManager *tools.Manager,
conf *config.Config,
sessionImpl *impl.SessionImpl,
workflowManager *runtime.Registry,
dingtalkOldClient *dingtalk.OldClient,
dingtalkContactClient *dingtalk.ContactClient,
dingtalkNotableClient *dingtalk.NotableClient,
) *Handle {
return &Handle{
Ollama: Ollama,
toolManager: toolManager,
conf: conf,
sessionImpl: sessionImpl,
workflowManager: workflowManager,
dingtalkOldClient: dingtalkOldClient,
dingtalkContactClient: dingtalkContactClient,
dingtalkNotableClient: dingtalkNotableClient,
}
}
func (r *Handle) Recognize(ctx context.Context, rec *entitys.Recognize, promptProcessor PromptOption) (err error) {
entitys.ResLog(rec.Ch, "recognize_start", "准备意图识别")
prompt, err := promptProcessor.CreatePrompt(ctx, rec)
//意图识别
recognizeMsg, err := r.Ollama.IntentRecognize(ctx, &entitys.ToolSelect{
Prompt: prompt,
Tools: rec.Tasks,
})
if err != nil {
return
}
entitys.ResLog(rec.Ch, "recognize", recognizeMsg)
entitys.ResLog(rec.Ch, "recognize_end", "意图识别结束")
var match entitys.Match
if err = json.Unmarshal([]byte(recognizeMsg), &match); err != nil {
err = errors.SysErrf("数据结构错误:%v", err.Error())
return
}
rec.Match = &match
// 意图输入到日志
log.Infof("recognize: %s", recognizeMsg)
return
}
// RewriteQuery 改写查询词,支持多轮对话
func (r *Handle) RewriteQuery(ctx context.Context, history []model.AiBotChatHi, currentQuery string) (string, error) {
if len(history) == 0 {
return currentQuery, nil
}
histStr := strings.Builder{}
for _, h := range history {
if h.Role == "user" {
histStr.WriteString(fmt.Sprintf("%s:%s\n", h.CreateAt, h.Content))
}
}
systemPrompt := `你是一个搜索查询改写专家。请结合用户的历史对话上下文,将用户当前的输入改写为一个独立的、语义完整的、适合知识库检索的中文查询词。
要求:
1. 当前输入最能反映用户的意图,权重按照时间逆序依次减弱,改写后的查询词应与当前输入的语义相关。
2. 保持原意,补全指代(如“它”、“刚才那个问题”)。
3. 只返回改写后的查询词,不要有任何解释。
4. 如果当前输入已经很完整,直接返回原句。`
userPrompt := fmt.Sprintf("### 历史对话:\n%s\n### 当前输入:\n%s\n### 改写后的查询词:", histStr.String(), currentQuery)
messages := []api.Message{
{Role: "system", Content: systemPrompt},
{Role: "user", Content: userPrompt},
}
return r.Ollama.Chat(ctx, messages)
}
type IssueClassification struct {
SysName string `json:"sys_name"`
IssueTypeName string `json:"issue_type_name"`
Summary string `json:"summary"`
Reason string `json:"reason"`
}
// ClassifyIssueSys 问题系统分析
func (r *Handle) ClassifyIssueSystem(ctx context.Context, systems []string, userInput string, userHist []model.AiBotChatHi) (*IssueClassification, error) {
systemPrompt := fmt.Sprintf(`## 角色
你是一个系统类型判定专家。你的唯一任务是基于多轮对话识别用户当前讨论的系统sys_name。不需要输出问题类型。输出必须严格遵守 JSON 格式。
## 推理规则
1. 系统判定逻辑:
- 当前输入明确提到系统 → 直接覆盖历史系统
- 当前输入未提系统,但历史对话有 → 继承最近历史系统
- 当前输入和历史均未出现 → "全局"
- 询问公司、企业、制度层面的问题 → "全局"
2. 特殊规则:
- 如果当前输入仅包含系统名称如“CRM”视为系统上下文补充仅更新 sys_name不做其他推断
## 背景数据
可用系统列表:[%s]
## 输出格式
{
"sys_name": "系统名称",
"reason": "说明系统来源:当前输入 / 历史继承 / 默认"
}
`, strings.Join(systems, ", "))
historyStr := strings.Builder{}
historyStr.WriteString("### 历史对话:\n")
for _, h := range userHist {
if h.Role == "user" {
historyStr.WriteString(fmt.Sprintf("%s:%s\n", h.CreateAt, h.Content))
}
}
messages := []api.Message{
{Role: "system", Content: systemPrompt},
{Role: "assistant", Content: historyStr.String()},
{Role: "user", Content: userInput},
}
resp, err := r.Ollama.Chat(ctx, messages)
if err != nil {
return nil, err
}
// 尝试清理 JSON 内容(有时模型会返回 markdown 块)
resp = strings.TrimPrefix(resp, "```json")
resp = strings.TrimSuffix(resp, "```")
resp = strings.TrimSpace(resp)
var result IssueClassification
if err := json.Unmarshal([]byte(resp), &result); err != nil {
return nil, fmt.Errorf("解析分类结果失败: %w, 原文: %s", err, resp)
}
return &result, nil
}
// ClassifyIssueType 问题分类分析
func (r *Handle) ClassifyIssueType(ctx context.Context, issueTypes []string, systems []string, userInput string, userHist []model.AiBotChatHi) (*IssueClassification, error) {
systemPrompt := fmt.Sprintf(`## 角色
你是一个业务问题类型分析专家。你的任务是基于多轮对话识别用户讨论的**问题类型issue_type_name**,问题类型必须严格来自“背景数据-可用问题类型列表”。
你不负责系统名称判断。输出必须严格遵守 JSON 格式。
## 推理规则
1. 构建完整问题意图
- 将当前输入与历史对话合并理解为完整问题演进
- 当前输入可能是补充条件、追问、修正或只给模块名/报错片段
- 不要只看当前一句
- 忽略历史中的系统名称相关
2. 问题类型判定逻辑
- 当前输入明确匹配列表中某个类型 → 使用该类型
- 当前输入未明确,但历史已有 → 继承历史类型
- 当前输入未匹配,历史也没有 → 选择最接近的列表类型(尽量匹配意图)
- 除非是闲聊(如“你好”“在吗”),禁止返回空值
- 除非明确是需求,否则禁止返回“开发需求”类型,疑问句式一定不能返回“开发需求”类型
3. 特殊规则
- 当前输入只包含系统名/模块名/参数名 → 视为问题补充,继承历史 issue_type_name
- 输出必须严格匹配列表中的类型,不允许生成列表外的自造类型
## 背景数据
可用问题类型列表:[%s]
系统名称列表参考:[%s]
## 输出格式
{
"issue_type_name": "问题类型名称",
"summary": "15字内问题标题",
"reason": "说明问题类型是基于哪句话判断,或说明继承自历史,继承自哪条历史"
}`, strings.Join(issueTypes, ", "), strings.Join(systems, ", "))
historyStr := strings.Builder{}
historyStr.WriteString("### 历史对话:\n")
for _, h := range userHist {
if h.Role == "user" {
historyStr.WriteString(fmt.Sprintf("%s:%s\n", h.CreateAt, h.Content))
}
}
messages := []api.Message{
{Role: "system", Content: systemPrompt},
{Role: "assistant", Content: historyStr.String()},
{Role: "user", Content: userInput},
}
resp, err := r.Ollama.Chat(ctx, messages)
if err != nil {
return nil, err
}
// 尝试清理 JSON 内容(有时模型会返回 markdown 块)
resp = strings.TrimPrefix(resp, "```json")
resp = strings.TrimSuffix(resp, "```")
resp = strings.TrimSpace(resp)
var result IssueClassification
if err := json.Unmarshal([]byte(resp), &result); err != nil {
return nil, fmt.Errorf("解析分类结果失败: %w, 原文: %s", err, resp)
}
return &result, nil
}
type IsAnswerRelevant struct {
Relevance string `json:"relevance"`
Reason string `json:"reason"`
}
// 判断答案是否回答了问题
func (r *Handle) IsAnswerRelevant(ctx context.Context, question string, answer string) (bool, error) {
prompt := `## 角色
你是一个答案评估专家,你的任务是判断给定的答案是否真正回答了用户的问题。你必须严格分析语义、意图和信息覆盖情况,避免只看关键词。
## 输入
- question: %s
- answer: %s
## 判断逻辑
1. **直接回答**:答案明确提供了解决方案、步骤、结论或可执行信息 → 输出 True
2. **未回答**:答案仅泛泛提示、缺少关键步骤或信息,或者只是提供背景、登录信息等无关内容 → 输出 False
3. **部分回答**:答案提供了一部分可用信息,但未完全解决问题 → 输出 “Partial”
## 输出要求
输出严格 JSON 格式,只包含以下字段:
{
"relevance": "True / False / Partial",
"reason": "简要说明为什么答案被认为回答或未回答问题"
}`
resp, err := r.Ollama.Generation(ctx, fmt.Sprintf(prompt, question, answer))
if err != nil {
return false, err
}
// 尝试清理 JSON 内容(有时模型会返回 markdown 块)
resp = strings.TrimPrefix(resp, "```json")
resp = strings.TrimSuffix(resp, "```")
resp = strings.TrimSpace(resp)
var result IsAnswerRelevant
if err := json.Unmarshal([]byte(resp), &result); err != nil {
return false, fmt.Errorf("解析分类结果失败: %w, 原文: %s", err, resp)
}
log.Debug("分析结果:%s原因%s", result.Relevance, result.Reason)
if result.Relevance == "True" {
return true, nil
}
return false, nil
}
func (r *Handle) handleOtherTask(ctx context.Context, requireData *entitys.RequireData) (err error) {
entitys.ResText(requireData.Ch, "", requireData.Match.Reasoning)
return
}
func (r *Handle) HandleMatch(ctx context.Context, client *gateway.Client, rec *entitys.Recognize, requireData *entitys.RequireData) (err error) {
if !rec.Match.IsMatch {
if len(rec.Match.Chat) != 0 {
entitys.ResText(rec.Ch, "", rec.Match.Chat)
} else {
entitys.ResText(rec.Ch, "", rec.Match.Reasoning)
}
return
}
var pointTask *model.AiTask
for _, task := range requireData.Tasks {
if task.Index == rec.Match.Index {
pointTask = &task
break
}
}
if pointTask == nil || pointTask.Index == "other" {
return r.OtherTask(ctx, rec)
}
// 校验用户权限
if err = r.PermissionAuth(client, pointTask); err != nil {
log.Errorf("权限验证失败: %s", err.Error())
return
}
switch constants.TaskType(pointTask.Type) {
case constants.TaskTypeApi:
return r.handleApiTask(ctx, rec, pointTask)
case constants.TaskTypeKnowle:
return r.handleKnowleV2(ctx, rec, pointTask)
case constants.TaskTypeFunc:
return r.handleTask(ctx, rec, pointTask)
case constants.TaskTypeBot:
return r.HandleBot(ctx, rec, &entitys.Task{
Index: pointTask.Index,
})
case constants.TaskTypeEinoWorkflow:
return r.handleEinoWorkflow(ctx, rec, pointTask)
case constants.TaskTypeCozeWorkflow:
return r.handleCozeWorkflow(ctx, rec, pointTask)
default:
return r.handleOtherTask(ctx, requireData)
}
}
func (r *Handle) OtherTask(ctx context.Context, requireData *entitys.Recognize) (err error) {
entitys.ResText(requireData.Ch, "", requireData.Match.Reasoning)
return
}
func (r *Handle) handleTask(ctx context.Context, rec *entitys.Recognize, task *model.AiTask) (err error) {
var configData entitys.ConfigDataTool
err = json.Unmarshal([]byte(task.Config), &configData)
if err != nil {
return
}
err = r.toolManager.ExecuteTool(ctx, configData.Tool, rec)
if err != nil {
return
}
return
}
// 知识库
// func (r *Handle) handleKnowle(ctx context.Context, rec *entitys.Recognize, task *model.AiTask) (err error) {
// var (
// configData entitys.ConfigDataTool
// sessionIdKnowledge string
// query string
// host string
// )
// err = json.Unmarshal([]byte(task.Config), &configData)
// if err != nil {
// return
// }
// ext, err := rec_extra.GetTaskRecExt(rec)
// if err != nil {
// return
// }
// // 通过session 找到知识库session
// var has bool
// if len(ext.Session) == 0 {
// return errors.SessionNotFound
// }
// ext.SessionInfo, has, err = r.sessionImpl.FindOne(r.sessionImpl.WithSessionId(ext.Session))
// if err != nil {
// return
// } else if !has {
// return errors.SessionNotFound
// }
// // 找到知识库的host
// {
// tool, exists := r.toolManager.GetTool(configData.Tool)
// if !exists {
// return fmt.Errorf("tool not found: %s", configData.Tool)
// }
// if knowledgeTool, ok := tool.(*public.KnowledgeBaseTool); !ok {
// return fmt.Errorf("未找到知识库Tool: %s", configData.Tool)
// } else {
// host = knowledgeTool.GetConfig().BaseURL
// }
// }
// // 知识库的session为空请求知识库获取, 并绑定
// if ext.SessionInfo.KnowlegeSessionID == "" {
// // 请求知识库
// if sessionIdKnowledge, err = public.GetKnowledgeBaseSession(host, ext.Sys.KnowlegeBaseID, ext.Sys.KnowlegeTenantKey); err != nil {
// return
// }
// // 绑定知识库session下次可以使用
// ext.SessionInfo.KnowlegeSessionID = sessionIdKnowledge
// if err = r.sessionImpl.Update(&ext.SessionInfo, r.sessionImpl.WithSessionId(ext.SessionInfo.SessionID)); err != nil {
// return
// }
// }
// // 用户输入解析
// var ok bool
// input := make(map[string]string)
// if err = json.Unmarshal([]byte(rec.Match.Parameters), &input); err != nil {
// return
// }
// if query, ok = input["query"]; !ok {
// return fmt.Errorf("query不能为空")
// }
// ext.KnowledgeConf = entitys.KnowledgeBaseRequest{
// Session: ext.SessionInfo.KnowlegeSessionID,
// ApiKey: ext.Sys.KnowlegeTenantKey,
// Query: query,
// }
// rec.Ext = pkg.JsonByteIgonErr(ext)
// // 执行工具
// err = r.toolManager.ExecuteTool(ctx, configData.Tool, rec)
// if err != nil {
// return
// }
// return
// }
// 知识库V2 - lightRAG自建
func (r *Handle) handleKnowleV2(ctx context.Context, rec *entitys.Recognize, task *model.AiTask) (err error) {
// 获取用户session信息
ext, err := rec_extra.GetTaskRecExt(rec)
if err != nil {
return
}
// 获取租户ID 形式为 {biz-user} 比如 "zltx-platform"
tenantID := ext.Sys.KnowlegeTenantKey
// 请求知识库工具
knowledgeBase := knowledge_base.New(r.conf.KnowledgeConfig)
knowledgeResp, err := knowledgeBase.Query(&knowledge_base.QueryRequest{
TenantID: tenantID, // 后续动态接参
Query: rec.UserContent.Text,
Mode: constants.KnowledgeModeMix,
Stream: true,
Think: false,
OnlyRAG: true,
})
if err != nil {
return fmt.Errorf("请求知识库工具失败err: %v", err)
}
// 读取知识库SSE数据
err = r.readKnowledgeSSE(knowledgeResp, rec.Ch, false)
if err != nil {
return
}
return
}
// 读取知识库 SSE 数据
func (r *Handle) readKnowledgeSSE(resp io.ReadCloser, channel chan entitys.Response, useParagraphMode bool) (err error) {
scanner := bufio.NewScanner(resp)
var buffer strings.Builder
var taskIndex string = "knowledgeBase"
for scanner.Scan() {
line := scanner.Text()
delta, done, err := knowledge_base.ParseOpenAIStreamData(line)
if err != nil {
return fmt.Errorf("解析SSE数据失败: %w", err)
}
if done {
break
}
if delta == nil {
continue
}
// 推理内容
if delta.ReasoningContent != "" {
entitys.ResStream(channel, taskIndex, delta.ReasoningContent)
continue
}
// 输出内容 - 段落
if delta.Content != "" && useParagraphMode {
// 存入缓冲区
buffer.WriteString(delta.Content)
content := buffer.String()
// 检查是否有换行符,按段落输出
if idx := strings.LastIndex(content, "\n"); idx != -1 {
// 发送直到最后一个换行符的内容
toSend := content[:idx+1]
entitys.ResStream(channel, taskIndex, toSend)
// 重置缓冲区,保留剩余部分
remaining := content[idx+1:]
buffer.Reset()
buffer.WriteString(remaining)
}
}
// 输出内容 - 逐字
if delta.Content != "" && !useParagraphMode {
entitys.ResStream(channel, taskIndex, delta.Content)
}
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("读取SSE流中断: %w", err)
}
// 发送缓冲区剩余内容(仅在段落模式下需要)
if useParagraphMode && buffer.Len() > 0 {
entitys.ResStream(channel, taskIndex, buffer.String())
}
return nil
}
// bot 临时实现,后续转到 eino 工作流
func (r *Handle) HandleBot(ctx context.Context, rec *entitys.Recognize, task *entitys.Task) (err error) {
if task.Index == "bug_optimization_submit" {
var unionId string
entitys.ResLoading(rec.Ch, task.Index, "需求记录中...\n")
// 获取dingtalk accessToken
accessToken, _ := r.dingtalkOldClient.GetAccessToken()
// Ext 中获取 sessionId
taskExt := rec.GetTaskExt()
if taskExt == nil {
return errorcode.ParamErr("taskExt参数错误")
}
if len(taskExt.Session) > 0 {
// 获取创建者 dingtalk unionId
unionId = r.getUserDingtalkUnionId(ctx, accessToken, taskExt.Session)
} else if len(taskExt.UserName) > 0 {
unionId = r.getUserDingtalkUnionIdWithUserName(ctx, accessToken, taskExt.UserName)
} else {
return errorcode.ParamErr("taskExt参数错误,重要参数缺失")
}
// 附件url
var attachmentUrl string
for _, file := range rec.UserContent.File {
attachmentUrl = file.FileUrl
break
}
req := &dingtalk.InsertRecordReq{
BaseId: r.conf.Dingtalk.TableDemand.BaseId,
SheetIdOrName: r.conf.Dingtalk.TableDemand.SheetIdOrName,
// OperatorId: tool_callback.BotBugOptimizationSubmitAdminUnionId,
OperatorId: unionId,
CreatorUnionId: unionId,
Content: rec.UserContent.Text,
AttachmentUrl: attachmentUrl,
}
recordId, err := r.dingtalkNotableClient.InsertRecord(accessToken, req)
if err != nil {
errCode := r.dingtalkNotableClient.GetHTTPStatus(err)
// 权限不足
if errCode == 403 {
return errorcode.ForbiddenErr("您当前没有AI需求表编辑权限请联系管理员添加权限")
}
return err
}
if recordId == "" {
return errors.NewBusinessErr(422, "创建记录失败,请联系管理员")
}
var detailPage string
entitys.ResLog(rec.Ch, task.Index, "需求记录完成")
switch rec.OutPutScene {
case entitys.OutPutSceneDingTalk:
// 构建跳转链接
detailPage = "[去查看](" + r.conf.Dingtalk.TableDemand.Url + ")"
default:
// 构建跳转链接
detailPage = util.BuildJumpLink(r.conf.Dingtalk.TableDemand.Url, "去查看")
}
entitys.ResText(rec.Ch, task.Index, fmt.Sprintf("需求已记录,正在分配相关人员处理,请您耐心等待处理结果。点击查看工单进度:%s", detailPage))
return nil
}
return errors.NewBusinessErr(422, "bot 任务未实现")
}
// getUserDingtalkUnionId 获取用户的 dingtalk unionId
func (r *Handle) getUserDingtalkUnionId(ctx context.Context, accessToken, sessionID string) (unionId string) {
if len(sessionID) == 0 {
// 查询用户名
return ""
}
session, has, err := r.sessionImpl.FindOne(r.sessionImpl.WithSessionId(sessionID))
if err != nil || !has {
log.Warnf("session not found: %s", sessionID)
return
}
return r.getUserDingtalkUnionIdWithUserName(ctx, accessToken, session.UserName)
}
func (r *Handle) getUserDingtalkUnionIdWithUserName(ctx context.Context, accessToken, userName string) (unionId string) {
// 获取创建者uid 用户名 -> dingtalk uid
creatorId, err := r.dingtalkContactClient.SearchUserOne(dingtalk.AppKey{AccessToken: accessToken}, userName)
if err != nil {
log.Warnf("search dingtalk user one failed: %v", err)
return
}
// 获取用户详情 dingtalk uid -> dingtalk unionId
userDetails, err := r.dingtalkOldClient.QueryUserDetails(ctx, creatorId)
if err != nil {
log.Warnf("query user dingtalk details failed: %v", err)
return
}
if userDetails == nil {
log.Warnf("user details not found: %s", creatorId)
return
}
unionId = userDetails.UnionID
return
}
func (r *Handle) handleApiTask(ctx context.Context, rec *entitys.Recognize, task *model.AiTask) (err error) {
var (
request l_request.Request
requestParam map[string]interface{}
)
ext, err := rec_extra.GetTaskRecExt(rec)
if err != nil {
return
}
err = json.Unmarshal([]byte(rec.Match.Parameters), &requestParam)
if err != nil {
return
}
// request.Url = strings.ReplaceAll(task.Config, "${authorization}", requireData.Auth)
task.Config = strings.ReplaceAll(task.Config, "${authorization}", ext.Auth)
for k, v := range requestParam {
if vStr, ok := v.(string); ok {
task.Config = strings.ReplaceAll(task.Config, "${"+k+"}", vStr)
} else {
var jsonStr []byte
jsonStr, err = json.Marshal(v)
if err != nil {
return errors.NewBusinessErr(422, "请求参数解析失败")
}
task.Config = strings.ReplaceAll(task.Config, "\"${"+k+"}\"", string(jsonStr))
}
}
var configData entitys.ConfigDataHttp
err = json.Unmarshal([]byte(task.Config), &configData)
if err != nil {
return
}
err = mapstructure.Decode(configData.Request, &request)
if err != nil {
return
}
if len(request.Url) == 0 {
err = errors.NewBusinessErr(422, "api地址获取失败")
return
}
entitys.ResLoading(rec.Ch, task.Index, "正在请求数据")
res, err := request.Send()
if err != nil {
return
}
entitys.ResJson(rec.Ch, task.Index, res.Text)
return
}
// eino 工作流
func (r *Handle) handleEinoWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiTask) (err error) {
// token 写入ctx
ext, err := rec_extra.GetTaskRecExt(rec)
if err != nil {
return
}
ctx = util.SetTokenToContext(ctx, ext.Auth)
entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流")
// 工作流内部输出
workflowId := task.Index
_, err = r.workflowManager.Invoke(ctx, workflowId, &runtime.WorkflowArgs{Recognize: rec})
if err != nil {
return err
}
return nil
}
func (r *Handle) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiTask) (err error) {
entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)")
customClient := &http.Client{
Timeout: time.Minute * 30,
}
authCli := coze.NewTokenAuth(r.conf.Coze.ApiSecret)
cozeCli := coze.NewCozeAPI(
authCli,
coze.WithBaseURL(r.conf.Coze.BaseURL),
coze.WithHttpClient(customClient),
)
// 从参数中获取workflowID
type requestParams struct {
Request l_request.Request `json:"request"`
}
var config requestParams
err = json.Unmarshal([]byte(task.Config), &config)
if err != nil {
return err
}
workflowId, ok := config.Request.Json["workflow_id"].(string)
if !ok {
return fmt.Errorf("workflow_id不能为空")
}
// 提取参数
var data map[string]interface{}
err = json.Unmarshal([]byte(rec.Match.Parameters), &data)
req := &coze.RunWorkflowsReq{
WorkflowID: workflowId,
Parameters: data,
// IsAsync: true,
}
stream := config.Request.Json["stream"].(bool)
entitys.ResLog(rec.Ch, task.Index, "工作流执行中...")
if stream {
streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req)
if err != nil {
return err
}
handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index)
} else {
resp, err := cozeCli.Workflows.Runs.Create(ctx, req)
if err != nil {
return err
}
entitys.ResJson(rec.Ch, task.Index, resp.Data)
}
return
}
// handleCozeWorkflowEvents 处理 coze 工作流事件
func handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) {
defer resp.Close()
for {
event, err := resp.Recv()
if errorsSpecial.Is(err, io.EOF) {
fmt.Println("Stream finished")
break
}
if err != nil {
fmt.Println("Error receiving event:", err)
break
}
switch event.Event {
case coze.WorkflowEventTypeMessage:
entitys.ResStream(ch, index, event.Message.Content)
case coze.WorkflowEventTypeError:
entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %s", event.Error))
case coze.WorkflowEventTypeDone:
entitys.ResEnd(ch, index, "工作流执行完成")
case coze.WorkflowEventTypeInterrupt:
resumeReq := &coze.ResumeRunWorkflowsReq{
WorkflowID: workflowID,
EventID: event.Interrupt.InterruptData.EventID,
ResumeData: "your data",
InterruptType: event.Interrupt.InterruptData.Type,
}
newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq)
if err != nil {
entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error()))
return
}
entitys.ResLog(ch, index, "工作流恢复执行中...")
handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index)
}
}
fmt.Printf("done, log:%s\n", resp.Response().LogID())
}
// 权限验证
func (r *Handle) PermissionAuth(client *gateway.Client, pointTask *model.AiTask) (err error) {
// 授权检查权限
if !utils.Contains(client.GetCodes(), pointTask.Index) {
return fmt.Errorf("用户权限不足: %s", pointTask.Name)
}
return nil
}