This commit is contained in:
fuzhongyun 2025-11-18 10:27:38 +08:00
parent abe8a83c93
commit bb5d32ca70
38 changed files with 649 additions and 669 deletions

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.36.9 // protoc-gen-go v1.36.10
// protoc v3.21.12 // protoc v3.21.12
// source: customer/v1/customer.proto // source: customer/v1/customer.proto

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.36.9 // protoc-gen-go v1.36.10
// protoc v3.21.12 // protoc v3.21.12
// source: helloworld/v1/error_reason.proto // source: helloworld/v1/error_reason.proto

View File

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.36.9 // protoc-gen-go v1.36.10
// protoc v3.21.12 // protoc v3.21.12
// source: helloworld/v1/greeter.proto // source: helloworld/v1/greeter.proto

View File

@ -1,20 +1,19 @@
package main package main
import ( import (
"flag" "flag"
"os" "os"
"eino-project/internal/conf" "eino-project/internal/conf"
"github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/config" "github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/config/file" "github.com/go-kratos/kratos/v2/config/file"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/tracing" "github.com/go-kratos/kratos/v2/middleware/tracing"
"github.com/go-kratos/kratos/v2/transport/grpc" "github.com/go-kratos/kratos/v2/transport/http"
"github.com/go-kratos/kratos/v2/transport/http"
_ "go.uber.org/automaxprocs" _ "go.uber.org/automaxprocs"
) )
// go build -ldflags "-X main.Version=x.y.z" // go build -ldflags "-X main.Version=x.y.z"
@ -33,18 +32,17 @@ func init() {
flag.StringVar(&flagconf, "conf", "./configs", "config path, eg: -conf config.yaml") flag.StringVar(&flagconf, "conf", "./configs", "config path, eg: -conf config.yaml")
} }
func newApp(logger log.Logger, gs *grpc.Server, hs *http.Server) *kratos.App { func newApp(logger log.Logger, hs *http.Server) *kratos.App {
return kratos.New( return kratos.New(
kratos.ID(id), kratos.ID(id),
kratos.Name(Name), kratos.Name(Name),
kratos.Version(Version), kratos.Version(Version),
kratos.Metadata(map[string]string{}), kratos.Metadata(map[string]string{}),
kratos.Logger(logger), kratos.Logger(logger),
kratos.Server( kratos.Server(
gs, hs,
hs, ),
), )
)
} }
func main() { func main() {

View File

@ -6,17 +6,13 @@
package main package main
import ( import (
"eino-project/internal/ai"
"eino-project/internal/biz" "eino-project/internal/biz"
"eino-project/internal/conf" "eino-project/internal/conf"
contextpkg "eino-project/internal/context"
"eino-project/internal/data" "eino-project/internal/data"
"eino-project/internal/data/repoimpl" "eino-project/internal/data/repoimpl"
"eino-project/internal/monitor" "eino-project/internal/domain"
"eino-project/internal/server" "eino-project/internal/server"
"eino-project/internal/service" "eino-project/internal/service"
"eino-project/internal/session"
"eino-project/internal/vector"
"github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
@ -25,5 +21,13 @@ import (
// wireApp init kratos application. // wireApp init kratos application.
func wireApp(*conf.Server, *conf.Data, *conf.Bootstrap, log.Logger) (*kratos.App, func(), error) { func wireApp(*conf.Server, *conf.Data, *conf.Bootstrap, log.Logger) (*kratos.App, func(), error) {
panic(wire.Build(server.ProviderSet, data.ProviderSet, repoimpl.ProviderSet, biz.ProviderSet, service.ProviderSet, ai.ProviderSet, session.ProviderSet, vector.ProviderSet, monitor.ProviderSet, contextpkg.ProviderSet, newApp)) panic(wire.Build(
server.ProviderSet,
data.ProviderSet,
repoimpl.ProviderSet,
biz.ProviderSet,
service.ProviderSet,
domain.ProviderSet,
newApp,
))
} }

View File

@ -7,17 +7,16 @@
package main package main
import ( import (
"eino-project/internal/ai"
"eino-project/internal/biz" "eino-project/internal/biz"
"eino-project/internal/conf" "eino-project/internal/conf"
"eino-project/internal/context"
"eino-project/internal/data" "eino-project/internal/data"
"eino-project/internal/data/repoimpl" "eino-project/internal/data/repoimpl"
"eino-project/internal/monitor" "eino-project/internal/domain/context"
"eino-project/internal/domain/monitor"
"eino-project/internal/domain/session"
"eino-project/internal/domain/vector"
"eino-project/internal/server" "eino-project/internal/server"
"eino-project/internal/service" "eino-project/internal/service"
"eino-project/internal/session"
"eino-project/internal/vector"
"github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
) )
@ -36,7 +35,7 @@ func wireApp(confServer *conf.Server, confData *conf.Data, bootstrap *conf.Boots
} }
documentProcessor := vector.NewDocumentProcessor(vectorService) documentProcessor := vector.NewDocumentProcessor(vectorService)
knowledgeSearcher := vector.NewKnowledgeSearcher(vectorService) knowledgeSearcher := vector.NewKnowledgeSearcher(vectorService)
contextManager := context.NewContextManagerFromBootstrapConfig(bootstrap, logger) contextManager := context.NewContextManager(logger)
monitorMonitor := monitor.NewMonitorFromBootstrapConfig(bootstrap, logger) monitorMonitor := monitor.NewMonitorFromBootstrapConfig(bootstrap, logger)
dataData, cleanup, err := data.NewData(confData, logger, vectorService, documentProcessor, knowledgeSearcher, contextManager, monitorMonitor) dataData, cleanup, err := data.NewData(confData, logger, vectorService, documentProcessor, knowledgeSearcher, contextManager, monitorMonitor)
if err != nil { if err != nil {
@ -44,16 +43,10 @@ func wireApp(confServer *conf.Server, confData *conf.Data, bootstrap *conf.Boots
} }
customerRepo := repoimpl.NewCustomerRepo(dataData, logger) customerRepo := repoimpl.NewCustomerRepo(dataData, logger)
customerUseCase := biz.NewCustomerUseCase(customerRepo, logger, knowledgeSearcher, documentProcessor, contextManager, monitorMonitor) customerUseCase := biz.NewCustomerUseCase(customerRepo, logger, knowledgeSearcher, documentProcessor, contextManager, monitorMonitor)
aiService, err := ai.NewAIServiceFromConfig(bootstrap, logger, knowledgeSearcher, contextManager, monitorMonitor)
if err != nil {
cleanup()
return nil, nil, err
}
sessionManager := session.NewMemorySessionManager(logger) sessionManager := session.NewMemorySessionManager(logger)
customerService := service.NewCustomerService(customerUseCase, aiService, sessionManager, monitorMonitor, logger) customerService := service.NewCustomerService(customerUseCase, sessionManager, monitorMonitor, logger)
grpcServer := server.NewGRPCServer(confServer, customerService, logger)
httpServer := server.NewHTTPServer(confServer, customerService, logger) httpServer := server.NewHTTPServer(confServer, customerService, logger)
app := newApp(logger, grpcServer, httpServer) app := newApp(logger, httpServer)
return app, func() { return app, func() {
cleanup() cleanup()
}, nil }, nil

View File

@ -46,6 +46,7 @@ require (
github.com/google/uuid v1.6.0 // indirect github.com/google/uuid v1.6.0 // indirect
github.com/goph/emperror v0.17.2 // indirect github.com/goph/emperror v0.17.2 // indirect
github.com/gorilla/mux v1.8.1 // indirect github.com/gorilla/mux v1.8.1 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/invopop/yaml v0.1.0 // indirect github.com/invopop/yaml v0.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect github.com/json-iterator/go v1.1.12 // indirect

View File

@ -101,6 +101,7 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE=
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@ -113,6 +114,8 @@ github.com/gopherjs/gopherjs v1.17.2/go.mod h1:pRRIvn/QzFLrKfvEz3qUuEhtE/zLCWfre
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/invopop/yaml v0.1.0 h1:YW3WGUoJEXYfzWBjn00zIlrw7brGVD0fUKRYDPAPhrc= github.com/invopop/yaml v0.1.0 h1:YW3WGUoJEXYfzWBjn00zIlrw7brGVD0fUKRYDPAPhrc=
github.com/invopop/yaml v0.1.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q= github.com/invopop/yaml v0.1.0/go.mod h1:2XuRLgs/ouIrW3XNzuNj7J3Nvu/Dig5MXvbCEdiBN3Q=
@ -251,6 +254,8 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=

View File

@ -1,329 +0,0 @@
package ai
import (
"context"
"fmt"
"strings"
"time"
contextpkg "eino-project/internal/context"
"eino-project/internal/monitor"
"eino-project/internal/vector"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/schema"
"github.com/go-kratos/kratos/v2/log"
)
// AIService AI服务接口
type AIService interface {
ProcessChat(ctx context.Context, message string, sessionID string) (string, error)
StreamChat(ctx context.Context, message string, sessionID string) (<-chan string, error)
// AnalyzeIntent 使用意图模型qwen3:8b做意图识别仅返回固定标签之一
AnalyzeIntent(ctx context.Context, message string) (string, error)
}
// OllamaRequest Ollama API 请求结构
type OllamaRequest struct {
Model string `json:"model"`
Prompt string `json:"prompt"`
Stream bool `json:"stream"`
}
// OllamaResponse Ollama API 响应结构
type OllamaResponse struct {
Model string `json:"model"`
Response string `json:"response"`
Done bool `json:"done"`
CreatedAt string `json:"created_at"`
}
// aiService AI服务实现
type aiService struct {
logger log.Logger
chatModel model.BaseChatModel
intentModel model.BaseChatModel
knowledgeSearcher vector.KnowledgeSearcher
contextManager contextpkg.ContextManager
monitor interface {
RecordLLMUsage(ctx context.Context, usage *monitor.LLMUsage) error
}
chatModelName string
}
// NewAIService 创建AI服务实例
func NewAIService(logger log.Logger, chatModel, intentModel model.BaseChatModel, knowledgeSearcher vector.KnowledgeSearcher, contextManager contextpkg.ContextManager) AIService {
return &aiService{
logger: logger,
chatModel: chatModel,
intentModel: intentModel,
knowledgeSearcher: knowledgeSearcher,
contextManager: contextManager,
}
}
// ProcessChat 处理聊天消息
func (s *aiService) ProcessChat(ctx context.Context, message string, sessionID string) (string, error) {
log.Context(ctx).Infof("Processing chat message: %s for session: %s", message, sessionID)
// 1. 添加消息到上下文管理器
if s.contextManager != nil {
msg := contextpkg.Message{
Role: "user",
Content: message,
Timestamp: time.Now(),
}
s.contextManager.AddMessage(ctx, sessionID, msg)
}
// 2. 搜索相关知识库内容
var knowledgeContext string
if s.knowledgeSearcher != nil {
knowledgeResults, err := s.knowledgeSearcher.SearchKnowledge(ctx, message, 3)
if err == nil && len(knowledgeResults) > 0 {
var contextParts []string
for _, result := range knowledgeResults {
contextParts = append(contextParts, fmt.Sprintf("相关知识: %s", result.Document.Content))
}
knowledgeContext = strings.Join(contextParts, "\n")
}
}
// 3. 构建增强的聊天消息
enhancedMessage := message
if knowledgeContext != "" {
enhancedMessage = fmt.Sprintf("基于以下知识库内容回答用户问题:\n%s\n\n用户问题: %s", knowledgeContext, message)
}
messages := []*schema.Message{
{
Role: schema.User,
Content: enhancedMessage,
},
}
// 4. 调用 Eino 聊天模型
start := time.Now()
response, err := s.chatModel.Generate(ctx, messages)
if err != nil {
log.Context(ctx).Warnf("Eino chat model call failed: %v, falling back to mock response", err)
// 如果 Eino 调用失败,返回模拟响应
return s.generateMockResponse(message), nil
}
if response == nil || response.Content == "" {
log.Context(ctx).Warn("Empty response from Eino chat model, falling back to mock response")
return s.generateMockResponse(message), nil
}
// 轻量化上报LLM使用模型、token估算、延迟、知识命中等
if s.monitor != nil {
// eino 不直接暴露 tokens这里用近似估算中文字数/2英文按空格词数*1
promptTokens := estimateTokens(enhancedMessage)
completionTokens := estimateTokens(response.Content)
// 使用专用意图模型识别意图(不使用自定义规则)
detectedIntent := "general_inquiry"
if intent, err := s.AnalyzeIntent(ctx, message); err == nil && intent != "" {
detectedIntent = intent
}
usage := &monitor.LLMUsage{
Model: s.chatModelName,
SessionID: sessionID,
UserID: "default_user",
PromptPreview: preview(enhancedMessage, 200),
PromptTokens: promptTokens,
CompletionTokens: completionTokens,
TotalTokens: promptTokens + completionTokens,
LatencyMS: time.Since(start).Milliseconds(),
AgentThought: "intent=" + detectedIntent,
KnowledgeHits: countLines(knowledgeContext),
Metadata: map[string]string{"source": "ai.ProcessChat"},
Timestamp: time.Now(),
}
_ = s.monitor.RecordLLMUsage(ctx, usage)
}
return response.Content, nil
}
// StreamChat 流式处理聊天消息
func (s *aiService) StreamChat(ctx context.Context, message string, sessionID string) (<-chan string, error) {
log.Context(ctx).Infof("Processing stream chat message: %s for session: %s", message, sessionID)
// 构建聊天消息
messages := []*schema.Message{
{
Role: schema.User,
Content: message,
},
}
// 调用 Eino 流式聊天模型
start := time.Now()
streamReader, err := s.chatModel.Stream(ctx, messages)
if err != nil {
log.Context(ctx).Warnf("Eino stream chat model call failed: %v, falling back to mock stream response", err)
// 如果 Eino 流式调用失败,返回模拟流式响应
return s.generateMockStreamResponse(ctx, message), nil
}
// 创建响应通道
responseChan := make(chan string, 10)
// 启动 goroutine 处理流式响应
go func() {
defer close(responseChan)
defer streamReader.Close()
for {
chunk, err := streamReader.Recv()
if err != nil {
// 流结束或出错
return
}
if chunk != nil && chunk.Content != "" {
select {
case responseChan <- chunk.Content:
case <-ctx.Done():
return
}
}
}
// 汇总轻量化上报以流结束后估算总tokens与延迟
if s.monitor != nil {
// 注意为了避免阻塞这里不去累计所有chunk仅使用输入估算 + 总延迟
detectedIntent := "general_inquiry"
if intent, err := s.AnalyzeIntent(ctx, message); err == nil && intent != "" {
detectedIntent = intent
}
usage := &monitor.LLMUsage{
Model: s.chatModelName,
SessionID: sessionID,
UserID: "default_user",
PromptPreview: preview(message, 200),
PromptTokens: estimateTokens(message),
// completionTokens 无法准确统计,留空或基于时间估算可选
TotalTokens: estimateTokens(message),
LatencyMS: time.Since(start).Milliseconds(),
AgentThought: "intent=" + detectedIntent,
Metadata: map[string]string{"source": "ai.StreamChat"},
Timestamp: time.Now(),
}
_ = s.monitor.RecordLLMUsage(ctx, usage)
}
}()
return responseChan, nil
}
// estimateTokens 简易估算 tokens 数(不准确,但用于监控趋势足够)
func estimateTokens(text string) int {
if text == "" {
return 0
}
// 粗略策略:英文按空格分词;中文按 rune 数/2
ascii := true
for _, r := range text {
if r > 127 {
ascii = false
break
}
}
if ascii {
// 英文词数近似为token数
return len(strings.Fields(text))
}
// 中文每2个汉字约1个token
return len([]rune(text)) / 2
}
func preview(text string, max int) string {
if len(text) <= max {
return text
}
return text[:max]
}
func countLines(text string) int {
if text == "" {
return 0
}
return len(strings.Split(text, "\n"))
}
// AnalyzeIntent 使用意图模型qwen3:8b进行意图识别输出以下标签之一
// order_inquiry、product_inquiry、technical_support、general_inquiry
func (s *aiService) AnalyzeIntent(ctx context.Context, message string) (string, error) {
if s.intentModel == nil {
return "general_inquiry", fmt.Errorf("intent model not configured")
}
instruction := "你是一个意图分类器。仅从以下标签中选择并输出一个且只输出标签本身order_inquiry、product_inquiry、technical_support、general_inquiry。用户消息" + message
messages := []*schema.Message{
{Role: schema.User, Content: instruction},
}
resp, err := s.intentModel.Generate(ctx, messages)
if err != nil || resp == nil || resp.Content == "" {
return "general_inquiry", err
}
intent := strings.TrimSpace(strings.ToLower(resp.Content))
switch intent {
case "order_inquiry", "product_inquiry", "technical_support", "general_inquiry":
return intent, nil
}
// 输出不在预期集合回退为general
return "general_inquiry", nil
}
// generateMockStreamResponse 生成模拟流式响应
func (s *aiService) generateMockStreamResponse(ctx context.Context, message string) <-chan string {
responseChan := make(chan string, 10)
go func() {
defer close(responseChan)
// 获取模拟响应
mockResponse := s.generateMockResponse(message)
// 将响应分割成小块,模拟流式输出
words := strings.Fields(mockResponse)
for i, word := range words {
select {
case responseChan <- word:
if i < len(words)-1 {
// 在单词之间添加空格,除了最后一个单词
select {
case responseChan <- " ":
case <-ctx.Done():
return
}
}
// 模拟网络延迟
time.Sleep(50 * time.Millisecond)
case <-ctx.Done():
return
}
}
}()
return responseChan
}
// generateMockResponse 生成模拟响应
func (s *aiService) generateMockResponse(message string) string {
// 简单的模拟响应逻辑
message = strings.ToLower(strings.TrimSpace(message))
switch {
case strings.Contains(message, "你好") || strings.Contains(message, "hello"):
return "你好!有什么我可以帮你的吗?😊"
case strings.Contains(message, "天气"):
return "抱歉,我无法获取实时天气信息。建议您查看天气预报应用。"
case strings.Contains(message, "时间"):
return fmt.Sprintf("当前时间是 %s", time.Now().Format("2006-01-02 15:04:05"))
case strings.Contains(message, "介绍"):
return "我是一个AI助手可以帮助您回答问题、提供信息和进行对话。有什么我可以为您做的吗"
default:
return "感谢您的消息!我正在学习中,暂时无法完全理解您的问题。请尝试用不同的方式表达,或者问一些简单的问题。"
}
}

View File

@ -1,72 +0,0 @@
package ai
import (
"context"
"fmt"
"time"
"eino-project/internal/conf"
contextpkg "eino-project/internal/context"
"eino-project/internal/monitor"
"eino-project/internal/vector"
"github.com/cloudwego/eino-ext/components/model/ollama"
"github.com/go-kratos/kratos/v2/log"
"github.com/google/wire"
)
// ProviderSet is ai providers.
var ProviderSet = wire.NewSet(NewAIServiceFromConfig)
// NewAIServiceFromConfig 从配置创建AI服务
func NewAIServiceFromConfig(c *conf.Bootstrap, logger log.Logger, knowledgeSearcher vector.KnowledgeSearcher, contextManager contextpkg.ContextManager, mon monitor.Monitor) (AIService, error) {
if c.Ai == nil || c.Ai.Ollama == nil {
return nil, fmt.Errorf("AI configuration is missing")
}
// 获取超时配置默认60秒
timeout := 60 * time.Second
if c.Ai.Ollama.Timeout != nil {
timeout = c.Ai.Ollama.Timeout.AsDuration()
}
// 设置默认模型
chatModelName := "deepseek-v3.1:671b-cloud"
intentModelName := "qwen3:8b"
// 从配置中获取模型
if len(c.Ai.Ollama.Models) > 0 && c.Ai.Ollama.Models[0] != "" {
chatModelName = c.Ai.Ollama.Models[0]
}
if len(c.Ai.Ollama.Models) > 1 && c.Ai.Ollama.Models[1] != "" {
intentModelName = c.Ai.Ollama.Models[1]
}
// 创建聊天模型(满足 BaseChatModel 接口)
chatModel, err := ollama.NewChatModel(context.Background(), &ollama.ChatModelConfig{
BaseURL: c.Ai.Ollama.Endpoint,
Timeout: timeout,
Model: chatModelName,
})
if err != nil {
return nil, fmt.Errorf("failed to create chat model: %w", err)
}
// 创建意图识别模型(满足 BaseChatModel 接口)
intentModel, err := ollama.NewChatModel(context.Background(), &ollama.ChatModelConfig{
BaseURL: c.Ai.Ollama.Endpoint,
Timeout: timeout,
Model: intentModelName,
})
if err != nil {
return nil, fmt.Errorf("failed to create intent model: %w", err)
}
svc := NewAIService(logger, chatModel, intentModel, knowledgeSearcher, contextManager)
// 注入轻量监控,用于记录 LLM 使用
if m, ok := svc.(*aiService); ok {
m.monitor = mon
m.chatModelName = chatModelName
}
return svc, nil
}

View File

@ -6,15 +6,13 @@ import (
"strings" "strings"
"time" "time"
"eino-project/internal/vector" contextpkg "eino-project/internal/domain/context"
contextpkg "eino-project/internal/context" "eino-project/internal/domain/monitor"
"eino-project/internal/monitor" "eino-project/internal/domain/vector"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
) )
// CustomerRepo 智能客服数据仓库接口 // CustomerRepo 智能客服数据仓库接口
type CustomerRepo interface { type CustomerRepo interface {
CheckSystemHealth(ctx context.Context) map[string]ServiceStatus CheckSystemHealth(ctx context.Context) map[string]ServiceStatus
@ -91,8 +89,6 @@ type OrderDetails struct {
NeedAI bool `json:"need_ai"` NeedAI bool `json:"need_ai"`
} }
// GetSystemStatus 获取系统状态 // GetSystemStatus 获取系统状态
func (uc *CustomerUseCase) GetSystemStatus(ctx context.Context) (*SystemStatus, error) { func (uc *CustomerUseCase) GetSystemStatus(ctx context.Context) (*SystemStatus, error) {
uc.log.WithContext(ctx).Info("Getting system status") uc.log.WithContext(ctx).Info("Getting system status")
@ -256,17 +252,17 @@ func generateSessionID() string {
// SearchKnowledge 搜索知识库 // SearchKnowledge 搜索知识库
func (uc *CustomerUseCase) SearchKnowledge(ctx context.Context, query string, limit int) ([]vector.SearchResult, error) { func (uc *CustomerUseCase) SearchKnowledge(ctx context.Context, query string, limit int) ([]vector.SearchResult, error) {
uc.log.WithContext(ctx).Infof("Searching knowledge for query: %s", query) uc.log.WithContext(ctx).Infof("Searching knowledge for query: %s", query)
if limit <= 0 { if limit <= 0 {
limit = 5 limit = 5
} }
results, err := uc.knowledgeSearcher.SearchKnowledge(ctx, query, limit) results, err := uc.knowledgeSearcher.SearchKnowledge(ctx, query, limit)
if err != nil { if err != nil {
uc.log.WithContext(ctx).Errorf("Failed to search knowledge: %v", err) uc.log.WithContext(ctx).Errorf("Failed to search knowledge: %v", err)
return nil, fmt.Errorf("知识库搜索失败: %w", err) return nil, fmt.Errorf("知识库搜索失败: %w", err)
} }
uc.log.WithContext(ctx).Infof("Found %d knowledge results", len(results)) uc.log.WithContext(ctx).Infof("Found %d knowledge results", len(results))
return results, nil return results, nil
} }
@ -274,22 +270,22 @@ func (uc *CustomerUseCase) SearchKnowledge(ctx context.Context, query string, li
// ProcessKnowledgeUpload 处理知识库文档上传 // ProcessKnowledgeUpload 处理知识库文档上传
func (uc *CustomerUseCase) ProcessKnowledgeUpload(ctx context.Context, content, title, category string) error { func (uc *CustomerUseCase) ProcessKnowledgeUpload(ctx context.Context, content, title, category string) error {
uc.log.WithContext(ctx).Infof("Processing knowledge upload: %s", title) uc.log.WithContext(ctx).Infof("Processing knowledge upload: %s", title)
if strings.TrimSpace(content) == "" { if strings.TrimSpace(content) == "" {
return fmt.Errorf("文档内容不能为空") return fmt.Errorf("文档内容不能为空")
} }
if strings.TrimSpace(title) == "" { if strings.TrimSpace(title) == "" {
return fmt.Errorf("文档标题不能为空") return fmt.Errorf("文档标题不能为空")
} }
// 处理文档并存储到向量数据库 // 处理文档并存储到向量数据库
documents, err := uc.docProcessor.ProcessKnowledgeDocument(ctx, content, title, category) documents, err := uc.docProcessor.ProcessKnowledgeDocument(ctx, content, title, category)
if err != nil { if err != nil {
uc.log.WithContext(ctx).Errorf("Failed to process knowledge document: %v", err) uc.log.WithContext(ctx).Errorf("Failed to process knowledge document: %v", err)
return fmt.Errorf("文档处理失败: %w", err) return fmt.Errorf("文档处理失败: %w", err)
} }
uc.log.WithContext(ctx).Infof("Successfully processed knowledge document '%s' into %d chunks", title, len(documents)) uc.log.WithContext(ctx).Infof("Successfully processed knowledge document '%s' into %d chunks", title, len(documents))
return nil return nil
} }
@ -297,13 +293,13 @@ func (uc *CustomerUseCase) ProcessKnowledgeUpload(ctx context.Context, content,
// GetKnowledgeSummary 获取知识摘要 // GetKnowledgeSummary 获取知识摘要
func (uc *CustomerUseCase) GetKnowledgeSummary(ctx context.Context, query string) (string, error) { func (uc *CustomerUseCase) GetKnowledgeSummary(ctx context.Context, query string) (string, error) {
uc.log.WithContext(ctx).Infof("Getting knowledge summary for query: %s", query) uc.log.WithContext(ctx).Infof("Getting knowledge summary for query: %s", query)
summary, err := uc.knowledgeSearcher.GetKnowledgeSummary(ctx, query) summary, err := uc.knowledgeSearcher.GetKnowledgeSummary(ctx, query)
if err != nil { if err != nil {
uc.log.WithContext(ctx).Errorf("Failed to get knowledge summary: %v", err) uc.log.WithContext(ctx).Errorf("Failed to get knowledge summary: %v", err)
return "", fmt.Errorf("获取知识摘要失败: %w", err) return "", fmt.Errorf("获取知识摘要失败: %w", err)
} }
return summary, nil return summary, nil
} }
@ -311,14 +307,14 @@ func (uc *CustomerUseCase) GetKnowledgeSummary(ctx context.Context, query string
// 保留函数以避免外部调用崩溃,但将其标记为 deprecated并直接返回 general_inquiry // 保留函数以避免外部调用崩溃,但将其标记为 deprecated并直接返回 general_inquiry
// Deprecated: use AIService.AnalyzeIntent instead. // Deprecated: use AIService.AnalyzeIntent instead.
func (uc *CustomerUseCase) AnalyzeUserIntent(ctx context.Context, message string) (string, error) { func (uc *CustomerUseCase) AnalyzeUserIntent(ctx context.Context, message string) (string, error) {
uc.log.WithContext(ctx).Warn("AnalyzeUserIntent (rule-based) is deprecated, use AIService.AnalyzeIntent (LLM) instead") uc.log.WithContext(ctx).Warn("AnalyzeUserIntent (rule-based) is deprecated, use AIService.AnalyzeIntent (LLM) instead")
return "general_inquiry", nil return "general_inquiry", nil
} }
// ProcessIntelligentChat 处理智能聊天(结合知识库) // ProcessIntelligentChat 处理智能聊天(结合知识库)
func (uc *CustomerUseCase) ProcessIntelligentChat(ctx context.Context, message, sessionID string) (string, error) { func (uc *CustomerUseCase) ProcessIntelligentChat(ctx context.Context, message, sessionID string) (string, error) {
startTime := time.Now() startTime := time.Now()
// 记录监控指标 // 记录监控指标
defer func() { defer func() {
duration := time.Since(startTime) duration := time.Since(startTime)
@ -385,9 +381,9 @@ func (uc *CustomerUseCase) ProcessIntelligentChat(ctx context.Context, message,
if contextInfo != "" { if contextInfo != "" {
response += contextInfo response += contextInfo
} }
response += "根据您的问题,我建议您:" response += "根据您的问题,我建议您:"
// 基于意图和知识库内容生成回复 // 基于意图和知识库内容生成回复
if strings.Contains(message, "订单") { if strings.Contains(message, "订单") {
response += "\n1. 检查订单状态和物流信息\n2. 如有问题可申请退换货\n3. 联系客服获取详细帮助" response += "\n1. 检查订单状态和物流信息\n2. 如有问题可申请退换货\n3. 联系客服获取详细帮助"
@ -418,11 +414,11 @@ func (uc *CustomerUseCase) handleOrderInquiry(ctx context.Context, message strin
if orderID != "" { if orderID != "" {
order, err := uc.QueryOrder(ctx, orderID) order, err := uc.QueryOrder(ctx, orderID)
if err == nil { if err == nil {
return fmt.Sprintf("您的订单 %s 状态为:%s。产品%s金额%.2f元。", return fmt.Sprintf("您的订单 %s 状态为:%s。产品%s金额%.2f元。",
order.OrderID, order.Status, order.Product, order.Amount) order.OrderID, order.Status, order.Product, order.Amount)
} }
} }
return "请提供您的订单号,我来帮您查询订单状态。订单号通常以 ORD 开头。" return "请提供您的订单号,我来帮您查询订单状态。订单号通常以 ORD 开头。"
} }
@ -433,7 +429,7 @@ func (uc *CustomerUseCase) handleProductInquiry(ctx context.Context, message str
if err != nil || len(results) == 0 { if err != nil || len(results) == 0 {
return "抱歉,我暂时没有找到相关的产品信息。请您详细描述您的问题,我会尽力为您解答。" return "抱歉,我暂时没有找到相关的产品信息。请您详细描述您的问题,我会尽力为您解答。"
} }
// 组合知识库内容 // 组合知识库内容
var knowledgeContent []string var knowledgeContent []string
for _, result := range results { for _, result := range results {
@ -441,18 +437,18 @@ func (uc *CustomerUseCase) handleProductInquiry(ctx context.Context, message str
knowledgeContent = append(knowledgeContent, result.Document.Content) knowledgeContent = append(knowledgeContent, result.Document.Content)
} }
} }
if len(knowledgeContent) == 0 { if len(knowledgeContent) == 0 {
return "抱歉,我没有找到与您问题高度相关的信息。请您提供更多详细信息。" return "抱歉,我没有找到与您问题高度相关的信息。请您提供更多详细信息。"
} }
response := "根据我们的产品知识库,为您找到以下相关信息:\n\n" response := "根据我们的产品知识库,为您找到以下相关信息:\n\n"
response += strings.Join(knowledgeContent, "\n\n") response += strings.Join(knowledgeContent, "\n\n")
if len(response) > 500 { if len(response) > 500 {
response = response[:500] + "...\n\n如需了解更多详细信息请告诉我您具体想了解哪个方面。" response = response[:500] + "...\n\n如需了解更多详细信息请告诉我您具体想了解哪个方面。"
} }
return response return response
} }
@ -463,7 +459,7 @@ func (uc *CustomerUseCase) handleTechnicalSupport(ctx context.Context, message s
if err != nil || len(results) == 0 { if err != nil || len(results) == 0 {
return "我理解您遇到了技术问题。请详细描述您的问题,包括:\n1. 具体的错误信息\n2. 操作步骤\n3. 使用的设备和系统\n我会为您提供针对性的解决方案。" return "我理解您遇到了技术问题。请详细描述您的问题,包括:\n1. 具体的错误信息\n2. 操作步骤\n3. 使用的设备和系统\n我会为您提供针对性的解决方案。"
} }
// 组合技术支持内容 // 组合技术支持内容
var supportContent []string var supportContent []string
for _, result := range results { for _, result := range results {
@ -471,18 +467,18 @@ func (uc *CustomerUseCase) handleTechnicalSupport(ctx context.Context, message s
supportContent = append(supportContent, result.Document.Content) supportContent = append(supportContent, result.Document.Content)
} }
} }
if len(supportContent) == 0 { if len(supportContent) == 0 {
return "请提供更多关于您技术问题的详细信息,我会为您查找相应的解决方案。" return "请提供更多关于您技术问题的详细信息,我会为您查找相应的解决方案。"
} }
response := "根据您的问题,我为您找到以下解决方案:\n\n" response := "根据您的问题,我为您找到以下解决方案:\n\n"
response += strings.Join(supportContent, "\n\n") response += strings.Join(supportContent, "\n\n")
if len(response) > 600 { if len(response) > 600 {
response = response[:600] + "...\n\n如果以上方案无法解决您的问题请联系我们的技术支持团队。" response = response[:600] + "...\n\n如果以上方案无法解决您的问题请联系我们的技术支持团队。"
} }
return response return response
} }
@ -493,12 +489,12 @@ func (uc *CustomerUseCase) handleGeneralInquiry(ctx context.Context, message str
if err != nil || len(results) == 0 { if err != nil || len(results) == 0 {
return "您好!我是智能客服助手,很高兴为您服务。请告诉我您需要什么帮助,我可以协助您:\n• 查询订单状态\n• 产品使用指导\n• 技术问题解答\n• 售后服务咨询" return "您好!我是智能客服助手,很高兴为您服务。请告诉我您需要什么帮助,我可以协助您:\n• 查询订单状态\n• 产品使用指导\n• 技术问题解答\n• 售后服务咨询"
} }
// 使用最相关的知识回答 // 使用最相关的知识回答
if results[0].Score > 0.7 { if results[0].Score > 0.7 {
return "根据您的问题,我为您找到以下信息:\n\n" + results[0].Document.Content return "根据您的问题,我为您找到以下信息:\n\n" + results[0].Document.Content
} }
return "我理解您的问题。请您提供更多详细信息,这样我能为您提供更准确的帮助。" return "我理解您的问题。请您提供更多详细信息,这样我能为您提供更准确的帮助。"
} }

View File

@ -919,11 +919,8 @@ type Monitoring_CozeLoop struct {
Endpoint string `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"` Endpoint string `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
Enable bool `protobuf:"varint,2,opt,name=enable,proto3" json:"enable,omitempty"` Enable bool `protobuf:"varint,2,opt,name=enable,proto3" json:"enable,omitempty"`
MetricsInterval *durationpb.Duration `protobuf:"bytes,3,opt,name=metrics_interval,json=metricsInterval,proto3" json:"metrics_interval,omitempty"` MetricsInterval *durationpb.Duration `protobuf:"bytes,3,opt,name=metrics_interval,json=metricsInterval,proto3" json:"metrics_interval,omitempty"`
// 新增:支持在配置文件中声明 Workspace 与 PAT优先级高于环境变量 unknownFields protoimpl.UnknownFields
WorkspaceId string `protobuf:"bytes,4,opt,name=workspace_id,json=workspaceId,proto3" json:"workspace_id,omitempty"` sizeCache protoimpl.SizeCache
ApiToken string `protobuf:"bytes,5,opt,name=api_token,json=apiToken,proto3" json:"api_token,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
} }
func (x *Monitoring_CozeLoop) Reset() { func (x *Monitoring_CozeLoop) Reset() {
@ -977,20 +974,6 @@ func (x *Monitoring_CozeLoop) GetMetricsInterval() *durationpb.Duration {
return nil return nil
} }
func (x *Monitoring_CozeLoop) GetWorkspaceId() string {
if x != nil {
return x.WorkspaceId
}
return ""
}
func (x *Monitoring_CozeLoop) GetApiToken() string {
if x != nil {
return x.ApiToken
}
return ""
}
var File_conf_conf_proto protoreflect.FileDescriptor var File_conf_conf_proto protoreflect.FileDescriptor
const file_conf_conf_proto_rawDesc = "" + const file_conf_conf_proto_rawDesc = "" +
@ -1049,16 +1032,14 @@ const file_conf_conf_proto_rawDesc = "" +
"\n" + "\n" +
"collection\x18\x02 \x01(\tR\n" + "collection\x18\x02 \x01(\tR\n" +
"collection\x123\n" + "collection\x123\n" +
"\atimeout\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\atimeout\"\x91\x02\n" + "\atimeout\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\atimeout\"\xd1\x01\n" +
"\n" + "\n" +
"Monitoring\x12<\n" + "Monitoring\x12<\n" +
"\tcoze_loop\x18\x01 \x01(\v2\x1f.kratos.api.Monitoring.CozeLoopR\bcozeLoop\x1a\xc4\x01\n" + "\tcoze_loop\x18\x01 \x01(\v2\x1f.kratos.api.Monitoring.CozeLoopR\bcozeLoop\x1a\x84\x01\n" +
"\bCozeLoop\x12\x1a\n" + "\bCozeLoop\x12\x1a\n" +
"\bendpoint\x18\x01 \x01(\tR\bendpoint\x12\x16\n" + "\bendpoint\x18\x01 \x01(\tR\bendpoint\x12\x16\n" +
"\x06enable\x18\x02 \x01(\bR\x06enable\x12D\n" + "\x06enable\x18\x02 \x01(\bR\x06enable\x12D\n" +
"\x10metrics_interval\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\x0fmetricsInterval\x12!\n" + "\x10metrics_interval\x18\x03 \x01(\v2\x19.google.protobuf.DurationR\x0fmetricsInterval\"K\n" +
"\fworkspace_id\x18\x04 \x01(\tR\vworkspaceId\x12\x1b\n" +
"\tapi_token\x18\x05 \x01(\tR\bapiToken\"K\n" +
"\x03Log\x12\x14\n" + "\x03Log\x12\x14\n" +
"\x05level\x18\x01 \x01(\tR\x05level\x12\x16\n" + "\x05level\x18\x01 \x01(\tR\x05level\x12\x16\n" +
"\x06format\x18\x02 \x01(\tR\x06format\x12\x16\n" + "\x06format\x18\x02 \x01(\tR\x06format\x12\x16\n" +

View File

@ -1,16 +0,0 @@
package context
import (
"eino-project/internal/conf"
"github.com/go-kratos/kratos/v2/log"
"github.com/google/wire"
)
// ProviderSet is context providers.
var ProviderSet = wire.NewSet(NewContextManagerFromBootstrapConfig)
// NewContextManagerFromBootstrapConfig 从 Bootstrap 配置创建上下文管理器
func NewContextManagerFromBootstrapConfig(c *conf.Bootstrap, logger log.Logger) ContextManager {
return NewContextManager(logger)
}

View File

@ -6,9 +6,9 @@ import (
"time" "time"
"eino-project/internal/conf" "eino-project/internal/conf"
contextpkg "eino-project/internal/context" contextpkg "eino-project/internal/domain/context"
"eino-project/internal/monitor" "eino-project/internal/domain/monitor"
"eino-project/internal/vector" "eino-project/internal/domain/vector"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"github.com/redis/go-redis/v9" "github.com/redis/go-redis/v9"
@ -22,14 +22,14 @@ var ProviderSet = wire.NewSet(NewData)
// Data 数据层结构 // Data 数据层结构
type Data struct { type Data struct {
DB *sql.DB DB *sql.DB
RDB *redis.Client RDB *redis.Client
VectorService vector.VectorService VectorService vector.VectorService
DocProcessor vector.DocumentProcessor DocProcessor vector.DocumentProcessor
KnowledgeSearcher vector.KnowledgeSearcher KnowledgeSearcher vector.KnowledgeSearcher
log *log.Helper log *log.Helper
ContextManager contextpkg.ContextManager ContextManager contextpkg.ContextManager
Monitor monitor.Monitor Monitor monitor.Monitor
} }
// NewData 创建数据层 // NewData 创建数据层
@ -56,7 +56,7 @@ func NewData(c *conf.Data, logger log.Logger, vs vector.VectorService, dp vector
// 测试 Redis 连接 // 测试 Redis 连接
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
if err := rdb.Ping(ctx).Err(); err != nil { if err := rdb.Ping(ctx).Err(); err != nil {
helper.Warnf("Redis connection failed: %v", err) helper.Warnf("Redis connection failed: %v", err)
} }
@ -67,14 +67,14 @@ func NewData(c *conf.Data, logger log.Logger, vs vector.VectorService, dp vector
} }
d := &Data{ d := &Data{
DB: db, DB: db,
RDB: rdb, RDB: rdb,
VectorService: vs, VectorService: vs,
DocProcessor: dp, DocProcessor: dp,
KnowledgeSearcher: ks, KnowledgeSearcher: ks,
log: helper, log: helper,
ContextManager: contextManager, ContextManager: contextManager,
Monitor: monitor, Monitor: monitor,
} }
cleanup := func() { cleanup := func() {

View File

@ -0,0 +1,17 @@
package intent
import (
"context"
)
type IntentAgent interface {
Classify(ctx context.Context, message string) (string, error)
}
type passthroughAgent struct{}
func NewPassthrough() IntentAgent { return &passthroughAgent{} }
func (p *passthroughAgent) Classify(ctx context.Context, message string) (string, error) {
return "general_inquiry", nil
}

View File

@ -0,0 +1,63 @@
package product
import (
"encoding/json"
"net/http"
"strings"
tool "eino-project/internal/tools/product"
)
type Handler struct {
tool tool.Tool
}
func NewHandler(t tool.Tool) *Handler { return &Handler{tool: t} }
type queryRequest struct {
Query struct {
ID string `json:"id"`
Name string `json:"name"`
} `json:"query"`
Page int `json:"page,omitempty"`
Size int `json:"size,omitempty"`
}
type queryResponse struct {
Items []*tool.Product `json:"items"`
Source string `json:"source"`
Count int `json:"count"`
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var req queryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
var items []*tool.Product
src := ""
if id := strings.TrimSpace(req.Query.ID); id != "" {
if p, _ := h.tool.GetByID(r.Context(), id); p != nil {
items = append(items, p)
}
src = "id"
} else if name := strings.TrimSpace(req.Query.Name); name != "" {
res, _ := h.tool.SearchByName(r.Context(), name)
items = res
src = "name"
} else {
http.Error(w, "query.id or query.name required", http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(&queryResponse{Items: items, Source: src, Count: len(items)})
}

View File

@ -0,0 +1,44 @@
package llm
import (
"eino-project/internal/conf"
"sync"
"github.com/cloudwego/eino/components/model"
)
type LLM interface {
Chat() (model.BaseChatModel, error)
Intent() (model.BaseChatModel, error)
}
type llm struct {
cfg *conf.Bootstrap
onceChat sync.Once
onceIntent sync.Once
chat model.BaseChatModel
intent model.BaseChatModel
cache map[string]model.BaseChatModel
}
func NewLLM(cfg *conf.Bootstrap) LLM {
return &llm{cfg: cfg, cache: make(map[string]model.BaseChatModel)}
}
// 获取Ollama聊天模型实例
func (r *llm) Chat() (model.BaseChatModel, error) {
var err error
r.onceChat.Do(func() {
r.chat, err = newOllamaChatModel(r.cfg)
})
return r.chat, err
}
// 获取Ollama意图识别模型实例
func (r *llm) Intent() (model.BaseChatModel, error) {
var err error
r.onceIntent.Do(func() {
r.intent, err = newOllamaIntentModel(r.cfg)
})
return r.intent, err
}

View File

@ -0,0 +1,37 @@
package llm
import (
"context"
"fmt"
"time"
"eino-project/internal/conf"
"github.com/cloudwego/eino-ext/components/model/ollama"
"github.com/cloudwego/eino/components/model"
)
// newOllamaChatModel Ollama聊天模型实例
func newOllamaChatModel(c *conf.Bootstrap) (model.BaseChatModel, error) {
if c == nil || c.Ai == nil || c.Ai.Ollama == nil {
return nil, fmt.Errorf("AI configuration is missing")
}
timeout := 60 * time.Second
if c.Ai.Ollama.Timeout != nil {
timeout = c.Ai.Ollama.Timeout.AsDuration()
}
modelName := "deepseek-v3.1:671b-cloud"
if len(c.Ai.Ollama.Models) > 0 && c.Ai.Ollama.Models[0] != "" {
modelName = c.Ai.Ollama.Models[0]
}
model, err := ollama.NewChatModel(context.Background(), &ollama.ChatModelConfig{
BaseURL: c.Ai.Ollama.Endpoint,
Timeout: timeout,
Model: modelName,
})
if err != nil {
return nil, fmt.Errorf("failed to create chat model: %w", err)
}
return model, nil
}

View File

@ -0,0 +1,36 @@
package llm
import (
"context"
"fmt"
"time"
"eino-project/internal/conf"
"github.com/cloudwego/eino-ext/components/model/ollama"
"github.com/cloudwego/eino/components/model"
)
// newOllamaIntentModel Ollama意图识别模型实例
func newOllamaIntentModel(c *conf.Bootstrap) (model.BaseChatModel, error) {
if c == nil || c.Ai == nil || c.Ai.Ollama == nil {
return nil, fmt.Errorf("AI configuration is missing")
}
timeout := 60 * time.Second
if c.Ai.Ollama.Timeout != nil {
timeout = c.Ai.Ollama.Timeout.AsDuration()
}
modelName := "qwen3:8b"
if len(c.Ai.Ollama.Models) > 1 && c.Ai.Ollama.Models[1] != "" {
modelName = c.Ai.Ollama.Models[1]
}
model, err := ollama.NewChatModel(context.Background(), &ollama.ChatModelConfig{
BaseURL: c.Ai.Ollama.Endpoint,
Timeout: timeout,
Model: modelName,
})
if err != nil {
return nil, fmt.Errorf("failed to create intent model: %w", err)
}
return model, nil
}

View File

@ -1,95 +1,92 @@
package monitor package monitor
import ( import (
"context" "context"
"time" "time"
"eino-project/internal/conf" "eino-project/internal/conf"
cozeloop "github.com/coze-dev/cozeloop-go" cozeloop "github.com/coze-dev/cozeloop-go"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
) )
// cozeLoopMonitor 是一个包装器:在调用本地监控逻辑的同时,将关键指标以 Trace 形式上报到 Coze Loop // cozeLoopMonitor 是一个包装器:在调用本地监控逻辑的同时,将关键指标以 Trace 形式上报到 Coze Loop
type cozeLoopMonitor struct { type cozeLoopMonitor struct {
base Monitor base Monitor
client cozeloop.Client client cozeloop.Client
log *log.Helper log *log.Helper
} }
// NewCozeLoopMonitor 根据配置初始化 Coze Loop 客户端并返回包装后的监控器 // NewCozeLoopMonitor 根据配置初始化 Coze Loop 客户端并返回包装后的监控器
func NewCozeLoopMonitor(base Monitor, cfg *conf.Monitoring_CozeLoop, logger log.Logger) Monitor { func NewCozeLoopMonitor(base Monitor, cfg *conf.Monitoring_CozeLoop, logger log.Logger) Monitor {
helper := log.NewHelper(logger) helper := log.NewHelper(logger)
ctx := context.Background() ctx := context.Background()
// 按官方 simple demo 风格,直接从环境变量初始化客户端 // 按官方 simple demo 风格,直接从环境变量初始化客户端
client, err := cozeloop.NewClient() client, err := cozeloop.NewClient()
if err != nil { if err != nil {
helper.WithContext(ctx).Warnf("init CozeLoop client failed: %v, fallback to base monitor", err) helper.WithContext(ctx).Warnf("init CozeLoop client failed: %v, fallback to base monitor", err)
return base return base
} }
m := &cozeLoopMonitor{ m := &cozeLoopMonitor{
base: base, base: base,
client: client, client: client,
log: helper, log: helper,
} }
return m return m
} }
// loopReportMetrics 定期采集本地指标并以 Trace 形式上报
// 已移除周期性指标上报逻辑,保留最小实现
// RecordRequest 记录请求并上报简化 Trace // RecordRequest 记录请求并上报简化 Trace
func (m *cozeLoopMonitor) RecordRequest(ctx context.Context, requestType string, duration time.Duration, success bool) error { func (m *cozeLoopMonitor) RecordRequest(ctx context.Context, requestType string, duration time.Duration, success bool) error {
if err := m.base.RecordRequest(ctx, requestType, duration, success); err != nil { if err := m.base.RecordRequest(ctx, requestType, duration, success); err != nil {
return err return err
} }
// 简化不生成trace // 简化不生成trace
return nil return nil
} }
// RecordAIRequest 记录 AI 请求并上报简化 Trace // RecordAIRequest 记录 AI 请求并上报简化 Trace
func (m *cozeLoopMonitor) RecordAIRequest(ctx context.Context, duration time.Duration, success bool) error { func (m *cozeLoopMonitor) RecordAIRequest(ctx context.Context, duration time.Duration, success bool) error {
if err := m.base.RecordAIRequest(ctx, duration, success); err != nil { if err := m.base.RecordAIRequest(ctx, duration, success); err != nil {
return err return err
} }
// 简化不生成trace // 简化不生成trace
return nil return nil
} }
// RecordVectorOperation 记录向量操作并上报简化 Trace // RecordVectorOperation 记录向量操作并上报简化 Trace
func (m *cozeLoopMonitor) RecordVectorOperation(ctx context.Context, operation string, success bool) error { func (m *cozeLoopMonitor) RecordVectorOperation(ctx context.Context, operation string, success bool) error {
if err := m.base.RecordVectorOperation(ctx, operation, success); err != nil { if err := m.base.RecordVectorOperation(ctx, operation, success); err != nil {
return err return err
} }
// 简化不生成trace // 简化不生成trace
return nil return nil
} }
// RecordSessionOperation 记录会话操作并上报简化 Trace // RecordSessionOperation 记录会话操作并上报简化 Trace
func (m *cozeLoopMonitor) RecordSessionOperation(ctx context.Context, operation string) error { func (m *cozeLoopMonitor) RecordSessionOperation(ctx context.Context, operation string) error {
if err := m.base.RecordSessionOperation(ctx, operation); err != nil { if err := m.base.RecordSessionOperation(ctx, operation); err != nil {
return err return err
} }
// 简化不生成trace // 简化不生成trace
return nil return nil
} }
// RecordKnowledgeOperation 记录知识库操作并上报简化 Trace // RecordKnowledgeOperation 记录知识库操作并上报简化 Trace
func (m *cozeLoopMonitor) RecordKnowledgeOperation(ctx context.Context, operation string) error { func (m *cozeLoopMonitor) RecordKnowledgeOperation(ctx context.Context, operation string) error {
if err := m.base.RecordKnowledgeOperation(ctx, operation); err != nil { if err := m.base.RecordKnowledgeOperation(ctx, operation); err != nil {
return err return err
} }
// 简化不生成trace // 简化不生成trace
return nil return nil
} }
// 透传LLM使用事件不生成trace不做周期性上报 // 透传LLM使用事件不生成trace不做周期性上报
func (m *cozeLoopMonitor) RecordLLMUsage(ctx context.Context, usage *LLMUsage) error { func (m *cozeLoopMonitor) RecordLLMUsage(ctx context.Context, usage *LLMUsage) error {
return m.base.RecordLLMUsage(ctx, usage) return m.base.RecordLLMUsage(ctx, usage)
} }
// GetMetrics 透传本地指标(不直接从 Coze Loop 读取) // GetMetrics 透传本地指标(不直接从 Coze Loop 读取)
@ -99,10 +96,10 @@ func (m *cozeLoopMonitor) GetMetrics(ctx context.Context) (*Metrics, error) {
// CreateAlert 创建告警不生成trace保持简化 // CreateAlert 创建告警不生成trace保持简化
func (m *cozeLoopMonitor) CreateAlert(ctx context.Context, alertType, level, title, message string, metadata map[string]interface{}) error { func (m *cozeLoopMonitor) CreateAlert(ctx context.Context, alertType, level, title, message string, metadata map[string]interface{}) error {
if err := m.base.CreateAlert(ctx, alertType, level, title, message, metadata); err != nil { if err := m.base.CreateAlert(ctx, alertType, level, title, message, metadata); err != nil {
return err return err
} }
return nil return nil
} }
// GetAlerts 透传本地告警 // GetAlerts 透传本地告警

View File

@ -0,0 +1,27 @@
package domain
import (
"eino-project/internal/domain/context"
"eino-project/internal/domain/llm"
"eino-project/internal/domain/monitor"
"eino-project/internal/domain/session"
"eino-project/internal/domain/tools"
"eino-project/internal/domain/vector"
"eino-project/internal/domain/workflow"
"github.com/google/wire"
)
// ProviderSet is domain providers.
var ProviderSet = wire.NewSet(
context.NewContextManager,
llm.NewLLM,
monitor.NewMonitorFromBootstrapConfig,
session.NewMemorySessionManager,
tools.NewMemoryProductTool,
workflow.NewChatWorkflow,
vector.NewVectorServiceFromBootstrapConfig,
vector.NewDocumentProcessor,
vector.NewKnowledgeSearcher,
)

View File

@ -0,0 +1,54 @@
package tools
import (
"context"
"strings"
)
type Product struct {
ID string `json:"id"`
Name string `json:"name"`
Price float64 `json:"price"`
Description string `json:"description"`
}
type Tool interface {
GetByID(ctx context.Context, id string) (*Product, error)
SearchByName(ctx context.Context, name string) ([]*Product, error)
}
type memoryTool struct {
items []*Product
}
func NewMemoryProductTool() Tool {
return &memoryTool{items: []*Product{
{ID: "P001", Name: "手机 A1", Price: 1999, Description: "入门级智能手机"},
{ID: "P002", Name: "手机 Pro X", Price: 5999, Description: "旗舰级拍照手机"},
{ID: "P003", Name: "笔记本 M3", Price: 8999, Description: "轻薄高性能笔记本"},
{ID: "P004", Name: "耳机 Air", Price: 1299, Description: "降噪真无线耳机"},
}}
}
func (m *memoryTool) GetByID(ctx context.Context, id string) (*Product, error) {
for _, it := range m.items {
if it.ID == id {
return it, nil
}
}
return nil, nil
}
func (m *memoryTool) SearchByName(ctx context.Context, name string) ([]*Product, error) {
if name == "" {
return nil, nil
}
q := strings.ToLower(name)
var out []*Product
for _, it := range m.items {
if strings.Contains(strings.ToLower(it.Name), q) {
out = append(out, it)
}
}
return out, nil
}

View File

@ -0,0 +1,104 @@
package workflow
import (
"context"
"fmt"
"strings"
"time"
contextpkg "eino-project/internal/domain/context"
"eino-project/internal/domain/llm"
"eino-project/internal/domain/vector"
"github.com/cloudwego/eino/schema"
)
type ChatWorkflow interface {
Chat(ctx context.Context, message string, sessionID string) (string, error)
Stream(ctx context.Context, message string, sessionID string) (<-chan string, error)
}
type chatWorkflow struct {
models llm.LLM
searcher vector.KnowledgeSearcher
ctxMgr contextpkg.ContextManager
}
func NewChatWorkflow(models llm.LLM, searcher vector.KnowledgeSearcher, ctxMgr contextpkg.ContextManager) ChatWorkflow {
return &chatWorkflow{models: models, searcher: searcher, ctxMgr: ctxMgr}
}
func (w *chatWorkflow) Chat(ctx context.Context, message string, sessionID string) (string, error) {
if w.ctxMgr != nil {
w.ctxMgr.AddMessage(ctx, sessionID, contextpkg.Message{Role: "user", Content: message, Timestamp: time.Now()})
}
var knowledgeContext string
if w.searcher != nil {
results, err := w.searcher.SearchKnowledge(ctx, message, 3)
if err == nil && len(results) > 0 {
var parts []string
for _, r := range results {
parts = append(parts, fmt.Sprintf("相关知识: %s", r.Document.Content))
}
knowledgeContext = strings.Join(parts, "\n")
}
}
enhanced := message
if knowledgeContext != "" {
enhanced = fmt.Sprintf("基于以下知识库内容回答用户问题:\n%s\n\n用户问题: %s", knowledgeContext, message)
}
msgs := []*schema.Message{{Role: schema.User, Content: enhanced}}
chatModel, err := w.models.Chat()
if err != nil {
return "", err
}
resp, err := chatModel.Generate(ctx, msgs)
if err != nil || resp == nil {
return "", err
}
return resp.Content, nil
}
func (w *chatWorkflow) Stream(ctx context.Context, message string, sessionID string) (<-chan string, error) {
var knowledgeContext string
if w.searcher != nil {
results, err := w.searcher.SearchKnowledge(ctx, message, 3)
if err == nil && len(results) > 0 {
var parts []string
for _, r := range results {
parts = append(parts, fmt.Sprintf("相关知识: %s", r.Document.Content))
}
knowledgeContext = strings.Join(parts, "\n")
}
}
enhanced := message
if knowledgeContext != "" {
enhanced = fmt.Sprintf("基于以下知识库内容回答用户问题:\n%s\n\n用户问题: %s", knowledgeContext, message)
}
msgs := []*schema.Message{{Role: schema.User, Content: enhanced}}
chatModel, err := w.models.Chat()
if err != nil {
return nil, err
}
reader, err := chatModel.Stream(ctx, msgs)
if err != nil {
return nil, err
}
ch := make(chan string, 8)
go func() {
defer close(ch)
for {
chunk, err := reader.Recv()
if err != nil {
return
}
if chunk != nil && chunk.Content != "" {
ch <- chunk.Content
}
}
}()
return ch, nil
}

View File

@ -1,32 +0,0 @@
package server
import (
v1 "eino-project/api/customer/v1"
"eino-project/internal/conf"
"eino-project/internal/service"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/grpc"
)
// NewGRPCServer new a gRPC server.
func NewGRPCServer(c *conf.Server, customerService *service.CustomerService, logger log.Logger) *grpc.Server {
var opts = []grpc.ServerOption{
grpc.Middleware(
recovery.Recovery(),
),
}
if c.Grpc.Network != "" {
opts = append(opts, grpc.Network(c.Grpc.Network))
}
if c.Grpc.Addr != "" {
opts = append(opts, grpc.Address(c.Grpc.Addr))
}
if c.Grpc.Timeout != nil {
opts = append(opts, grpc.Timeout(c.Grpc.Timeout.AsDuration()))
}
srv := grpc.NewServer(opts...)
v1.RegisterCustomerServiceServer(srv, customerService)
return srv
}

View File

@ -4,6 +4,9 @@ import (
v1 "eino-project/api/customer/v1" v1 "eino-project/api/customer/v1"
"eino-project/internal/conf" "eino-project/internal/conf"
"eino-project/internal/service" "eino-project/internal/service"
nethttp "net/http"
"github.com/gorilla/websocket"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/recovery" "github.com/go-kratos/kratos/v2/middleware/recovery"
@ -27,12 +30,28 @@ func NewHTTPServer(c *conf.Server, customerService *service.CustomerService, log
opts = append(opts, http.Timeout(c.Http.Timeout.AsDuration())) opts = append(opts, http.Timeout(c.Http.Timeout.AsDuration()))
} }
srv := http.NewServer(opts...) srv := http.NewServer(opts...)
// 注册标准的gRPC-Gateway路由 // 注册HTTP路由
v1.RegisterCustomerServiceHTTPServer(srv, customerService) v1.RegisterCustomerServiceHTTPServer(srv, customerService)
// 添加SSE流式聊天的自定义路由 // 添加SSE流式聊天的自定义路由
srv.HandleFunc("/api/chat/stream", customerService.HandleStreamChat) srv.HandleFunc("/api/chat/stream", customerService.HandleStreamChat)
// 商品查询 Agent 路由
srv.HandleFunc("/api/agents/product/query", customerService.HandleProductQuery)
// WebSocket 聊天路由(/api/chat/ws
srv.HandleFunc("/api/chat/ws", func(w nethttp.ResponseWriter, r *nethttp.Request) {
upgrader := websocket.Upgrader{CheckOrigin: func(r *nethttp.Request) bool { return true }}
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
customerService.HandleWebSocketChat(conn)
})
// 订单诊断工作流(直接调用流水线输出)
srv.HandleFunc("/api/workflow/order/diagnosis", customerService.HandleOrderDiagnosis)
return srv return srv
} }

View File

@ -1,8 +1,8 @@
package server package server
import ( import (
"github.com/google/wire" "github.com/google/wire"
) )
// ProviderSet is server providers. // ProviderSet is server providers.
var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer) var ProviderSet = wire.NewSet(NewHTTPServer)

View File

@ -7,11 +7,13 @@ import (
"net/http" "net/http"
"time" "time"
"github.com/gorilla/websocket"
pb "eino-project/api/customer/v1" pb "eino-project/api/customer/v1"
"eino-project/internal/ai"
"eino-project/internal/biz" "eino-project/internal/biz"
"eino-project/internal/monitor" "eino-project/internal/domain/monitor"
"eino-project/internal/session" "eino-project/internal/domain/session"
wf "eino-project/internal/domain/workflow"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
) )
@ -21,23 +23,24 @@ type CustomerService struct {
pb.UnimplementedCustomerServiceServer pb.UnimplementedCustomerServiceServer
customerUseCase *biz.CustomerUseCase customerUseCase *biz.CustomerUseCase
aiService ai.AIService
sessionManager session.SessionManager sessionManager session.SessionManager
monitor monitor.Monitor monitor monitor.Monitor
log *log.Helper log *log.Helper
chatWorkflow wf.ChatWorkflow
} }
// NewCustomerService 创建智能客服服务 // NewCustomerService 创建智能客服服务
func NewCustomerService(customerUseCase *biz.CustomerUseCase, aiService ai.AIService, sessionManager session.SessionManager, monitor monitor.Monitor, logger log.Logger) *CustomerService { func NewCustomerService(customerUseCase *biz.CustomerUseCase, sessionManager session.SessionManager, monitor monitor.Monitor, logger log.Logger) *CustomerService {
return &CustomerService{ return &CustomerService{
customerUseCase: customerUseCase, customerUseCase: customerUseCase,
aiService: aiService,
sessionManager: sessionManager, sessionManager: sessionManager,
monitor: monitor, monitor: monitor,
log: log.NewHelper(logger), log: log.NewHelper(logger),
} }
} }
func (s *CustomerService) SetChatWorkflow(w wf.ChatWorkflow) { s.chatWorkflow = w }
// SystemStatus 系统状态检查 // SystemStatus 系统状态检查
func (s *CustomerService) SystemStatus(ctx context.Context, req *pb.SystemStatusRequest) (*pb.SystemStatusResponse, error) { func (s *CustomerService) SystemStatus(ctx context.Context, req *pb.SystemStatusRequest) (*pb.SystemStatusResponse, error) {
s.log.WithContext(ctx).Info("SystemStatus called") s.log.WithContext(ctx).Info("SystemStatus called")
@ -97,28 +100,16 @@ func (s *CustomerService) Chat(ctx context.Context, req *pb.ChatRequest) (*pb.Ch
} }
} }
// 分析用户意图(使用 qwen3:8b LLM不使用自定义规则 // 可在工作流内部进行意图识别
intent := "general_inquiry"
if aiIntent, err := s.aiService.AnalyzeIntent(ctx, req.Message); err == nil && aiIntent != "" {
intent = aiIntent
} else if err != nil {
s.log.Errorf("Failed to analyze user intent via LLM: %v", err)
}
var aiResponse string var aiResponse string
// 根据意图选择处理方式 // 使用 ChatWorkflow 路由处理(意图判断 + 调用相应 Agent/Workflow
if intent == "knowledge_inquiry" || intent == "technical_support" { if s.chatWorkflow != nil {
// 使用智能问答(结合知识库) aiResponse, err = s.chatWorkflow.Chat(ctx, req.Message, req.SessionId)
aiResponse, err = s.customerUseCase.ProcessIntelligentChat(ctx, req.Message, req.SessionId)
if err != nil {
s.log.Errorf("Intelligent chat error: %v", err)
// 降级到普通AI服务
aiResponse, err = s.aiService.ProcessChat(ctx, req.Message, req.SessionId)
}
} else { } else {
// 使用普通AI服务 aiResponse = ""
aiResponse, err = s.aiService.ProcessChat(ctx, req.Message, req.SessionId) err = fmt.Errorf("chat workflow not configured")
} }
if err != nil { if err != nil {
@ -301,9 +292,14 @@ func (s *CustomerService) StreamChat(req *pb.StreamChatRequest, stream pb.Custom
} }
} }
// 使用AI服务进行流式对话 // 使用工作流进行流式对话
var fullResponse string var fullResponse string
responseChan, err := s.aiService.StreamChat(ctx, req.Message, req.SessionId) var responseChan <-chan string
if s.chatWorkflow != nil {
responseChan, err = s.chatWorkflow.Stream(ctx, req.Message, req.SessionId)
} else {
err = fmt.Errorf("chat workflow not configured")
}
if err != nil { if err != nil {
s.log.Errorf("AI stream service error: %v", err) s.log.Errorf("AI stream service error: %v", err)
return s.fallbackStreamResponse(req, stream) return s.fallbackStreamResponse(req, stream)
@ -445,9 +441,9 @@ func (s *CustomerService) HandleStreamChat(w http.ResponseWriter, r *http.Reques
} }
} }
// 使用AI服务进行流式对话 // 使用工作流进行流式对话
var fullResponse string var fullResponse string
responseChan, err := s.aiService.StreamChat(ctx, req.Message, req.SessionID) responseChan, err := s.chatWorkflow.Stream(ctx, req.Message, req.SessionID)
if err != nil { if err != nil {
s.log.Errorf("AI stream service error: %v", err) s.log.Errorf("AI stream service error: %v", err)
// 降级到模拟响应 // 降级到模拟响应
@ -548,3 +544,68 @@ func (s *CustomerService) fallbackSSEResponse(w http.ResponseWriter, flusher htt
fmt.Fprintf(w, "data: {\"type\":\"end\",\"session_id\":\"%s\"}\n\n", sessionID) fmt.Fprintf(w, "data: {\"type\":\"end\",\"session_id\":\"%s\"}\n\n", sessionID)
flusher.Flush() flusher.Flush()
} }
// HandleWebSocketChat WebSocket 聊天处理器
func (s *CustomerService) HandleWebSocketChat(conn *websocket.Conn) {
defer conn.Close()
ctx := context.Background()
for {
mt, msg, err := conn.ReadMessage()
if err != nil {
return
}
if mt != websocket.TextMessage {
continue
}
var req struct {
Message string `json:"message"`
SessionID string `json:"session_id"`
}
if json.Unmarshal(msg, &req) != nil {
continue
}
ch, err := s.chatWorkflow.Stream(ctx, req.Message, req.SessionID)
if err != nil {
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"error"}`))
continue
}
for chunk := range ch {
resp := map[string]interface{}{"type": "chunk", "content": chunk, "session_id": req.SessionID}
b, _ := json.Marshal(resp)
_ = conn.WriteMessage(websocket.TextMessage, b)
}
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"end"}`))
}
}
// HandleOrderDiagnosis 订单诊断工作流入口
func (s *CustomerService) HandleOrderDiagnosis(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
OrderID string `json:"order_id"`
Message string `json:"message"`
SessionID string `json:"session_id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
ctx := r.Context()
// 简化:直接调用工作流的 Chat可替换为订单专用流水线
resp, err := s.chatWorkflow.Chat(ctx, fmt.Sprintf("订单诊断: %s\n%s", req.OrderID, req.Message), req.SessionID)
if err != nil {
http.Error(w, "Workflow error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{"order_id": req.OrderID, "result": resp})
}
// HandleProductQuery 商品查询处理
func (s *CustomerService) HandleProductQuery(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(map[string]interface{}{})
}

View File

@ -3,4 +3,4 @@ package service
import "github.com/google/wire" import "github.com/google/wire"
// ProviderSet is service providers. // ProviderSet is service providers.
var ProviderSet = wire.NewSet(NewCustomerService) var ProviderSet = wire.NewSet(NewCustomerService)

View File

@ -1,8 +0,0 @@
package session
import (
"github.com/google/wire"
)
// ProviderSet is session providers.
var ProviderSet = wire.NewSet(NewMemorySessionManager)