ai-courseware/eino-project/internal/service/customer.go

619 lines
18 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/gorilla/websocket"
pb "eino-project/api/customer/v1"
"eino-project/internal/biz"
"eino-project/internal/domain/agent"
"eino-project/internal/domain/llm"
"eino-project/internal/domain/monitor"
"eino-project/internal/domain/session"
wf "eino-project/internal/domain/workflow"
"github.com/cloudwego/eino/adk"
"github.com/go-kratos/kratos/v2/log"
)
// CustomerService 智能客服服务实现
type CustomerService struct {
pb.UnimplementedCustomerServiceServer
customerUseCase *biz.CustomerUseCase
sessionManager session.SessionManager
monitor monitor.Monitor
log *log.Helper
chatWorkflow wf.ChatWorkflow
// productAgent 商品查询AgentEino ADK ChatModelAgent实现绑定工具并自动选择调用
productAgent adk.Agent
}
// NewCustomerService 创建智能客服服务
func NewCustomerService(
customerUseCase *biz.CustomerUseCase,
sessionManager session.SessionManager,
monitor monitor.Monitor,
logger log.Logger,
models llm.LLM,
) *CustomerService {
return &CustomerService{
customerUseCase: customerUseCase,
sessionManager: sessionManager,
monitor: monitor,
log: log.NewHelper(logger),
// 构建商品查询 ChatModelAgent绑定工具并让模型自动选择调用
productAgent: agent.NewProductChatAgent(context.Background(), models),
}
}
func (s *CustomerService) SetChatWorkflow(w wf.ChatWorkflow) { s.chatWorkflow = w }
// SystemStatus 系统状态检查
func (s *CustomerService) SystemStatus(ctx context.Context, req *pb.SystemStatusRequest) (*pb.SystemStatusResponse, error) {
s.log.WithContext(ctx).Info("SystemStatus called")
status, err := s.customerUseCase.GetSystemStatus(ctx)
if err != nil {
return nil, err
}
// 转换业务层数据到 protobuf 格式
services := make(map[string]*pb.ServiceStatus)
for key, service := range status.Services {
services[key] = &pb.ServiceStatus{
Name: service.Name,
Status: service.Status,
Message: service.Message,
}
}
return &pb.SystemStatusResponse{
Status: status.Status,
Services: services,
Version: status.Version,
}, nil
}
// Chat 处理聊天消息
func (s *CustomerService) Chat(ctx context.Context, req *pb.ChatRequest) (*pb.ChatResponse, error) {
startTime := time.Now()
s.log.WithContext(ctx).Infof("Chat called with message: %s", req.Message)
// 记录监控指标
var success bool
if s.monitor != nil {
defer func() {
duration := time.Since(startTime)
s.monitor.RecordRequest(ctx, "chat", duration, success)
}()
}
// 添加用户消息到会话
_, err := s.sessionManager.AddMessage(ctx, req.SessionId, "user", req.Message)
if err != nil {
s.log.Errorf("Failed to add user message: %v", err)
// 如果会话不存在,创建新会话
if req.SessionId == "" {
session, createErr := s.sessionManager.CreateSession(ctx, "default_user", "新对话")
if createErr != nil {
return nil, fmt.Errorf("failed to create session: %w", createErr)
}
req.SessionId = session.ID
}
// 重试添加消息
_, err = s.sessionManager.AddMessage(ctx, req.SessionId, "user", req.Message)
if err != nil {
return nil, fmt.Errorf("failed to add user message: %w", err)
}
}
// 可在工作流内部进行意图识别
var aiResponse string
// 使用 ChatWorkflow 路由处理(意图判断 + 调用相应 Agent/Workflow
if s.chatWorkflow != nil {
aiResponse, err = s.chatWorkflow.Chat(ctx, req.Message, req.SessionId)
} else {
aiResponse = ""
err = fmt.Errorf("chat workflow not configured")
}
if err != nil {
s.log.Errorf("AI service error: %v", err)
// 降级到业务层处理
chatMessage, bizErr := s.customerUseCase.ProcessChat(ctx, req.SessionId, req.Message)
if bizErr != nil {
return nil, fmt.Errorf("both AI and business logic failed: AI error: %v, Biz error: %v", err, bizErr)
}
aiResponse = chatMessage.Message
}
// 添加AI回复到会话
_, err = s.sessionManager.AddMessage(ctx, req.SessionId, "assistant", aiResponse)
if err != nil {
s.log.Errorf("Failed to add AI message: %v", err)
}
success = true
return &pb.ChatResponse{
SessionId: req.SessionId,
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
Type: "response",
Payload: &pb.ChatPayload{
DataType: "text",
Content: &pb.ChatContent{
Chunk: aiResponse,
FullMessage: aiResponse,
IsFinal: true,
},
},
}, nil
}
// GetSessions 获取会话列表
func (s *CustomerService) GetSessions(ctx context.Context, req *pb.GetSessionsRequest) (*pb.GetSessionsResponse, error) {
s.log.WithContext(ctx).Info("GetSessions called")
// 使用会话管理器获取会话列表
sessions, err := s.sessionManager.ListSessions(ctx, "default_user") // 暂时使用默认用户ID
if err != nil {
return nil, fmt.Errorf("failed to list sessions: %w", err)
}
// 转换会话管理器数据到 protobuf 格式
pbSessions := make([]*pb.Session, 0, len(sessions))
for _, session := range sessions {
pbSessions = append(pbSessions, &pb.Session{
SessionId: session.ID,
Title: session.Title,
CreatedAt: session.CreatedAt.Format("2006-01-02 15:04:05"),
})
}
return &pb.GetSessionsResponse{
Sessions: pbSessions,
}, nil
}
// CreateSession 创建新会话
func (s *CustomerService) CreateSession(ctx context.Context, req *pb.CreateSessionRequest) (*pb.CreateSessionResponse, error) {
s.log.WithContext(ctx).Infof("CreateSession called with title: %s", req.Title)
// 使用会话管理器创建新会话
session, err := s.sessionManager.CreateSession(ctx, "default_user", req.Title) // 暂时使用默认用户ID
if err != nil {
return nil, fmt.Errorf("failed to create session: %w", err)
}
return &pb.CreateSessionResponse{
SessionId: session.ID,
Title: session.Title,
CreatedAt: session.CreatedAt.Format("2006-01-02 15:04:05"),
}, nil
}
// UploadKnowledge 上传知识库文档
func (s *CustomerService) UploadKnowledge(ctx context.Context, req *pb.UploadKnowledgeRequest) (*pb.UploadKnowledgeResponse, error) {
s.log.WithContext(ctx).Infof("UploadKnowledge called with filename: %s", req.FileName)
// 调用业务层处理文档上传和向量化
err := s.customerUseCase.ProcessKnowledgeUpload(ctx, string(req.Content), req.FileName, req.FileType)
if err != nil {
s.log.Errorf("Failed to process knowledge upload: %v", err)
return &pb.UploadKnowledgeResponse{
Success: false,
FileId: "",
}, err
}
// 生成文件ID
fileID := "file_" + req.FileName
return &pb.UploadKnowledgeResponse{
Success: true,
FileId: fileID,
}, nil
}
// ListKnowledge 获取知识库文档列表
func (s *CustomerService) ListKnowledge(ctx context.Context, req *pb.ListKnowledgeRequest) (*pb.ListKnowledgeResponse, error) {
s.log.WithContext(ctx).Info("ListKnowledge called")
documents, err := s.customerUseCase.ListKnowledge(ctx)
if err != nil {
return nil, err
}
// 转换业务层数据到 protobuf 格式
pbDocuments := make([]*pb.KnowledgeDocument, 0, len(documents))
for _, doc := range documents {
pbDocuments = append(pbDocuments, &pb.KnowledgeDocument{
Id: doc.ID,
FileName: doc.FileName,
FileType: doc.FileType,
ContentPreview: doc.ContentPreview,
UploadTime: doc.UploadTime.Format("2006-01-02 15:04:05"),
Status: doc.Status,
})
}
return &pb.ListKnowledgeResponse{
Documents: pbDocuments,
}, nil
}
// DeleteKnowledge 删除知识库文档
func (s *CustomerService) DeleteKnowledge(ctx context.Context, req *pb.DeleteKnowledgeRequest) (*pb.DeleteKnowledgeResponse, error) {
s.log.WithContext(ctx).Infof("DeleteKnowledge called with id: %s", req.Id)
// 简单实现,实际应该调用业务层处理文档删除
return &pb.DeleteKnowledgeResponse{
Success: true,
}, nil
}
// QueryOrder 查询订单信息
func (s *CustomerService) QueryOrder(ctx context.Context, req *pb.QueryOrderRequest) (*pb.QueryOrderResponse, error) {
s.log.WithContext(ctx).Infof("QueryOrder called with orderID: %s", req.OrderId)
orderDetails, err := s.customerUseCase.QueryOrder(ctx, req.OrderId)
if err != nil {
return nil, err
}
return &pb.QueryOrderResponse{
OrderId: orderDetails.OrderID,
Status: orderDetails.Status,
NeedAi: orderDetails.NeedAI,
Details: &pb.OrderDetails{
Product: orderDetails.Product,
Amount: orderDetails.Amount,
CreateTime: orderDetails.CreateTime.Format("2006-01-02 15:04:05"),
},
}, nil
}
// StreamChat 流式聊天处理 (gRPC流式接口)
func (s *CustomerService) StreamChat(req *pb.StreamChatRequest, stream pb.CustomerService_StreamChatServer) error {
s.log.Infof("StreamChat called with message: %s, session: %s", req.Message, req.SessionId)
ctx := stream.Context()
// 添加用户消息到会话
_, err := s.sessionManager.AddMessage(ctx, req.SessionId, "user", req.Message)
if err != nil {
s.log.Errorf("Failed to add user message: %v", err)
// 如果会话不存在,创建新会话
if req.SessionId == "" {
session, createErr := s.sessionManager.CreateSession(ctx, "default_user", "新对话")
if createErr != nil {
return fmt.Errorf("failed to create session: %w", createErr)
}
req.SessionId = session.ID
}
// 重试添加消息
_, err = s.sessionManager.AddMessage(ctx, req.SessionId, "user", req.Message)
if err != nil {
return fmt.Errorf("failed to add user message: %w", err)
}
}
// 使用工作流进行流式对话
var fullResponse string
var responseChan <-chan string
if s.chatWorkflow != nil {
responseChan, err = s.chatWorkflow.Stream(ctx, req.Message, req.SessionId)
} else {
err = fmt.Errorf("chat workflow not configured")
}
if err != nil {
s.log.Errorf("AI stream service error: %v", err)
return s.fallbackStreamResponse(req, stream)
}
for chunk := range responseChan {
fullResponse += chunk
response := &pb.StreamChatResponse{
SessionId: req.SessionId,
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
Type: "chunk",
Content: chunk,
IsFinal: false,
}
if err := stream.Send(response); err != nil {
s.log.Errorf("Failed to send stream response: %v", err)
return err
}
}
// 发送最终响应
response := &pb.StreamChatResponse{
SessionId: req.SessionId,
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
Type: "final",
Content: "",
IsFinal: true,
}
if err := stream.Send(response); err != nil {
s.log.Errorf("Failed to send final response: %v", err)
return err
}
// 添加完整的AI回复到会话
if fullResponse != "" {
_, err = s.sessionManager.AddMessage(ctx, req.SessionId, "assistant", fullResponse)
if err != nil {
s.log.Errorf("Failed to add AI message: %v", err)
}
}
return nil
}
// fallbackStreamResponse 降级流式响应
func (s *CustomerService) fallbackStreamResponse(req *pb.StreamChatRequest, stream pb.CustomerService_StreamChatServer) error {
responses := []string{
"正在思考您的问题",
"根据您的描述",
"我建议您可以尝试以下解决方案",
"如果问题仍然存在,请联系技术支持",
}
for i, content := range responses {
select {
case <-stream.Context().Done():
return stream.Context().Err()
default:
}
response := &pb.StreamChatResponse{
SessionId: req.SessionId,
Timestamp: time.Now().Format("2006-01-02 15:04:05"),
Type: "chunk",
Content: content,
IsFinal: i == len(responses)-1,
}
if err := stream.Send(response); err != nil {
s.log.Errorf("Failed to send stream response: %v", err)
return err
}
// 模拟处理延迟
time.Sleep(500 * time.Millisecond)
}
return nil
}
// HandleStreamChat HTTP SSE流式聊天处理器
func (s *CustomerService) HandleStreamChat(w http.ResponseWriter, r *http.Request) {
s.log.Info("HandleStreamChat called")
// 设置SSE响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Cache-Control")
// 解析请求参数
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
Message string `json:"message"`
SessionID string `json:"session_id"`
Model string `json:"model,omitempty"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
// 获取flusher用于实时推送
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
ctx := r.Context()
// 添加用户消息到会话
_, err := s.sessionManager.AddMessage(ctx, req.SessionID, "user", req.Message)
if err != nil {
s.log.Errorf("Failed to add user message: %v", err)
// 如果会话不存在,创建新会话
if req.SessionID == "" {
session, createErr := s.sessionManager.CreateSession(ctx, "default_user", "新对话")
if createErr != nil {
http.Error(w, "Failed to create session", http.StatusInternalServerError)
return
}
req.SessionID = session.ID
}
// 重试添加消息
_, err = s.sessionManager.AddMessage(ctx, req.SessionID, "user", req.Message)
if err != nil {
http.Error(w, "Failed to add user message", http.StatusInternalServerError)
return
}
}
// 使用工作流进行流式对话
var fullResponse string
responseChan, err := s.chatWorkflow.Stream(ctx, req.Message, req.SessionID)
if err != nil {
s.log.Errorf("AI stream service error: %v", err)
// 降级到模拟响应
s.fallbackSSEResponse(w, flusher, req.SessionID, r.Context())
return
}
for chunk := range responseChan {
fullResponse += chunk
// 构造SSE响应数据
response := map[string]interface{}{
"session_id": req.SessionID,
"timestamp": time.Now().Format("2006-01-02 15:04:05"),
"type": "chunk",
"content": chunk,
"is_final": false,
}
jsonData, err := json.Marshal(response)
if err != nil {
s.log.Errorf("Failed to marshal response: %v", err)
continue
}
// 发送SSE数据
fmt.Fprintf(w, "data: %s\n\n", jsonData)
flusher.Flush()
}
// 发送结束标记
endResponse := map[string]interface{}{
"type": "end",
"session_id": req.SessionID,
"timestamp": time.Now().Format("2006-01-02 15:04:05"),
}
jsonData, err := json.Marshal(endResponse)
if err != nil {
s.log.Errorf("Failed to marshal end response: %v", err)
} else {
fmt.Fprintf(w, "data: %s\n\n", jsonData)
flusher.Flush()
}
// 添加完整的AI回复到会话
if fullResponse != "" {
_, err = s.sessionManager.AddMessage(ctx, req.SessionID, "assistant", fullResponse)
if err != nil {
s.log.Errorf("Failed to add AI message: %v", err)
}
}
}
// fallbackSSEResponse 降级SSE响应
func (s *CustomerService) fallbackSSEResponse(w http.ResponseWriter, flusher http.Flusher, sessionID string, ctx context.Context) {
responses := []string{
"正在分析您的问题...",
"根据您的描述,我理解您遇到的情况。",
"让我为您提供一些建议和解决方案:",
"1. 首先,请检查相关设置是否正确",
"2. 如果问题持续,建议重启相关服务",
"3. 必要时可以联系技术支持获得进一步帮助",
"希望这些建议对您有帮助!",
}
for i, content := range responses {
select {
case <-ctx.Done():
return
default:
}
// 构造SSE响应数据
response := map[string]interface{}{
"session_id": sessionID,
"timestamp": time.Now().Format("2006-01-02 15:04:05"),
"type": "chunk",
"content": content,
"is_final": i == len(responses)-1,
}
jsonData, err := json.Marshal(response)
if err != nil {
s.log.Errorf("Failed to marshal response: %v", err)
continue
}
// 发送SSE数据
fmt.Fprintf(w, "data: %s\n\n", jsonData)
flusher.Flush()
// 模拟处理延迟
time.Sleep(800 * time.Millisecond)
}
// 发送结束标记
fmt.Fprintf(w, "data: {\"type\":\"end\",\"session_id\":\"%s\"}\n\n", sessionID)
flusher.Flush()
}
// HandleWebSocketChat WebSocket 聊天处理器
func (s *CustomerService) HandleWebSocketChat(conn *websocket.Conn) {
defer conn.Close()
ctx := context.Background()
for {
mt, msg, err := conn.ReadMessage()
if err != nil {
return
}
if mt != websocket.TextMessage {
continue
}
var req struct {
Message string `json:"message"`
SessionID string `json:"session_id"`
}
if json.Unmarshal(msg, &req) != nil {
continue
}
ch, err := s.chatWorkflow.Stream(ctx, req.Message, req.SessionID)
if err != nil {
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"error"}`))
continue
}
for chunk := range ch {
resp := map[string]interface{}{"type": "chunk", "content": chunk, "session_id": req.SessionID}
b, _ := json.Marshal(resp)
_ = conn.WriteMessage(websocket.TextMessage, b)
}
_ = conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"end"}`))
}
}
// HandleOrderDiagnosis 订单诊断工作流入口
func (s *CustomerService) HandleOrderDiagnosis(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
OrderID string `json:"order_id"`
Message string `json:"message"`
SessionID string `json:"session_id"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
ctx := r.Context()
// 简化:直接调用工作流的 Chat可替换为订单专用流水线
resp, err := s.chatWorkflow.Chat(ctx, fmt.Sprintf("订单诊断: %s\n%s", req.OrderID, req.Message), req.SessionID)
if err != nil {
http.Error(w, "Workflow error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{"order_id": req.OrderID, "result": resp})
}