A simple MCP server that enables meeting room booking through an AI assistant, supporting room availability checks and booking operations with React Agent pattern for tool calling.
该项目实现了一个入门的MCP Client和MCP Server交互的功能。 MCP Client中使用了React Agent模式来实现工具的调用。 MCP Server实现了两种启动方式即(stdio和sse),该例子中使用了sse的方式。
使用ReAct模式创建一个Agent,也可以使用langchain等框架实现,我这里没有使用AI框架。 在项目目录下创建‘react_agent.py’文件
import asyncio import json import os import datetime from openai import AsyncOpenAI from mcp_client import MCPClient from pprint import pprint class ReActAgent: def __init__(self, mcp_client: MCPClient): # 从环境变量获取API密钥,如果不存在则使用默认值 # 修改base_url来使用不同的大模型供应商 # React模式一次使用一个工具 api_key = os.environ.get("API_KEY", "your llm api key") self.llm = AsyncOpenAI( api_key=api_key, base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) self.mcp_client = mcp_client self.messages = [] self.system_prompt = """ 你是一个专业会议室预定AI助手,必须严格遵循以下规则运行: 【核心机制】 采用Thought→Action→Observation循环工作流: 1. 解析用户需求时,必须明确识别以下要素: - 精确到分钟的时间段(自动补全日期格式) - 预定目标(查询/预定) 2. 工具调用规范: 只能使用下列工具且每次只能调用1个: - list_idle_meeting_rooms(必填参数:start_time, end_time) - book_meeting_room(必填参数:room_id, start_time, end_time) 参数要求: • 时间参数必须转为"YYYY-MM-DD HH:mm:ss"格式 • room_id必须从list_idle_meeting_rooms返回结果中选取 3. 执行流程强制要求: (1) 预定操作前必须调用list_idle_meeting_rooms验证时段可用性 【输出规则】 1. 未完成时输出json格式: { "thought": "推理过程(必须包含时间转换逻辑)", "action": {"name": "工具名", "args": {"参数1":"值1"}}, "observation": "工具返回原始结果" } 2. 未完成标准: - 未调用工具 - 未返回最终答案 - 明确提示预定失败 3. 完成标准:当且仅当满足以下条件时输出最终答案: - book_meeting_room返回预定成功提示 - 包含有效的会议室ID 4. 最终答案json格式: { "final_answer": "预定成功:{room_id} ({start_time}至{end_time})" } 【校验机制】 1. 时间参数三重验证: (1) 格式正则校验:^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$ (2) 时间逻辑:start_time list[types.TextContent]: return ["会议室1", "会议室2", "会议室3"] async def book_meeting_room(room_id: str, start_time: str, end_time: str) -> list[types.TextContent]: # 随机返回预定成功或失败 print(f"预定会议室{room_id},开始时间{start_time},结束时间{end_time}") if room_id in ["会议室1", "会议室2", "会议室3"]: return f"{room_id}预定成功" if random.random() list[types.Tool]: return [ types.Tool( name="list_idle_meeting_rooms", description="list idle meeting rooms", inputSchema={ "type": "object", "properties": { "start_time": { "type": "string", "description": "开始时间" }, "end_time": { "type": "string", "description": "结束时间" } }, "required": ["start_time", "end_time"] } ), types.Tool( name="book_meeting_room", description="book meeting room", inputSchema={ "type": "object", "properties": { "room_id": { "type": "string", "description": "会议室ID" }, "start_time": { "type": "string", "description": "开始时间" }, "end_time": { "type": "string", "description": "结束时间" } }, "required": ["room_id", "start_time", "end_time"] } ) ] @app.call_tool() async def call_tool( name: str, arguments: dict ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: if name == "list_idle_meeting_rooms": start_time = arguments["start_time"] end_time = arguments["end_time"] result = await list_idle_meeting_rooms(start_time, end_time) return [types.TextContent(type="text", text=str(result))] elif name == "book_meeting_room": print(arguments) room_id = arguments["room_id"] start_time = arguments["start_time"] end_time = arguments["end_time"] result = await book_meeting_room(room_id, start_time, end_time) return [types.TextContent(type="text", text=str(result))] raise ValueError(f"Tool not found: {name}") if __name__ == "__main__": # Run the FastAPI app with uvicorn import sys # 默认使用标准输入输出传输 transport = "stdio" port = 8000 # 检查命令行参数 if len(sys.argv) > 1: if sys.argv[1] == "sse": transport = "sse" if len(sys.argv) > 2: try: port = int(sys.argv[2]) except ValueError: pass if transport == "sse": # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("weather-mcp") # 配置 SSE 传输,增加超时和重试参数 sse = SseServerTransport( "/message/", ) async def handle_sse(request): try: logger.info(f"建立新的SSE连接: {request.client}") async with sse.connect_sse(request.scope, request.receive, request._send) as streams: try: await app.run(streams[0], streams[1], app.create_initialization_options()) except Exception as e: logger.error(f"处理SSE连接时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) except Exception as e: logger.error(f"建立SSE连接时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) return starlette_app = Starlette( debug=True, routes=[ Route("/sse", endpoint=handle_sse), Mount("/message/", app=sse.handle_post_message), ] ) print(f"在端口{port}上启动MCP服务器,使用SSE传输") print(f"连接URL: http://127.0.0.1:{port}/sse") uvicorn.run(starlette_app, host="0.0.0.0", port=port, log_level="info") else: from mcp.server.stdio import stdio_server import anyio async def run_stdio(): async with stdio_server() as streams: await app.run(streams[0], streams[1], app.create_initialization_options()) print("使用标准输入输出传输启动MCP服务器") anyio.run(run_stdio)
在项目目录创建‘mcp_client.py’文件
from typing import Optional from contextlib import AsyncExitStack from mcp.client.sse import sse_client from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client class MCPClient: def __init__(self): self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() async def connect_to_server(self, server_script_path: str, server_interpreter_path: str): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ is_python = server_script_path.endswith(".py") is_js = server_script_path.endswith(".js") if not (is_python or is_js): raise ValueError("Server script must be a .py or .js file") command = "python" if is_python else "node" command = server_interpreter_path server_params = StdioServerParameters( command=command, args=[server_script_path], env=None ) stdio_transport = await self.exit_stack.enter_async_context( stdio_client(server_params) ) self.stdio, self.write = stdio_transport self.session = await self.exit_stack.enter_async_context( ClientSession(self.stdio, self.write) ) await self.session.initialize() # 列出可用的工具 response = await self.session.list_tools() tools = response.tools print(" Connected to server with tools:", [tool.name for tool in tools]) async def connect_to_sse_server(self, server_url: str): """Connect to an MCP server Args: server_script_path: Path to the server script (.py or .js) """ self._streams_context = sse_client(url=server_url) streams = await self._streams_context.__aenter__() self._session_context = ClientSession(*streams) self.session = await self._session_context.__aenter__() await self.session.initialize() # 列出可用的工具 response = await self.session.list_tools() tools = response.tools print(" Connected to server with tools:", [tool.name for tool in tools])
使用命令启动agent,并从控制台观察输出。
python react_agent.py
控制台输出
第1次循环
调用工具list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果['会议室1', '会议室2', '会议室3']
当前步骤1的reAct详情如下:
thought: 用户想预定明天上午9点到10点半的会议室。首先需要将时间转换为正确的格式:start_time='2025-04-10 09:00:00',end_time='2025-04-10 10:30:00'。然后调用list_idle_meeting_rooms 来查询这个时间段内空闲的会议室。
action: list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'}
observation: ['会议室1', '会议室2', '会议室3']
第2次循环
调用工具book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果会议室1预定成功
当前步骤2的reAct详情如下:
thought: 查询结果显示有三个会议室可用:['会议室1', '会议室2', '会议室3']。用户要求随意选择一个,我将选择'会议室1'进行预定。现在需要调用book_meeting_room工具来完成预定操作。
action: book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'}
observation: 会议室1预定成功
第3次循环
最终结果: 预定成功:会议室1 (2025-04-10 09:00:00至2025-04-10 10:30:00)
Discover shared experiences
Shared threads will appear here, showcasing real-world applications and insights from the community. Check back soon for updates!