diff --git a/app/api/routes.py b/app/api/routes.py index cc4a59d..43735b7 100644 --- a/app/api/routes.py +++ b/app/api/routes.py @@ -34,6 +34,7 @@ class QueryRequest(BaseModel): top_k: int = 5 stream: bool = False think: bool = False + only_rag: bool = False # 是否仅使用RAG检索结果,不进行LLM兜底 class IngestResponse(BaseModel): filename: str @@ -121,16 +122,41 @@ async def query_knowledge_base( enable_rerank=settings.RERANK_ENABLED ) - # 处理流式输出 (SSE 协议) + # 处理流式输出 (SSE 协议 - OpenAI 兼容格式) if request.stream: + import time async def stream_generator(): - # SSE 格式化辅助函数 - def sse_pack(event: str, text: str) -> str: - # 使用 JSON 包装 data 内容,确保换行符和特殊字符被正确转义 - data = json.dumps({"text": text}, ensure_ascii=False) - return f"event: {event}\ndata: {data}\n\n" + chat_id = f"chatcmpl-{secrets.token_hex(12)}" + created_time = int(time.time()) + model_name = settings.LLM_MODEL - yield sse_pack("thinking", "1. 上下文检索中...\n") + # 辅助函数:构造 OpenAI 兼容的 Chunk + def openai_chunk(content=None, reasoning_content=None, finish_reason=None, extra_delta=None): + delta = {} + if content: + delta["content"] = content + if reasoning_content: + delta["reasoning_content"] = reasoning_content + if extra_delta: + delta.update(extra_delta) + + chunk = { + "id": chat_id, + "object": "chat.completion.chunk", + "created": created_time, + "model": model_name, + "choices": [ + { + "index": 0, + "delta": delta, + "finish_reason": finish_reason + } + ] + } + return f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" + + # 1. 发送检索状态 (作为思考过程的一部分) + yield openai_chunk(reasoning_content="1. 上下文检索中...\n") context_param = QueryParam( mode=request.mode, @@ -139,9 +165,8 @@ async def query_knowledge_base( enable_rerank=settings.RERANK_ENABLED ) - # 获取上下文 (这步耗时较长,包含图遍历) + # 获取上下文 context_resp = await rag.aquery(request.query, param=context_param) - logging.info(f"Context Response: {context_resp}") # 判断检索状态 @@ -153,17 +178,28 @@ async def query_knowledge_base( think = request.think if has_context: - yield sse_pack("system", "retrieved") # 发送系统事件:已检索到信息 - yield sse_pack("thinking", f"2. 上下文已检索 (长度: {len(context_resp)} 字符).\n") + yield openai_chunk( + reasoning_content=f"2. 上下文已检索 (长度: {len(context_resp)} 字符).\n", + extra_delta={"x_rag_status": "hit"} + ) else: - yield sse_pack("system", "missed") # 发送系统事件:未检索到信息 - yield sse_pack("thinking", "2. 未找到相关上下文,将依赖 LLM 自身知识\n") + yield openai_chunk( + reasoning_content="2. 未找到相关上下文\n", + extra_delta={"x_rag_status": "miss"} + ) + + # 如果开启了仅RAG模式且未找到上下文,则直接结束 + if request.only_rag: + yield openai_chunk(content="未找到相关知识库内容。", finish_reason="stop") + yield "data: [DONE]\n\n" + return + + yield openai_chunk(reasoning_content=" (将依赖 LLM 自身知识)\n") think = False - yield sse_pack("thinking", "3. 答案生成中...\n") + yield openai_chunk(reasoning_content="3. 答案生成中...\n") # 2. 生成答案 - # 手动构建 System Prompt sys_prompt = CUSTOM_RAG_RESPONSE_PROMPT.format( context_data=context_resp, response_type="Multiple Paragraphs", @@ -179,23 +215,18 @@ async def query_knowledge_base( hashing_kv=rag.llm_response_cache ) - thinkState = 0 # think 状态 0: 未开始 1: 开始 2: 结束 async for chunk in stream_resp: if isinstance(chunk, dict): if chunk.get("type") == "thinking": - if thinkState == 0: - yield sse_pack("thinking", "\n思考:\n") - thinkState = 1 - - yield sse_pack("thinking", chunk["content"]) + yield openai_chunk(reasoning_content=chunk["content"]) elif chunk.get("type") == "content": - if thinkState == 1: - yield sse_pack("none", "\n\n\n") - thinkState = 2 - - yield sse_pack("answer", chunk["content"]) + yield openai_chunk(content=chunk["content"]) elif chunk: - yield sse_pack("answer", chunk) + yield openai_chunk(content=chunk) + + # 发送结束标记 + yield openai_chunk(finish_reason="stop") + yield "data: [DONE]\n\n" # 使用 text/event-stream Content-Type return StreamingResponse(stream_generator(), media_type="text/event-stream") diff --git a/app/static/admin.html b/app/static/admin.html index 9607935..fb17267 100644 --- a/app/static/admin.html +++ b/app/static/admin.html @@ -103,8 +103,9 @@
-