This commit is contained in:
fuzhongyun 2025-11-20 17:50:13 +08:00
parent a838b87d7e
commit 10cd48667c
18 changed files with 835 additions and 142 deletions

View File

@ -18,11 +18,10 @@ import (
"eino-project/internal/domain/vector"
"eino-project/internal/server"
"eino-project/internal/service"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/log"
)
import (
_ "go.uber.org/automaxprocs"
)
@ -45,10 +44,12 @@ func wireApp(confServer *conf.Server, confData *conf.Data, bootstrap *conf.Boots
customerRepo := repoimpl.NewCustomerRepo(dataData, logger)
customerUseCase := biz.NewCustomerUseCase(customerRepo, logger, knowledgeSearcher, documentProcessor, contextManager, monitorMonitor)
sessionManager := session.NewMemorySessionManager(logger)
llmLLM := llm.NewLLM(bootstrap)
customerService := service.NewCustomerService(customerUseCase, sessionManager, monitorMonitor, logger, llmLLM)
agentService := service.NewAgentService(logger, llmLLM)
httpServer := server.NewHTTPServer(confServer, customerService, agentService, logger)
llmLLM := llm.NewLLM(bootstrap)
customerService := service.NewCustomerService(customerUseCase, sessionManager, monitorMonitor, logger, llmLLM)
agentService := service.NewAgentService(logger, llmLLM)
workflowService := service.NewWorkflowService(logger, llmLLM)
chatService := service.NewChatService(logger, llmLLM, monitorMonitor)
httpServer := server.NewHTTPServer(confServer, customerService, agentService, workflowService, logger, chatService)
app := newApp(logger, httpServer)
return app, func() {
cleanup()

View File

@ -21,7 +21,7 @@ func NewIntentAgent(ctx context.Context, models llm.LLM) adk.Agent {
# 你是一个意图识别智能体根据用户输入识别用户的意图
- 当用户输入商品相关时意图为"商品查询"(product)
- 当用户输入订单相关时意图为"订单诊断"(order)
- 当用户输入其他问题时意图为"其他"(other)
- 当用户输入除商品相关和订单相关的其他问题时意图为"其他"(other)
- 输出结构为: {"intent": "product|order|other"}
`,
Model: intentModel,

View File

@ -0,0 +1,43 @@
package agent
import (
"context"
"eino-project/internal/domain/llm"
"eino-project/internal/domain/tools"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/compose"
)
// NewOrderChatAgent 订单查询 Agent具备订单查询与日志查询工具
func NewOrderChatAgent(ctx context.Context, models llm.LLM) adk.Agent {
chatModel, err := models.Intent()
if err != nil {
return nil
}
toolsCfg := adk.ToolsConfig{
ToolsNodeConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{
tools.NewOrderByIDTool(),
},
ExecuteSequentially: false,
},
ReturnDirectly: map[string]bool{
"get_order_by_id": true,
},
}
agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
Name: "订单查询智能体",
Description: "支持按订单ID查询订单详情",
Model: chatModel,
ToolsConfig: toolsCfg,
Instruction: `
# 你是订单查询智能体
- 当用户给出订单ID时优先调用 "get_order_by_id" 返回订单详情
- 仅返回订单详情json即可不做多余解释
`,
})
return agent
}

View File

@ -0,0 +1,43 @@
package agent
import (
"context"
"eino-project/internal/domain/llm"
"eino-project/internal/domain/tools"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/compose"
)
// NewOrderLogAgent 订单日志 Agent仅绑定日志查询工具
func NewOrderLogAgent(ctx context.Context, models llm.LLM) adk.Agent {
chatModel, err := models.Intent()
if err != nil {
return nil
}
toolsCfg := adk.ToolsConfig{
ToolsNodeConfig: compose.ToolsNodeConfig{
Tools: []tool.BaseTool{
tools.NewOrderLogQueryTool(),
},
ExecuteSequentially: false,
},
ReturnDirectly: map[string]bool{
"query_order_logs": true,
},
}
agent, _ := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
Name: "订单日志智能体",
Description: "查询并返回订单日志",
Model: chatModel,
ToolsConfig: toolsCfg,
Instruction: `
# 你是订单日志查询智能体
- 接收订单ID调用 "query_order_logs" 按时间倒序获取所有日志列表
- 输出日志列表
`,
})
return agent
}

View File

@ -41,9 +41,14 @@ func NewCozeLoopMonitor(base Monitor, cfg *conf.Monitoring_CozeLoop, logger log.
// RecordRequest 记录请求并上报简化 Trace
func (m *cozeLoopMonitor) RecordRequest(ctx context.Context, requestType string, duration time.Duration, success bool) error {
if err := m.base.RecordRequest(ctx, requestType, duration, success); err != nil {
return err
}
ctx, span := m.client.StartSpan(ctx, "RecordRequest", requestType)
span.Finish(ctx)
m.client.Close(ctx)
// if err := m.base.RecordRequest(ctx, requestType, duration, success); err != nil {
// return err
// }
// 简化不生成trace
return nil
}

View File

@ -0,0 +1,45 @@
package tools
import (
"context"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/components/tool/utils"
)
var ordersMock = []*Order{
{ID: "O001", Status: "failed", Product: "商品 272", Amount: 1, CreateTime: "2025-11-11 14:45:00"},
{ID: "O004", Status: "created", Product: "手机 A1", Amount: 1, CreateTime: "2025-11-01 10:00:00"},
{ID: "O002", Status: "paid", Product: "手机 Pro X", Amount: 2, CreateTime: "2025-11-02 11:30:00"},
{ID: "O003", Status: "shipped", Product: "笔记本 M3", Amount: 1, CreateTime: "2025-11-05 09:15:00"},
{ID: "O271", Status: "created", Product: "商品 271", Amount: 3, CreateTime: "2025-11-10 12:00:00"},
}
type Order struct {
ID string `json:"id"`
Status string `json:"status"`
Product string `json:"product"`
Amount int `json:"amount"`
CreateTime string `json:"create_time"`
}
type OrderByIDInput struct {
OrderID string `json:"order_id" jsonschema:"description=订单ID"`
}
func NewOrderByIDTool() tool.InvokableTool {
t, err := utils.InferTool("get_order_by_id", "根据订单ID查询订单详情返回订单对象。", orderByID)
if err != nil {
panic(err)
}
return t
}
func orderByID(ctx context.Context, in *OrderByIDInput) (*Order, error) {
for _, it := range ordersMock {
if it.ID == in.OrderID {
return it, nil
}
}
return &Order{}, nil
}

View File

@ -0,0 +1,39 @@
package tools
import (
"context"
"time"
"github.com/cloudwego/eino/components/tool"
"github.com/cloudwego/eino/components/tool/utils"
)
type OrderLog struct {
OrderID string `json:"order_id"`
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
}
type OrderLogQueryInput struct {
OrderID string `json:"order_id" jsonschema:"description=订单ID"`
}
func NewOrderLogQueryTool() tool.InvokableTool {
t, err := utils.InferTool("query_order_logs", "查询订单处理日志,返回日志数组。", orderLogQuery)
if err != nil {
panic(err)
}
return t
}
func orderLogQuery(ctx context.Context, in *OrderLogQueryInput) ([]*OrderLog, error) {
logs := []*OrderLog{
{OrderID: in.OrderID, Timestamp: time.Now().Add(-5 * time.Minute), Level: "INFO", Message: "订单已创建"},
{OrderID: in.OrderID, Timestamp: time.Now().Add(-4 * time.Minute), Level: "INFO", Message: "支付完成"},
{OrderID: in.OrderID, Timestamp: time.Now().Add(-3 * time.Minute), Level: "INFO", Message: "仓库打包"},
{OrderID: in.OrderID, Timestamp: time.Now().Add(-2 * time.Minute), Level: "WARN", Message: "快递延迟,重新调度"},
{OrderID: in.OrderID, Timestamp: time.Now().Add(-1 * time.Minute), Level: "INFO", Message: "发货完成"},
}
return logs, nil
}

View File

@ -25,7 +25,11 @@ type chatWorkflow struct {
}
func NewChatWorkflow(models llm.LLM, searcher vector.KnowledgeSearcher, ctxMgr contextpkg.ContextManager) ChatWorkflow {
return &chatWorkflow{models: models, searcher: searcher, ctxMgr: ctxMgr}
return &chatWorkflow{
models: models,
searcher: searcher,
ctxMgr: ctxMgr,
}
}
func (w *chatWorkflow) Chat(ctx context.Context, message string, sessionID string) (string, error) {

View File

@ -0,0 +1,85 @@
package workflow
import (
"context"
"fmt"
"strings"
"eino-project/internal/domain/agent"
"eino-project/internal/domain/llm"
"eino-project/internal/pkg/adkutil"
"github.com/cloudwego/eino/compose"
)
type ZltxProductWorkflow struct {
models llm.LLM
}
type productItem struct {
ID string `json:"id"`
Name string `json:"name"`
Price int `json:"price"`
Description string `json:"description"`
}
type productQueryRes struct {
Items []*productItem `json:"items"`
Source string `json:"source"`
Count int `json:"count"`
}
func NewZltxProductWorkflow(models llm.LLM) *ZltxProductWorkflow {
return &ZltxProductWorkflow{models: models}
}
func (w *ZltxProductWorkflow) Run(ctx context.Context, message string) (string, error) {
g := compose.NewGraph[map[string]any, string]()
_ = g.AddLambdaNode("preprocess", compose.InvokableLambda(func(ctx context.Context, in map[string]any) (string, error) {
raw, _ := in["message"].(string)
q := strings.TrimSpace(raw)
return q, nil
}))
_ = g.AddLambdaNode("agent_call", compose.InvokableLambda(func(ctx context.Context, q string) (productQueryRes, error) {
ag := agent.NewProductChatAgent(ctx, w.models)
out, err := adkutil.QueryJSON[productQueryRes](ctx, ag, q)
if err != nil || out == nil {
return productQueryRes{}, err
}
return *out, nil
}))
_ = g.AddLambdaNode("describe", compose.InvokableLambda(func(ctx context.Context, res productQueryRes) (string, error) {
if res.Count <= 0 || len(res.Items) == 0 {
return "未找到相关商品", nil
}
var b strings.Builder
fmt.Fprintf(&b, "共%d条来源%s。", res.Count, res.Source)
for i, it := range res.Items {
if i == 0 {
fmt.Fprintf(&b, "首条:%s(编号%s),价格%d%s。", it.Name, it.ID, it.Price, it.Description)
continue
}
fmt.Fprintf(&b, "%s(编号%s),价格%d%s", it.Name, it.ID, it.Price, it.Description)
}
return b.String(), nil
}))
_ = g.AddEdge(compose.START, "preprocess")
_ = g.AddEdge("preprocess", "agent_call")
_ = g.AddEdge("agent_call", "describe")
_ = g.AddEdge("describe", compose.END)
r, err := g.Compile(ctx)
if err != nil {
return "", err
}
in := map[string]any{"message": message}
ret, err := r.Invoke(ctx, in)
if err != nil {
return "", err
}
return ret, nil
}

View File

@ -1,120 +1,120 @@
package adkutil
import (
"context"
"encoding/json"
"errors"
"context"
"encoding/json"
"errors"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
krlog "github.com/go-kratos/kratos/v2/log"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
krlog "github.com/go-kratos/kratos/v2/log"
)
type Result struct {
Customized any
Message *schema.Message
Customized any
Message *schema.Message
}
func QueryJSON[T any](ctx context.Context, agent adk.Agent, query string) (*T, error) {
r, err := Query(ctx, agent, query)
if err != nil {
return nil, err
}
if r.Customized != nil {
if v, ok := r.Customized.(*T); ok {
return v, nil
}
b, _ := json.Marshal(r.Customized)
var out T
if json.Unmarshal(b, &out) == nil {
return &out, nil
}
}
if r.Message != nil && r.Message.Content != "" {
var out T
if json.Unmarshal([]byte(r.Message.Content), &out) == nil {
return &out, nil
}
}
return nil, errors.New("agent output not match target type")
r, err := Query(ctx, agent, query)
if err != nil {
return nil, err
}
if r.Customized != nil {
if v, ok := r.Customized.(*T); ok {
return v, nil
}
b, _ := json.Marshal(r.Customized)
var out T
if json.Unmarshal(b, &out) == nil {
return &out, nil
}
}
if r.Message != nil && r.Message.Content != "" {
var out T
if json.Unmarshal([]byte(r.Message.Content), &out) == nil {
return &out, nil
}
}
return nil, errors.New("agent output not match target type")
}
func QueryWithLogger(ctx context.Context, agent adk.Agent, query string, logger *krlog.Helper) (Result, error) {
runner := adk.NewRunner(ctx, adk.RunnerConfig{Agent: agent})
it := runner.Query(ctx, query)
var out Result
var lastErr error
if logger != nil {
logger.Infof("agent query start: %s", query)
}
for {
ev, ok := it.Next()
if !ok || ev == nil {
break
}
if logger != nil {
logger.Infof("agent event received: err=%v", ev.Err)
}
if ev.Err != nil {
lastErr = ev.Err
}
if ev.Output != nil {
if ev.Output.CustomizedOutput != nil {
out.Customized = ev.Output.CustomizedOutput
if logger != nil {
b, _ := json.Marshal(ev.Output.CustomizedOutput)
logger.Infof("agent customized output=%s", string(b))
}
}
if ev.Output.MessageOutput != nil {
msg, _ := ev.Output.MessageOutput.GetMessage()
if msg != nil {
out.Message = msg
if logger != nil {
logger.Infof("agent message role=%s content=%s", msg.Role, msg.Content)
if len(msg.ToolCalls) > 0 {
for _, tc := range msg.ToolCalls {
if tc.Function.Name != "" {
logger.Infof("agent tool call name=%s args=%s", tc.Function.Name, tc.Function.Arguments)
}
}
}
}
}
}
}
}
if out.Customized != nil || out.Message != nil {
return out, nil
}
if lastErr != nil {
return out, lastErr
}
return out, errors.New("agent no output")
runner := adk.NewRunner(ctx, adk.RunnerConfig{Agent: agent})
it := runner.Query(ctx, query)
var out Result
var lastErr error
if logger != nil {
logger.Infof("agent query start: %s", query)
}
for {
ev, ok := it.Next()
if !ok || ev == nil {
break
}
if logger != nil {
logger.Infof("agent event received: err=%v", ev.Err)
}
if ev.Err != nil {
lastErr = ev.Err
}
if ev.Output != nil {
if ev.Output.CustomizedOutput != nil {
out.Customized = ev.Output.CustomizedOutput
if logger != nil {
b, _ := json.Marshal(ev.Output.CustomizedOutput)
logger.Infof("agent customized output=%s", string(b))
}
}
if ev.Output.MessageOutput != nil {
msg, _ := ev.Output.MessageOutput.GetMessage()
if msg != nil {
out.Message = msg
if logger != nil {
logger.Infof("agent message role=%s content=%s", msg.Role, msg.Content)
if len(msg.ToolCalls) > 0 {
for _, tc := range msg.ToolCalls {
if tc.Function.Name != "" {
logger.Infof("agent tool call name=%s args=%s", tc.Function.Name, tc.Function.Arguments)
}
}
}
}
}
}
}
}
if out.Customized != nil || out.Message != nil {
return out, nil
}
if lastErr != nil {
return out, lastErr
}
return out, errors.New("agent no output")
}
func QueryJSONWithLogger[T any](ctx context.Context, agent adk.Agent, query string, logger *krlog.Helper) (*T, error) {
r, err := QueryWithLogger(ctx, agent, query, logger)
if err != nil {
return nil, err
}
if r.Customized != nil {
if v, ok := r.Customized.(*T); ok {
return v, nil
}
b, _ := json.Marshal(r.Customized)
var out T
if json.Unmarshal(b, &out) == nil {
return &out, nil
}
}
if r.Message != nil && r.Message.Content != "" {
var out T
if json.Unmarshal([]byte(r.Message.Content), &out) == nil {
return &out, nil
}
}
return nil, errors.New("agent output not match target type")
r, err := QueryWithLogger(ctx, agent, query, logger)
if err != nil {
return nil, err
}
if r.Customized != nil {
if v, ok := r.Customized.(*T); ok {
return v, nil
}
b, _ := json.Marshal(r.Customized)
var out T
if json.Unmarshal(b, &out) == nil {
return &out, nil
}
}
if r.Message != nil && r.Message.Content != "" {
var out T
if json.Unmarshal([]byte(r.Message.Content), &out) == nil {
return &out, nil
}
}
return nil, errors.New("agent output not match target type")
}
// Query 对 Agent 发起一次非流式请求,并提取统一结果
@ -196,3 +196,44 @@ func Stream(ctx context.Context, agent adk.Agent, query string) (<-chan string,
}()
return ch, nil
}
// StreamWithLogger记录事件以判断是否触发模型流式MessageOutput多次或仅工具输出CustomizedOutput
func StreamWithLogger(ctx context.Context, agent adk.Agent, query string, logger *krlog.Helper) (<-chan string, error) {
runner := adk.NewRunner(ctx, adk.RunnerConfig{Agent: agent, EnableStreaming: true})
it := runner.Query(ctx, query)
ch := make(chan string, 8)
go func() {
defer close(ch)
var msgCount int
for {
ev, ok := it.Next()
if !ok || ev == nil {
if logger != nil {
logger.Infof("stream finished, message_count=%d", msgCount)
}
break
}
logger.Infof("stream event: %v", ev)
if ev.Output != nil {
if ev.Output.CustomizedOutput != nil && logger != nil {
logger.Infof("customized output received")
}
if ev.Output.MessageOutput != nil {
msg, _ := ev.Output.MessageOutput.GetMessage()
if msg != nil {
msgCount++
if logger != nil {
logger.Infof("message event #%d role=%s len=%d", msgCount, msg.Role, len(msg.Content))
}
if msg.Content != "" {
ch <- msg.Content
}
}
}
}
}
}()
return ch, nil
}

View File

@ -0,0 +1,103 @@
package sseutil
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/gorilla/websocket"
)
type Content struct {
Chunk string `json:"chunk"`
FullMessage string `json:"full_message"`
IsFinal bool `json:"is_final"`
}
type Payload struct {
DataType string `json:"data_type"`
Content Content `json:"content"`
Component map[string]interface{} `json:"component,omitempty"`
}
type Response struct {
SessionID string `json:"session_id"`
Timestamp string `json:"timestamp"`
Type string `json:"type"`
Payload Payload `json:"payload"`
}
func ts() string { return time.Now().Format("2006-01-02 15:04:05") }
func write(w http.ResponseWriter, fl http.Flusher, r Response) {
b, _ := json.Marshal(r)
fmt.Fprintf(w, "data: %s\n\n", b)
fl.Flush()
}
func WriteLog(w http.ResponseWriter, fl http.Flusher, sessionID string, msg string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: false}}})
}
func WriteLoading(w http.ResponseWriter, fl http.Flusher, sessionID string, msg string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: false}}})
}
func WriteProcess(w http.ResponseWriter, fl http.Flusher, sessionID string, msg string, final bool) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: final}}})
}
func WriteStreamChunk(w http.ResponseWriter, fl http.Flusher, sessionID string, chunk string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: chunk, FullMessage: "", IsFinal: false}}})
}
func WriteStreamFinal(w http.ResponseWriter, fl http.Flusher, sessionID string, full string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: "", FullMessage: full, IsFinal: true}}})
}
func WriteJson(w http.ResponseWriter, fl http.Flusher, sessionID string, component map[string]interface{}, final bool) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "component_data", Content: Content{Chunk: "", FullMessage: "", IsFinal: final}, Component: component}})
}
func WriteError(w http.ResponseWriter, fl http.Flusher, sessionID string, errMsg string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "error", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: errMsg, IsFinal: true}}})
}
func WriteDone(w http.ResponseWriter, fl http.Flusher, sessionID string) {
write(w, fl, Response{SessionID: sessionID, Timestamp: ts(), Type: "done", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: "", IsFinal: true}}})
}
// 构建(不写出)——用于 WebSocket 统一协议
func BuildLog(sessionID string, msg string) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: false}}})
return b
}
func BuildProcess(sessionID string, msg string, final bool) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: msg, FullMessage: "", IsFinal: final}}})
return b
}
func BuildStreamChunk(sessionID string, chunk string) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: chunk, FullMessage: "", IsFinal: false}}})
return b
}
func BuildStreamFinal(sessionID string, full string) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "general_chat_stream", Content: Content{Chunk: "", FullMessage: full, IsFinal: true}}})
return b
}
func BuildJson(sessionID string, component map[string]interface{}, final bool) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "data", Payload: Payload{DataType: "component_data", Content: Content{Chunk: "", FullMessage: "", IsFinal: final}, Component: component}})
return b
}
func BuildError(sessionID string, errMsg string) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "error", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: errMsg, IsFinal: true}}})
return b
}
func BuildDone(sessionID string) []byte {
b, _ := json.Marshal(Response{SessionID: sessionID, Timestamp: ts(), Type: "done", Payload: Payload{DataType: "thinking_process", Content: Content{Chunk: "", FullMessage: "", IsFinal: true}}})
return b
}
// WebSocket 工具类
func WSWriteJSON(conn *websocket.Conn, payload []byte) error {
return conn.WriteMessage(websocket.TextMessage, payload)
}

View File

@ -14,7 +14,14 @@ import (
)
// NewHTTPServer new an HTTP server.
func NewHTTPServer(c *conf.Server, customerService *service.CustomerService, agentService *service.AgentService, logger log.Logger) *http.Server {
func NewHTTPServer(
c *conf.Server,
customerService *service.CustomerService,
agentService *service.AgentService,
workflowService *service.WorkflowService,
logger log.Logger,
chatService *service.ChatService,
) *http.Server {
var opts = []http.ServerOption{
http.Middleware(
recovery.Recovery(),
@ -34,24 +41,21 @@ func NewHTTPServer(c *conf.Server, customerService *service.CustomerService, age
// 注册HTTP路由
v1.RegisterCustomerServiceHTTPServer(srv, customerService)
// 添加SSE流式聊天的自定义路由
srv.HandleFunc("/api/chat/stream", customerService.HandleStreamChat)
// WebSocket 聊天路由(/api/chat/ws
srv.HandleFunc("/api/chat/ws", func(w nethttp.ResponseWriter, r *nethttp.Request) {
upgrader := websocket.Upgrader{CheckOrigin: func(r *nethttp.Request) bool { return true }}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
customerService.HandleWebSocketChat(conn)
})
route := srv.Route("/api")
// 商品查询 Agent 路由
route.POST("/agents/product/query", agentService.HandleProductQuery)
// 订单诊断工作流(直接调用流水线输出)
// route.POST("/api/workflow/order/diagnosis", customerService.HandleOrderDiagnosis)
// 产品工作流Graph 编排)
route.POST("/workflow/product/query", workflowService.HandleProductWorkflow)
// WebSocket 聊天路由(/api/chat/ws
route.GET("/chat/ws", func(ctx http.Context) error {
upgrader := websocket.Upgrader{CheckOrigin: func(r *nethttp.Request) bool { return true }}
conn, err := upgrader.Upgrade(ctx.Response(), ctx.Request(), nil)
if err != nil {
return err
}
chatService.HandleWebSocketChat(conn)
return nil
})
return srv
}

View File

@ -0,0 +1,215 @@
package service
import (
"context"
"encoding/json"
"fmt"
"time"
"eino-project/internal/domain/agent"
"eino-project/internal/domain/llm"
"eino-project/internal/domain/monitor"
"eino-project/internal/domain/workflow"
"eino-project/internal/pkg/adkutil"
"eino-project/internal/pkg/sseutil"
"github.com/cloudwego/eino/schema"
"github.com/go-kratos/kratos/v2/log"
"github.com/gorilla/websocket"
)
type ChatService struct {
models llm.LLM
log *log.Helper
monitor monitor.Monitor
}
func NewChatService(logger log.Logger, models llm.LLM, m monitor.Monitor) *ChatService {
return &ChatService{models: models, log: log.NewHelper(logger), monitor: m}
}
func (s *ChatService) HandleWebSocketChat(conn *websocket.Conn) {
defer conn.Close()
ctx := context.Background()
// 初始化两轮上下文(不输出,仅用于后续对话)
var dialogCtx []schema.Message = []schema.Message{
{Role: schema.User, Content: "你好"},
{Role: schema.Assistant, Content: "您好,请问需要什么帮助?"},
{Role: schema.User, Content: "查询订单 O271 的进度"},
{Role: schema.Assistant, Content: "订单 O271 已创建,正在处理"},
}
for {
mt, msg, err := conn.ReadMessage()
if err != nil {
return
}
if mt != websocket.TextMessage {
continue
}
var req struct {
Message string `json:"message"`
SessionID string `json:"session_id"`
}
if json.Unmarshal(msg, &req) != nil || req.Message == "" {
_ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, "invalid input"))
_ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID))
continue
}
// 追加到上下文并限制近2轮最多保留4条
dialogCtx = append(dialogCtx, schema.Message{Role: schema.User, Content: req.Message})
if len(dialogCtx) > 4 {
dialogCtx = dialogCtx[len(dialogCtx)-4:]
}
// 成本消耗记录示意写入DB + Coze Loop 简化监控
_ = func() error {
// TODO: 记录成本消耗token、模型、耗时等
s.log.Infof("cost-record session=%s ts=%s", req.SessionID, time.Now().Format("2006-01-02 15:04:05"))
if s.monitor != nil {
_ = s.monitor.RecordRequest(ctx, "ws_chat", 0, true)
}
return nil
}()
_ = sseutil.WSWriteJSON(conn, sseutil.BuildLog(req.SessionID, "意图识别中"))
intentAgent := agent.NewIntentAgent(ctx, s.models)
var intent string
if intentAgent != nil {
r, err := adkutil.Query(ctx, intentAgent, req.Message)
if err == nil && r.Message != nil && r.Message.Content != "" {
var intentJson map[string]interface{}
if json.Unmarshal([]byte(r.Message.Content), &intentJson) == nil {
intent = intentJson["intent"].(string)
}
}
}
_ = sseutil.WSWriteJSON(conn, sseutil.BuildLog(req.SessionID, "意图识别结果:"+intent))
switch intent {
case "product":
_ = sseutil.WSWriteJSON(conn, sseutil.BuildProcess(req.SessionID, "进入产品工作流", false))
wf := workflow.NewZltxProductWorkflow(s.models)
startWF := time.Now()
out, err := wf.Run(ctx, req.Message)
if err != nil {
_ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, err.Error()))
_ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID))
continue
}
if s.monitor != nil {
_ = s.monitor.RecordAIRequest(ctx, time.Since(startWF), true)
_ = s.monitor.RecordLLMUsage(ctx, &monitor.LLMUsage{Model: "workflow:product", SessionID: req.SessionID, PromptPreview: req.Message, LatencyMS: time.Since(startWF).Milliseconds(), Timestamp: time.Now()})
}
var comp map[string]interface{}
if json.Unmarshal([]byte(out), &comp) != nil {
comp = map[string]interface{}{"result": out}
}
_ = sseutil.WSWriteJSON(conn, sseutil.BuildJson(req.SessionID, comp, true))
_ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID))
case "order":
_ = sseutil.WSWriteJSON(conn, sseutil.BuildProcess(req.SessionID, "获取订单信息", false))
ordAgent := agent.NewOrderChatAgent(ctx, s.models)
ordr, err := adkutil.QueryWithLogger(ctx, ordAgent, req.Message, s.log)
if err != nil {
_ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, err.Error()))
_ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID))
continue
}
var orderInfo map[string]interface{}
if ordr.Customized != nil {
b, _ := json.Marshal(ordr.Customized)
_ = json.Unmarshal(b, &orderInfo)
} else if ordr.Message != nil {
_ = json.Unmarshal([]byte(ordr.Message.Content), &orderInfo)
}
if orderInfo["id"] == nil {
_ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, "订单ID不存在"))
_ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID))
continue
}
_ = sseutil.WSWriteJSON(conn, sseutil.BuildJson(req.SessionID, map[string]interface{}{"order": orderInfo}, false))
status := fmt.Sprintf("%v", orderInfo["status"])
if status != "" && status != "completed" && status != "delivered" {
_ = sseutil.WSWriteJSON(conn, sseutil.BuildProcess(req.SessionID, "订单未完成生成诊断Markdown流式", false))
logAgent := agent.NewOrderLogAgent(ctx, s.models)
id := fmt.Sprintf("%v", orderInfo["id"])
res, err := adkutil.Query(ctx, logAgent, fmt.Sprintf("order_id=%s", id))
if err != nil {
s.log.Infof("order log agent error: %v", err)
_ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, "日志查询失败"))
} else {
var logs []map[string]interface{}
if res.Customized != nil {
b, _ := json.Marshal(res.Customized)
_ = json.Unmarshal(b, &logs)
} else if res.Message != nil && res.Message.Content != "" {
_ = json.Unmarshal([]byte(res.Message.Content), &logs)
}
// 仅使用最近两轮上下文
ctx2 := dialogCtx
if len(ctx2) > 4 {
ctx2 = ctx2[len(ctx2)-4:]
}
ctxTextDialog, _ := json.Marshal(ctx2)
ctxTextOrder, _ := json.Marshal(orderInfo)
ctxTextLogs, _ := json.Marshal(logs)
prompt := fmt.Sprintf("基于最近两轮对话上下文、订单信息与处理日志生成Markdown诊断。\n\n上下文:%s\n\n订单:%s\n\n日志:%s\n\n请给出状态判定、关键日志条目与建议。", string(ctxTextDialog), string(ctxTextOrder), string(ctxTextLogs))
chatModel, err := s.models.Chat()
if err != nil {
_ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, err.Error()))
} else {
reader, err := chatModel.Stream(ctx, []*schema.Message{{Role: schema.User, Content: prompt}})
if err != nil {
_ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, "诊断流式生成失败"))
} else {
start := time.Now()
var full string
for {
chunk, rerr := reader.Recv()
if rerr != nil {
break
}
if chunk != nil && chunk.Content != "" {
full += chunk.Content
_ = sseutil.WSWriteJSON(conn, sseutil.BuildStreamChunk(req.SessionID, chunk.Content))
}
}
if s.monitor != nil {
_ = s.monitor.RecordAIRequest(ctx, time.Since(start), true)
_ = s.monitor.RecordLLMUsage(ctx, &monitor.LLMUsage{Model: "agent:order_diagnosis", SessionID: req.SessionID, PromptPreview: prompt, LatencyMS: time.Since(start).Milliseconds(), Timestamp: time.Now()})
}
_ = sseutil.WSWriteJSON(conn, sseutil.BuildStreamFinal(req.SessionID, full))
// 追加助手回复到上下文并限制两轮
dialogCtx = append(dialogCtx, schema.Message{Role: schema.Assistant, Content: full})
if len(dialogCtx) > 4 {
dialogCtx = dialogCtx[len(dialogCtx)-4:]
}
}
}
}
}
_ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID))
default:
_ = sseutil.WSWriteJSON(conn, sseutil.BuildProcess(req.SessionID, "进入自然对话", false))
chatModel, err := s.models.Chat()
if err != nil {
_ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, err.Error()))
_ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID))
continue
}
start := time.Now()
resp, err := chatModel.Generate(ctx, []*schema.Message{{Role: schema.User, Content: req.Message}})
if err != nil || resp == nil {
_ = sseutil.WSWriteJSON(conn, sseutil.BuildError(req.SessionID, "对话失败"))
_ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID))
continue
}
if s.monitor != nil {
_ = s.monitor.RecordAIRequest(ctx, time.Since(start), true)
_ = s.monitor.RecordLLMUsage(ctx, &monitor.LLMUsage{Model: "chat", SessionID: req.SessionID, PromptPreview: req.Message, LatencyMS: time.Since(start).Milliseconds(), Timestamp: time.Now()})
}
_ = sseutil.WSWriteJSON(conn, sseutil.BuildStreamFinal(req.SessionID, resp.Content))
_ = sseutil.WSWriteJSON(conn, sseutil.BuildDone(req.SessionID))
}
}
}

View File

@ -36,20 +36,20 @@ type CustomerService struct {
// NewCustomerService 创建智能客服服务
func NewCustomerService(
customerUseCase *biz.CustomerUseCase,
sessionManager session.SessionManager,
monitor monitor.Monitor,
logger log.Logger,
models llm.LLM,
customerUseCase *biz.CustomerUseCase,
sessionManager session.SessionManager,
monitor monitor.Monitor,
logger log.Logger,
models llm.LLM,
) *CustomerService {
return &CustomerService{
customerUseCase: customerUseCase,
sessionManager: sessionManager,
monitor: monitor,
log: log.NewHelper(logger),
// 构建商品查询 ChatModelAgent绑定工具并让模型自动选择调用
productAgent: agent.NewProductChatAgent(context.Background(), models),
}
return &CustomerService{
customerUseCase: customerUseCase,
sessionManager: sessionManager,
monitor: monitor,
log: log.NewHelper(logger),
// 构建商品查询 ChatModelAgent绑定工具并让模型自动选择调用
productAgent: agent.NewProductChatAgent(context.Background(), models),
}
}
func (s *CustomerService) SetChatWorkflow(w wf.ChatWorkflow) { s.chatWorkflow = w }

View File

@ -3,4 +3,4 @@ package service
import "github.com/google/wire"
// ProviderSet is service providers.
var ProviderSet = wire.NewSet(NewCustomerService, NewAgentService)
var ProviderSet = wire.NewSet(NewCustomerService, NewAgentService, NewWorkflowService, NewChatService)

View File

@ -0,0 +1,65 @@
package service
import (
"context"
"encoding/json"
"net/http"
"eino-project/internal/domain/llm"
"eino-project/internal/domain/workflow"
"eino-project/internal/pkg/sseutil"
"github.com/go-kratos/kratos/v2/log"
kratoshttp "github.com/go-kratos/kratos/v2/transport/http"
)
type WorkflowService struct {
models llm.LLM
log *log.Helper
}
func NewWorkflowService(logger log.Logger, models llm.LLM) *WorkflowService {
return &WorkflowService{
models: models,
log: log.NewHelper(logger),
}
}
func (s *WorkflowService) HandleProductWorkflow(ctx kratoshttp.Context) error {
type reqType struct {
Message string `json:"message"`
SessionID string `json:"session_id"`
}
var req reqType
if err := json.NewDecoder(ctx.Request().Body).Decode(&req); err != nil {
return ctx.JSON(http.StatusBadRequest, &CommonResp{Code: http.StatusBadRequest, Msg: "Invalid request body"})
}
w := ctx.Response()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
fl, ok := w.(http.Flusher)
if !ok {
return ctx.JSON(http.StatusInternalServerError, &CommonResp{Code: http.StatusInternalServerError, Msg: "SSE not supported"})
}
sseutil.WriteLoading(w, fl, req.SessionID, "开始执行工作流")
sseutil.WriteProcess(w, fl, req.SessionID, "预处理输入", false)
wf := workflow.NewZltxProductWorkflow(s.models)
final, err := wf.Run(context.Background(), req.Message)
if err != nil {
sseutil.WriteError(w, fl, req.SessionID, err.Error())
sseutil.WriteDone(w, fl, req.SessionID)
return nil
}
var comp map[string]interface{}
if json.Unmarshal([]byte(final), &comp) != nil {
comp = map[string]interface{}{"result": final}
}
sseutil.WriteJson(w, fl, req.SessionID, comp, true)
sseutil.WriteDone(w, fl, req.SessionID)
return nil
}