Перейти к содержимому

Примеры кода

25+ готовых примеров MCP серверов — копируйте и адаптируйте под свои задачи

Коллекция готовых примеров для копирования и адаптации. Все примеры протестированы и работают с MCP Inspector.

calculator.py
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("calculator")
@mcp.tool()
def add(a: float, b: float) -> float:
"""Сложение двух чисел"""
return a + b
@mcp.tool()
def subtract(a: float, b: float) -> float:
"""Вычитание: a - b"""
return a - b
@mcp.tool()
def multiply(a: float, b: float) -> float:
"""Умножение двух чисел"""
return a * b
@mcp.tool()
def divide(a: float, b: float) -> float:
"""Деление: a / b"""
if b == 0:
raise ValueError("Деление на ноль невозможно")
return a / b
@mcp.tool()
def power(base: float, exponent: float) -> float:
"""Возведение в степень: base^exponent"""
return base ** exponent
if __name__ == "__main__":
mcp.run(transport="stdio")
file_manager.py
from mcp.server.fastmcp import FastMCP
from pathlib import Path
mcp = FastMCP("file-manager")
# Укажите разрешённую директорию
ALLOWED_DIR = Path("/tmp/mcp-files")
ALLOWED_DIR.mkdir(exist_ok=True)
def safe_path(filename: str) -> Path:
"""Проверка что путь внутри разрешённой директории"""
path = (ALLOWED_DIR / filename).resolve()
if not str(path).startswith(str(ALLOWED_DIR)):
raise ValueError("Доступ запрещён")
return path
@mcp.tool()
def read_file(filename: str) -> str:
"""Прочитать содержимое файла"""
path = safe_path(filename)
if not path.exists():
raise FileNotFoundError(f"Файл не найден: {filename}")
return path.read_text()
@mcp.tool()
def write_file(filename: str, content: str) -> str:
"""Записать содержимое в файл"""
path = safe_path(filename)
path.write_text(content)
return f"Записано {len(content)} символов в {filename}"
@mcp.tool()
def list_files() -> list[str]:
"""Список файлов в директории"""
return [f.name for f in ALLOWED_DIR.iterdir() if f.is_file()]
@mcp.tool()
def delete_file(filename: str) -> str:
"""Удалить файл"""
path = safe_path(filename)
if not path.exists():
raise FileNotFoundError(f"Файл не найден: {filename}")
path.unlink()
return f"Файл {filename} удалён"
if __name__ == "__main__":
mcp.run(transport="stdio")
rest_client.py
from mcp.server.fastmcp import FastMCP
import httpx
mcp = FastMCP("rest-client")
@mcp.tool()
async def http_get(url: str, headers: dict | None = None) -> dict:
"""Выполнить GET запрос к API"""
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(url, headers=headers or {})
return {
"status": response.status_code,
"headers": dict(response.headers),
"body": response.text[:5000] # Ограничение размера
}
@mcp.tool()
async def http_post(url: str, data: dict, headers: dict | None = None) -> dict:
"""Выполнить POST запрос к API"""
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(url, json=data, headers=headers or {})
return {
"status": response.status_code,
"body": response.text[:5000]
}
@mcp.tool()
async def fetch_json(url: str) -> dict:
"""Получить JSON с URL"""
async with httpx.AsyncClient(timeout=30) as client:
response = await client.get(url)
response.raise_for_status()
return response.json()
if __name__ == "__main__":
mcp.run(transport="stdio")
weather.py
from mcp.server.fastmcp import FastMCP
import httpx
mcp = FastMCP("weather")
NWS_API = "https://api.weather.gov"
HEADERS = {"User-Agent": "mcp-weather/1.0"}
@mcp.tool()
async def get_weather(latitude: float, longitude: float) -> dict:
"""Получить погоду по координатам (только США)"""
async with httpx.AsyncClient() as client:
# Получаем grid point
points = await client.get(
f"{NWS_API}/points/{latitude},{longitude}",
headers=HEADERS
)
points_data = points.json()
# Получаем прогноз
forecast_url = points_data["properties"]["forecast"]
forecast = await client.get(forecast_url, headers=HEADERS)
periods = forecast.json()["properties"]["periods"][:3]
return {
"location": points_data["properties"]["relativeLocation"]["properties"],
"forecast": [
{
"name": p["name"],
"temperature": f"{p['temperature']}°{p['temperatureUnit']}",
"description": p["shortForecast"]
}
for p in periods
]
}
@mcp.tool()
async def get_alerts(state: str) -> list[dict]:
"""Получить погодные предупреждения по штату США"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{NWS_API}/alerts/active?area={state.upper()}",
headers=HEADERS
)
features = response.json().get("features", [])
return [
{
"event": f["properties"]["event"],
"headline": f["properties"]["headline"],
"severity": f["properties"]["severity"]
}
for f in features[:5]
]
if __name__ == "__main__":
mcp.run(transport="stdio")
sqlite_server.py
from mcp.server.fastmcp import FastMCP
import sqlite3
from contextlib import contextmanager
mcp = FastMCP("sqlite")
DB_PATH = "data.db"
@contextmanager
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
@mcp.tool()
def init_database() -> str:
"""Инициализировать БД с примером таблицы"""
with get_db() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
return "База данных инициализирована"
@mcp.tool()
def query(sql: str) -> list[dict]:
"""Выполнить SELECT запрос (только чтение!)"""
sql_lower = sql.lower().strip()
if not sql_lower.startswith("select"):
raise ValueError("Разрешены только SELECT запросы")
with get_db() as conn:
cursor = conn.execute(sql)
return [dict(row) for row in cursor.fetchall()]
@mcp.tool()
def insert_user(name: str, email: str) -> dict:
"""Добавить пользователя"""
with get_db() as conn:
cursor = conn.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
(name, email)
)
conn.commit()
return {"id": cursor.lastrowid, "name": name, "email": email}
@mcp.tool()
def list_users() -> list[dict]:
"""Список всех пользователей"""
return query("SELECT * FROM users ORDER BY created_at DESC")
@mcp.resource("db://schema")
def get_schema() -> str:
"""Схема базы данных"""
with get_db() as conn:
cursor = conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table'"
)
return "\n\n".join(row[0] for row in cursor.fetchall() if row[0])
if __name__ == "__main__":
mcp.run(transport="stdio")
postgres_server.py
from mcp.server.fastmcp import FastMCP
import asyncpg
import os
mcp = FastMCP("postgres")
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:pass@localhost/db")
async def get_pool():
return await asyncpg.create_pool(DATABASE_URL, min_size=1, max_size=5)
@mcp.tool()
async def query(sql: str, params: list | None = None) -> list[dict]:
"""Выполнить SQL запрос"""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *(params or []))
return [dict(row) for row in rows]
@mcp.tool()
async def execute(sql: str, params: list | None = None) -> str:
"""Выполнить SQL команду (INSERT/UPDATE/DELETE)"""
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.execute(sql, *(params or []))
return result
@mcp.tool()
async def list_tables() -> list[str]:
"""Список таблиц в БД"""
result = await query("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
""")
return [row["table_name"] for row in result]
@mcp.tool()
async def describe_table(table_name: str) -> list[dict]:
"""Описание структуры таблицы"""
return await query("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = $1
ORDER BY ordinal_position
""", [table_name])
if __name__ == "__main__":
mcp.run(transport="stdio")
resources_example.py
from mcp.server.fastmcp import FastMCP
from datetime import datetime
import json
mcp = FastMCP("resources-demo")
# Статический ресурс
@mcp.resource("config://app")
def get_config() -> str:
"""Конфигурация приложения"""
return json.dumps({
"version": "1.0.0",
"environment": "development",
"features": ["tools", "resources", "prompts"]
}, indent=2)
# Динамический ресурс с параметром
@mcp.resource("user://{user_id}")
def get_user(user_id: str) -> str:
"""Информация о пользователе"""
users = {
"1": {"name": "Алиса", "role": "admin"},
"2": {"name": "Борис", "role": "user"},
}
user = users.get(user_id, {"error": "Пользователь не найден"})
return json.dumps(user, ensure_ascii=False)
# Ресурс с текущим временем
@mcp.resource("system://time")
def get_time() -> str:
"""Текущее время сервера"""
return datetime.now().isoformat()
# Ресурс со списком
@mcp.resource("data://products")
def get_products() -> str:
"""Каталог продуктов"""
products = [
{"id": 1, "name": "Ноутбук", "price": 50000},
{"id": 2, "name": "Телефон", "price": 30000},
{"id": 3, "name": "Планшет", "price": 25000},
]
return json.dumps(products, ensure_ascii=False, indent=2)
if __name__ == "__main__":
mcp.run(transport="stdio")
prompts_example.py
from mcp.server.fastmcp import FastMCP
from mcp.types import PromptMessage, TextContent
mcp = FastMCP("prompts-demo")
@mcp.prompt()
def code_review(code: str, language: str = "python") -> list[PromptMessage]:
"""Промпт для code review"""
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"""Проведи code review следующего кода на {language}.
Обрати внимание на:
1. Читаемость и стиль кода
2. Потенциальные баги
3. Производительность
4. Безопасность
5. Предложения по улучшению
Код:
```{language}
{code}
```"""
)
)
]
@mcp.prompt()
def explain_error(error_message: str, context: str = "") -> list[PromptMessage]:
"""Промпт для объяснения ошибки"""
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"""Объясни эту ошибку простым языком и предложи решение.
Ошибка:
{error_message}
{"Контекст: " + context if context else ""}
Пожалуйста:
1. Объясни что означает эта ошибка
2. Укажи возможные причины
3. Предложи способы исправления"""
)
)
]
@mcp.prompt()
def generate_tests(function_code: str, framework: str = "pytest") -> list[PromptMessage]:
"""Промпт для генерации тестов"""
return [
PromptMessage(
role="user",
content=TextContent(
type="text",
text=f"""Напиши тесты для этой функции используя {framework}.
Функция:
```python
{function_code}

Требования:

  1. Тесты на основные сценарии
  2. Тесты на граничные случаи
  3. Тесты на ошибки
  4. Понятные названия тестов""" ) ) ]

if name == “main”: mcp.run(transport=“stdio”)

## Продвинутые примеры
### Контекст и логирование
```python title="context_logging.py"
from mcp.server.fastmcp import FastMCP, Context
from datetime import datetime
mcp = FastMCP("context-demo")
@mcp.tool()
async def long_task(steps: int, ctx: Context) -> str:
"""Задача с отчётом о прогрессе"""
for i in range(steps):
# Отчёт о прогрессе
await ctx.report_progress(i + 1, steps)
# Логирование
await ctx.info(f"Выполнен шаг {i + 1} из {steps}")
# Имитация работы
import asyncio
await asyncio.sleep(0.5)
return f"Выполнено {steps} шагов"
@mcp.tool()
async def get_request_info(ctx: Context) -> dict:
"""Информация о текущем запросе"""
return {
"request_id": ctx.request_id,
"client_id": getattr(ctx, 'client_id', 'unknown'),
"timestamp": datetime.now().isoformat()
}
if __name__ == "__main__":
mcp.run(transport="stdio")
lifespan_example.py
from mcp.server.fastmcp import FastMCP
from contextlib import asynccontextmanager
from dataclasses import dataclass
import asyncpg
@dataclass
class AppContext:
db_pool: asyncpg.Pool
@asynccontextmanager
async def app_lifespan(server: FastMCP):
"""Управление жизненным циклом приложения"""
# Startup
pool = await asyncpg.create_pool("postgresql://localhost/mydb")
try:
yield AppContext(db_pool=pool)
finally:
# Shutdown
await pool.close()
mcp = FastMCP("lifespan-demo", lifespan=app_lifespan)
@mcp.tool()
async def db_query(sql: str, ctx: Context) -> list[dict]:
"""Запрос к БД через пул соединений"""
app: AppContext = ctx.request_context.lifespan_context
async with app.db_pool.acquire() as conn:
rows = await conn.fetch(sql)
return [dict(row) for row in rows]
if __name__ == "__main__":
mcp.run(transport="stdio")
error_handling.py
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.exceptions import ToolError
mcp = FastMCP("error-demo")
class ValidationError(ToolError):
"""Ошибка валидации"""
pass
class NotFoundError(ToolError):
"""Ресурс не найден"""
pass
@mcp.tool()
def validate_email(email: str) -> dict:
"""Валидация email"""
if not email:
raise ValidationError("Email не может быть пустым")
if "@" not in email:
raise ValidationError("Email должен содержать @")
parts = email.split("@")
if len(parts) != 2 or not parts[1]:
raise ValidationError("Неверный формат email")
return {"email": email, "valid": True}
@mcp.tool()
def get_user(user_id: int) -> dict:
"""Получить пользователя (с обработкой ошибок)"""
users = {1: "Алиса", 2: "Борис"}
if user_id not in users:
raise NotFoundError(f"Пользователь {user_id} не найден")
return {"id": user_id, "name": users[user_id]}
@mcp.tool()
def divide_safe(a: float, b: float) -> float:
"""Безопасное деление"""
try:
return a / b
except ZeroDivisionError:
raise ToolError("Деление на ноль невозможно")
if __name__ == "__main__":
mcp.run(transport="stdio")
РесурсОписание
Python SDKОфициальный Python SDK
TypeScript SDKОфициальный TypeScript SDK
FastMCP DocsДокументация FastMCP
MCP ExamplesОфициальные примеры серверов
awesome-mcp-serversКоллекция MCP серверов