简介
该项目实现了一个入门的MCP Client和MCP Server交互的功能。
MCP Client中使用了React Agent模式来实现工具的调用。
MCP Server实现了两种启动方式即(stdio和sse),该例子中使用了sse的方式。
创建Agent
使用ReAct模式创建一个Agent,也可以使用langchain等框架实现,我这里没有使用AI框架。
在项目目录下创建‘react_agent.py’文件
import asyncio
import json
import os
import datetime
from openai import AsyncOpenAI
from mcp_client import MCPClient
from pprint import pprint
class ReActAgent:
def __init__(self, mcp_client: MCPClient):
# 从环境变量获取API密钥,如果不存在则使用默认值
# 修改base_url来使用不同的大模型供应商
# React模式一次使用一个工具
api_key = os.environ.get("API_KEY", "your llm api key")
self.llm = AsyncOpenAI(
api_key=api_key,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
self.mcp_client = mcp_client
self.messages = []
self.system_prompt = """
你是一个专业会议室预定AI助手,必须严格遵循以下规则运行:
【核心机制】
采用Thought→Action→Observation循环工作流:
1. 解析用户需求时,必须明确识别以下要素:
- 精确到分钟的时间段(自动补全日期格式)
- 预定目标(查询/预定)
2. 工具调用规范:
只能使用下列工具且每次只能调用1个:
- list_idle_meeting_rooms(必填参数:start_time, end_time)
- book_meeting_room(必填参数:room_id, start_time, end_time)
参数要求:
• 时间参数必须转为"YYYY-MM-DD HH:mm:ss"格式
• room_id必须从list_idle_meeting_rooms返回结果中选取
3. 执行流程强制要求:
(1) 预定操作前必须调用list_idle_meeting_rooms验证时段可用性
【输出规则】
1. 未完成时输出json格式:
{
"thought": "推理过程(必须包含时间转换逻辑)",
"action": {"name": "工具名", "args": {"参数1":"值1"}},
"observation": "工具返回原始结果"
}
2. 未完成标准:
- 未调用工具
- 未返回最终答案
- 明确提示预定失败
3. 完成标准:当且仅当满足以下条件时输出最终答案:
- book_meeting_room返回预定成功提示
- 包含有效的会议室ID
4. 最终答案json格式:
{
"final_answer": "预定成功:{room_id} ({start_time}至{end_time})"
}
【校验机制】
1. 时间参数三重验证:
(1) 格式正则校验:^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$
(2) 时间逻辑:start_time list[types.TextContent]:
return ["会议室1", "会议室2", "会议室3"]
async def book_meeting_room(room_id: str, start_time: str, end_time: str) -> list[types.TextContent]:
# 随机返回预定成功或失败
print(f"预定会议室{room_id},开始时间{start_time},结束时间{end_time}")
if room_id in ["会议室1", "会议室2", "会议室3"]:
return f"{room_id}预定成功" if random.random() list[types.Tool]:
return [
types.Tool(
name="list_idle_meeting_rooms",
description="list idle meeting rooms",
inputSchema={
"type": "object",
"properties": {
"start_time": {
"type": "string",
"description": "开始时间"
},
"end_time": {
"type": "string",
"description": "结束时间"
}
},
"required": ["start_time", "end_time"]
}
),
types.Tool(
name="book_meeting_room",
description="book meeting room",
inputSchema={
"type": "object",
"properties": {
"room_id": {
"type": "string",
"description": "会议室ID"
},
"start_time": {
"type": "string",
"description": "开始时间"
},
"end_time": {
"type": "string",
"description": "结束时间"
}
},
"required": ["room_id", "start_time", "end_time"]
}
)
]
@app.call_tool()
async def call_tool(
name: str,
arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
if name == "list_idle_meeting_rooms":
start_time = arguments["start_time"]
end_time = arguments["end_time"]
result = await list_idle_meeting_rooms(start_time, end_time)
return [types.TextContent(type="text", text=str(result))]
elif name == "book_meeting_room":
print(arguments)
room_id = arguments["room_id"]
start_time = arguments["start_time"]
end_time = arguments["end_time"]
result = await book_meeting_room(room_id, start_time, end_time)
return [types.TextContent(type="text", text=str(result))]
raise ValueError(f"Tool not found: {name}")
if __name__ == "__main__":
# Run the FastAPI app with uvicorn
import sys
# 默认使用标准输入输出传输
transport = "stdio"
port = 8000
# 检查命令行参数
if len(sys.argv) > 1:
if sys.argv[1] == "sse":
transport = "sse"
if len(sys.argv) > 2:
try:
port = int(sys.argv[2])
except ValueError:
pass
if transport == "sse":
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("weather-mcp")
# 配置 SSE 传输,增加超时和重试参数
sse = SseServerTransport(
"/message/",
)
async def handle_sse(request):
try:
logger.info(f"建立新的SSE连接: {request.client}")
async with sse.connect_sse(request.scope, request.receive, request._send) as streams:
try:
await app.run(streams[0], streams[1], app.create_initialization_options())
except Exception as e:
logger.error(f"处理SSE连接时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
except Exception as e:
logger.error(f"建立SSE连接时出错: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return
starlette_app = Starlette(
debug=True,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/message/", app=sse.handle_post_message),
]
)
print(f"在端口{port}上启动MCP服务器,使用SSE传输")
print(f"连接URL: http://127.0.0.1:{port}/sse")
uvicorn.run(starlette_app, host="0.0.0.0", port=port, log_level="info")
else:
from mcp.server.stdio import stdio_server
import anyio
async def run_stdio():
async with stdio_server() as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
print("使用标准输入输出传输启动MCP服务器")
anyio.run(run_stdio)
创建一个MCP Server管理类
在项目目录创建‘mcp_client.py’文件
from typing import Optional
from contextlib import AsyncExitStack
from mcp.client.sse import sse_client
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class MCPClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
async def connect_to_server(self, server_script_path: str, server_interpreter_path: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
is_python = server_script_path.endswith(".py")
is_js = server_script_path.endswith(".js")
if not (is_python or is_js):
raise ValueError("Server script must be a .py or .js file")
command = "python" if is_python else "node"
command = server_interpreter_path
server_params = StdioServerParameters(
command=command, args=[server_script_path], env=None
)
stdio_transport = await self.exit_stack.enter_async_context(
stdio_client(server_params)
)
self.stdio, self.write = stdio_transport
self.session = await self.exit_stack.enter_async_context(
ClientSession(self.stdio, self.write)
)
await self.session.initialize()
# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools
print("
Connected to server with tools:", [tool.name for tool in tools])
async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server
Args:
server_script_path: Path to the server script (.py or .js)
"""
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.__aenter__()
self._session_context = ClientSession(*streams)
self.session = await self._session_context.__aenter__()
await self.session.initialize()
# 列出可用的工具
response = await self.session.list_tools()
tools = response.tools
print("
Connected to server with tools:", [tool.name for tool in tools])
启动MCP Client
使用命令启动agent,并从控制台观察输出。
python react_agent.py
控制台输出
第1次循环
调用工具list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果['会议室1', '会议室2', '会议室3']
当前步骤1的reAct详情如下:
thought: 用户想预定明天上午9点到10点半的会议室。首先需要将时间转换为正确的格式:start_time='2025-04-10 09:00:00',end_time='2025-04-10 10:30:00'。然后调用list_idle_meeting_rooms 来查询这个时间段内空闲的会议室。
action: list_idle_meeting_rooms,参数{'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'}
observation: ['会议室1', '会议室2', '会议室3']
第2次循环
调用工具book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'},返回结果会议室1预定成功
当前步骤2的reAct详情如下:
thought: 查询结果显示有三个会议室可用:['会议室1', '会议室2', '会议室3']。用户要求随意选择一个,我将选择'会议室1'进行预定。现在需要调用book_meeting_room工具来完成预定操作。
action: book_meeting_room,参数{'room_id': '会议室1', 'start_time': '2025-04-10 09:00:00', 'end_time': '2025-04-10 10:30:00'}
observation: 会议室1预定成功
第3次循环
最终结果: 预定成功:会议室1 (2025-04-10 09:00:00至2025-04-10 10:30:00)