diff --git a/eino-project/cmd/server/__debug_bin4041733900 b/eino-project/cmd/server/__debug_bin2356973511 similarity index 83% rename from eino-project/cmd/server/__debug_bin4041733900 rename to eino-project/cmd/server/__debug_bin2356973511 index 1970e67..354eb6b 100644 Binary files a/eino-project/cmd/server/__debug_bin4041733900 and b/eino-project/cmd/server/__debug_bin2356973511 differ diff --git a/eino-project/cmd/server/__debug_bin2704070066 b/eino-project/cmd/server/__debug_bin352958662 similarity index 81% rename from eino-project/cmd/server/__debug_bin2704070066 rename to eino-project/cmd/server/__debug_bin352958662 index fdcc57f..25b8ed0 100644 Binary files a/eino-project/cmd/server/__debug_bin2704070066 and b/eino-project/cmd/server/__debug_bin352958662 differ diff --git a/eino-project/cmd/server/wire_gen.go b/eino-project/cmd/server/wire_gen.go index f5d39e9..24f951f 100644 --- a/eino-project/cmd/server/wire_gen.go +++ b/eino-project/cmd/server/wire_gen.go @@ -18,11 +18,10 @@ import ( "eino-project/internal/domain/vector" "eino-project/internal/server" "eino-project/internal/service" + "github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2/log" -) -import ( _ "go.uber.org/automaxprocs" ) @@ -45,10 +44,12 @@ func wireApp(confServer *conf.Server, confData *conf.Data, bootstrap *conf.Boots customerRepo := repoimpl.NewCustomerRepo(dataData, logger) customerUseCase := biz.NewCustomerUseCase(customerRepo, logger, knowledgeSearcher, documentProcessor, contextManager, monitorMonitor) sessionManager := session.NewMemorySessionManager(logger) - llmLLM := llm.NewLLM(bootstrap) - customerService := service.NewCustomerService(customerUseCase, sessionManager, monitorMonitor, logger, llmLLM) - agentService := service.NewAgentService(logger, llmLLM) - httpServer := server.NewHTTPServer(confServer, customerService, agentService, logger) + llmLLM := llm.NewLLM(bootstrap) + customerService := service.NewCustomerService(customerUseCase, sessionManager, monitorMonitor, logger, llmLLM) + agentService := service.NewAgentService(logger, llmLLM) + workflowService := service.NewWorkflowService(logger, llmLLM) + chatService := service.NewChatService(logger, llmLLM, monitorMonitor) + httpServer := server.NewHTTPServer(confServer, customerService, agentService, workflowService, logger, chatService) app := newApp(logger, httpServer) return app, func() { cleanup() diff --git a/eino-project/internal/domain/agent/intent.go b/eino-project/internal/domain/agent/intent.go index 5d1be90..f34e3e0 100644 --- a/eino-project/internal/domain/agent/intent.go +++ b/eino-project/internal/domain/agent/intent.go @@ -21,7 +21,7 @@ func NewIntentAgent(ctx context.Context, models llm.LLM) adk.Agent { # 你是一个意图识别智能体,根据用户输入识别用户的意图。 - 当用户输入商品相关时,意图为"商品查询"(product) - 当用户输入订单相关时,意图为"订单诊断"(order) - - 当用户输入其他问题时,意图为"其他"(other) + - 当用户输入除商品相关和订单相关的其他问题时,意图为"其他"(other) - 输出结构为: {"intent": "product|order|other"} `, Model: intentModel, diff --git a/eino-project/internal/domain/agent/order.go b/eino-project/internal/domain/agent/order.go new file mode 100644 index 0000000..98e01c0 --- /dev/null +++ b/eino-project/internal/domain/agent/order.go @@ -0,0 +1,43 @@ +package agent + +import ( + "context" + + "eino-project/internal/domain/llm" + "eino-project/internal/domain/tools" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/components/tool" + "github.com/cloudwego/eino/compose" +) + +// NewOrderChatAgent 订单查询 Agent,具备订单查询与日志查询工具 +func NewOrderChatAgent(ctx context.Context, models llm.LLM) adk.Agent { + chatModel, err := models.Intent() + if err != nil { + return nil + } + toolsCfg := adk.ToolsConfig{ + ToolsNodeConfig: compose.ToolsNodeConfig{ + Tools: []tool.BaseTool{ + tools.NewOrderByIDTool(), + }, + ExecuteSequentially: false, + }, + ReturnDirectly: map[string]bool{ + "get_order_by_id": true, + }, + } + agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{ + Name: "订单查询智能体", + Description: "支持按订单ID查询订单详情", + Model: chatModel, + ToolsConfig: toolsCfg, + Instruction: ` + # 你是订单查询智能体 + - 当用户给出订单ID时,优先调用 "get_order_by_id" 返回订单详情 + - 仅返回订单详情json即可,不做多余解释 + `, + }) + return agent +} diff --git a/eino-project/internal/domain/agent/order_log.go b/eino-project/internal/domain/agent/order_log.go new file mode 100644 index 0000000..c1ccdf6 --- /dev/null +++ b/eino-project/internal/domain/agent/order_log.go @@ -0,0 +1,43 @@ +package agent + +import ( + "context" + + "eino-project/internal/domain/llm" + "eino-project/internal/domain/tools" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/components/tool" + "github.com/cloudwego/eino/compose" +) + +// NewOrderLogAgent 订单日志 Agent,仅绑定日志查询工具 +func NewOrderLogAgent(ctx context.Context, models llm.LLM) adk.Agent { + chatModel, err := models.Intent() + if err != nil { + return nil + } + toolsCfg := adk.ToolsConfig{ + ToolsNodeConfig: compose.ToolsNodeConfig{ + Tools: []tool.BaseTool{ + tools.NewOrderLogQueryTool(), + }, + ExecuteSequentially: false, + }, + ReturnDirectly: map[string]bool{ + "query_order_logs": true, + }, + } + agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{ + Name: "订单日志智能体", + Description: "查询并返回订单日志", + Model: chatModel, + ToolsConfig: toolsCfg, + Instruction: ` + # 你是订单日志查询智能体 + - 接收订单ID,调用 "query_order_logs" 按时间倒序获取所有日志列表 + - 输出日志列表 + `, + }) + return agent +} diff --git a/eino-project/internal/domain/monitor/coze_loop.go b/eino-project/internal/domain/monitor/coze_loop.go index b97ac7b..8eb0de4 100644 --- a/eino-project/internal/domain/monitor/coze_loop.go +++ b/eino-project/internal/domain/monitor/coze_loop.go @@ -41,9 +41,14 @@ func NewCozeLoopMonitor(base Monitor, cfg *conf.Monitoring_CozeLoop, logger log. // RecordRequest 记录请求并上报简化 Trace func (m *cozeLoopMonitor) RecordRequest(ctx context.Context, requestType string, duration time.Duration, success bool) error { - if err := m.base.RecordRequest(ctx, requestType, duration, success); err != nil { - return err - } + ctx, span := m.client.StartSpan(ctx, "RecordRequest", requestType) + + span.Finish(ctx) + m.client.Close(ctx) + + // if err := m.base.RecordRequest(ctx, requestType, duration, success); err != nil { + // return err + // } // 简化:不生成trace return nil } diff --git a/eino-project/internal/domain/tools/order.go b/eino-project/internal/domain/tools/order.go new file mode 100644 index 0000000..add7a8a --- /dev/null +++ b/eino-project/internal/domain/tools/order.go @@ -0,0 +1,45 @@ +package tools + +import ( + "context" + + "github.com/cloudwego/eino/components/tool" + "github.com/cloudwego/eino/components/tool/utils" +) + +var ordersMock = []*Order{ + {ID: "O001", Status: "failed", Product: "商品 272", Amount: 1, CreateTime: "2025-11-11 14:45:00"}, + {ID: "O004", Status: "created", Product: "手机 A1", Amount: 1, CreateTime: "2025-11-01 10:00:00"}, + {ID: "O002", Status: "paid", Product: "手机 Pro X", Amount: 2, CreateTime: "2025-11-02 11:30:00"}, + {ID: "O003", Status: "shipped", Product: "笔记本 M3", Amount: 1, CreateTime: "2025-11-05 09:15:00"}, + {ID: "O271", Status: "created", Product: "商品 271", Amount: 3, CreateTime: "2025-11-10 12:00:00"}, +} + +type Order struct { + ID string `json:"id"` + Status string `json:"status"` + Product string `json:"product"` + Amount int `json:"amount"` + CreateTime string `json:"create_time"` +} + +type OrderByIDInput struct { + OrderID string `json:"order_id" jsonschema:"description=订单ID"` +} + +func NewOrderByIDTool() tool.InvokableTool { + t, err := utils.InferTool("get_order_by_id", "根据订单ID查询订单详情,返回订单对象。", orderByID) + if err != nil { + panic(err) + } + return t +} + +func orderByID(ctx context.Context, in *OrderByIDInput) (*Order, error) { + for _, it := range ordersMock { + if it.ID == in.OrderID { + return it, nil + } + } + return &Order{}, nil +} diff --git a/eino-project/internal/domain/tools/order_log.go b/eino-project/internal/domain/tools/order_log.go new file mode 100644 index 0000000..2a17bde --- /dev/null +++ b/eino-project/internal/domain/tools/order_log.go @@ -0,0 +1,39 @@ +package tools + +import ( + "context" + "time" + + "github.com/cloudwego/eino/components/tool" + "github.com/cloudwego/eino/components/tool/utils" +) + +type OrderLog struct { + OrderID string `json:"order_id"` + Timestamp time.Time `json:"timestamp"` + Level string `json:"level"` + Message string `json:"message"` +} + +type OrderLogQueryInput struct { + OrderID string `json:"order_id" jsonschema:"description=订单ID"` +} + +func NewOrderLogQueryTool() tool.InvokableTool { + t, err := utils.InferTool("query_order_logs", "查询订单处理日志,返回日志数组。", orderLogQuery) + if err != nil { + panic(err) + } + return t +} + +func orderLogQuery(ctx context.Context, in *OrderLogQueryInput) ([]*OrderLog, error) { + logs := []*OrderLog{ + {OrderID: in.OrderID, Timestamp: time.Now().Add(-5 * time.Minute), Level: "INFO", Message: "订单已创建"}, + {OrderID: in.OrderID, Timestamp: time.Now().Add(-4 * time.Minute), Level: "INFO", Message: "支付完成"}, + {OrderID: in.OrderID, Timestamp: time.Now().Add(-3 * time.Minute), Level: "INFO", Message: "仓库打包"}, + {OrderID: in.OrderID, Timestamp: time.Now().Add(-2 * time.Minute), Level: "WARN", Message: "快递延迟,重新调度"}, + {OrderID: in.OrderID, Timestamp: time.Now().Add(-1 * time.Minute), Level: "INFO", Message: "发货完成"}, + } + return logs, nil +} diff --git a/eino-project/internal/domain/workflow/chat_workflow.go b/eino-project/internal/domain/workflow/chat_workflow.go index e9fc776..42b74e9 100644 --- a/eino-project/internal/domain/workflow/chat_workflow.go +++ b/eino-project/internal/domain/workflow/chat_workflow.go @@ -25,7 +25,11 @@ type chatWorkflow struct { } func NewChatWorkflow(models llm.LLM, searcher vector.KnowledgeSearcher, ctxMgr contextpkg.ContextManager) ChatWorkflow { - return &chatWorkflow{models: models, searcher: searcher, ctxMgr: ctxMgr} + return &chatWorkflow{ + models: models, + searcher: searcher, + ctxMgr: ctxMgr, + } } func (w *chatWorkflow) Chat(ctx context.Context, message string, sessionID string) (string, error) { diff --git a/eino-project/internal/domain/workflow/zltx_product.go b/eino-project/internal/domain/workflow/zltx_product.go new file mode 100644 index 0000000..785f568 --- /dev/null +++ b/eino-project/internal/domain/workflow/zltx_product.go @@ -0,0 +1,85 @@ +package workflow + +import ( + "context" + "fmt" + "strings" + + "eino-project/internal/domain/agent" + "eino-project/internal/domain/llm" + "eino-project/internal/pkg/adkutil" + + "github.com/cloudwego/eino/compose" +) + +type ZltxProductWorkflow struct { + models llm.LLM +} + +type productItem struct { + ID string `json:"id"` + Name string `json:"name"` + Price int `json:"price"` + Description string `json:"description"` +} + +type productQueryRes struct { + Items []*productItem `json:"items"` + Source string `json:"source"` + Count int `json:"count"` +} + +func NewZltxProductWorkflow(models llm.LLM) *ZltxProductWorkflow { + return &ZltxProductWorkflow{models: models} +} + +func (w *ZltxProductWorkflow) Run(ctx context.Context, message string) (string, error) { + g := compose.NewGraph[map[string]any, string]() + + _ = g.AddLambdaNode("preprocess", compose.InvokableLambda(func(ctx context.Context, in map[string]any) (string, error) { + raw, _ := in["message"].(string) + q := strings.TrimSpace(raw) + return q, nil + })) + + _ = g.AddLambdaNode("agent_call", compose.InvokableLambda(func(ctx context.Context, q string) (productQueryRes, error) { + ag := agent.NewProductChatAgent(ctx, w.models) + out, err := adkutil.QueryJSON[productQueryRes](ctx, ag, q) + if err != nil || out == nil { + return productQueryRes{}, err + } + return *out, nil + })) + + _ = g.AddLambdaNode("describe", compose.InvokableLambda(func(ctx context.Context, res productQueryRes) (string, error) { + if res.Count <= 0 || len(res.Items) == 0 { + return "未找到相关商品", nil + } + var b strings.Builder + fmt.Fprintf(&b, "共%d条,来源%s。", res.Count, res.Source) + for i, it := range res.Items { + if i == 0 { + fmt.Fprintf(&b, "首条:%s(编号%s),价格%d,%s。", it.Name, it.ID, it.Price, it.Description) + continue + } + fmt.Fprintf(&b, ";%s(编号%s),价格%d,%s", it.Name, it.ID, it.Price, it.Description) + } + return b.String(), nil + })) + + _ = g.AddEdge(compose.START, "preprocess") + _ = g.AddEdge("preprocess", "agent_call") + _ = g.AddEdge("agent_call", "describe") + _ = g.AddEdge("describe", compose.END) + + r, err := g.Compile(ctx) + if err != nil { + return "", err + } + in := map[string]any{"message": message} + ret, err := r.Invoke(ctx, in) + if err != nil { + return "", err + } + return ret, nil +} diff --git a/eino-project/internal/pkg/adkutil/adkutil.go b/eino-project/internal/pkg/adkutil/adkutil.go index 0a6f89c..b945400 100644 --- a/eino-project/internal/pkg/adkutil/adkutil.go +++ b/eino-project/internal/pkg/adkutil/adkutil.go @@ -1,120 +1,120 @@ package adkutil import ( - "context" - "encoding/json" - "errors" + "context" + "encoding/json" + "errors" - "github.com/cloudwego/eino/adk" - "github.com/cloudwego/eino/schema" - krlog "github.com/go-kratos/kratos/v2/log" + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/schema" + krlog "github.com/go-kratos/kratos/v2/log" ) type Result struct { - Customized any - Message *schema.Message + Customized any + Message *schema.Message } func QueryJSON[T any](ctx context.Context, agent adk.Agent, query string) (*T, error) { - r, err := Query(ctx, agent, query) - if err != nil { - return nil, err - } - if r.Customized != nil { - if v, ok := r.Customized.(*T); ok { - return v, nil - } - b, _ := json.Marshal(r.Customized) - var out T - if json.Unmarshal(b, &out) == nil { - return &out, nil - } - } - if r.Message != nil && r.Message.Content != "" { - var out T - if json.Unmarshal([]byte(r.Message.Content), &out) == nil { - return &out, nil - } - } - return nil, errors.New("agent output not match target type") + r, err := Query(ctx, agent, query) + if err != nil { + return nil, err + } + if r.Customized != nil { + if v, ok := r.Customized.(*T); ok { + return v, nil + } + b, _ := json.Marshal(r.Customized) + var out T + if json.Unmarshal(b, &out) == nil { + return &out, nil + } + } + if r.Message != nil && r.Message.Content != "" { + var out T + if json.Unmarshal([]byte(r.Message.Content), &out) == nil { + return &out, nil + } + } + return nil, errors.New("agent output not match target type") } func QueryWithLogger(ctx context.Context, agent adk.Agent, query string, logger *krlog.Helper) (Result, error) { - runner := adk.NewRunner(ctx, adk.RunnerConfig{Agent: agent}) - it := runner.Query(ctx, query) - var out Result - var lastErr error - if logger != nil { - logger.Infof("agent query start: %s", query) - } - for { - ev, ok := it.Next() - if !ok || ev == nil { - break - } - if logger != nil { - logger.Infof("agent event received: err=%v", ev.Err) - } - if ev.Err != nil { - lastErr = ev.Err - } - if ev.Output != nil { - if ev.Output.CustomizedOutput != nil { - out.Customized = ev.Output.CustomizedOutput - if logger != nil { - b, _ := json.Marshal(ev.Output.CustomizedOutput) - logger.Infof("agent customized output=%s", string(b)) - } - } - if ev.Output.MessageOutput != nil { - msg, _ := ev.Output.MessageOutput.GetMessage() - if msg != nil { - out.Message = msg - if logger != nil { - logger.Infof("agent message role=%s content=%s", msg.Role, msg.Content) - if len(msg.ToolCalls) > 0 { - for _, tc := range msg.ToolCalls { - if tc.Function.Name != "" { - logger.Infof("agent tool call name=%s args=%s", tc.Function.Name, tc.Function.Arguments) - } - } - } - } - } - } - } - } - if out.Customized != nil || out.Message != nil { - return out, nil - } - if lastErr != nil { - return out, lastErr - } - return out, errors.New("agent no output") + runner := adk.NewRunner(ctx, adk.RunnerConfig{Agent: agent}) + it := runner.Query(ctx, query) + var out Result + var lastErr error + if logger != nil { + logger.Infof("agent query start: %s", query) + } + for { + ev, ok := it.Next() + if !ok || ev == nil { + break + } + if logger != nil { + logger.Infof("agent event received: err=%v", ev.Err) + } + if ev.Err != nil { + lastErr = ev.Err + } + if ev.Output != nil { + if ev.Output.CustomizedOutput != nil { + out.Customized = ev.Output.CustomizedOutput + if logger != nil { + b, _ := json.Marshal(ev.Output.CustomizedOutput) + logger.Infof("agent customized output=%s", string(b)) + } + } + if ev.Output.MessageOutput != nil { + msg, _ := ev.Output.MessageOutput.GetMessage() + if msg != nil { + out.Message = msg + if logger != nil { + logger.Infof("agent message role=%s content=%s", msg.Role, msg.Content) + if len(msg.ToolCalls) > 0 { + for _, tc := range msg.ToolCalls { + if tc.Function.Name != "" { + logger.Infof("agent tool call name=%s args=%s", tc.Function.Name, tc.Function.Arguments) + } + } + } + } + } + } + } + } + if out.Customized != nil || out.Message != nil { + return out, nil + } + if lastErr != nil { + return out, lastErr + } + return out, errors.New("agent no output") } func QueryJSONWithLogger[T any](ctx context.Context, agent adk.Agent, query string, logger *krlog.Helper) (*T, error) { - r, err := QueryWithLogger(ctx, agent, query, logger) - if err != nil { - return nil, err - } - if r.Customized != nil { - if v, ok := r.Customized.(*T); ok { - return v, nil - } - b, _ := json.Marshal(r.Customized) - var out T - if json.Unmarshal(b, &out) == nil { - return &out, nil - } - } - if r.Message != nil && r.Message.Content != "" { - var out T - if json.Unmarshal([]byte(r.Message.Content), &out) == nil { - return &out, nil - } - } - return nil, errors.New("agent output not match target type") + r, err := QueryWithLogger(ctx, agent, query, logger) + if err != nil { + return nil, err + } + if r.Customized != nil { + if v, ok := r.Customized.(*T); ok { + return v, nil + } + b, _ := json.Marshal(r.Customized) + var out T + if json.Unmarshal(b, &out) == nil { + return &out, nil + } + } + if r.Message != nil && r.Message.Content != "" { + var out T + if json.Unmarshal([]byte(r.Message.Content), &out) == nil { + return &out, nil + } + } + return nil, errors.New("agent output not match target type") } // Query 对 Agent 发起一次非流式请求,并提取统一结果 @@ -196,3 +196,44 @@ func Stream(ctx context.Context, agent adk.Agent, query string) (<-chan string, }() return ch, nil } + +// StreamWithLogger:记录事件以判断是否触发模型流式(MessageOutput多次)或仅工具输出(CustomizedOutput) +func StreamWithLogger(ctx context.Context, agent adk.Agent, query string, logger *krlog.Helper) (<-chan string, error) { + runner := adk.NewRunner(ctx, adk.RunnerConfig{Agent: agent, EnableStreaming: true}) + it := runner.Query(ctx, query) + ch := make(chan string, 8) + go func() { + defer close(ch) + var msgCount int + for { + ev, ok := it.Next() + if !ok || ev == nil { + if logger != nil { + logger.Infof("stream finished, message_count=%d", msgCount) + } + break + } + + logger.Infof("stream event: %v", ev) + + if ev.Output != nil { + if ev.Output.CustomizedOutput != nil && logger != nil { + logger.Infof("customized output received") + } + if ev.Output.MessageOutput != nil { + msg, _ := ev.Output.MessageOutput.GetMessage() + if msg != nil { + msgCount++ + if logger != nil { + logger.Infof("message event #%d role=%s len=%d", msgCount, msg.Role, len(msg.Content)) + } + if msg.Content != "" { + ch <- msg.Content + } + } + } + } + } + }() + return ch, nil +} diff --git a/eino-project/internal/pkg/sseutil/sseutil.go b/eino-project/internal/pkg/sseutil/sseutil.go new file mode 100644 index 0000000..1666721 --- /dev/null +++ b/eino-project/internal/pkg/sseutil/sseutil.go @@ -0,0 +1,103 @@ +package sseutil + +import ( + "encoding/json" + "fmt" + "net/http" + "time" + "github.com/gorilla/websocket" +) + +type Content struct { + Chunk string `json:"chunk"` + FullMessage string `json:"full_message"` + IsFinal bool `json:"is_final"` +} + +type Payload struct { + DataType string `json:"data_type"` + Content Content `json:"content"` + Component map[string]interface{} `json:"component,omitempty"` +} + +type Response struct { + SessionID string `json:"session_id"` + Timestamp string `json:"timestamp"` + Type string `json:"type"` + Payload Payload `json:"payload"` +} + +func ts() string { return time.Now().Format("2006-01-02 15:04:05") } + +func write(w http.ResponseWriter, fl http.Flusher, r Response) { + b, _ := json.Marshal(r) + fmt.Fprintf(w, "data: %s\n\n", b) + fl.Flush() +} + +func WriteLog(w http.ResponseWriter, fl http.Flusher, sessionID string, msg string) { + write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: false}}}) +} + +func WriteLoading(w http.ResponseWriter, fl http.Flusher, sessionID string, msg string) { + write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: false}}}) +} + +func WriteProcess(w http.ResponseWriter, fl http.Flusher, sessionID string, msg string, final bool) { + write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: final}}}) +} + +func WriteStreamChunk(w http.ResponseWriter, fl http.Flusher, sessionID string, chunk string) { + write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: chunk, FullMessage: "", IsFinal: false}}}) +} + +func WriteStreamFinal(w http.ResponseWriter, fl http.Flusher, sessionID string, full string) { + write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: "", FullMessage: full, IsFinal: true}}}) +} + +func WriteJson(w http.ResponseWriter, fl http.Flusher, sessionID string, component map[string]interface{}, final bool) { + write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "component_data", Content: Content{Chunk: "", FullMessage: "", IsFinal: final}, Component: component}}) +} + +func WriteError(w http.ResponseWriter, fl http.Flusher, sessionID string, errMsg string) { + write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "error", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: errMsg, IsFinal: true}}}) +} + +func WriteDone(w http.ResponseWriter, fl http.Flusher, sessionID string) { + write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "done", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: "", IsFinal: true}}}) +} + +// 构建(不写出)——用于 WebSocket 统一协议 +func BuildLog(sessionID string, msg string) []byte { + b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: false}}}) + return b +} +func BuildProcess(sessionID string, msg string, final bool) []byte { + b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: final}}}) + return b +} +func BuildStreamChunk(sessionID string, chunk string) []byte { + b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: chunk, FullMessage: "", IsFinal: false}}}) + return b +} +func BuildStreamFinal(sessionID string, full string) []byte { + b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: "", FullMessage: full, IsFinal: true}}}) + return b +} +func BuildJson(sessionID string, component map[string]interface{}, final bool) []byte { + b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "component_data", Content: Content{Chunk: "", FullMessage: "", IsFinal: final}, Component: component}}) + return b +} +func BuildError(sessionID string, errMsg string) []byte { + b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "error", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: errMsg, IsFinal: true}}}) + return b +} +func BuildDone(sessionID string) []byte { + b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "done", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: "", IsFinal: true}}}) + return b +} + +// WebSocket 工具类 +func WSWriteJSON(conn *websocket.Conn, payload []byte) error { + return conn.WriteMessage(websocket.TextMessage, payload) +} diff --git a/eino-project/internal/server/http.go b/eino-project/internal/server/http.go index c1749ad..c10b23b 100644 --- a/eino-project/internal/server/http.go +++ b/eino-project/internal/server/http.go @@ -14,7 +14,14 @@ import ( ) // NewHTTPServer new an HTTP server. -func NewHTTPServer(c *conf.Server, customerService *service.CustomerService, agentService *service.AgentService, logger log.Logger) *http.Server { +func NewHTTPServer( + c *conf.Server, + customerService *service.CustomerService, + agentService *service.AgentService, + workflowService *service.WorkflowService, + logger log.Logger, + chatService *service.ChatService, +) *http.Server { var opts = []http.ServerOption{ http.Middleware( recovery.Recovery(), @@ -34,24 +41,21 @@ func NewHTTPServer(c *conf.Server, customerService *service.CustomerService, age // 注册HTTP路由 v1.RegisterCustomerServiceHTTPServer(srv, customerService) - // 添加SSE流式聊天的自定义路由 - srv.HandleFunc("/api/chat/stream", customerService.HandleStreamChat) - - // WebSocket 聊天路由(/api/chat/ws) - srv.HandleFunc("/api/chat/ws", func(w nethttp.ResponseWriter, r *nethttp.Request) { - upgrader := websocket.Upgrader{CheckOrigin: func(r *nethttp.Request) bool { return true }} - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - return - } - customerService.HandleWebSocketChat(conn) - }) - route := srv.Route("/api") // 商品查询 Agent 路由 route.POST("/agents/product/query", agentService.HandleProductQuery) - // 订单诊断工作流(直接调用流水线输出) - // route.POST("/api/workflow/order/diagnosis", customerService.HandleOrderDiagnosis) + // 产品工作流(Graph 编排) + route.POST("/workflow/product/query", workflowService.HandleProductWorkflow) + // WebSocket 聊天路由(/api/chat/ws) + route.GET("/chat/ws", func(ctx http.Context) error { + upgrader := websocket.Upgrader{CheckOrigin: func(r *nethttp.Request) bool { return true }} + conn, err := upgrader.Upgrade(ctx.Response(), ctx.Request(), nil) + if err != nil { + return err + } + chatService.HandleWebSocketChat(conn) + return nil + }) return srv } diff --git a/eino-project/internal/service/chat.go b/eino-project/internal/service/chat.go new file mode 100644 index 0000000..6b6e2a0 --- /dev/null +++ b/eino-project/internal/service/chat.go @@ -0,0 +1,215 @@ +package service + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "eino-project/internal/domain/agent" + "eino-project/internal/domain/llm" + "eino-project/internal/domain/monitor" + "eino-project/internal/domain/workflow" + "eino-project/internal/pkg/adkutil" + "eino-project/internal/pkg/sseutil" + + "github.com/cloudwego/eino/schema" + "github.com/go-kratos/kratos/v2/log" + "github.com/gorilla/websocket" +) + +type ChatService struct { + models llm.LLM + log *log.Helper + monitor monitor.Monitor +} + +func NewChatService(logger log.Logger, models llm.LLM, m monitor.Monitor) *ChatService { + return &ChatService{models: models, log: log.NewHelper(logger), monitor: m} +} + +func (s *ChatService) HandleWebSocketChat(conn *websocket.Conn) { + defer conn.Close() + ctx := context.Background() + // 初始化两轮上下文(不输出,仅用于后续对话) + var dialogCtx []schema.Message = []schema.Message{ + {Role: schema.User, Content: "你好"}, + {Role: schema.Assistant, Content: "您好,请问需要什么帮助?"}, + {Role: schema.User, Content: "查询订单 O271 的进度"}, + {Role: schema.Assistant, Content: "订单 O271 已创建,正在处理"}, + } + for { + mt, msg, err := conn.ReadMessage() + if err != nil { + return + } + if mt != websocket.TextMessage { + continue + } + var req struct { + Message string `json:"message"` + SessionID string `json:"session_id"` + } + if json.Unmarshal(msg, &req) != nil || req.Message == "" { + _ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, "invalid input")) + _ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID)) + continue + } + // 追加到上下文并限制近2轮(最多保留4条) + dialogCtx = append(dialogCtx, schema.Message{Role: schema.User, Content: req.Message}) + if len(dialogCtx) > 4 { + dialogCtx = dialogCtx[len(dialogCtx)-4:] + } + // 成本消耗记录(示意:写入DB) + Coze Loop 简化监控 + _ = func() error { + // TODO: 记录成本消耗(token、模型、耗时等) + s.log.Infof("cost-record session=%s ts=%s", req.SessionID, time.Now().Format("2006-01-02 15:04:05")) + if s.monitor != nil { + _ = s.monitor.RecordRequest(ctx, "ws_chat", 0, true) + } + return nil + }() + + _ = sseutil.WSWriteJSON(conn, sseutil.BuildLog(req.SessionID, "意图识别中")) + intentAgent := agent.NewIntentAgent(ctx, s.models) + var intent string + if intentAgent != nil { + r, err := adkutil.Query(ctx, intentAgent, req.Message) + if err == nil && r.Message != nil && r.Message.Content != "" { + var intentJson map[string]interface{} + if json.Unmarshal([]byte(r.Message.Content), &intentJson) == nil { + intent = intentJson["intent"].(string) + } + } + } + _ = sseutil.WSWriteJSON(conn, sseutil.BuildLog(req.SessionID, "意图识别结果:"+intent)) + + switch intent { + case "product": + _ = sseutil.WSWriteJSON(conn, sseutil.BuildProcess(req.SessionID, "进入产品工作流", false)) + wf := workflow.NewZltxProductWorkflow(s.models) + startWF := time.Now() + out, err := wf.Run(ctx, req.Message) + if err != nil { + _ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, err.Error())) + _ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID)) + continue + } + if s.monitor != nil { + _ = s.monitor.RecordAIRequest(ctx, time.Since(startWF), true) + _ = s.monitor.RecordLLMUsage(ctx, &monitor.LLMUsage{Model: "workflow:product", SessionID: req.SessionID, PromptPreview: req.Message, LatencyMS: time.Since(startWF).Milliseconds(), Timestamp: time.Now()}) + } + var comp map[string]interface{} + if json.Unmarshal([]byte(out), &comp) != nil { + comp = map[string]interface{}{"result": out} + } + _ = sseutil.WSWriteJSON(conn, sseutil.BuildJson(req.SessionID, comp, true)) + _ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID)) + case "order": + _ = sseutil.WSWriteJSON(conn, sseutil.BuildProcess(req.SessionID, "获取订单信息", false)) + ordAgent := agent.NewOrderChatAgent(ctx, s.models) + ordr, err := adkutil.QueryWithLogger(ctx, ordAgent, req.Message, s.log) + if err != nil { + _ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, err.Error())) + _ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID)) + continue + } + var orderInfo map[string]interface{} + if ordr.Customized != nil { + b, _ := json.Marshal(ordr.Customized) + _ = json.Unmarshal(b, &orderInfo) + } else if ordr.Message != nil { + _ = json.Unmarshal([]byte(ordr.Message.Content), &orderInfo) + } + if orderInfo["id"] == nil { + _ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, "订单ID不存在")) + _ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID)) + continue + } + _ = sseutil.WSWriteJSON(conn, sseutil.BuildJson(req.SessionID, map[string]interface{}{"order": orderInfo}, false)) + + status := fmt.Sprintf("%v", orderInfo["status"]) + if status != "" && status != "completed" && status != "delivered" { + _ = sseutil.WSWriteJSON(conn, sseutil.BuildProcess(req.SessionID, "订单未完成,生成诊断(Markdown流式)", false)) + logAgent := agent.NewOrderLogAgent(ctx, s.models) + id := fmt.Sprintf("%v", orderInfo["id"]) + res, err := adkutil.Query(ctx, logAgent, fmt.Sprintf("order_id=%s", id)) + if err != nil { + s.log.Infof("order log agent error: %v", err) + _ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, "日志查询失败")) + } else { + var logs []map[string]interface{} + if res.Customized != nil { + b, _ := json.Marshal(res.Customized) + _ = json.Unmarshal(b, &logs) + } else if res.Message != nil && res.Message.Content != "" { + _ = json.Unmarshal([]byte(res.Message.Content), &logs) + } + // 仅使用最近两轮上下文 + ctx2 := dialogCtx + if len(ctx2) > 4 { + ctx2 = ctx2[len(ctx2)-4:] + } + ctxTextDialog, _ := json.Marshal(ctx2) + ctxTextOrder, _ := json.Marshal(orderInfo) + ctxTextLogs, _ := json.Marshal(logs) + prompt := fmt.Sprintf("基于最近两轮对话上下文、订单信息与处理日志生成Markdown诊断。\n\n上下文:%s\n\n订单:%s\n\n日志:%s\n\n请给出状态判定、关键日志条目与建议。", string(ctxTextDialog), string(ctxTextOrder), string(ctxTextLogs)) + chatModel, err := s.models.Chat() + if err != nil { + _ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, err.Error())) + } else { + reader, err := chatModel.Stream(ctx, []*schema.Message{{Role: schema.User, Content: prompt}}) + if err != nil { + _ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, "诊断流式生成失败")) + } else { + start := time.Now() + var full string + for { + chunk, rerr := reader.Recv() + if rerr != nil { + break + } + if chunk != nil && chunk.Content != "" { + full += chunk.Content + _ = sseutil.WSWriteJSON(conn, sseutil.BuildStreamChunk(req.SessionID, chunk.Content)) + } + } + if s.monitor != nil { + _ = s.monitor.RecordAIRequest(ctx, time.Since(start), true) + _ = s.monitor.RecordLLMUsage(ctx, &monitor.LLMUsage{Model: "agent:order_diagnosis", SessionID: req.SessionID, PromptPreview: prompt, LatencyMS: time.Since(start).Milliseconds(), Timestamp: time.Now()}) + } + _ = sseutil.WSWriteJSON(conn, sseutil.BuildStreamFinal(req.SessionID, full)) + // 追加助手回复到上下文并限制两轮 + dialogCtx = append(dialogCtx, schema.Message{Role: schema.Assistant, Content: full}) + if len(dialogCtx) > 4 { + dialogCtx = dialogCtx[len(dialogCtx)-4:] + } + } + } + } + } + _ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID)) + default: + _ = sseutil.WSWriteJSON(conn, sseutil.BuildProcess(req.SessionID, "进入自然对话", false)) + chatModel, err := s.models.Chat() + if err != nil { + _ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, err.Error())) + _ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID)) + continue + } + start := time.Now() + resp, err := chatModel.Generate(ctx, []*schema.Message{{Role: schema.User, Content: req.Message}}) + if err != nil || resp == nil { + _ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, "对话失败")) + _ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID)) + continue + } + if s.monitor != nil { + _ = s.monitor.RecordAIRequest(ctx, time.Since(start), true) + _ = s.monitor.RecordLLMUsage(ctx, &monitor.LLMUsage{Model: "chat", SessionID: req.SessionID, PromptPreview: req.Message, LatencyMS: time.Since(start).Milliseconds(), Timestamp: time.Now()}) + } + _ = sseutil.WSWriteJSON(conn, sseutil.BuildStreamFinal(req.SessionID, resp.Content)) + _ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID)) + } + } +} diff --git a/eino-project/internal/service/customer.go b/eino-project/internal/service/customer.go index 299ac5e..f8051f8 100644 --- a/eino-project/internal/service/customer.go +++ b/eino-project/internal/service/customer.go @@ -36,20 +36,20 @@ type CustomerService struct { // NewCustomerService 创建智能客服服务 func NewCustomerService( - customerUseCase *biz.CustomerUseCase, - sessionManager session.SessionManager, - monitor monitor.Monitor, - logger log.Logger, - models llm.LLM, + customerUseCase *biz.CustomerUseCase, + sessionManager session.SessionManager, + monitor monitor.Monitor, + logger log.Logger, + models llm.LLM, ) *CustomerService { - return &CustomerService{ - customerUseCase: customerUseCase, - sessionManager: sessionManager, - monitor: monitor, - log: log.NewHelper(logger), - // 构建商品查询 ChatModelAgent,绑定工具并让模型自动选择调用 - productAgent: agent.NewProductChatAgent(context.Background(), models), - } + return &CustomerService{ + customerUseCase: customerUseCase, + sessionManager: sessionManager, + monitor: monitor, + log: log.NewHelper(logger), + // 构建商品查询 ChatModelAgent,绑定工具并让模型自动选择调用 + productAgent: agent.NewProductChatAgent(context.Background(), models), + } } func (s *CustomerService) SetChatWorkflow(w wf.ChatWorkflow) { s.chatWorkflow = w } diff --git a/eino-project/internal/service/service.go b/eino-project/internal/service/service.go index 97485f8..8c6b0a6 100644 --- a/eino-project/internal/service/service.go +++ b/eino-project/internal/service/service.go @@ -3,4 +3,4 @@ package service import "github.com/google/wire" // ProviderSet is service providers. -var ProviderSet = wire.NewSet(NewCustomerService, NewAgentService) +var ProviderSet = wire.NewSet(NewCustomerService, NewAgentService, NewWorkflowService, NewChatService) diff --git a/eino-project/internal/service/workflow.go b/eino-project/internal/service/workflow.go new file mode 100644 index 0000000..c83c709 --- /dev/null +++ b/eino-project/internal/service/workflow.go @@ -0,0 +1,65 @@ +package service + +import ( + "context" + "encoding/json" + "net/http" + + "eino-project/internal/domain/llm" + "eino-project/internal/domain/workflow" + "eino-project/internal/pkg/sseutil" + + "github.com/go-kratos/kratos/v2/log" + kratoshttp "github.com/go-kratos/kratos/v2/transport/http" +) + +type WorkflowService struct { + models llm.LLM + log *log.Helper +} + +func NewWorkflowService(logger log.Logger, models llm.LLM) *WorkflowService { + return &WorkflowService{ + models: models, + log: log.NewHelper(logger), + } +} + +func (s *WorkflowService) HandleProductWorkflow(ctx kratoshttp.Context) error { + type reqType struct { + Message string `json:"message"` + SessionID string `json:"session_id"` + } + var req reqType + if err := json.NewDecoder(ctx.Request().Body).Decode(&req); err != nil { + return ctx.JSON(http.StatusBadRequest, &CommonResp{Code: http.StatusBadRequest, Msg: "Invalid request body"}) + } + + w := ctx.Response() + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + fl, ok := w.(http.Flusher) + if !ok { + return ctx.JSON(http.StatusInternalServerError, &CommonResp{Code: http.StatusInternalServerError, Msg: "SSE not supported"}) + } + + sseutil.WriteLoading(w, fl, req.SessionID, "开始执行工作流") + sseutil.WriteProcess(w, fl, req.SessionID, "预处理输入", false) + + wf := workflow.NewZltxProductWorkflow(s.models) + final, err := wf.Run(context.Background(), req.Message) + if err != nil { + sseutil.WriteError(w, fl, req.SessionID, err.Error()) + sseutil.WriteDone(w, fl, req.SessionID) + return nil + } + + var comp map[string]interface{} + if json.Unmarshal([]byte(final), &comp) != nil { + comp = map[string]interface{}{"result": final} + } + sseutil.WriteJson(w, fl, req.SessionID, comp, true) + sseutil.WriteDone(w, fl, req.SessionID) + return nil +}