diff --git a/eino-project/api/customer/v1/customer.pb.go b/eino-project/api/customer/v1/customer.pb.go index a250e0b..11a534e 100644 --- a/eino-project/api/customer/v1/customer.pb.go +++ b/eino-project/api/customer/v1/customer.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.9 +// protoc-gen-go v1.36.10 // protoc v3.21.12 // source: customer/v1/customer.proto diff --git a/eino-project/api/helloworld/v1/error_reason.pb.go b/eino-project/api/helloworld/v1/error_reason.pb.go index bf82ba0..bf8ecaa 100644 --- a/eino-project/api/helloworld/v1/error_reason.pb.go +++ b/eino-project/api/helloworld/v1/error_reason.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.9 +// protoc-gen-go v1.36.10 // protoc v3.21.12 // source: helloworld/v1/error_reason.proto diff --git a/eino-project/api/helloworld/v1/greeter.pb.go b/eino-project/api/helloworld/v1/greeter.pb.go index ce8456a..16c27de 100644 --- a/eino-project/api/helloworld/v1/greeter.pb.go +++ b/eino-project/api/helloworld/v1/greeter.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.9 +// protoc-gen-go v1.36.10 // protoc v3.21.12 // source: helloworld/v1/greeter.proto diff --git a/eino-project/cmd/server/__debug_bin274909578 b/eino-project/cmd/server/__debug_bin274909578 deleted file mode 100644 index ed4a149..0000000 Binary files a/eino-project/cmd/server/__debug_bin274909578 and /dev/null differ diff --git a/eino-project/cmd/server/main.go b/eino-project/cmd/server/main.go index fcdb998..36cacb5 100644 --- a/eino-project/cmd/server/main.go +++ b/eino-project/cmd/server/main.go @@ -1,20 +1,19 @@ package main import ( - "flag" - "os" + "flag" + "os" - "eino-project/internal/conf" + "eino-project/internal/conf" - "github.com/go-kratos/kratos/v2" - "github.com/go-kratos/kratos/v2/config" - "github.com/go-kratos/kratos/v2/config/file" - "github.com/go-kratos/kratos/v2/log" - "github.com/go-kratos/kratos/v2/middleware/tracing" - "github.com/go-kratos/kratos/v2/transport/grpc" - "github.com/go-kratos/kratos/v2/transport/http" + "github.com/go-kratos/kratos/v2" + "github.com/go-kratos/kratos/v2/config" + "github.com/go-kratos/kratos/v2/config/file" + "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/middleware/tracing" + "github.com/go-kratos/kratos/v2/transport/http" - _ "go.uber.org/automaxprocs" + _ "go.uber.org/automaxprocs" ) // go build -ldflags "-X main.Version=x.y.z" @@ -33,18 +32,17 @@ func init() { flag.StringVar(&flagconf, "conf", "./configs", "config path, eg: -conf config.yaml") } -func newApp(logger log.Logger, gs *grpc.Server, hs *http.Server) *kratos.App { - return kratos.New( - kratos.ID(id), - kratos.Name(Name), - kratos.Version(Version), - kratos.Metadata(map[string]string{}), - kratos.Logger(logger), - kratos.Server( - gs, - hs, - ), - ) +func newApp(logger log.Logger, hs *http.Server) *kratos.App { + return kratos.New( + kratos.ID(id), + kratos.Name(Name), + kratos.Version(Version), + kratos.Metadata(map[string]string{}), + kratos.Logger(logger), + kratos.Server( + hs, + ), + ) } func main() { diff --git a/eino-project/cmd/server/wire.go b/eino-project/cmd/server/wire.go index fb9a90e..3f394cf 100644 --- a/eino-project/cmd/server/wire.go +++ b/eino-project/cmd/server/wire.go @@ -6,17 +6,13 @@ package main import ( - "eino-project/internal/ai" "eino-project/internal/biz" "eino-project/internal/conf" - contextpkg "eino-project/internal/context" "eino-project/internal/data" "eino-project/internal/data/repoimpl" - "eino-project/internal/monitor" + "eino-project/internal/domain" "eino-project/internal/server" "eino-project/internal/service" - "eino-project/internal/session" - "eino-project/internal/vector" "github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2/log" @@ -25,5 +21,13 @@ import ( // wireApp init kratos application. func wireApp(*conf.Server, *conf.Data, *conf.Bootstrap, log.Logger) (*kratos.App, func(), error) { - panic(wire.Build(server.ProviderSet, data.ProviderSet, repoimpl.ProviderSet, biz.ProviderSet, service.ProviderSet, ai.ProviderSet, session.ProviderSet, vector.ProviderSet, monitor.ProviderSet, contextpkg.ProviderSet, newApp)) + panic(wire.Build( + server.ProviderSet, + data.ProviderSet, + repoimpl.ProviderSet, + biz.ProviderSet, + service.ProviderSet, + domain.ProviderSet, + newApp, + )) } diff --git a/eino-project/cmd/server/wire_gen.go b/eino-project/cmd/server/wire_gen.go index 53e66b8..fb811b9 100644 --- a/eino-project/cmd/server/wire_gen.go +++ b/eino-project/cmd/server/wire_gen.go @@ -7,17 +7,16 @@ package main import ( - "eino-project/internal/ai" "eino-project/internal/biz" "eino-project/internal/conf" - "eino-project/internal/context" "eino-project/internal/data" "eino-project/internal/data/repoimpl" - "eino-project/internal/monitor" + "eino-project/internal/domain/context" + "eino-project/internal/domain/monitor" + "eino-project/internal/domain/session" + "eino-project/internal/domain/vector" "eino-project/internal/server" "eino-project/internal/service" - "eino-project/internal/session" - "eino-project/internal/vector" "github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2/log" ) @@ -36,7 +35,7 @@ func wireApp(confServer *conf.Server, confData *conf.Data, bootstrap *conf.Boots } documentProcessor := vector.NewDocumentProcessor(vectorService) knowledgeSearcher := vector.NewKnowledgeSearcher(vectorService) - contextManager := context.NewContextManagerFromBootstrapConfig(bootstrap, logger) + contextManager := context.NewContextManager(logger) monitorMonitor := monitor.NewMonitorFromBootstrapConfig(bootstrap, logger) dataData, cleanup, err := data.NewData(confData, logger, vectorService, documentProcessor, knowledgeSearcher, contextManager, monitorMonitor) if err != nil { @@ -44,16 +43,10 @@ func wireApp(confServer *conf.Server, confData *conf.Data, bootstrap *conf.Boots } customerRepo := repoimpl.NewCustomerRepo(dataData, logger) customerUseCase := biz.NewCustomerUseCase(customerRepo, logger, knowledgeSearcher, documentProcessor, contextManager, monitorMonitor) - aiService, err := ai.NewAIServiceFromConfig(bootstrap, logger, knowledgeSearcher, contextManager, monitorMonitor) - if err != nil { - cleanup() - return nil, nil, err - } sessionManager := session.NewMemorySessionManager(logger) - customerService := service.NewCustomerService(customerUseCase, aiService, sessionManager, monitorMonitor, logger) - grpcServer := server.NewGRPCServer(confServer, customerService, logger) + customerService := service.NewCustomerService(customerUseCase, sessionManager, monitorMonitor, logger) httpServer := server.NewHTTPServer(confServer, customerService, logger) - app := newApp(logger, grpcServer, httpServer) + app := newApp(logger, httpServer) return app, func() { cleanup() }, nil diff --git a/eino-project/go.mod b/eino-project/go.mod index 6d18014..565af73 100644 --- a/eino-project/go.mod +++ b/eino-project/go.mod @@ -46,6 +46,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/goph/emperror v0.17.2 // indirect github.com/gorilla/mux v1.8.1 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/invopop/yaml v0.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect diff --git a/eino-project/go.sum b/eino-project/go.sum index abc4d74..2599002 100644 --- a/eino-project/go.sum +++ b/eino-project/go.sum @@ -101,6 +101,7 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -113,6 +114,8 @@ github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfre github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/invopop/yaml v0.1.0 h1:YW3WGUoJEXYfzWBjn00zIlrw7brGVD0fUKRYDPAPhrc= github.com/invopop/yaml v0.1.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q= @@ -251,6 +254,8 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91 golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= +golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= diff --git a/eino-project/internal/ai/ai.go b/eino-project/internal/ai/ai.go deleted file mode 100644 index 5818322..0000000 --- a/eino-project/internal/ai/ai.go +++ /dev/null @@ -1,329 +0,0 @@ -package ai - -import ( - "context" - "fmt" - "strings" - "time" - - contextpkg "eino-project/internal/context" - "eino-project/internal/monitor" - "eino-project/internal/vector" - - "github.com/cloudwego/eino/components/model" - "github.com/cloudwego/eino/schema" - "github.com/go-kratos/kratos/v2/log" -) - -// AIService AI服务接口 -type AIService interface { - ProcessChat(ctx context.Context, message string, sessionID string) (string, error) - StreamChat(ctx context.Context, message string, sessionID string) (<-chan string, error) - // AnalyzeIntent 使用意图模型(qwen3:8b)做意图识别,仅返回固定标签之一 - AnalyzeIntent(ctx context.Context, message string) (string, error) -} - -// OllamaRequest Ollama API 请求结构 -type OllamaRequest struct { - Model string `json:"model"` - Prompt string `json:"prompt"` - Stream bool `json:"stream"` -} - -// OllamaResponse Ollama API 响应结构 -type OllamaResponse struct { - Model string `json:"model"` - Response string `json:"response"` - Done bool `json:"done"` - CreatedAt string `json:"created_at"` -} - -// aiService AI服务实现 -type aiService struct { - logger log.Logger - chatModel model.BaseChatModel - intentModel model.BaseChatModel - knowledgeSearcher vector.KnowledgeSearcher - contextManager contextpkg.ContextManager - monitor interface { - RecordLLMUsage(ctx context.Context, usage *monitor.LLMUsage) error - } - chatModelName string -} - -// NewAIService 创建AI服务实例 -func NewAIService(logger log.Logger, chatModel, intentModel model.BaseChatModel, knowledgeSearcher vector.KnowledgeSearcher, contextManager contextpkg.ContextManager) AIService { - return &aiService{ - logger: logger, - chatModel: chatModel, - intentModel: intentModel, - knowledgeSearcher: knowledgeSearcher, - contextManager: contextManager, - } -} - -// ProcessChat 处理聊天消息 -func (s *aiService) ProcessChat(ctx context.Context, message string, sessionID string) (string, error) { - log.Context(ctx).Infof("Processing chat message: %s for session: %s", message, sessionID) - - // 1. 添加消息到上下文管理器 - if s.contextManager != nil { - msg := contextpkg.Message{ - Role: "user", - Content: message, - Timestamp: time.Now(), - } - s.contextManager.AddMessage(ctx, sessionID, msg) - } - - // 2. 搜索相关知识库内容 - var knowledgeContext string - if s.knowledgeSearcher != nil { - knowledgeResults, err := s.knowledgeSearcher.SearchKnowledge(ctx, message, 3) - if err == nil && len(knowledgeResults) > 0 { - var contextParts []string - for _, result := range knowledgeResults { - contextParts = append(contextParts, fmt.Sprintf("相关知识: %s", result.Document.Content)) - } - knowledgeContext = strings.Join(contextParts, "\n") - } - } - - // 3. 构建增强的聊天消息 - enhancedMessage := message - if knowledgeContext != "" { - enhancedMessage = fmt.Sprintf("基于以下知识库内容回答用户问题:\n%s\n\n用户问题: %s", knowledgeContext, message) - } - - messages := []*schema.Message{ - { - Role: schema.User, - Content: enhancedMessage, - }, - } - - // 4. 调用 Eino 聊天模型 - start := time.Now() - response, err := s.chatModel.Generate(ctx, messages) - if err != nil { - log.Context(ctx).Warnf("Eino chat model call failed: %v, falling back to mock response", err) - // 如果 Eino 调用失败,返回模拟响应 - return s.generateMockResponse(message), nil - } - - if response == nil || response.Content == "" { - log.Context(ctx).Warn("Empty response from Eino chat model, falling back to mock response") - return s.generateMockResponse(message), nil - } - - // 轻量化上报LLM使用:模型、token估算、延迟、知识命中等 - if s.monitor != nil { - // eino 不直接暴露 tokens,这里用近似估算:中文字数/2,英文按空格词数*1 - promptTokens := estimateTokens(enhancedMessage) - completionTokens := estimateTokens(response.Content) - // 使用专用意图模型识别意图(不使用自定义规则) - detectedIntent := "general_inquiry" - if intent, err := s.AnalyzeIntent(ctx, message); err == nil && intent != "" { - detectedIntent = intent - } - usage := &monitor.LLMUsage{ - Model: s.chatModelName, - SessionID: sessionID, - UserID: "default_user", - PromptPreview: preview(enhancedMessage, 200), - PromptTokens: promptTokens, - CompletionTokens: completionTokens, - TotalTokens: promptTokens + completionTokens, - LatencyMS: time.Since(start).Milliseconds(), - AgentThought: "intent=" + detectedIntent, - KnowledgeHits: countLines(knowledgeContext), - Metadata: map[string]string{"source": "ai.ProcessChat"}, - Timestamp: time.Now(), - } - _ = s.monitor.RecordLLMUsage(ctx, usage) - } - - return response.Content, nil -} - -// StreamChat 流式处理聊天消息 -func (s *aiService) StreamChat(ctx context.Context, message string, sessionID string) (<-chan string, error) { - log.Context(ctx).Infof("Processing stream chat message: %s for session: %s", message, sessionID) - - // 构建聊天消息 - messages := []*schema.Message{ - { - Role: schema.User, - Content: message, - }, - } - - // 调用 Eino 流式聊天模型 - start := time.Now() - streamReader, err := s.chatModel.Stream(ctx, messages) - if err != nil { - log.Context(ctx).Warnf("Eino stream chat model call failed: %v, falling back to mock stream response", err) - // 如果 Eino 流式调用失败,返回模拟流式响应 - return s.generateMockStreamResponse(ctx, message), nil - } - - // 创建响应通道 - responseChan := make(chan string, 10) - - // 启动 goroutine 处理流式响应 - go func() { - defer close(responseChan) - defer streamReader.Close() - - for { - chunk, err := streamReader.Recv() - if err != nil { - // 流结束或出错 - return - } - if chunk != nil && chunk.Content != "" { - select { - case responseChan <- chunk.Content: - case <-ctx.Done(): - return - } - } - } - - // 汇总轻量化上报:以流结束后估算总tokens与延迟 - if s.monitor != nil { - // 注意:为了避免阻塞,这里不去累计所有chunk;仅使用输入估算 + 总延迟 - detectedIntent := "general_inquiry" - if intent, err := s.AnalyzeIntent(ctx, message); err == nil && intent != "" { - detectedIntent = intent - } - usage := &monitor.LLMUsage{ - Model: s.chatModelName, - SessionID: sessionID, - UserID: "default_user", - PromptPreview: preview(message, 200), - PromptTokens: estimateTokens(message), - // completionTokens 无法准确统计,留空或基于时间估算可选 - TotalTokens: estimateTokens(message), - LatencyMS: time.Since(start).Milliseconds(), - AgentThought: "intent=" + detectedIntent, - Metadata: map[string]string{"source": "ai.StreamChat"}, - Timestamp: time.Now(), - } - _ = s.monitor.RecordLLMUsage(ctx, usage) - } - }() - - return responseChan, nil -} - -// estimateTokens 简易估算 tokens 数(不准确,但用于监控趋势足够) -func estimateTokens(text string) int { - if text == "" { - return 0 - } - // 粗略策略:英文按空格分词;中文按 rune 数/2 - ascii := true - for _, r := range text { - if r > 127 { - ascii = false - break - } - } - if ascii { - // 英文:词数近似为token数 - return len(strings.Fields(text)) - } - // 中文:每2个汉字约1个token - return len([]rune(text)) / 2 -} - -func preview(text string, max int) string { - if len(text) <= max { - return text - } - return text[:max] -} - -func countLines(text string) int { - if text == "" { - return 0 - } - return len(strings.Split(text, "\n")) -} - -// AnalyzeIntent 使用意图模型(qwen3:8b)进行意图识别,输出以下标签之一: -// order_inquiry、product_inquiry、technical_support、general_inquiry -func (s *aiService) AnalyzeIntent(ctx context.Context, message string) (string, error) { - if s.intentModel == nil { - return "general_inquiry", fmt.Errorf("intent model not configured") - } - instruction := "你是一个意图分类器。仅从以下标签中选择并输出一个,且只输出标签本身:order_inquiry、product_inquiry、technical_support、general_inquiry。用户消息:" + message - messages := []*schema.Message{ - {Role: schema.User, Content: instruction}, - } - resp, err := s.intentModel.Generate(ctx, messages) - if err != nil || resp == nil || resp.Content == "" { - return "general_inquiry", err - } - intent := strings.TrimSpace(strings.ToLower(resp.Content)) - switch intent { - case "order_inquiry", "product_inquiry", "technical_support", "general_inquiry": - return intent, nil - } - // 输出不在预期集合,回退为general - return "general_inquiry", nil -} - -// generateMockStreamResponse 生成模拟流式响应 -func (s *aiService) generateMockStreamResponse(ctx context.Context, message string) <-chan string { - responseChan := make(chan string, 10) - - go func() { - defer close(responseChan) - - // 获取模拟响应 - mockResponse := s.generateMockResponse(message) - - // 将响应分割成小块,模拟流式输出 - words := strings.Fields(mockResponse) - for i, word := range words { - select { - case responseChan <- word: - if i < len(words)-1 { - // 在单词之间添加空格,除了最后一个单词 - select { - case responseChan <- " ": - case <-ctx.Done(): - return - } - } - // 模拟网络延迟 - time.Sleep(50 * time.Millisecond) - case <-ctx.Done(): - return - } - } - }() - - return responseChan -} - -// generateMockResponse 生成模拟响应 -func (s *aiService) generateMockResponse(message string) string { - // 简单的模拟响应逻辑 - message = strings.ToLower(strings.TrimSpace(message)) - - switch { - case strings.Contains(message, "你好") || strings.Contains(message, "hello"): - return "你好!有什么我可以帮你的吗?😊" - case strings.Contains(message, "天气"): - return "抱歉,我无法获取实时天气信息。建议您查看天气预报应用。" - case strings.Contains(message, "时间"): - return fmt.Sprintf("当前时间是 %s", time.Now().Format("2006-01-02 15:04:05")) - case strings.Contains(message, "介绍"): - return "我是一个AI助手,可以帮助您回答问题、提供信息和进行对话。有什么我可以为您做的吗?" - default: - return "感谢您的消息!我正在学习中,暂时无法完全理解您的问题。请尝试用不同的方式表达,或者问一些简单的问题。" - } -} diff --git a/eino-project/internal/ai/provider.go b/eino-project/internal/ai/provider.go deleted file mode 100644 index 138f8b4..0000000 --- a/eino-project/internal/ai/provider.go +++ /dev/null @@ -1,72 +0,0 @@ -package ai - -import ( - "context" - "fmt" - "time" - - "eino-project/internal/conf" - contextpkg "eino-project/internal/context" - "eino-project/internal/monitor" - "eino-project/internal/vector" - - "github.com/cloudwego/eino-ext/components/model/ollama" - "github.com/go-kratos/kratos/v2/log" - "github.com/google/wire" -) - -// ProviderSet is ai providers. -var ProviderSet = wire.NewSet(NewAIServiceFromConfig) - -// NewAIServiceFromConfig 从配置创建AI服务 -func NewAIServiceFromConfig(c *conf.Bootstrap, logger log.Logger, knowledgeSearcher vector.KnowledgeSearcher, contextManager contextpkg.ContextManager, mon monitor.Monitor) (AIService, error) { - if c.Ai == nil || c.Ai.Ollama == nil { - return nil, fmt.Errorf("AI configuration is missing") - } - - // 获取超时配置,默认60秒 - timeout := 60 * time.Second - if c.Ai.Ollama.Timeout != nil { - timeout = c.Ai.Ollama.Timeout.AsDuration() - } - - // 设置默认模型 - chatModelName := "deepseek-v3.1:671b-cloud" - intentModelName := "qwen3:8b" - - // 从配置中获取模型 - if len(c.Ai.Ollama.Models) > 0 && c.Ai.Ollama.Models[0] != "" { - chatModelName = c.Ai.Ollama.Models[0] - } - if len(c.Ai.Ollama.Models) > 1 && c.Ai.Ollama.Models[1] != "" { - intentModelName = c.Ai.Ollama.Models[1] - } - - // 创建聊天模型(满足 BaseChatModel 接口) - chatModel, err := ollama.NewChatModel(context.Background(), &ollama.ChatModelConfig{ - BaseURL: c.Ai.Ollama.Endpoint, - Timeout: timeout, - Model: chatModelName, - }) - if err != nil { - return nil, fmt.Errorf("failed to create chat model: %w", err) - } - - // 创建意图识别模型(满足 BaseChatModel 接口) - intentModel, err := ollama.NewChatModel(context.Background(), &ollama.ChatModelConfig{ - BaseURL: c.Ai.Ollama.Endpoint, - Timeout: timeout, - Model: intentModelName, - }) - if err != nil { - return nil, fmt.Errorf("failed to create intent model: %w", err) - } - - svc := NewAIService(logger, chatModel, intentModel, knowledgeSearcher, contextManager) - // 注入轻量监控,用于记录 LLM 使用 - if m, ok := svc.(*aiService); ok { - m.monitor = mon - m.chatModelName = chatModelName - } - return svc, nil -} diff --git a/eino-project/internal/biz/customer.go b/eino-project/internal/biz/customer.go index 24c12ff..8045cef 100644 --- a/eino-project/internal/biz/customer.go +++ b/eino-project/internal/biz/customer.go @@ -6,15 +6,13 @@ import ( "strings" "time" - "eino-project/internal/vector" - contextpkg "eino-project/internal/context" - "eino-project/internal/monitor" + contextpkg "eino-project/internal/domain/context" + "eino-project/internal/domain/monitor" + "eino-project/internal/domain/vector" "github.com/go-kratos/kratos/v2/log" ) - - // CustomerRepo 智能客服数据仓库接口 type CustomerRepo interface { CheckSystemHealth(ctx context.Context) map[string]ServiceStatus @@ -91,8 +89,6 @@ type OrderDetails struct { NeedAI bool `json:"need_ai"` } - - // GetSystemStatus 获取系统状态 func (uc *CustomerUseCase) GetSystemStatus(ctx context.Context) (*SystemStatus, error) { uc.log.WithContext(ctx).Info("Getting system status") @@ -256,17 +252,17 @@ func generateSessionID() string { // SearchKnowledge 搜索知识库 func (uc *CustomerUseCase) SearchKnowledge(ctx context.Context, query string, limit int) ([]vector.SearchResult, error) { uc.log.WithContext(ctx).Infof("Searching knowledge for query: %s", query) - + if limit <= 0 { limit = 5 } - + results, err := uc.knowledgeSearcher.SearchKnowledge(ctx, query, limit) if err != nil { uc.log.WithContext(ctx).Errorf("Failed to search knowledge: %v", err) return nil, fmt.Errorf("知识库搜索失败: %w", err) } - + uc.log.WithContext(ctx).Infof("Found %d knowledge results", len(results)) return results, nil } @@ -274,22 +270,22 @@ func (uc *CustomerUseCase) SearchKnowledge(ctx context.Context, query string, li // ProcessKnowledgeUpload 处理知识库文档上传 func (uc *CustomerUseCase) ProcessKnowledgeUpload(ctx context.Context, content, title, category string) error { uc.log.WithContext(ctx).Infof("Processing knowledge upload: %s", title) - + if strings.TrimSpace(content) == "" { return fmt.Errorf("文档内容不能为空") } - + if strings.TrimSpace(title) == "" { return fmt.Errorf("文档标题不能为空") } - + // 处理文档并存储到向量数据库 documents, err := uc.docProcessor.ProcessKnowledgeDocument(ctx, content, title, category) if err != nil { uc.log.WithContext(ctx).Errorf("Failed to process knowledge document: %v", err) return fmt.Errorf("文档处理失败: %w", err) } - + uc.log.WithContext(ctx).Infof("Successfully processed knowledge document '%s' into %d chunks", title, len(documents)) return nil } @@ -297,13 +293,13 @@ func (uc *CustomerUseCase) ProcessKnowledgeUpload(ctx context.Context, content, // GetKnowledgeSummary 获取知识摘要 func (uc *CustomerUseCase) GetKnowledgeSummary(ctx context.Context, query string) (string, error) { uc.log.WithContext(ctx).Infof("Getting knowledge summary for query: %s", query) - + summary, err := uc.knowledgeSearcher.GetKnowledgeSummary(ctx, query) if err != nil { uc.log.WithContext(ctx).Errorf("Failed to get knowledge summary: %v", err) return "", fmt.Errorf("获取知识摘要失败: %w", err) } - + return summary, nil } @@ -311,14 +307,14 @@ func (uc *CustomerUseCase) GetKnowledgeSummary(ctx context.Context, query string // 保留函数以避免外部调用崩溃,但将其标记为 deprecated,并直接返回 general_inquiry // Deprecated: use AIService.AnalyzeIntent instead. func (uc *CustomerUseCase) AnalyzeUserIntent(ctx context.Context, message string) (string, error) { - uc.log.WithContext(ctx).Warn("AnalyzeUserIntent (rule-based) is deprecated, use AIService.AnalyzeIntent (LLM) instead") - return "general_inquiry", nil + uc.log.WithContext(ctx).Warn("AnalyzeUserIntent (rule-based) is deprecated, use AIService.AnalyzeIntent (LLM) instead") + return "general_inquiry", nil } // ProcessIntelligentChat 处理智能聊天(结合知识库) func (uc *CustomerUseCase) ProcessIntelligentChat(ctx context.Context, message, sessionID string) (string, error) { startTime := time.Now() - + // 记录监控指标 defer func() { duration := time.Since(startTime) @@ -385,9 +381,9 @@ func (uc *CustomerUseCase) ProcessIntelligentChat(ctx context.Context, message, if contextInfo != "" { response += contextInfo } - + response += "根据您的问题,我建议您:" - + // 基于意图和知识库内容生成回复 if strings.Contains(message, "订单") { response += "\n1. 检查订单状态和物流信息\n2. 如有问题可申请退换货\n3. 联系客服获取详细帮助" @@ -418,11 +414,11 @@ func (uc *CustomerUseCase) handleOrderInquiry(ctx context.Context, message strin if orderID != "" { order, err := uc.QueryOrder(ctx, orderID) if err == nil { - return fmt.Sprintf("您的订单 %s 状态为:%s。产品:%s,金额:%.2f元。", + return fmt.Sprintf("您的订单 %s 状态为:%s。产品:%s,金额:%.2f元。", order.OrderID, order.Status, order.Product, order.Amount) } } - + return "请提供您的订单号,我来帮您查询订单状态。订单号通常以 ORD 开头。" } @@ -433,7 +429,7 @@ func (uc *CustomerUseCase) handleProductInquiry(ctx context.Context, message str if err != nil || len(results) == 0 { return "抱歉,我暂时没有找到相关的产品信息。请您详细描述您的问题,我会尽力为您解答。" } - + // 组合知识库内容 var knowledgeContent []string for _, result := range results { @@ -441,18 +437,18 @@ func (uc *CustomerUseCase) handleProductInquiry(ctx context.Context, message str knowledgeContent = append(knowledgeContent, result.Document.Content) } } - + if len(knowledgeContent) == 0 { return "抱歉,我没有找到与您问题高度相关的信息。请您提供更多详细信息。" } - + response := "根据我们的产品知识库,为您找到以下相关信息:\n\n" response += strings.Join(knowledgeContent, "\n\n") - + if len(response) > 500 { response = response[:500] + "...\n\n如需了解更多详细信息,请告诉我您具体想了解哪个方面。" } - + return response } @@ -463,7 +459,7 @@ func (uc *CustomerUseCase) handleTechnicalSupport(ctx context.Context, message s if err != nil || len(results) == 0 { return "我理解您遇到了技术问题。请详细描述您的问题,包括:\n1. 具体的错误信息\n2. 操作步骤\n3. 使用的设备和系统\n我会为您提供针对性的解决方案。" } - + // 组合技术支持内容 var supportContent []string for _, result := range results { @@ -471,18 +467,18 @@ func (uc *CustomerUseCase) handleTechnicalSupport(ctx context.Context, message s supportContent = append(supportContent, result.Document.Content) } } - + if len(supportContent) == 0 { return "请提供更多关于您技术问题的详细信息,我会为您查找相应的解决方案。" } - + response := "根据您的问题,我为您找到以下解决方案:\n\n" response += strings.Join(supportContent, "\n\n") - + if len(response) > 600 { response = response[:600] + "...\n\n如果以上方案无法解决您的问题,请联系我们的技术支持团队。" } - + return response } @@ -493,12 +489,12 @@ func (uc *CustomerUseCase) handleGeneralInquiry(ctx context.Context, message str if err != nil || len(results) == 0 { return "您好!我是智能客服助手,很高兴为您服务。请告诉我您需要什么帮助,我可以协助您:\n• 查询订单状态\n• 产品使用指导\n• 技术问题解答\n• 售后服务咨询" } - + // 使用最相关的知识回答 if results[0].Score > 0.7 { return "根据您的问题,我为您找到以下信息:\n\n" + results[0].Document.Content } - + return "我理解您的问题。请您提供更多详细信息,这样我能为您提供更准确的帮助。" } diff --git a/eino-project/internal/conf/conf.pb.go b/eino-project/internal/conf/conf.pb.go index c247894..73441b2 100644 --- a/eino-project/internal/conf/conf.pb.go +++ b/eino-project/internal/conf/conf.pb.go @@ -919,11 +919,8 @@ type Monitoring_CozeLoop struct { Endpoint string `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"` Enable bool `protobuf:"varint,2,opt,name=enable,proto3" json:"enable,omitempty"` MetricsInterval *durationpb.Duration `protobuf:"bytes,3,opt,name=metrics_interval,json=metricsInterval,proto3" json:"metrics_interval,omitempty"` - // 新增:支持在配置文件中声明 Workspace 与 PAT(优先级高于环境变量) - WorkspaceId string `protobuf:"bytes,4,opt,name=workspace_id,json=workspaceId,proto3" json:"workspace_id,omitempty"` - ApiToken string `protobuf:"bytes,5,opt,name=api_token,json=apiToken,proto3" json:"api_token,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *Monitoring_CozeLoop) Reset() { @@ -977,20 +974,6 @@ func (x *Monitoring_CozeLoop) GetMetricsInterval() *durationpb.Duration { return nil } -func (x *Monitoring_CozeLoop) GetWorkspaceId() string { - if x != nil { - return x.WorkspaceId - } - return "" -} - -func (x *Monitoring_CozeLoop) GetApiToken() string { - if x != nil { - return x.ApiToken - } - return "" -} - var File_conf_conf_proto protoreflect.FileDescriptor const file_conf_conf_proto_rawDesc = "" + @@ -1049,16 +1032,14 @@ const file_conf_conf_proto_rawDesc = "" + "\n" + "collection\x18\x02 \x01(\tR\n" + "collection\x123\n" + - "\atimeout\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\atimeout\"\x91\x02\n" + + "\atimeout\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\atimeout\"\xd1\x01\n" + "\n" + "Monitoring\x12<\n" + - "\tcoze_loop\x18\x01 \x01(\v2\x1f.kratos.api.Monitoring.CozeLoopR\bcozeLoop\x1a\xc4\x01\n" + + "\tcoze_loop\x18\x01 \x01(\v2\x1f.kratos.api.Monitoring.CozeLoopR\bcozeLoop\x1a\x84\x01\n" + "\bCozeLoop\x12\x1a\n" + "\bendpoint\x18\x01 \x01(\tR\bendpoint\x12\x16\n" + "\x06enable\x18\x02 \x01(\bR\x06enable\x12D\n" + - "\x10metrics_interval\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\x0fmetricsInterval\x12!\n" + - "\fworkspace_id\x18\x04 \x01(\tR\vworkspaceId\x12\x1b\n" + - "\tapi_token\x18\x05 \x01(\tR\bapiToken\"K\n" + + "\x10metrics_interval\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\x0fmetricsInterval\"K\n" + "\x03Log\x12\x14\n" + "\x05level\x18\x01 \x01(\tR\x05level\x12\x16\n" + "\x06format\x18\x02 \x01(\tR\x06format\x12\x16\n" + diff --git a/eino-project/internal/context/provider.go b/eino-project/internal/context/provider.go deleted file mode 100644 index 1496d6f..0000000 --- a/eino-project/internal/context/provider.go +++ /dev/null @@ -1,16 +0,0 @@ -package context - -import ( - "eino-project/internal/conf" - - "github.com/go-kratos/kratos/v2/log" - "github.com/google/wire" -) - -// ProviderSet is context providers. -var ProviderSet = wire.NewSet(NewContextManagerFromBootstrapConfig) - -// NewContextManagerFromBootstrapConfig 从 Bootstrap 配置创建上下文管理器 -func NewContextManagerFromBootstrapConfig(c *conf.Bootstrap, logger log.Logger) ContextManager { - return NewContextManager(logger) -} \ No newline at end of file diff --git a/eino-project/internal/data/data.go b/eino-project/internal/data/data.go index b148a78..10c2d1a 100644 --- a/eino-project/internal/data/data.go +++ b/eino-project/internal/data/data.go @@ -6,9 +6,9 @@ import ( "time" "eino-project/internal/conf" - contextpkg "eino-project/internal/context" - "eino-project/internal/monitor" - "eino-project/internal/vector" + contextpkg "eino-project/internal/domain/context" + "eino-project/internal/domain/monitor" + "eino-project/internal/domain/vector" "github.com/go-kratos/kratos/v2/log" "github.com/redis/go-redis/v9" @@ -22,14 +22,14 @@ var ProviderSet = wire.NewSet(NewData) // Data 数据层结构 type Data struct { - DB *sql.DB - RDB *redis.Client - VectorService vector.VectorService - DocProcessor vector.DocumentProcessor + DB *sql.DB + RDB *redis.Client + VectorService vector.VectorService + DocProcessor vector.DocumentProcessor KnowledgeSearcher vector.KnowledgeSearcher - log *log.Helper - ContextManager contextpkg.ContextManager - Monitor monitor.Monitor + log *log.Helper + ContextManager contextpkg.ContextManager + Monitor monitor.Monitor } // NewData 创建数据层 @@ -56,7 +56,7 @@ func NewData(c *conf.Data, logger log.Logger, vs vector.VectorService, dp vector // 测试 Redis 连接 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - + if err := rdb.Ping(ctx).Err(); err != nil { helper.Warnf("Redis connection failed: %v", err) } @@ -67,14 +67,14 @@ func NewData(c *conf.Data, logger log.Logger, vs vector.VectorService, dp vector } d := &Data{ - DB: db, - RDB: rdb, - VectorService: vs, - DocProcessor: dp, + DB: db, + RDB: rdb, + VectorService: vs, + DocProcessor: dp, KnowledgeSearcher: ks, - log: helper, - ContextManager: contextManager, - Monitor: monitor, + log: helper, + ContextManager: contextManager, + Monitor: monitor, } cleanup := func() { diff --git a/eino-project/internal/domain/agent/intent/intent.go b/eino-project/internal/domain/agent/intent/intent.go new file mode 100644 index 0000000..4125e5a --- /dev/null +++ b/eino-project/internal/domain/agent/intent/intent.go @@ -0,0 +1,17 @@ +package intent + +import ( + "context" +) + +type IntentAgent interface { + Classify(ctx context.Context, message string) (string, error) +} + +type passthroughAgent struct{} + +func NewPassthrough() IntentAgent { return &passthroughAgent{} } + +func (p *passthroughAgent) Classify(ctx context.Context, message string) (string, error) { + return "general_inquiry", nil +} \ No newline at end of file diff --git a/eino-project/internal/domain/agent/product/handler.go b/eino-project/internal/domain/agent/product/handler.go new file mode 100644 index 0000000..8681ed8 --- /dev/null +++ b/eino-project/internal/domain/agent/product/handler.go @@ -0,0 +1,63 @@ +package product + +import ( + "encoding/json" + "net/http" + "strings" + + tool "eino-project/internal/tools/product" +) + +type Handler struct { + tool tool.Tool +} + +func NewHandler(t tool.Tool) *Handler { return &Handler{tool: t} } + +type queryRequest struct { + Query struct { + ID string `json:"id"` + Name string `json:"name"` + } `json:"query"` + Page int `json:"page,omitempty"` + Size int `json:"size,omitempty"` +} + +type queryResponse struct { + Items []*tool.Product `json:"items"` + Source string `json:"source"` + Count int `json:"count"` +} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + var req queryRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + var items []*tool.Product + src := "" + + if id := strings.TrimSpace(req.Query.ID); id != "" { + if p, _ := h.tool.GetByID(r.Context(), id); p != nil { + items = append(items, p) + } + src = "id" + } else if name := strings.TrimSpace(req.Query.Name); name != "" { + res, _ := h.tool.SearchByName(r.Context(), name) + items = res + src = "name" + } else { + http.Error(w, "query.id or query.name required", http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(&queryResponse{Items: items, Source: src, Count: len(items)}) +} \ No newline at end of file diff --git a/eino-project/internal/context/context.go b/eino-project/internal/domain/context/context.go similarity index 100% rename from eino-project/internal/context/context.go rename to eino-project/internal/domain/context/context.go diff --git a/eino-project/internal/domain/llm/llm.go b/eino-project/internal/domain/llm/llm.go new file mode 100644 index 0000000..36fa9fb --- /dev/null +++ b/eino-project/internal/domain/llm/llm.go @@ -0,0 +1,44 @@ +package llm + +import ( + "eino-project/internal/conf" + "sync" + + "github.com/cloudwego/eino/components/model" +) + +type LLM interface { + Chat() (model.BaseChatModel, error) + Intent() (model.BaseChatModel, error) +} + +type llm struct { + cfg *conf.Bootstrap + onceChat sync.Once + onceIntent sync.Once + chat model.BaseChatModel + intent model.BaseChatModel + cache map[string]model.BaseChatModel +} + +func NewLLM(cfg *conf.Bootstrap) LLM { + return &llm{cfg: cfg, cache: make(map[string]model.BaseChatModel)} +} + +// 获取Ollama聊天模型实例 +func (r *llm) Chat() (model.BaseChatModel, error) { + var err error + r.onceChat.Do(func() { + r.chat, err = newOllamaChatModel(r.cfg) + }) + return r.chat, err +} + +// 获取Ollama意图识别模型实例 +func (r *llm) Intent() (model.BaseChatModel, error) { + var err error + r.onceIntent.Do(func() { + r.intent, err = newOllamaIntentModel(r.cfg) + }) + return r.intent, err +} diff --git a/eino-project/internal/domain/llm/ollama_chat.go b/eino-project/internal/domain/llm/ollama_chat.go new file mode 100644 index 0000000..a6b2e02 --- /dev/null +++ b/eino-project/internal/domain/llm/ollama_chat.go @@ -0,0 +1,37 @@ +package llm + +import ( + "context" + "fmt" + "time" + + "eino-project/internal/conf" + + "github.com/cloudwego/eino-ext/components/model/ollama" + "github.com/cloudwego/eino/components/model" +) + +// newOllamaChatModel Ollama聊天模型实例 +func newOllamaChatModel(c *conf.Bootstrap) (model.BaseChatModel, error) { + if c == nil || c.Ai == nil || c.Ai.Ollama == nil { + return nil, fmt.Errorf("AI configuration is missing") + } + timeout := 60 * time.Second + if c.Ai.Ollama.Timeout != nil { + timeout = c.Ai.Ollama.Timeout.AsDuration() + } + modelName := "deepseek-v3.1:671b-cloud" + if len(c.Ai.Ollama.Models) > 0 && c.Ai.Ollama.Models[0] != "" { + modelName = c.Ai.Ollama.Models[0] + } + model, err := ollama.NewChatModel(context.Background(), &ollama.ChatModelConfig{ + BaseURL: c.Ai.Ollama.Endpoint, + Timeout: timeout, + Model: modelName, + }) + if err != nil { + return nil, fmt.Errorf("failed to create chat model: %w", err) + } + + return model, nil +} diff --git a/eino-project/internal/domain/llm/ollama_intent.go b/eino-project/internal/domain/llm/ollama_intent.go new file mode 100644 index 0000000..382d92c --- /dev/null +++ b/eino-project/internal/domain/llm/ollama_intent.go @@ -0,0 +1,36 @@ +package llm + +import ( + "context" + "fmt" + "time" + + "eino-project/internal/conf" + + "github.com/cloudwego/eino-ext/components/model/ollama" + "github.com/cloudwego/eino/components/model" +) + +// newOllamaIntentModel Ollama意图识别模型实例 +func newOllamaIntentModel(c *conf.Bootstrap) (model.BaseChatModel, error) { + if c == nil || c.Ai == nil || c.Ai.Ollama == nil { + return nil, fmt.Errorf("AI configuration is missing") + } + timeout := 60 * time.Second + if c.Ai.Ollama.Timeout != nil { + timeout = c.Ai.Ollama.Timeout.AsDuration() + } + modelName := "qwen3:8b" + if len(c.Ai.Ollama.Models) > 1 && c.Ai.Ollama.Models[1] != "" { + modelName = c.Ai.Ollama.Models[1] + } + model, err := ollama.NewChatModel(context.Background(), &ollama.ChatModelConfig{ + BaseURL: c.Ai.Ollama.Endpoint, + Timeout: timeout, + Model: modelName, + }) + if err != nil { + return nil, fmt.Errorf("failed to create intent model: %w", err) + } + return model, nil +} diff --git a/eino-project/internal/monitor/coze_loop.go b/eino-project/internal/domain/monitor/coze_loop.go similarity index 58% rename from eino-project/internal/monitor/coze_loop.go rename to eino-project/internal/domain/monitor/coze_loop.go index 536e146..b97ac7b 100644 --- a/eino-project/internal/monitor/coze_loop.go +++ b/eino-project/internal/domain/monitor/coze_loop.go @@ -1,95 +1,92 @@ package monitor import ( - "context" - "time" + "context" + "time" - "eino-project/internal/conf" + "eino-project/internal/conf" - cozeloop "github.com/coze-dev/cozeloop-go" - "github.com/go-kratos/kratos/v2/log" + cozeloop "github.com/coze-dev/cozeloop-go" + "github.com/go-kratos/kratos/v2/log" ) // cozeLoopMonitor 是一个包装器:在调用本地监控逻辑的同时,将关键指标以 Trace 形式上报到 Coze Loop type cozeLoopMonitor struct { - base Monitor - client cozeloop.Client - log *log.Helper + base Monitor + client cozeloop.Client + log *log.Helper } // NewCozeLoopMonitor 根据配置初始化 Coze Loop 客户端并返回包装后的监控器 func NewCozeLoopMonitor(base Monitor, cfg *conf.Monitoring_CozeLoop, logger log.Logger) Monitor { - helper := log.NewHelper(logger) + helper := log.NewHelper(logger) - ctx := context.Background() + ctx := context.Background() - // 按官方 simple demo 风格,直接从环境变量初始化客户端 - client, err := cozeloop.NewClient() - if err != nil { - helper.WithContext(ctx).Warnf("init CozeLoop client failed: %v, fallback to base monitor", err) - return base - } + // 按官方 simple demo 风格,直接从环境变量初始化客户端 + client, err := cozeloop.NewClient() + if err != nil { + helper.WithContext(ctx).Warnf("init CozeLoop client failed: %v, fallback to base monitor", err) + return base + } - m := &cozeLoopMonitor{ - base: base, - client: client, - log: helper, - } + m := &cozeLoopMonitor{ + base: base, + client: client, + log: helper, + } return m } -// loopReportMetrics 定期采集本地指标并以 Trace 形式上报 -// 已移除周期性指标上报逻辑,保留最小实现 - // RecordRequest 记录请求并上报简化 Trace func (m *cozeLoopMonitor) RecordRequest(ctx context.Context, requestType string, duration time.Duration, success bool) error { - if err := m.base.RecordRequest(ctx, requestType, duration, success); err != nil { - return err - } - // 简化:不生成trace - return nil + if err := m.base.RecordRequest(ctx, requestType, duration, success); err != nil { + return err + } + // 简化:不生成trace + return nil } // RecordAIRequest 记录 AI 请求并上报简化 Trace func (m *cozeLoopMonitor) RecordAIRequest(ctx context.Context, duration time.Duration, success bool) error { - if err := m.base.RecordAIRequest(ctx, duration, success); err != nil { - return err - } - // 简化:不生成trace - return nil + if err := m.base.RecordAIRequest(ctx, duration, success); err != nil { + return err + } + // 简化:不生成trace + return nil } // RecordVectorOperation 记录向量操作并上报简化 Trace func (m *cozeLoopMonitor) RecordVectorOperation(ctx context.Context, operation string, success bool) error { - if err := m.base.RecordVectorOperation(ctx, operation, success); err != nil { - return err - } - // 简化:不生成trace - return nil + if err := m.base.RecordVectorOperation(ctx, operation, success); err != nil { + return err + } + // 简化:不生成trace + return nil } // RecordSessionOperation 记录会话操作并上报简化 Trace func (m *cozeLoopMonitor) RecordSessionOperation(ctx context.Context, operation string) error { - if err := m.base.RecordSessionOperation(ctx, operation); err != nil { - return err - } - // 简化:不生成trace - return nil + if err := m.base.RecordSessionOperation(ctx, operation); err != nil { + return err + } + // 简化:不生成trace + return nil } // RecordKnowledgeOperation 记录知识库操作并上报简化 Trace func (m *cozeLoopMonitor) RecordKnowledgeOperation(ctx context.Context, operation string) error { - if err := m.base.RecordKnowledgeOperation(ctx, operation); err != nil { - return err - } - // 简化:不生成trace - return nil + if err := m.base.RecordKnowledgeOperation(ctx, operation); err != nil { + return err + } + // 简化:不生成trace + return nil } // 透传LLM使用事件(不生成trace,不做周期性上报) func (m *cozeLoopMonitor) RecordLLMUsage(ctx context.Context, usage *LLMUsage) error { - return m.base.RecordLLMUsage(ctx, usage) + return m.base.RecordLLMUsage(ctx, usage) } // GetMetrics 透传本地指标(不直接从 Coze Loop 读取) @@ -99,10 +96,10 @@ func (m *cozeLoopMonitor) GetMetrics(ctx context.Context) (*Metrics, error) { // CreateAlert 创建告警(不生成trace,保持简化) func (m *cozeLoopMonitor) CreateAlert(ctx context.Context, alertType, level, title, message string, metadata map[string]interface{}) error { - if err := m.base.CreateAlert(ctx, alertType, level, title, message, metadata); err != nil { - return err - } - return nil + if err := m.base.CreateAlert(ctx, alertType, level, title, message, metadata); err != nil { + return err + } + return nil } // GetAlerts 透传本地告警 diff --git a/eino-project/internal/monitor/monitor.go b/eino-project/internal/domain/monitor/monitor.go similarity index 100% rename from eino-project/internal/monitor/monitor.go rename to eino-project/internal/domain/monitor/monitor.go diff --git a/eino-project/internal/monitor/provider.go b/eino-project/internal/domain/monitor/provider.go similarity index 100% rename from eino-project/internal/monitor/provider.go rename to eino-project/internal/domain/monitor/provider.go diff --git a/eino-project/internal/domain/provier_set.go b/eino-project/internal/domain/provier_set.go new file mode 100644 index 0000000..8e481f6 --- /dev/null +++ b/eino-project/internal/domain/provier_set.go @@ -0,0 +1,27 @@ +package domain + +import ( + "eino-project/internal/domain/context" + "eino-project/internal/domain/llm" + "eino-project/internal/domain/monitor" + "eino-project/internal/domain/session" + "eino-project/internal/domain/tools" + "eino-project/internal/domain/vector" + "eino-project/internal/domain/workflow" + + "github.com/google/wire" +) + +// ProviderSet is domain providers. +var ProviderSet = wire.NewSet( + context.NewContextManager, + llm.NewLLM, + monitor.NewMonitorFromBootstrapConfig, + session.NewMemorySessionManager, + tools.NewMemoryProductTool, + workflow.NewChatWorkflow, + + vector.NewVectorServiceFromBootstrapConfig, + vector.NewDocumentProcessor, + vector.NewKnowledgeSearcher, +) diff --git a/eino-project/internal/session/session.go b/eino-project/internal/domain/session/session.go similarity index 100% rename from eino-project/internal/session/session.go rename to eino-project/internal/domain/session/session.go diff --git a/eino-project/internal/domain/tools/product.go b/eino-project/internal/domain/tools/product.go new file mode 100644 index 0000000..b66729f --- /dev/null +++ b/eino-project/internal/domain/tools/product.go @@ -0,0 +1,54 @@ +package tools + +import ( + "context" + "strings" +) + +type Product struct { + ID string `json:"id"` + Name string `json:"name"` + Price float64 `json:"price"` + Description string `json:"description"` +} + +type Tool interface { + GetByID(ctx context.Context, id string) (*Product, error) + SearchByName(ctx context.Context, name string) ([]*Product, error) +} + +type memoryTool struct { + items []*Product +} + +func NewMemoryProductTool() Tool { + return &memoryTool{items: []*Product{ + {ID: "P001", Name: "手机 A1", Price: 1999, Description: "入门级智能手机"}, + {ID: "P002", Name: "手机 Pro X", Price: 5999, Description: "旗舰级拍照手机"}, + {ID: "P003", Name: "笔记本 M3", Price: 8999, Description: "轻薄高性能笔记本"}, + {ID: "P004", Name: "耳机 Air", Price: 1299, Description: "降噪真无线耳机"}, + }} +} + +func (m *memoryTool) GetByID(ctx context.Context, id string) (*Product, error) { + for _, it := range m.items { + if it.ID == id { + return it, nil + } + } + return nil, nil +} + +func (m *memoryTool) SearchByName(ctx context.Context, name string) ([]*Product, error) { + if name == "" { + return nil, nil + } + q := strings.ToLower(name) + var out []*Product + for _, it := range m.items { + if strings.Contains(strings.ToLower(it.Name), q) { + out = append(out, it) + } + } + return out, nil +} diff --git a/eino-project/internal/vector/chromadb.go b/eino-project/internal/domain/vector/chromadb.go similarity index 100% rename from eino-project/internal/vector/chromadb.go rename to eino-project/internal/domain/vector/chromadb.go diff --git a/eino-project/internal/vector/document.go b/eino-project/internal/domain/vector/document.go similarity index 100% rename from eino-project/internal/vector/document.go rename to eino-project/internal/domain/vector/document.go diff --git a/eino-project/internal/vector/provider.go b/eino-project/internal/domain/vector/provider.go similarity index 100% rename from eino-project/internal/vector/provider.go rename to eino-project/internal/domain/vector/provider.go diff --git a/eino-project/internal/vector/vector.go b/eino-project/internal/domain/vector/vector.go similarity index 100% rename from eino-project/internal/vector/vector.go rename to eino-project/internal/domain/vector/vector.go diff --git a/eino-project/internal/domain/workflow/chat_workflow.go b/eino-project/internal/domain/workflow/chat_workflow.go new file mode 100644 index 0000000..e9fc776 --- /dev/null +++ b/eino-project/internal/domain/workflow/chat_workflow.go @@ -0,0 +1,104 @@ +package workflow + +import ( + "context" + "fmt" + "strings" + "time" + + contextpkg "eino-project/internal/domain/context" + "eino-project/internal/domain/llm" + "eino-project/internal/domain/vector" + + "github.com/cloudwego/eino/schema" +) + +type ChatWorkflow interface { + Chat(ctx context.Context, message string, sessionID string) (string, error) + Stream(ctx context.Context, message string, sessionID string) (<-chan string, error) +} + +type chatWorkflow struct { + models llm.LLM + searcher vector.KnowledgeSearcher + ctxMgr contextpkg.ContextManager +} + +func NewChatWorkflow(models llm.LLM, searcher vector.KnowledgeSearcher, ctxMgr contextpkg.ContextManager) ChatWorkflow { + return &chatWorkflow{models: models, searcher: searcher, ctxMgr: ctxMgr} +} + +func (w *chatWorkflow) Chat(ctx context.Context, message string, sessionID string) (string, error) { + if w.ctxMgr != nil { + w.ctxMgr.AddMessage(ctx, sessionID, contextpkg.Message{Role: "user", Content: message, Timestamp: time.Now()}) + } + + var knowledgeContext string + if w.searcher != nil { + results, err := w.searcher.SearchKnowledge(ctx, message, 3) + if err == nil && len(results) > 0 { + var parts []string + for _, r := range results { + parts = append(parts, fmt.Sprintf("相关知识: %s", r.Document.Content)) + } + knowledgeContext = strings.Join(parts, "\n") + } + } + + enhanced := message + if knowledgeContext != "" { + enhanced = fmt.Sprintf("基于以下知识库内容回答用户问题:\n%s\n\n用户问题: %s", knowledgeContext, message) + } + + msgs := []*schema.Message{{Role: schema.User, Content: enhanced}} + chatModel, err := w.models.Chat() + if err != nil { + return "", err + } + resp, err := chatModel.Generate(ctx, msgs) + if err != nil || resp == nil { + return "", err + } + return resp.Content, nil +} + +func (w *chatWorkflow) Stream(ctx context.Context, message string, sessionID string) (<-chan string, error) { + var knowledgeContext string + if w.searcher != nil { + results, err := w.searcher.SearchKnowledge(ctx, message, 3) + if err == nil && len(results) > 0 { + var parts []string + for _, r := range results { + parts = append(parts, fmt.Sprintf("相关知识: %s", r.Document.Content)) + } + knowledgeContext = strings.Join(parts, "\n") + } + } + enhanced := message + if knowledgeContext != "" { + enhanced = fmt.Sprintf("基于以下知识库内容回答用户问题:\n%s\n\n用户问题: %s", knowledgeContext, message) + } + msgs := []*schema.Message{{Role: schema.User, Content: enhanced}} + chatModel, err := w.models.Chat() + if err != nil { + return nil, err + } + reader, err := chatModel.Stream(ctx, msgs) + if err != nil { + return nil, err + } + ch := make(chan string, 8) + go func() { + defer close(ch) + for { + chunk, err := reader.Recv() + if err != nil { + return + } + if chunk != nil && chunk.Content != "" { + ch <- chunk.Content + } + } + }() + return ch, nil +} diff --git a/eino-project/internal/server/grpc.go b/eino-project/internal/server/grpc.go deleted file mode 100644 index d97d847..0000000 --- a/eino-project/internal/server/grpc.go +++ /dev/null @@ -1,32 +0,0 @@ -package server - -import ( - v1 "eino-project/api/customer/v1" - "eino-project/internal/conf" - "eino-project/internal/service" - - "github.com/go-kratos/kratos/v2/log" - "github.com/go-kratos/kratos/v2/middleware/recovery" - "github.com/go-kratos/kratos/v2/transport/grpc" -) - -// NewGRPCServer new a gRPC server. -func NewGRPCServer(c *conf.Server, customerService *service.CustomerService, logger log.Logger) *grpc.Server { - var opts = []grpc.ServerOption{ - grpc.Middleware( - recovery.Recovery(), - ), - } - if c.Grpc.Network != "" { - opts = append(opts, grpc.Network(c.Grpc.Network)) - } - if c.Grpc.Addr != "" { - opts = append(opts, grpc.Address(c.Grpc.Addr)) - } - if c.Grpc.Timeout != nil { - opts = append(opts, grpc.Timeout(c.Grpc.Timeout.AsDuration())) - } - srv := grpc.NewServer(opts...) - v1.RegisterCustomerServiceServer(srv, customerService) - return srv -} diff --git a/eino-project/internal/server/http.go b/eino-project/internal/server/http.go index 2ae8cde..171b739 100644 --- a/eino-project/internal/server/http.go +++ b/eino-project/internal/server/http.go @@ -4,6 +4,9 @@ import ( v1 "eino-project/api/customer/v1" "eino-project/internal/conf" "eino-project/internal/service" + nethttp "net/http" + + "github.com/gorilla/websocket" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/middleware/recovery" @@ -27,12 +30,28 @@ func NewHTTPServer(c *conf.Server, customerService *service.CustomerService, log opts = append(opts, http.Timeout(c.Http.Timeout.AsDuration())) } srv := http.NewServer(opts...) - - // 注册标准的gRPC-Gateway路由 + + // 注册HTTP路由 v1.RegisterCustomerServiceHTTPServer(srv, customerService) - + // 添加SSE流式聊天的自定义路由 srv.HandleFunc("/api/chat/stream", customerService.HandleStreamChat) - + + // 商品查询 Agent 路由 + srv.HandleFunc("/api/agents/product/query", customerService.HandleProductQuery) + + // WebSocket 聊天路由(/api/chat/ws) + srv.HandleFunc("/api/chat/ws", func(w nethttp.ResponseWriter, r *nethttp.Request) { + upgrader := websocket.Upgrader{CheckOrigin: func(r *nethttp.Request) bool { return true }} + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + customerService.HandleWebSocketChat(conn) + }) + + // 订单诊断工作流(直接调用流水线输出) + srv.HandleFunc("/api/workflow/order/diagnosis", customerService.HandleOrderDiagnosis) + return srv } diff --git a/eino-project/internal/server/server.go b/eino-project/internal/server/server.go index f389425..de3a8ce 100644 --- a/eino-project/internal/server/server.go +++ b/eino-project/internal/server/server.go @@ -1,8 +1,8 @@ package server import ( - "github.com/google/wire" + "github.com/google/wire" ) // ProviderSet is server providers. -var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer) +var ProviderSet = wire.NewSet(NewHTTPServer) diff --git a/eino-project/internal/service/customer.go b/eino-project/internal/service/customer.go index ba5f546..f140439 100644 --- a/eino-project/internal/service/customer.go +++ b/eino-project/internal/service/customer.go @@ -7,11 +7,13 @@ import ( "net/http" "time" + "github.com/gorilla/websocket" + pb "eino-project/api/customer/v1" - "eino-project/internal/ai" "eino-project/internal/biz" - "eino-project/internal/monitor" - "eino-project/internal/session" + "eino-project/internal/domain/monitor" + "eino-project/internal/domain/session" + wf "eino-project/internal/domain/workflow" "github.com/go-kratos/kratos/v2/log" ) @@ -21,23 +23,24 @@ type CustomerService struct { pb.UnimplementedCustomerServiceServer customerUseCase *biz.CustomerUseCase - aiService ai.AIService sessionManager session.SessionManager monitor monitor.Monitor log *log.Helper + chatWorkflow wf.ChatWorkflow } // NewCustomerService 创建智能客服服务 -func NewCustomerService(customerUseCase *biz.CustomerUseCase, aiService ai.AIService, sessionManager session.SessionManager, monitor monitor.Monitor, logger log.Logger) *CustomerService { +func NewCustomerService(customerUseCase *biz.CustomerUseCase, sessionManager session.SessionManager, monitor monitor.Monitor, logger log.Logger) *CustomerService { return &CustomerService{ customerUseCase: customerUseCase, - aiService: aiService, sessionManager: sessionManager, monitor: monitor, log: log.NewHelper(logger), } } +func (s *CustomerService) SetChatWorkflow(w wf.ChatWorkflow) { s.chatWorkflow = w } + // SystemStatus 系统状态检查 func (s *CustomerService) SystemStatus(ctx context.Context, req *pb.SystemStatusRequest) (*pb.SystemStatusResponse, error) { s.log.WithContext(ctx).Info("SystemStatus called") @@ -97,28 +100,16 @@ func (s *CustomerService) Chat(ctx context.Context, req *pb.ChatRequest) (*pb.Ch } } - // 分析用户意图(使用 qwen3:8b LLM,不使用自定义规则) - intent := "general_inquiry" - if aiIntent, err := s.aiService.AnalyzeIntent(ctx, req.Message); err == nil && aiIntent != "" { - intent = aiIntent - } else if err != nil { - s.log.Errorf("Failed to analyze user intent via LLM: %v", err) - } + // 可在工作流内部进行意图识别 var aiResponse string - // 根据意图选择处理方式 - if intent == "knowledge_inquiry" || intent == "technical_support" { - // 使用智能问答(结合知识库) - aiResponse, err = s.customerUseCase.ProcessIntelligentChat(ctx, req.Message, req.SessionId) - if err != nil { - s.log.Errorf("Intelligent chat error: %v", err) - // 降级到普通AI服务 - aiResponse, err = s.aiService.ProcessChat(ctx, req.Message, req.SessionId) - } + // 使用 ChatWorkflow 路由处理(意图判断 + 调用相应 Agent/Workflow) + if s.chatWorkflow != nil { + aiResponse, err = s.chatWorkflow.Chat(ctx, req.Message, req.SessionId) } else { - // 使用普通AI服务 - aiResponse, err = s.aiService.ProcessChat(ctx, req.Message, req.SessionId) + aiResponse = "" + err = fmt.Errorf("chat workflow not configured") } if err != nil { @@ -301,9 +292,14 @@ func (s *CustomerService) StreamChat(req *pb.StreamChatRequest, stream pb.Custom } } - // 使用AI服务进行流式对话 + // 使用工作流进行流式对话 var fullResponse string - responseChan, err := s.aiService.StreamChat(ctx, req.Message, req.SessionId) + var responseChan <-chan string + if s.chatWorkflow != nil { + responseChan, err = s.chatWorkflow.Stream(ctx, req.Message, req.SessionId) + } else { + err = fmt.Errorf("chat workflow not configured") + } if err != nil { s.log.Errorf("AI stream service error: %v", err) return s.fallbackStreamResponse(req, stream) @@ -445,9 +441,9 @@ func (s *CustomerService) HandleStreamChat(w http.ResponseWriter, r *http.Reques } } - // 使用AI服务进行流式对话 + // 使用工作流进行流式对话 var fullResponse string - responseChan, err := s.aiService.StreamChat(ctx, req.Message, req.SessionID) + responseChan, err := s.chatWorkflow.Stream(ctx, req.Message, req.SessionID) if err != nil { s.log.Errorf("AI stream service error: %v", err) // 降级到模拟响应 @@ -548,3 +544,68 @@ func (s *CustomerService) fallbackSSEResponse(w http.ResponseWriter, flusher htt fmt.Fprintf(w, "data: {\"type\":\"end\",\"session_id\":\"%s\"}\n\n", sessionID) flusher.Flush() } + +// HandleWebSocketChat WebSocket 聊天处理器 +func (s *CustomerService) HandleWebSocketChat(conn *websocket.Conn) { + defer conn.Close() + ctx := context.Background() + for { + mt, msg, err := conn.ReadMessage() + if err != nil { + return + } + if mt != websocket.TextMessage { + continue + } + var req struct { + Message string `json:"message"` + SessionID string `json:"session_id"` + } + if json.Unmarshal(msg, &req) != nil { + continue + } + ch, err := s.chatWorkflow.Stream(ctx, req.Message, req.SessionID) + if err != nil { + _ = conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"error"}`)) + continue + } + for chunk := range ch { + resp := map[string]interface{}{"type": "chunk", "content": chunk, "session_id": req.SessionID} + b, _ := json.Marshal(resp) + _ = conn.WriteMessage(websocket.TextMessage, b) + } + _ = conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"end"}`)) + } +} + +// HandleOrderDiagnosis 订单诊断工作流入口 +func (s *CustomerService) HandleOrderDiagnosis(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + var req struct { + OrderID string `json:"order_id"` + Message string `json:"message"` + SessionID string `json:"session_id"` + } + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + ctx := r.Context() + // 简化:直接调用工作流的 Chat(可替换为订单专用流水线) + resp, err := s.chatWorkflow.Chat(ctx, fmt.Sprintf("订单诊断: %s\n%s", req.OrderID, req.Message), req.SessionID) + if err != nil { + http.Error(w, "Workflow error", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(map[string]interface{}{"order_id": req.OrderID, "result": resp}) +} + +// HandleProductQuery 商品查询处理 +func (s *CustomerService) HandleProductQuery(w http.ResponseWriter, r *http.Request) { + + json.NewEncoder(w).Encode(map[string]interface{}{}) +} diff --git a/eino-project/internal/service/service.go b/eino-project/internal/service/service.go index 6b27720..56c7d2c 100644 --- a/eino-project/internal/service/service.go +++ b/eino-project/internal/service/service.go @@ -3,4 +3,4 @@ package service import "github.com/google/wire" // ProviderSet is service providers. -var ProviderSet = wire.NewSet(NewCustomerService) \ No newline at end of file +var ProviderSet = wire.NewSet(NewCustomerService) diff --git a/eino-project/internal/session/provider.go b/eino-project/internal/session/provider.go deleted file mode 100644 index 6f7b006..0000000 --- a/eino-project/internal/session/provider.go +++ /dev/null @@ -1,8 +0,0 @@ -package session - -import ( - "github.com/google/wire" -) - -// ProviderSet is session providers. -var ProviderSet = wire.NewSet(NewMemorySessionManager) \ No newline at end of file