Перейти к содержимому

HTTP/SSE транспорт

Развёртывание MCP серверов с HTTP и Server-Sent Events для веб-приложений

HTTP/SSE транспорт позволяет развёртывать MCP серверы как веб-сервисы, доступные через HTTP API.

Два канала связи:

  • HTTP POST — запросы от клиента к серверу (JSON-RPC)
  • SSE — события от сервера к клиенту (Server-Sent Events)
server.ts
import express from "express";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/streamableHttp.js";
const app = express();
app.use(express.json());
const server = new McpServer({
name: "http-server",
version: "1.0.0",
});
// Регистрация инструментов
server.tool("hello", {}, async () => ({
content: [{ type: "text", text: "Hello from HTTP!" }],
}));
// Хранение сессий
const sessions = new Map<string, StreamableHTTPServerTransport>();
// SSE endpoint
app.get("/sse", async (req, res) => {
const sessionId = req.query.sessionId as string || crypto.randomUUID();
// Настройка SSE
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("Access-Control-Allow-Origin", "*");
const transport = new StreamableHTTPServerTransport("/message", res);
sessions.set(sessionId, transport);
// Подключение сервера
await server.connect(transport);
// Отправка session ID клиенту
res.write(`data: ${JSON.stringify({ sessionId })}\n\n`);
// Cleanup при отключении
req.on("close", () => {
sessions.delete(sessionId);
});
});
// POST endpoint для запросов
app.post("/message", async (req, res) => {
const sessionId = req.query.sessionId as string;
const transport = sessions.get(sessionId);
if (!transport) {
res.status(404).json({ error: "Session not found" });
return;
}
try {
await transport.handleRequest(req.body, res);
} catch (error) {
res.status(500).json({ error: "Internal server error" });
}
});
// Health check
app.get("/health", (req, res) => {
res.json({ status: "ok", sessions: sessions.size });
});
app.listen(3000, () => {
console.log("MCP HTTP server on http://localhost:3000");
});
client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from "@modelcontextprotocol/sdk/client/streamableHttp.js";
const transport = new StreamableHTTPClientTransport(
"http://localhost:3000/sse",
"http://localhost:3000/message"
);
const client = new Client({
name: "http-client",
version: "1.0.0",
});
await client.connect(transport);
// Вызов инструмента
const result = await client.callTool("hello", {});
console.log(result);
server.py
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from sse_starlette.sse import EventSourceResponse
import asyncio
import uuid
import json
from mcp.server.fastmcp import FastMCP
from mcp.server.sse import SseServerTransport
app = FastAPI()
mcp = FastMCP("http-server")
# Хранение сессий
sessions: dict[str, SseServerTransport] = {}
@mcp.tool()
def hello() -> str:
"""Приветствие"""
return "Hello from HTTP!"
@app.get("/sse")
async def sse_endpoint(request: Request):
session_id = str(uuid.uuid4())
async def event_generator():
transport = SseServerTransport()
sessions[session_id] = transport
# Отправляем session ID
yield {
"event": "session",
"data": json.dumps({"sessionId": session_id})
}
try:
async for message in transport.receive():
yield {
"event": "message",
"data": json.dumps(message)
}
finally:
del sessions[session_id]
return EventSourceResponse(event_generator())
@app.post("/message")
async def message_endpoint(request: Request, sessionId: str):
transport = sessions.get(sessionId)
if not transport:
return {"error": "Session not found"}, 404
body = await request.json()
response = await transport.handle_request(body)
return response
@app.get("/health")
async def health():
return {"status": "ok", "sessions": len(sessions)}
Terminal
uvicorn server:app --host 0.0.0.0 --port 3000
/etc/nginx/sites-available/mcp-server
upstream mcp_server {
server 127.0.0.1:3000;
keepalive 32;
}
server {
listen 443 ssl http2;
server_name mcp.example.com;
ssl_certificate /etc/letsencrypt/live/mcp.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/mcp.example.com/privkey.pem;
# SSE endpoint
location /sse {
proxy_pass http://mcp_server;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# SSE настройки
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
# Chunked transfer
chunked_transfer_encoding on;
}
# Message endpoint
location /message {
proxy_pass http://mcp_server;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Host $host;
proxy_set_header Content-Type application/json;
}
# Health check
location /health {
proxy_pass http://mcp_server;
}
}
server.ts
import cors from "cors";
app.use(cors({
origin: ["https://allowed-origin.com"],
methods: ["GET", "POST"],
allowedHeaders: ["Content-Type", "Authorization"],
credentials: true,
}));
middleware/auth.ts
const authenticate = (req: Request, res: Response, next: NextFunction) => {
const authHeader = req.headers.authorization;
if (!authHeader?.startsWith("Bearer ")) {
res.status(401).json({ error: "Unauthorized" });
return;
}
const token = authHeader.substring(7);
try {
const payload = verifyToken(token);
req.user = payload;
next();
} catch {
res.status(401).json({ error: "Invalid token" });
}
};
app.use("/sse", authenticate);
app.use("/message", authenticate);
middleware/apiKey.ts
const apiKeyAuth = (req: Request, res: Response, next: NextFunction) => {
const apiKey = req.headers["x-api-key"];
if (!apiKey || !isValidApiKey(apiKey as string)) {
res.status(401).json({ error: "Invalid API key" });
return;
}
next();
};
middleware/rateLimit.ts
import rateLimit from "express-rate-limit";
const limiter = rateLimit({
windowMs: 60 * 1000, // 1 минута
max: 100, // 100 запросов
message: { error: "Too many requests" },
});
app.use("/message", limiter);
metrics.ts
import { Counter, Histogram, Gauge, register } from "prom-client";
const requestCounter = new Counter({
name: "mcp_requests_total",
help: "Total MCP requests",
labelNames: ["method", "status"],
});
const requestDuration = new Histogram({
name: "mcp_request_duration_seconds",
help: "MCP request duration",
labelNames: ["method"],
});
const activeSessions = new Gauge({
name: "mcp_active_sessions",
help: "Number of active sessions",
});
// Middleware
app.use((req, res, next) => {
const start = Date.now();
res.on("finish", () => {
const duration = (Date.now() - start) / 1000;
requestCounter.inc({ method: req.method, status: res.statusCode });
requestDuration.observe({ method: req.method }, duration);
});
next();
});
// Metrics endpoint
app.get("/metrics", async (req, res) => {
activeSessions.set(sessions.size);
res.set("Content-Type", register.contentType);
res.end(await register.metrics());
});
/etc/nginx/nginx.conf
upstream mcp_servers {
ip_hash; # Sticky sessions по IP
server 127.0.0.1:3001;
server 127.0.0.1:3002;
server 127.0.0.1:3003;
}
redis.ts
import Redis from "ioredis";
const redis = new Redis();
// Сохранение сессии
await redis.set(`session:${sessionId}`, JSON.stringify(sessionData), "EX", 3600);
// Получение сессии
const data = await redis.get(`session:${sessionId}`);
docker-compose.yml
version: '3.8'
services:
mcp-http:
build: .
ports:
- "3000:3000"
environment:
- NODE_ENV=production
- REDIS_URL=redis://redis:6379
depends_on:
- redis
deploy:
replicas: 3
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
nginx:
image: nginx:alpine
ports:
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/conf.d/default.conf
- /etc/letsencrypt:/etc/letsencrypt:ro
depends_on:
- mcp-http
volumes:
redis_data: