diff --git a/Dockerfile b/Dockerfile index 2bd6192..8b02af6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ ## 使用官方Go镜像作为构建环境 -FROM golang:1.24.1-alpine AS builder +FROM golang:1.24.7-alpine AS builder # 设置工作目录 WORKDIR /app diff --git a/config/config_env.yaml b/config/config_env.yaml index d2cd2a4..188e5f9 100644 --- a/config/config_env.yaml +++ b/config/config_env.yaml @@ -18,6 +18,10 @@ vllm: timeout: "120s" level: "info" +coze: + base_url: "https://api.coze.cn" + api_secret: "pat_guUSPk8KZFvIIbVReuaMlOBVAaIISSdkTEV8MaRgVPNv6UEYPHKTBUXznFcxl04H" + sys: session_len: 6 diff --git a/config/config_test.yaml b/config/config_test.yaml index cbeb64f..dc20fd7 100644 --- a/config/config_test.yaml +++ b/config/config_test.yaml @@ -5,7 +5,7 @@ server: ollama: - base_url: "http://127.0.0.1:11434" + base_url: "http://host.docker.internal:11434" model: "qwen3-coder:480b-cloud" generate_model: "qwen3-coder:480b-cloud" vl_model: "gemini-3-pro-preview" @@ -105,7 +105,7 @@ permissionConfig: llm: providers: ollama: - endpoint: http://127.0.0.1:11434 + endpoint: http://host.docker.internal:11434 timeout: 60s models: - id: qwen3-coder:480b-cloud diff --git a/internal/biz/do/handle.go b/internal/biz/do/handle.go index a09eb48..3a4a12b 100644 --- a/internal/biz/do/handle.go +++ b/internal/biz/do/handle.go @@ -17,12 +17,17 @@ import ( "ai_scheduler/internal/pkg/util" "ai_scheduler/internal/tools" "ai_scheduler/internal/tools/public" + errorsSpecial "errors" + "io" + "net/http" + "time" "context" "encoding/json" "fmt" "strings" + "github.com/coze-dev/coze-go" "gorm.io/gorm/utils" ) @@ -118,9 +123,10 @@ func (r *Handle) HandleMatch(ctx context.Context, client *gateway.Client, rec *e return r.handleTask(ctx, rec, pointTask) case constants.TaskTypeKnowle: return r.handleKnowle(ctx, rec, pointTask) - case constants.TaskTypeEinoWorkflow: return r.handleEinoWorkflow(ctx, rec, pointTask) + case constants.TaskTypeCozeWorkflow: + return r.handleCozeWorkflow(ctx, rec, pointTask) default: return r.handleOtherTask(ctx, requireData) } @@ -302,6 +308,106 @@ func (r *Handle) handleEinoWorkflow(ctx context.Context, rec *entitys.Recognize, return nil } +func (r *Handle) handleCozeWorkflow(ctx context.Context, rec *entitys.Recognize, task *model.AiTask) (err error) { + entitys.ResLoading(rec.Ch, task.Index, "正在执行工作流(coze)") + + customClient := &http.Client{ + Timeout: time.Minute * 30, + } + + authCli := coze.NewTokenAuth(r.conf.Coze.ApiSecret) + cozeCli := coze.NewCozeAPI( + authCli, + coze.WithBaseURL(r.conf.Coze.BaseURL), + coze.WithHttpClient(customClient), + ) + + // 从参数中获取workflowID + type requestParams struct { + Request l_request.Request `json:"request"` + } + var config requestParams + err = json.Unmarshal([]byte(task.Config), &config) + if err != nil { + return err + } + workflowId, ok := config.Request.Json["workflow_id"].(string) + if !ok { + return fmt.Errorf("workflow_id不能为空") + } + // 提取参数 + var data map[string]interface{} + err = json.Unmarshal([]byte(rec.Match.Parameters), &data) + + req := &coze.RunWorkflowsReq{ + WorkflowID: workflowId, + Parameters: data, + // IsAsync: true, + } + + stream := config.Request.Json["stream"].(bool) + + entitys.ResLog(rec.Ch, task.Index, "工作流执行中...") + + if stream { + streamResp, err := cozeCli.Workflows.Runs.Stream(ctx, req) + if err != nil { + return err + } + + handleCozeWorkflowEvents(ctx, streamResp, cozeCli, workflowId, rec.Ch, task.Index) + } else { + resp, err := cozeCli.Workflows.Runs.Create(ctx, req) + if err != nil { + return err + } + + entitys.ResJson(rec.Ch, task.Index, resp.Data) + } + + return +} + +// handleCozeWorkflowEvents 处理 coze 工作流事件 +func handleCozeWorkflowEvents(ctx context.Context, resp coze.Stream[coze.WorkflowEvent], cozeCli coze.CozeAPI, workflowID string, ch chan entitys.Response, index string) { + defer resp.Close() + for { + event, err := resp.Recv() + if errorsSpecial.Is(err, io.EOF) { + fmt.Println("Stream finished") + break + } + if err != nil { + fmt.Println("Error receiving event:", err) + break + } + + switch event.Event { + case coze.WorkflowEventTypeMessage: + entitys.ResStream(ch, index, event.Message.Content) + case coze.WorkflowEventTypeError: + entitys.ResError(ch, index, fmt.Sprintf("工作流执行错误: %s", event.Error)) + case coze.WorkflowEventTypeDone: + entitys.ResEnd(ch, index, "工作流执行完成") + case coze.WorkflowEventTypeInterrupt: + resumeReq := &coze.ResumeRunWorkflowsReq{ + WorkflowID: workflowID, + EventID: event.Interrupt.InterruptData.EventID, + ResumeData: "your data", + InterruptType: event.Interrupt.InterruptData.Type, + } + newResp, err := cozeCli.Workflows.Runs.Resume(ctx, resumeReq) + if err != nil { + entitys.ResError(ch, index, fmt.Sprintf("工作流恢复执行错误: %s", err.Error())) + return + } + entitys.ResLog(ch, index, "工作流恢复执行中...") + handleCozeWorkflowEvents(ctx, newResp, cozeCli, workflowID, ch, index) + } + } + fmt.Printf("done, log:%s\n", resp.Response().LogID()) +} + // 权限验证 func (r *Handle) PermissionAuth(client *gateway.Client, pointTask *model.AiTask) (err error) { // 授权检查权限 diff --git a/internal/config/config.go b/internal/config/config.go index 91115cb..ee0c1a5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -12,6 +12,7 @@ type Config struct { Server ServerConfig `mapstructure:"server"` Ollama OllamaConfig `mapstructure:"ollama"` Vllm VllmConfig `mapstructure:"vllm"` + Coze CozeConfig `mapstructure:"coze"` Sys SysConfig `mapstructure:"sys"` Tools ToolsConfig `mapstructure:"tools"` Logging LoggingConfig `mapstructure:"logging"` @@ -90,6 +91,13 @@ type VllmConfig struct { Level string `mapstructure:"level"` } +// CozeConfig Coze配置 +type CozeConfig struct { + BaseURL string `mapstructure:"base_url"` + ApiKey string `mapstructure:"api_key"` + ApiSecret string `mapstructure:"api_secret"` +} + type Redis struct { Host string `mapstructure:"host"` Type string `mapstructure:"type"` diff --git a/internal/data/constants/const.go b/internal/data/constants/const.go index 604848e..b3c6ef0 100644 --- a/internal/data/constants/const.go +++ b/internal/data/constants/const.go @@ -16,6 +16,7 @@ const ( TaskTypeFunc TaskType = 3 TaskTypeBot TaskType = 4 TaskTypeEinoWorkflow TaskType = 5 // eino 工作流 + TaskTypeCozeWorkflow TaskType = 6 // coze 工作流 ) type UseFul int32