Безопасность MCP серверов
Best practices по безопасности MCP серверов для production
Основные принципы
Заголовок раздела «Основные принципы»- Принцип минимальных привилегий — сервер должен иметь только необходимые права
- Валидация всех входных данных — никогда не доверяйте входным параметрам
- Изоляция — запускайте серверы в sandbox окружении
- Аудит — логируйте все операции
Session Security
Заголовок раздела «Session Security»import secretsimport uuid
def generate_session_id() -> str: """Безопасная генерация session ID""" return str(uuid.uuid4()) + "-" + secrets.token_hex(16)
# Привязка к пользователюsessions = {}
def create_session(user_id: str) -> str: session_id = generate_session_id() sessions[session_id] = { "user_id": user_id, "created_at": datetime.utcnow(), "expires_at": datetime.utcnow() + timedelta(hours=1) } return session_idВалидация входных данных
Заголовок раздела «Валидация входных данных»from pydantic import BaseModel, Field, validatorimport re
class FileReadInput(BaseModel): path: str = Field(..., max_length=255)
@validator('path') def validate_path(cls, v): # Запрет path traversal if '..' in v or v.startswith('/'): raise ValueError('Invalid path')
# Только разрешённые символы if not re.match(r'^[\w\-./]+$', v): raise ValueError('Invalid characters in path')
return v
@mcp.tool()def read_file(input: FileReadInput) -> str: """Безопасное чтение файла""" allowed_dir = "/app/data" full_path = os.path.join(allowed_dir, input.path)
# Проверка, что путь внутри allowed_dir real_path = os.path.realpath(full_path) if not real_path.startswith(os.path.realpath(allowed_dir)): raise McpError("FORBIDDEN", "Access denied")
return open(real_path).read()Sandboxing
Заголовок раздела «Sandboxing»Docker с ограничениями
Заголовок раздела «Docker с ограничениями»services: mcp-server: read_only: true cap_drop: - ALL security_opt: - no-new-privileges:true tmpfs: - /tmp:size=100M ulimits: nofile: soft: 1024 hard: 2048seccomp профиль
Заголовок раздела «seccomp профиль»{ "defaultAction": "SCMP_ACT_ERRNO", "syscalls": [ { "names": ["read", "write", "open", "close", "stat", "fstat"], "action": "SCMP_ACT_ALLOW" } ]}Rate Limiting
Заголовок раздела «Rate Limiting»from collections import defaultdictimport time
class RateLimiter: def __init__(self, max_requests: int, window_seconds: int): self.max_requests = max_requests self.window = window_seconds self.requests = defaultdict(list)
def is_allowed(self, client_id: str) -> bool: now = time.time() window_start = now - self.window
# Очистка старых запросов self.requests[client_id] = [ t for t in self.requests[client_id] if t > window_start ]
if len(self.requests[client_id]) >= self.max_requests: return False
self.requests[client_id].append(now) return True
limiter = RateLimiter(max_requests=100, window_seconds=60)Логирование и аудит
Заголовок раздела «Логирование и аудит»import loggingimport json
audit_logger = logging.getLogger("audit")audit_logger.setLevel(logging.INFO)
def audit_log(event: str, user_id: str, details: dict): audit_logger.info(json.dumps({ "timestamp": datetime.utcnow().isoformat(), "event": event, "user_id": user_id, "details": details }))
@mcp.tool()def sensitive_operation(data: str, ctx: Context) -> str: audit_log("sensitive_operation", ctx.user_id, {"data_length": len(data)}) return process(data)