feat: 1. 新增“coze工作流”任务类型及相关配置 2. 增加coze工作流统一流式&非流式任务处理方法

This commit is contained in:
fuzhongyun 2025-12-16 18:25:13 +08:00
parent 3a0ba6eaab
commit 3dccccda9e
4 changed files with 120 additions and 1 deletions

View File

@ -18,6 +18,10 @@ vllm:
timeout: "120s" timeout: "120s"
level: "info" level: "info"
coze:
base_url: "https://api.coze.cn"
api_secret: "pat_guUSPk8KZFvIIbVReuaMlOBVAaIISSdkTEV8MaRgVPNv6UEYPHKTBUXznFcxl04H"
sys: sys:
session_len: 6 session_len: 6

View File

@ -17,12 +17,17 @@ import (
"ai_scheduler/internal/pkg/util" "ai_scheduler/internal/pkg/util"
"ai_scheduler/internal/tools" "ai_scheduler/internal/tools"
"ai_scheduler/internal/tools/public" "ai_scheduler/internal/tools/public"
errorsSpecial "errors"
"io"
"net/http"
"time"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
"github.com/coze-dev/coze-go"
"gorm.io/gorm/utils" "gorm.io/gorm/utils"
) )
@ -118,9 +123,10 @@ func (r *Handle) HandleMatch(ctx context.Context, client *gateway.Client, rec *e
return r.handleTask(ctx, rec, pointTask) return r.handleTask(ctx, rec, pointTask)
case constants.TaskTypeKnowle: case constants.TaskTypeKnowle:
return r.handleKnowle(ctx, rec, pointTask) return r.handleKnowle(ctx, rec, pointTask)
case constants.TaskTypeEinoWorkflow: case constants.TaskTypeEinoWorkflow:
return r.handleEinoWorkflow(ctx, rec, pointTask) return r.handleEinoWorkflow(ctx, rec, pointTask)
case constants.TaskTypeCozeWorkflow:
return r.handleCozeWorkflow(ctx, rec, pointTask)
default: default:
return r.handleOtherTask(ctx, requireData) return r.handleOtherTask(ctx, requireData)
} }
@ -302,6 +308,106 @@ func (r *Handle) handleEinoWorkflow(ctx context.Context, rec *entitys.Recognize,
return nil return nil
} }
func (r *Handle) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiTask) (err error) {
entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)")
customClient := &http.Client{
Timeout: time.Minute * 30,
}
authCli := coze.NewTokenAuth(r.conf.Coze.ApiSecret)
cozeCli := coze.NewCozeAPI(
authCli,
coze.WithBaseURL(r.conf.Coze.BaseURL),
coze.WithHttpClient(customClient),
)
// 从参数中获取workflowID
type requestParams struct {
Request l_request.Request `json:"request"`
}
var config requestParams
err = json.Unmarshal([]byte(task.Config), &config)
if err != nil {
return err
}
workflowId, ok := config.Request.Json["workflow_id"].(string)
if !ok {
return fmt.Errorf("workflow_id不能为空")
}
// 提取参数
var data map[string]interface{}
err = json.Unmarshal([]byte(rec.Match.Parameters), &data)
req := &coze.RunWorkflowsReq{
WorkflowID: workflowId,
Parameters: data,
// IsAsync: true,
}
stream := config.Request.Json["stream"].(bool)
entitys.ResLog(rec.Ch, task.Index, "工作流执行中...")
if stream {
streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req)
if err != nil {
return err
}
handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index)
} else {
resp, err := cozeCli.Workflows.Runs.Create(ctx, req)
if err != nil {
return err
}
entitys.ResJson(rec.Ch, task.Index, resp.Data)
}
return
}
// handleCozeWorkflowEvents 处理 coze 工作流事件
func handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) {
defer resp.Close()
for {
event, err := resp.Recv()
if errorsSpecial.Is(err, io.EOF) {
fmt.Println("Stream finished")
break
}
if err != nil {
fmt.Println("Error receiving event:", err)
break
}
switch event.Event {
case coze.WorkflowEventTypeMessage:
entitys.ResStream(ch, index, event.Message.Content)
case coze.WorkflowEventTypeError:
entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %s", event.Error))
case coze.WorkflowEventTypeDone:
entitys.ResEnd(ch, index, "工作流执行完成")
case coze.WorkflowEventTypeInterrupt:
resumeReq := &coze.ResumeRunWorkflowsReq{
WorkflowID: workflowID,
EventID: event.Interrupt.InterruptData.EventID,
ResumeData: "your data",
InterruptType: event.Interrupt.InterruptData.Type,
}
newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq)
if err != nil {
entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error()))
return
}
entitys.ResLog(ch, index, "工作流恢复执行中...")
handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index)
}
}
fmt.Printf("done, log:%s\n", resp.Response().LogID())
}
// 权限验证 // 权限验证
func (r *Handle) PermissionAuth(client *gateway.Client, pointTask *model.AiTask) (err error) { func (r *Handle) PermissionAuth(client *gateway.Client, pointTask *model.AiTask) (err error) {
// 授权检查权限 // 授权检查权限

View File

@ -12,6 +12,7 @@ type Config struct {
Server ServerConfig `mapstructure:"server"` Server ServerConfig `mapstructure:"server"`
Ollama OllamaConfig `mapstructure:"ollama"` Ollama OllamaConfig `mapstructure:"ollama"`
Vllm VllmConfig `mapstructure:"vllm"` Vllm VllmConfig `mapstructure:"vllm"`
Coze CozeConfig `mapstructure:"coze"`
Sys SysConfig `mapstructure:"sys"` Sys SysConfig `mapstructure:"sys"`
Tools ToolsConfig `mapstructure:"tools"` Tools ToolsConfig `mapstructure:"tools"`
Logging LoggingConfig `mapstructure:"logging"` Logging LoggingConfig `mapstructure:"logging"`
@ -90,6 +91,13 @@ type VllmConfig struct {
Level string `mapstructure:"level"` Level string `mapstructure:"level"`
} }
// CozeConfig Coze配置
type CozeConfig struct {
BaseURL string `mapstructure:"base_url"`
ApiKey string `mapstructure:"api_key"`
ApiSecret string `mapstructure:"api_secret"`
}
type Redis struct { type Redis struct {
Host string `mapstructure:"host"` Host string `mapstructure:"host"`
Type string `mapstructure:"type"` Type string `mapstructure:"type"`

View File

@ -16,6 +16,7 @@ const (
TaskTypeFunc TaskType = 3 TaskTypeFunc TaskType = 3
TaskTypeBot TaskType = 4 TaskTypeBot TaskType = 4
TaskTypeEinoWorkflow TaskType = 5 // eino 工作流 TaskTypeEinoWorkflow TaskType = 5 // eino 工作流
TaskTypeCozeWorkflow TaskType = 6 // coze 工作流
) )
type UseFul int32 type UseFul int32