package service import ( "context" "encoding/json" "net/http" "eino-project/internal/domain/llm" "eino-project/internal/domain/workflow" "eino-project/internal/pkg/sseutil" "github.com/go-kratos/kratos/v2/log" kratoshttp "github.com/go-kratos/kratos/v2/transport/http" ) type WorkflowService struct { models llm.LLM log *log.Helper } func NewWorkflowService(logger log.Logger, models llm.LLM) *WorkflowService { return &WorkflowService{ models: models, log: log.NewHelper(logger), } } func (s *WorkflowService) HandleProductWorkflow(ctx kratoshttp.Context) error { type reqType struct { Message string `json:"message"` SessionID string `json:"session_id"` } var req reqType if err := json.NewDecoder(ctx.Request().Body).Decode(&req); err != nil { return ctx.JSON(http.StatusBadRequest, &CommonResp{Code: http.StatusBadRequest, Msg: "Invalid request body"}) } w := ctx.Response() w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") fl, ok := w.(http.Flusher) if !ok { return ctx.JSON(http.StatusInternalServerError, &CommonResp{Code: http.StatusInternalServerError, Msg: "SSE not supported"}) } sseutil.WriteLoading(w, fl, req.SessionID, "开始执行工作流") sseutil.WriteProcess(w, fl, req.SessionID, "预处理输入", false) wf := workflow.NewZltxProductWorkflow(s.models) final, err := wf.Run(context.Background(), req.Message) if err != nil { sseutil.WriteError(w, fl, req.SessionID, err.Error()) sseutil.WriteDone(w, fl, req.SessionID) return nil } var comp map[string]interface{} if json.Unmarshal([]byte(final), &comp) != nil { comp = map[string]interface{}{"result": final} } sseutil.WriteJson(w, fl, req.SessionID, comp, true) sseutil.WriteDone(w, fl, req.SessionID) return nil }