Перейти к содержимому

Production Checklist

14 пунктов для подготовки MCP сервера к production — безопасность, производительность, мониторинг

Проверьте каждый пункт перед выкладкой в production.

from pydantic import BaseModel, Field, validator
from typing import Annotated
class FileReadInput(BaseModel):
path: Annotated[str, Field(max_length=255)]
@validator('path')
def validate_path(cls, v):
# Запрещаем выход за пределы директории
if '..' in v or v.startswith('/'):
raise ValueError('Invalid path')
return v
@mcp.tool()
def read_file(input: FileReadInput) -> str:
"""Безопасное чтение файла"""
safe_path = ALLOWED_DIR / input.path
return safe_path.read_text()
from pathlib import Path
ALLOWED_DIR = Path("/app/data").resolve()
def safe_path(user_path: str) -> Path:
"""Проверяет, что путь в разрешённой директории"""
requested = (ALLOWED_DIR / user_path).resolve()
if not str(requested).startswith(str(ALLOWED_DIR)):
raise PermissionError(f"Access denied: {user_path}")
return requested
# ❌ ОПАСНО: SQL инъекция
@mcp.tool()
def search_users(query: str) -> str:
cursor.execute(f"SELECT * FROM users WHERE name LIKE '%{query}%'")
# ✅ БЕЗОПАСНО: параметризованные запросы
@mcp.tool()
def search_users(query: str) -> str:
cursor.execute(
"SELECT * FROM users WHERE name LIKE ?",
(f"%{query}%",)
)
import os
# ❌ ОПАСНО
API_KEY = "sk-1234567890abcdef"
# ✅ БЕЗОПАСНО
API_KEY = os.environ.get("API_KEY")
if not API_KEY:
raise ValueError("API_KEY not set")
# Docker
ENV API_KEY=${API_KEY}

import asyncpg
# Глобальный пул соединений
pool: asyncpg.Pool | None = None
async def get_pool() -> asyncpg.Pool:
global pool
if pool is None:
pool = await asyncpg.create_pool(
DATABASE_URL,
min_size=5,
max_size=20,
command_timeout=60
)
return pool
@mcp.tool()
async def query(sql: str) -> str:
pool = await get_pool()
async with pool.acquire() as conn:
result = await conn.fetch(sql)
return str(result)
from functools import lru_cache
from datetime import datetime, timedelta
# Простой кэш в памяти
cache = {}
CACHE_TTL = timedelta(minutes=5)
def cached(key: str):
def decorator(func):
async def wrapper(*args, **kwargs):
now = datetime.now()
if key in cache:
value, expires = cache[key]
if now < expires:
return value
result = await func(*args, **kwargs)
cache[key] = (result, now + CACHE_TTL)
return result
return wrapper
return decorator
@mcp.tool()
@cached("weather")
async def get_weather(city: str) -> str:
"""Погода с кэшированием на 5 минут"""
# Запрос к API...
return weather_data
import asyncio
import aiohttp
@mcp.tool()
async def fetch_url(url: str) -> str:
"""Загрузка URL с таймаутом"""
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
try:
async with session.get(url) as response:
return await response.text()
except asyncio.TimeoutError:
return "Error: Request timeout (30s)"
MAX_RESPONSE_SIZE = 100 * 1024 # 100 KB
@mcp.tool()
def read_file(path: str) -> str:
"""Чтение файла с ограничением размера"""
with open(path, 'r') as f:
content = f.read(MAX_RESPONSE_SIZE)
if len(content) == MAX_RESPONSE_SIZE:
content += "\n\n[... truncated, file too large ...]"
return content

from mcp.server.fastmcp import FastMCP
import logging
logger = logging.getLogger(__name__)
@mcp.tool()
async def risky_operation(data: str) -> str:
"""Операция с обработкой ошибок"""
try:
result = await process(data)
return result
except ConnectionError as e:
logger.error(f"Connection failed: {e}")
return "Error: Service temporarily unavailable"
except ValueError as e:
logger.warning(f"Invalid input: {e}")
return f"Error: Invalid input - {e}"
except Exception as e:
logger.exception("Unexpected error")
return "Error: Internal server error"
import hashlib
@mcp.tool()
def create_file(name: str, content: str) -> str:
"""Идемпотентное создание файла"""
# Используем хэш контента для уникальности
content_hash = hashlib.sha256(content.encode()).hexdigest()[:8]
filename = f"{name}_{content_hash}.txt"
path = ALLOWED_DIR / filename
# Если файл с таким содержимым уже есть — не перезаписываем
if path.exists():
return f"File already exists: {filename}"
path.write_text(content)
return f"Created: {filename}"
from datetime import datetime
start_time = datetime.now()
@mcp.tool()
def health_check() -> str:
"""Проверка здоровья сервера"""
uptime = datetime.now() - start_time
return f"""
Status: OK
Uptime: {uptime}
Version: 1.0.0
Database: {"connected" if db_connected() else "disconnected"}
"""

import json
import sys
from datetime import datetime
def log(level: str, message: str, **kwargs):
"""Структурированный лог в JSON"""
entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"message": message,
**kwargs
}
print(json.dumps(entry), file=sys.stderr)
@mcp.tool()
def process_data(data: str) -> str:
log("INFO", "Processing started", data_length=len(data))
try:
result = do_processing(data)
log("INFO", "Processing completed", result_length=len(result))
return result
except Exception as e:
log("ERROR", "Processing failed", error=str(e))
raise
import time
from functools import wraps
def timed(func):
"""Декоратор для измерения времени"""
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return await func(*args, **kwargs)
finally:
elapsed = time.perf_counter() - start
log("INFO", "Tool executed",
tool=func.__name__,
duration_ms=round(elapsed * 1000, 2))
return wrapper
@mcp.tool()
@timed
async def slow_operation(data: str) -> str:
"""Операция с замером времени"""
await asyncio.sleep(1)
return "Done"
import aiohttp
ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK")
async def send_alert(message: str, severity: str = "error"):
"""Отправка алерта в Slack/Discord"""
if not ALERT_WEBHOOK:
return
payload = {
"text": f"[{severity.upper()}] MCP Server: {message}"
}
async with aiohttp.ClientSession() as session:
await session.post(ALERT_WEBHOOK, json=payload)
@mcp.tool()
async def critical_operation(data: str) -> str:
try:
return await process(data)
except Exception as e:
await send_alert(f"Critical operation failed: {e}")
raise

FROM python:3.11-slim
# Не запускать от root
RUN useradd -m -u 1000 mcpuser
WORKDIR /app
# Зависимости
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Код
COPY --chown=mcpuser:mcpuser . .
USER mcpuser
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import sys; sys.exit(0)"
CMD ["python", "server.py"]
version: '3.8'
services:
mcp-server:
build: .
restart: unless-stopped
environment:
- DATABASE_URL=${DATABASE_URL}
- API_KEY=${API_KEY}
- LOG_LEVEL=INFO
stdin_open: true
tty: true
deploy:
resources:
limits:
memory: 512M
cpus: '0.5'
logging:
driver: json-file
options:
max-size: "10m"
max-file: "3"

  • Прочитать Быстрый старт
  • Создать первый сервер с 2 tools
  • Протестировать в MCP Inspector
  • Подключить базу данных (SQLite)
  • Создать tool для REST API
  • Обработать ошибки
  • Настроить Claude Desktop
  • Протестировать tools в реальном чате
  • Настроить VS Code или Cursor
  • Изучить Troubleshooting
  • Настроить логирование в stderr
  • Написать unit-тесты
  • Пройти этот чеклист (14 пунктов)
  • Создать Dockerfile
  • Настроить переменные окружения
  • Собрать Docker image
  • Протестировать в production-like окружении
  • Задокументировать API своего сервера

Используйте этот скрипт для проверки сервера:

#!/usr/bin/env python3
"""pre_deploy_check.py - Проверка перед деплоем"""
import subprocess
import json
import sys
def check_server(command: list[str]) -> dict:
"""Проверяет MCP сервер"""
results = {
"initialize": False,
"tools_list": False,
"tool_call": False,
"errors": []
}
# 1. Initialize
init_req = json.dumps({
"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {"name": "check", "version": "1.0"}
}
}) + "\n"
try:
result = subprocess.run(
command, input=init_req, capture_output=True,
text=True, timeout=10
)
response = json.loads(result.stdout.strip())
if "result" in response:
results["initialize"] = True
print("✅ Initialize: OK")
else:
results["errors"].append(f"Initialize failed: {response}")
print("❌ Initialize: FAILED")
except Exception as e:
results["errors"].append(str(e))
print(f"❌ Initialize: {e}")
# 2. Tools list
tools_req = json.dumps({
"jsonrpc": "2.0", "id": 2, "method": "tools/list", "params": {}
}) + "\n"
try:
result = subprocess.run(
command, input=init_req + tools_req, capture_output=True,
text=True, timeout=10
)
lines = result.stdout.strip().split('\n')
if len(lines) >= 2:
tools_response = json.loads(lines[1])
if "result" in tools_response and "tools" in tools_response["result"]:
tools_count = len(tools_response["result"]["tools"])
results["tools_list"] = True
print(f"✅ Tools list: OK ({tools_count} tools)")
else:
print("❌ Tools list: FAILED")
else:
print("❌ Tools list: No response")
except Exception as e:
results["errors"].append(str(e))
print(f"❌ Tools list: {e}")
return results
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python pre_deploy_check.py <command>")
sys.exit(1)
results = check_server(sys.argv[1:])
if results["initialize"] and results["tools_list"]:
print("\n✅ Server is ready for deployment!")
sys.exit(0)
else:
print("\n❌ Server has issues. Fix before deploying.")
for error in results["errors"]:
print(f" - {error}")
sys.exit(1)

MCP Specification

Best Practices — официальные рекомендации

Инструменты

MCP Inspector — отладка и тестирование