Перейти к содержимому

Безопасность MCP серверов

Best practices по безопасности MCP серверов для production

  1. Принцип минимальных привилегий — сервер должен иметь только необходимые права
  2. Валидация всех входных данных — никогда не доверяйте входным параметрам
  3. Изоляция — запускайте серверы в sandbox окружении
  4. Аудит — логируйте все операции
session.py
import secrets
import uuid
def generate_session_id() -> str:
"""Безопасная генерация session ID"""
return str(uuid.uuid4()) + "-" + secrets.token_hex(16)
# Привязка к пользователю
sessions = {}
def create_session(user_id: str) -> str:
session_id = generate_session_id()
sessions[session_id] = {
"user_id": user_id,
"created_at": datetime.utcnow(),
"expires_at": datetime.utcnow() + timedelta(hours=1)
}
return session_id
validation.py
from pydantic import BaseModel, Field, validator
import re
class FileReadInput(BaseModel):
path: str = Field(..., max_length=255)
@validator('path')
def validate_path(cls, v):
# Запрет path traversal
if '..' in v or v.startswith('/'):
raise ValueError('Invalid path')
# Только разрешённые символы
if not re.match(r'^[\w\-./]+$', v):
raise ValueError('Invalid characters in path')
return v
@mcp.tool()
def read_file(input: FileReadInput) -> str:
"""Безопасное чтение файла"""
allowed_dir = "/app/data"
full_path = os.path.join(allowed_dir, input.path)
# Проверка, что путь внутри allowed_dir
real_path = os.path.realpath(full_path)
if not real_path.startswith(os.path.realpath(allowed_dir)):
raise McpError("FORBIDDEN", "Access denied")
return open(real_path).read()
docker-compose.yml
services:
mcp-server:
read_only: true
cap_drop:
- ALL
security_opt:
- no-new-privileges:true
tmpfs:
- /tmp:size=100M
ulimits:
nofile:
soft: 1024
hard: 2048
seccomp.json
{
"defaultAction": "SCMP_ACT_ERRNO",
"syscalls": [
{
"names": ["read", "write", "open", "close", "stat", "fstat"],
"action": "SCMP_ACT_ALLOW"
}
]
}
rate_limiter.py
from collections import defaultdict
import time
class RateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, client_id: str) -> bool:
now = time.time()
window_start = now - self.window
# Очистка старых запросов
self.requests[client_id] = [
t for t in self.requests[client_id] if t > window_start
]
if len(self.requests[client_id]) >= self.max_requests:
return False
self.requests[client_id].append(now)
return True
limiter = RateLimiter(max_requests=100, window_seconds=60)
audit.py
import logging
import json
audit_logger = logging.getLogger("audit")
audit_logger.setLevel(logging.INFO)
def audit_log(event: str, user_id: str, details: dict):
audit_logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"event": event,
"user_id": user_id,
"details": details
}))
@mcp.tool()
def sensitive_operation(data: str, ctx: Context) -> str:
audit_log("sensitive_operation", ctx.user_id, {"data_length": len(data)})
return process(data)