#!/usr/bin/env python3 """ LangGraph 工作流测试 """ import asyncio import sys import os # 添加项目根目录到 Python 路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from src.workflows.chat_workflow import get_chat_workflow, create_chat_session from src.models.chat_models import ChatMessage, MessageRole from datetime import datetime async def test_workflow_simple(): """测试工作流简单模式""" print("🧪 测试 LangGraph 工作流 (简单模式)") print("=" * 50) try: # 获取工作流 workflow = get_chat_workflow() # 测试用例 test_cases = [ "你好,我是新用户", "查询订单 ORD12345", "我的订单 ORD12346 有什么问题吗?", "今天天气怎么样?", "谢谢你的帮助" ] for i, user_input in enumerate(test_cases, 1): print(f"\n📝 测试 {i}: {user_input}") # 使用简单模式 result = await workflow.run_simple(user_input) print(f"🤖 AI: {result['response']}") print(f"✅ 会话ID: {result['session_id'][:8]}...") print(f"📊 处理步骤: {len(result['results'])} 个") print("\n🎉 工作流简单模式测试完成!") except Exception as e: print(f"❌ 测试失败: {e}") raise async def test_workflow_streaming(): """测试工作流流式模式""" print("\n🧪 测试 LangGraph 工作流 (流式模式)") print("=" * 50) try: # 获取工作流 workflow = get_chat_workflow() session_id = await create_chat_session() # 测试订单查询 print(f"\n📝 测试订单查询 (会话: {session_id[:8]}...)") print("用户: 帮我查询订单 ORD12345 的状态") print("🤖 AI: ", end="", flush=True) async for result in workflow.run_streaming("帮我查询订单 ORD12345 的状态", session_id): result_type = result.get("type", "") content = result.get("content", "") if result_type == "workflow_start": print(f"\n🔄 {content}") elif result_type == "intent_analysis": print(f"\n🎯 意图分析完成") elif result_type == "route_decision": print(f"\n🚦 {content}") elif result_type == "order_info": print(f"\n📦 订单信息已获取") elif result_type == "thinking_start": print(f"\n🤔 {content}") elif result_type == "thinking": print(content, end="", flush=True) elif result_type == "diagnosis_result": print(f"\n🔍 诊断完成") elif result_type == "final_response": print(f"\n💬 最终回复: {content[:100]}...") elif result_type == "error": print(f"\n❌ {content}") print("\n✅ 订单查询测试完成") # 测试自然对话 print(f"\n📝 测试自然对话") print("用户: 今天天气真不错") print("🤖 AI: ", end="", flush=True) async for result in workflow.run_streaming("今天天气真不错", session_id): result_type = result.get("type", "") content = result.get("content", "") if result_type == "chat_chunk": print(content, end="", flush=True) elif result_type == "chat_complete": if not content: # 如果没有通过 chunk 获取到内容 print(content, end="", flush=True) print("\n✅ 自然对话测试完成") print("\n🎉 工作流流式模式测试完成!") except Exception as e: print(f"❌ 测试失败: {e}") raise async def test_intent_analysis(): """测试意图分析""" print("\n🧪 测试意图分析") print("=" * 50) try: from src.workflows.nodes.intent_analysis import intent_analysis_node from src.models.chat_models import WorkflowState # 测试用例 test_cases = [ ("查询订单 ORD12345", "订单查询"), ("我的订单什么时候到?", "订单查询"), ("今天天气怎么样?", "一般对话"), ("你好,很高兴认识你", "一般对话"), ("订单 ORD12346 有问题", "订单查询") ] for user_input, expected_type in test_cases: print(f"\n📝 测试: {user_input}") # 创建工作流状态 state = WorkflowState( session_id="test_intent", user_input=user_input, chat_history=[], timestamp=datetime.now(), current_node="intent_analysis" ) # 运行意图分析 result = await intent_analysis_node(state) intent_result = result.get("intent_result", {}) intent_type = intent_result.get("intent_type", "unknown") confidence = intent_result.get("confidence", 0.0) print(f"🎯 预期: {expected_type}") print(f"🎯 结果: {intent_type} (置信度: {confidence:.2f})") print("\n🎉 意图分析测试完成!") except Exception as e: print(f"❌ 测试失败: {e}") raise async def main(): """主测试函数""" print("🚀 开始测试 LangGraph 工作流") try: await test_workflow_simple() await test_workflow_streaming() await test_intent_analysis() print("\n🎊 所有工作流测试通过!") except Exception as e: print(f"\n💥 测试过程中出现错误: {e}") sys.exit(1) if __name__ == "__main__": asyncio.run(main())