feat: implement mcp frontend

This commit is contained in:
h
2026-05-20 09:59:47 +02:00
parent 99a30f256d
commit 7970d4be9b
7 changed files with 902 additions and 24 deletions
+30 -2
View File
@@ -13,6 +13,7 @@ from beaver_gateway.agents.claude import ClaudeAgent
from beaver_gateway.agents.raycast import RaycastAgent, RemoteTool, UserPreferences
from beaver_gateway.core.registry import Gateway
from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend
from beaver_gateway.frontends.mcp_server import McpServerFrontend
from beaver_gateway.mcp.types import McpServer
@@ -94,14 +95,41 @@ gateway = Gateway(
# Phase 2.1 — bundle of plain Python callables exposed as one
# FastMCP namespace. The internal aggregator mounts it under
# ``/mcp/time`` on ``127.0.0.1:INTERNAL_MCP_PORT``; Phase 2.2's
# ClaudeCode adapter will forward that URL into
# ``BackendOptions.mcp_servers``.
# ClaudeCode adapter forwards that URL into
# ``BackendOptions.mcp_servers``. Phase 3's ``McpServerFrontend``
# reverse-proxies the same internal URL out to external clients.
McpServer.python_tool(name="time", tools=[current_time]),
# Phase 3 — illustrates the ``lenient`` flag. Real-world stdio MCPs
# sometimes print "Processing..." or other chatter to stdout before
# their actual JSON-RPC frames; the default mcp client forwards
# those parse failures downstream as warnings (visible in
# Cursor/Cline). With ``lenient=True`` we silently drop non-JSON
# lines, so downstream UIs see clean JSON-RPC only. The command
# below is just a placeholder — replace with whatever stdio MCP
# you actually want gateway to ingest (e.g. an obsidian-mcp).
#
# Commented out by default: example users won't have the binary
# installed and an unreachable command makes ``docker compose up``
# surface a confusing "command not found" line at first request.
# Uncomment after pointing ``command`` at a real stdio MCP.
#
# McpServer.stdio(
# name="obsidian",
# command=["uvx", "mcp-obsidian"],
# env={"OBSIDIAN_API_KEY": "..."},
# lenient=True,
# ),
],
frontends=[
# Phase 1.4 — expose the agents as `model=<name>` on an
# Anthropic-compatible Messages endpoint. Auth comes from
# `BOOTSTRAP_TOKENS` in the env (`name1:value1,name2:value2`).
AnthropicMessagesFrontend(host="0.0.0.0", port=8000),
# Phase 3 — re-exposes every declared `McpServer` outside the
# gateway with bearer auth + audit log. Per-namespace endpoints
# at `/mcp/<name>/`; flat bundle at `/mcp/all/`. Discovery page
# (HTML, auth-gated) at `/` with copy-pastable Cursor / Claude
# Desktop snippets. Auth re-uses `BOOTSTRAP_TOKENS`.
McpServerFrontend(host="0.0.0.0", port=8001),
],
)
+4
View File
@@ -12,6 +12,10 @@ build the internal MCP aggregator app and run it on
``127.0.0.1:INTERNAL_MCP_PORT`` as another task inside the same
TaskGroup. URLs are surfaced through ``GatewayRuntime.mcp_internal_urls``
so Phase 2.2's ClaudeCode adapter can find them.
Phase 3 — the same aggregator backs the external ``McpServerFrontend``;
``cli`` doesn't have to know that the frontend reverse-proxies into it,
it just keeps the aggregator running for anyone who needs it.
"""
from __future__ import annotations
+520
View File
@@ -0,0 +1,520 @@
"""External MCP frontend (Phase 3.1).
A streamable-HTTP gateway in front of the internal MCP aggregator
(``beaver_gateway.mcp.internal_app``). The aggregator already hosts
every declared ``McpServer`` (``python_tool``, stdio proxy, HTTP proxy)
under ``/mcp/<name>`` plus a flat ``/mcp/all`` bundle on
``127.0.0.1:INTERNAL_MCP_PORT``. This frontend re-exposes those URLs to
external clients with three additions:
* **Bearer auth** — ``Authorization: Bearer <token>``, ``X-Api-Key``,
or ``?token=<…>`` query string. All three forms verify against the
same :class:`TokenStore` as ``AnthropicMessagesFrontend``.
* **Audit log** — one line per request (token name, namespace,
request method/path, response status). The DB-backed audit log lives
in Phase 4; for now we just emit a structured log line.
* **Discovery page** at ``GET /`` (auth-gated) — HTML rendered with a
tiny inline Jinja2 template listing every namespace plus copy-pastable
config snippets for Cursor / claude.ai / Claude Desktop.
Why a reverse proxy and not a second mount? FastMCP's session managers
are tied to the lifespan they were created in; running the same
aggregator under two uvicorn servers double-initializes state. Building
two parallel aggregators would double upstream connections (two
subprocesses for every stdio MCP, two HTTP clients for every remote).
A loopback proxy keeps one source of truth — the internal aggregator —
and lets us layer policy on the outside.
"""
from __future__ import annotations
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any
from urllib.parse import urlencode
import aiohttp
from starlette.applications import Starlette
from starlette.responses import HTMLResponse, JSONResponse, StreamingResponse
from starlette.routing import Route
from beaver_gateway.frontends.base import Frontend
from beaver_gateway.mcp.internal_app import ALL_NAMESPACE
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Mapping
from starlette.requests import Request
from beaver_gateway.frontends.base import GatewayRuntime
_log = logging.getLogger("beaver_gateway.frontends.mcp_server")
# Hop-by-hop headers that must NOT be forwarded across an HTTP proxy
# (RFC 7230 §6.1). Bypassing this filter would break chunked transfer
# encoding when ``Content-Length`` arrives, or upstream-aware proxies
# would refuse the second hop's connection-pool reuse.
_HOP_BY_HOP_HEADERS = frozenset(
{
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailers",
"transfer-encoding",
"upgrade",
"host",
"content-length",
}
)
# Standard auth-bearing headers we *do not* forward to the internal app —
# the internal app is on loopback with no auth of its own, and forwarding
# the inbound bearer would only confuse it. Each method-specific MCP
# request from the upstream Cursor/etc. carries a fresh ``mcp-session-id``
# that we *must* forward.
_AUTH_HEADERS = frozenset({"authorization", "x-api-key"})
__all__ = ["McpServerFrontend"]
class McpServerFrontend(Frontend):
"""Auth + audit + reverse-proxy in front of the internal MCP aggregator."""
def __init__(self, *, host: str = "0.0.0.0", port: int = 8001) -> None: # noqa: S104
self.host = host
self.port = port
self._runtime: GatewayRuntime | None = None
self._app: Starlette | None = None
# Single shared aiohttp session, opened once when uvicorn starts
# the Starlette lifespan. Reused across every proxied request —
# MCP clients (esp. claude.ai) reconnect frequently, and a fresh
# ClientSession per call would cost a TCP handshake every time.
self._http: aiohttp.ClientSession | None = None
def configure(self, runtime: GatewayRuntime) -> None:
self._runtime = runtime
self._app = self._build_app(runtime)
async def serve(self) -> None:
import uvicorn
if self._app is None:
msg = "configure() must be called before serve()"
raise RuntimeError(msg)
config = uvicorn.Config(
self._app, host=self.host, port=self.port, log_level="info"
)
server = uvicorn.Server(config)
await server.serve()
def _build_app(self, runtime: GatewayRuntime) -> Starlette: # noqa: ARG002
# The Starlette lifespan owns the shared aiohttp session: opened
# at startup, closed at shutdown so we don't leak sockets when
# uvicorn is restarted under the same process. Starlette wants a
# plain context manager, not an async-generator function — we
# decorate to match.
@asynccontextmanager
async def lifespan(_app: Starlette) -> AsyncIterator[None]:
self._http = aiohttp.ClientSession(
# Long sock_read because MCP tool calls can take a while
# (a claude tool over HTTP can easily stretch beyond 30s
# on a real tool).
timeout=aiohttp.ClientTimeout(total=None, sock_read=600),
)
try:
yield
finally:
if self._http is not None:
await self._http.close()
self._http = None
routes = [
Route("/", self._discovery, methods=["GET"]),
Route("/healthz", self._healthz, methods=["GET"]),
# Two routes per namespace so both the trailing-slash and
# sub-path forms work (``/mcp/time`` AND ``/mcp/time/foo``).
# Starlette doesn't fold them into one route automatically.
Route(
"/mcp/{namespace}",
self._proxy_endpoint,
methods=["GET", "POST", "DELETE", "OPTIONS"],
),
Route(
"/mcp/{namespace}/{path:path}",
self._proxy_endpoint,
methods=["GET", "POST", "DELETE", "OPTIONS"],
),
]
return Starlette(routes=routes, lifespan=lifespan)
async def _healthz(self, _request: Request) -> JSONResponse:
return JSONResponse({"status": "ok"})
async def _discovery(self, request: Request) -> HTMLResponse | JSONResponse:
runtime = self._require_runtime()
token_name = _verify_request(request, runtime)
if token_name is None:
return _unauthorized()
# Use the request's own scheme+host so the snippets work behind
# reverse proxies / tunnels. Falls back to the configured
# host:port if the client didn't send Host (curl --raw).
base = _external_base_url(request, self.host, self.port)
html = _render_discovery_page(
base_url=base,
namespaces=list(runtime.mcps),
actor=token_name,
)
return HTMLResponse(html)
async def _proxy_endpoint(
self, request: Request
) -> StreamingResponse | JSONResponse:
runtime = self._require_runtime()
token_name = _verify_request(request, runtime)
if token_name is None:
return _unauthorized()
namespace = request.path_params["namespace"]
subpath = request.path_params.get("path", "")
upstream_url = self._upstream_url(namespace, subpath)
if upstream_url is None:
_log.info(
"mcp: actor=%s namespace=%s 404 (unknown namespace)",
token_name,
namespace,
)
return JSONResponse(
{"error": "unknown namespace", "namespace": namespace},
status_code=404,
)
if self._http is None:
# Lifespan hasn't run yet (shouldn't happen with uvicorn).
return JSONResponse(
{"error": "frontend not ready"}, status_code=503
)
return await _reverse_proxy(
client=self._http,
request=request,
upstream_url=upstream_url,
namespace=namespace,
actor=token_name,
)
def _upstream_url(self, namespace: str, subpath: str) -> str | None:
runtime = self._require_runtime()
# ``all`` is built by the aggregator unconditionally when at least
# one MCP is configured; the URL map only contains per-domain
# entries (see ``build_internal_app``), so we synthesize ``all``'s
# loopback URL from any per-domain URL's authority.
if namespace == ALL_NAMESPACE:
sample = next(iter(runtime.mcp_internal_urls.values()), None)
if sample is None:
return None
base = sample.rsplit("/mcp/", 1)[0]
return _join_subpath(f"{base}/mcp/{ALL_NAMESPACE}/", subpath)
base = runtime.mcp_internal_urls.get(namespace)
if base is None:
return None
return _join_subpath(base, subpath)
def _require_runtime(self) -> GatewayRuntime:
if self._runtime is None:
msg = "configure() must be called before serving requests"
raise RuntimeError(msg)
return self._runtime
def _verify_request(request: Request, runtime: GatewayRuntime) -> str | None:
"""Accept ``Authorization: Bearer``, ``X-Api-Key``, or ``?token=``.
The third form is the escape hatch for clients that can only put
secrets in the URL (claude.ai's MCP config historically did this).
All three roads end at the same :class:`TokenStore`.
"""
api_key = request.headers.get("x-api-key")
if api_key:
return runtime.token_store.verify(api_key)
auth_header = request.headers.get("authorization")
if auth_header:
return runtime.token_store.verify_bearer(auth_header)
qs_token = request.query_params.get("token")
if qs_token:
return runtime.token_store.verify(qs_token)
return None
def _unauthorized() -> JSONResponse:
return JSONResponse(
{"error": "invalid or missing bearer token"},
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
def _join_subpath(base_url: str, subpath: str) -> str:
"""Concatenate the loopback URL with the proxied sub-path.
``base_url`` always ends in ``/`` (the aggregator publishes URLs
that way to avoid Starlette's 307 redirect dance); the sub-path is
appended verbatim, with the query string handled by the caller.
"""
if subpath:
return base_url + subpath.lstrip("/")
return base_url
def _external_base_url(request: Request, fallback_host: str, fallback_port: int) -> str:
host = request.headers.get("host")
if host:
scheme = request.headers.get("x-forwarded-proto", request.url.scheme)
return f"{scheme}://{host}"
return f"http://{fallback_host}:{fallback_port}"
async def _reverse_proxy(
*,
client: aiohttp.ClientSession,
request: Request,
upstream_url: str,
namespace: str,
actor: str,
) -> StreamingResponse | JSONResponse:
"""Bidirectionally stream an MCP request between client ↔ internal aggregator.
Streamable-HTTP MCP responses can be a long-running SSE stream
(tools that emit partial progress) or a one-shot JSON body; we
don't peek — just relay chunks as they arrive in either direction
until both sides close.
"""
qs = request.url.query
if qs:
# Drop ``?token=`` from the forwarded URL — internal app doesn't
# need it, and propagating creds further than necessary widens
# the leak surface (logs, metrics, traces all see query strings).
scrubbed = _scrub_query(qs, drop={"token"})
if scrubbed:
upstream_url = f"{upstream_url}?{scrubbed}"
forward_headers = _forward_headers(request)
body_iter: AsyncIterator[bytes] | None = None
if request.method not in {"GET", "HEAD", "OPTIONS"}:
body_iter = _request_body_iter(request)
try:
upstream_resp = await client.request(
request.method,
upstream_url,
headers=forward_headers,
data=body_iter,
allow_redirects=False,
)
except aiohttp.ClientError as exc:
_log.warning(
"mcp: actor=%s namespace=%s upstream connect failed: %s",
actor,
namespace,
exc,
)
return JSONResponse(
{"error": "upstream MCP unreachable", "detail": str(exc)},
status_code=502,
)
_log.info(
"mcp: actor=%s namespace=%s %s %s -> %d",
actor,
namespace,
request.method,
request.url.path,
upstream_resp.status,
)
response_headers = _response_headers(upstream_resp.headers)
async def relay() -> AsyncIterator[bytes]:
try:
async for chunk in upstream_resp.content.iter_any():
yield chunk
except (aiohttp.ClientError, asyncio.CancelledError):
# Caller hung up or upstream dropped — just stop relaying.
return
finally:
upstream_resp.release()
return StreamingResponse(
relay(),
status_code=upstream_resp.status,
headers=response_headers,
media_type=upstream_resp.headers.get("content-type"),
)
async def _request_body_iter(request: Request) -> AsyncIterator[bytes]:
async for chunk in request.stream():
if chunk:
yield chunk
def _forward_headers(request: Request) -> dict[str, str]:
out: dict[str, str] = {}
for key, value in request.headers.items():
lowered = key.lower()
if lowered in _HOP_BY_HOP_HEADERS or lowered in _AUTH_HEADERS:
continue
out[key] = value
return out
def _response_headers(headers: Mapping[str, str]) -> dict[str, str]:
out: dict[str, str] = {}
for key, value in headers.items():
if key.lower() in _HOP_BY_HOP_HEADERS:
continue
out[key] = value
return out
def _scrub_query(query: str, *, drop: frozenset[str] | set[str]) -> str:
"""Remove sensitive keys from a URL-encoded query string."""
kept: list[tuple[str, str]] = []
for entry in query.split("&"):
if not entry:
continue
name, sep, value = entry.partition("=")
if name in drop:
continue
kept.append((name, value if sep else ""))
return urlencode(kept)
def _render_discovery_page(
*, base_url: str, namespaces: list[Any], actor: str
) -> str:
"""Render the auth-gated namespace + config-snippet page.
Inline HTML (no Jinja file) — keeps Phase 3 free of template-dir
plumbing that Phase 4's AdminFrontend will own.
"""
name_list = [getattr(ns, "name", str(ns)) for ns in namespaces]
rows = "\n".join(
f""" <tr>
<td><code>{_escape(name)}</code></td>
<td><code>{_escape(f"{base_url}/mcp/{name}/")}</code></td>
</tr>"""
for name in name_list
) or """ <tr><td colspan="2"><em>No MCP servers configured.</em></td></tr>"""
cursor_snippet = _CURSOR_SNIPPET.format(base_url=base_url)
claude_desktop_snippet = _CLAUDE_DESKTOP_SNIPPET.format(base_url=base_url)
return _DISCOVERY_TEMPLATE.format(
actor=_escape(actor),
base_url=_escape(base_url),
rows=rows,
all_url=_escape(f"{base_url}/mcp/{ALL_NAMESPACE}/"),
cursor_snippet=_escape(cursor_snippet),
claude_desktop_snippet=_escape(claude_desktop_snippet),
)
def _escape(value: str) -> str:
return (
value.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
)
_DISCOVERY_TEMPLATE = "\n".join( # noqa: FLY002 — readability beats one-string-blob
[
"<!doctype html>",
'<html lang="en">',
"<head>",
' <meta charset="utf-8">',
" <title>beaver-gateway · MCP discovery</title>",
" <style>",
" body {{",
" font-family: -apple-system, 'SF Pro Display', system-ui,"
" sans-serif;",
" max-width: 880px;",
" margin: 3rem auto;",
" padding: 0 1.25rem;",
" color: #1d1d1f;",
" line-height: 1.55;",
" }}",
" h1 {{ font-weight: 600; letter-spacing: -0.02em; }}",
" code, pre {{",
" font-family: ui-monospace, 'SF Mono', Menlo, monospace;",
" font-size: 0.92em;",
" }}",
" pre {{",
" background: #f5f5f7;",
" padding: 1rem 1.25rem;",
" border-radius: 10px;",
" overflow-x: auto;",
" }}",
" table {{ width: 100%; border-collapse: collapse; }}",
" th, td {{",
" text-align: left;",
" padding: 0.55rem 0.85rem;",
" border-bottom: 1px solid #e5e5ea;",
" }}",
" .muted {{ color: #6e6e73; }}",
" </style>",
"</head>",
"<body>",
" <h1>beaver-gateway</h1>",
' <p class="muted">Signed in as <strong>{actor}</strong>.'
" Base URL: <code>{base_url}</code></p>",
" <h2>Namespaces</h2>",
" <table>",
" <thead><tr><th>Name</th><th>URL</th></tr></thead>",
" <tbody>",
"{rows}",
" </tbody>",
" </table>",
' <p class="muted">Bundle endpoint (flat namespace,'
" escape-hatch): <code>{all_url}</code></p>",
" <h2>Cursor / Cline</h2>",
" <p>Add to your MCP config:</p>",
" <pre>{cursor_snippet}</pre>",
" <h2>Claude Desktop</h2>",
" <p>Add to <code>"
"~/Library/Application Support/Claude/claude_desktop_config.json"
"</code>:</p>",
" <pre>{claude_desktop_snippet}</pre>",
"</body>",
"</html>",
"",
]
)
_CURSOR_SNIPPET = """{{
"mcpServers": {{
"beaver-time": {{
"url": "{base_url}/mcp/time/",
"headers": {{ "Authorization": "Bearer <YOUR_TOKEN>" }}
}}
}}
}}"""
_CLAUDE_DESKTOP_SNIPPET = """{{
"mcpServers": {{
"beaver-time": {{
"type": "http",
"url": "{base_url}/mcp/time/",
"headers": {{ "Authorization": "Bearer <YOUR_TOKEN>" }}
}}
}}
}}"""
+27 -9
View File
@@ -16,8 +16,11 @@ from fastmcp import Client
from fastmcp.client.transports import StdioTransport, StreamableHttpTransport
from fastmcp.server import create_proxy
from beaver_gateway.mcp.lenient import LenientStdioTransport
if TYPE_CHECKING:
from fastmcp import FastMCP
from fastmcp.client.transports.base import ClientTransport
from beaver_gateway.mcp.types import HttpMcp, StdioMcp
@@ -27,22 +30,37 @@ def build_stdio_proxy(spec: StdioMcp) -> FastMCP:
``spec.command`` is a non-empty tuple; the first element is the
executable and the rest are CLI args. ``StdioTransport`` keeps the
subprocess alive across calls.
subprocess alive across calls. When ``spec.lenient`` is set we
swap in :class:`LenientStdioTransport`, which silently drops
non-JSON-RPC stdout lines instead of forwarding them as exceptions
(lets us ingest MCPs that print ``Processing...`` chatter on stdout
without leaking warnings to downstream UIs).
"""
if not spec.command:
msg = f"stdio MCP {spec.name!r} has empty command"
raise ValueError(msg)
command, *args = spec.command
transport = StdioTransport(
command=command,
args=list(args),
env=spec.env,
cwd=str(spec.cwd) if spec.cwd is not None else None,
)
cwd = str(spec.cwd) if spec.cwd is not None else None
transport: ClientTransport
if spec.lenient:
transport = LenientStdioTransport(
command=command, args=list(args), env=spec.env, cwd=cwd
)
else:
transport = StdioTransport(
command=command, args=list(args), env=spec.env, cwd=cwd
)
return create_proxy(Client(transport, name=spec.name))
def build_http_proxy(spec: HttpMcp) -> FastMCP:
"""Wrap a remote streamable-HTTP MCP into a mountable ``FastMCPProxy``."""
transport = StreamableHttpTransport(url=spec.url, auth=spec.auth)
"""Wrap a remote streamable-HTTP MCP into a mountable ``FastMCPProxy``.
``spec.headers`` are folded into the upstream client; pass per-call
auth (e.g. ``X-Api-Key``) here rather than as ``auth`` when the
upstream rejects Bearer headers.
"""
transport = StreamableHttpTransport(
url=spec.url, auth=spec.auth, headers=spec.headers
)
return create_proxy(Client(transport, name=spec.name))
+60 -9
View File
@@ -1,4 +1,4 @@
"""Internal MCP aggregator — one ASGI app, N FastMCP namespaces.
"""Internal MCP aggregator — one ASGI app, N FastMCP namespaces + ``all``.
Each ``McpServer`` declared in the user's config becomes its own
``FastMCP`` instance (regular for ``python_tool``, ``FastMCPProxy`` for
@@ -9,9 +9,18 @@ namespace via loopback as a distinct MCP server URL — preserving
per-domain framing while costing only one process worth of RAM
(PRD §6).
Phase 3 adds ``/mcp/all/``: a single FastMCP whose tools are the union
of every namespace's tools, prefixed by FastMCP's ``namespace_<tool>``
convention (e.g. ``time_current_time``). It's the escape-hatch for
clients that can only configure one MCP server — discouraged for tool-
heavy setups (PRD §6 cites the ~95%→~71% tool-selection drop on flat
namespaces) but real and reachable.
The aggregator returns both the app and a ``{name: url}`` map; Phase
2.2's ``ClaudeCodeBackendAdapter`` plugs the map directly into
``BackendOptions.mcp_servers``.
``BackendOptions.mcp_servers``. ``/mcp/all/`` is NOT included in that
map — claude-code-agents always get per-domain framing; only the
external MCP frontend (Phase 3.1) reverse-proxies the flat endpoint.
"""
from __future__ import annotations
@@ -19,6 +28,7 @@ from __future__ import annotations
from contextlib import AsyncExitStack, asynccontextmanager
from typing import TYPE_CHECKING
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.routing import Mount
@@ -29,30 +39,48 @@ from beaver_gateway.mcp.wrap import build_python_tool_server
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable
from fastmcp import FastMCP
from beaver_gateway.mcp.types import McpServerT
ALL_NAMESPACE = "all"
"""URL segment for the flat-namespace aggregator (``/mcp/all/``)."""
def build_internal_app(
mcps: Iterable[McpServerT], *, host: str, port: int
) -> tuple[Starlette, dict[str, str]]:
"""Build the aggregator ``Starlette`` app and the ``{name: url}`` map.
"""Build the aggregator ``Starlette`` app and the per-namespace URL map.
``host``/``port`` only flavour the URL strings handed back — actually
listening on them is the caller's job (``cli.main`` runs a uvicorn
server in a TaskGroup). We accept the address here so callers don't
have to format the URLs themselves and risk drifting from the
``/mcp/<name>`` convention.
Returns a map of ``{namespace: url}`` for the per-domain endpoints
only (claude-code's MCP routing expects per-domain framing). The
``/mcp/all/`` bundle endpoint exists on the same app but is
intentionally omitted from the URL map — it's only meaningful to
external clients via the MCP frontend, not to claude-code.
"""
servers: dict[str, FastMCP] = {spec.name: _build_server(spec) for spec in mcps}
child_apps = [s.http_app(transport="http", path="/") for s in servers.values()]
child_apps = {
name: s.http_app(transport="http", path="/")
for name, s in servers.items()
}
routes = [
Mount(f"/mcp/{name}", app=app)
for name, app in zip(servers, child_apps, strict=True)
Mount(f"/mcp/{name}", app=app) for name, app in child_apps.items()
]
# /mcp/all — flat-namespace bundle. Skip when there's nothing to
# bundle so we don't pay for an empty session manager lifecycle.
all_app = None
if servers:
all_server = _build_all_server(servers)
all_app = all_server.http_app(transport="http", path="/")
routes.append(Mount(f"/mcp/{ALL_NAMESPACE}", app=all_app))
@asynccontextmanager
async def lifespan(_parent: Starlette) -> AsyncIterator[None]:
# Each FastMCP http_app stores its session manager init in its
@@ -61,8 +89,12 @@ def build_internal_app(
# children come up together and unwind in reverse order on
# shutdown.
async with AsyncExitStack() as stack:
for child in child_apps:
for child in child_apps.values():
await stack.enter_async_context(child.router.lifespan_context(child))
if all_app is not None:
await stack.enter_async_context(
all_app.router.lifespan_context(all_app)
)
yield
app = Starlette(routes=routes, lifespan=lifespan)
@@ -85,3 +117,22 @@ def _build_server(spec: McpServerT) -> FastMCP:
# type-narrowing honest if a new variant lands without updates here.
msg = f"unsupported McpServer variant: {type(spec).__name__}"
raise TypeError(msg)
def _build_all_server(children: dict[str, FastMCP]) -> FastMCP:
"""Compose a single FastMCP whose tools union every namespace's tools.
FastMCP's ``mount(namespace=...)`` namespaces every tool as
``<namespace>_<original_name>``, so the flat endpoint becomes
``time_current_time``, ``calendar_event_create``, etc.
Mounted children are accessed in-memory via ``FastMCPProvider`` —
requests don't bounce through the child's own ``http_app``, so this
aggregator has its own independent streamable-HTTP session manager
and lifespan, and the per-namespace ``/mcp/<name>/`` mounts keep
working unchanged.
"""
parent = FastMCP(name="beaver-gateway-all")
for name, child in children.items():
parent.mount(child, namespace=name)
return parent
+234
View File
@@ -0,0 +1,234 @@
"""Tolerant transports for upstream MCPs that don't strictly speak JSON-RPC.
Some real-world MCP servers print non-JSON chatter to stdout before / between
their actual JSON-RPC frames (``Processing...``, banners, dependency-load
messages, etc.). The reference ``mcp.client.stdio.stdio_client`` parses every
stdout line as JSON-RPC and ships any parse failure downstream as an
exception, which the MCP ``ClientSession`` then logs as a warning that bleeds
into client UIs (Cursor, Cline) when they connect through us.
``LenientStdioTransport`` re-implements the stdio-client wiring with one
behavioural change: lines that don't parse as JSON-RPC are *silently
dropped* (one ``DEBUG`` log entry, no exception forwarded). Downstream
consumers see only valid messages. We keep the rest of the contract identical
to the reference client, including the spec-mandated graceful shutdown
sequence (close stdin → wait → SIGTERM → SIGKILL).
The transport plugs into ``fastmcp.server.create_proxy`` just like
``StdioTransport`` does, so the rest of the aggregator doesn't need to
know which flavour it got.
"""
from __future__ import annotations
import contextlib
import logging
import sys
from typing import TYPE_CHECKING
import anyio
import anyio.lowlevel
from anyio.streams.text import TextReceiveStream
from fastmcp.client.transports.base import ClientTransport
from mcp import ClientSession, types
from mcp.client.stdio import (
PROCESS_TERMINATION_TIMEOUT,
StdioServerParameters,
_create_platform_compatible_process,
_get_executable_command,
_terminate_process_tree,
)
from mcp.shared.message import SessionMessage
if TYPE_CHECKING:
from collections.abc import AsyncIterator
from typing import TextIO, Unpack
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from fastmcp.client.transports.base import SessionKwargs
_log = logging.getLogger("beaver_gateway.mcp.lenient")
class LenientStdioTransport(ClientTransport):
"""Stdio transport that tolerates non-JSON-RPC stdout noise.
Behaves like :class:`fastmcp.client.transports.StdioTransport` from the
consumer's perspective: one ``ClientSession`` per ``connect_session``
block, subprocess scoped to that block. We don't replicate the upstream
``keep_alive`` flag because the only caller (``create_proxy``) opens
the session lazily on first request and keeps it open for the lifetime
of the proxy.
"""
def __init__(
self,
command: str,
args: list[str],
env: dict[str, str] | None = None,
cwd: str | None = None,
log_file: TextIO | None = None,
) -> None:
self.command = command
self.args = args
self.env = env
self.cwd = cwd
# TextIO only — pre-open Path callers themselves. The upstream
# ``StdioTransport`` opens Path for you, but we keep this thin so
# the type contract stays narrow and easy to validate.
self.log_file = log_file
@contextlib.asynccontextmanager
async def connect_session(
self, **session_kwargs: Unpack[SessionKwargs]
) -> AsyncIterator[ClientSession]:
errlog: TextIO = self.log_file if self.log_file is not None else sys.stderr
async with _lenient_stdio_client(
StdioServerParameters(
command=self.command,
args=self.args,
env=self.env,
cwd=self.cwd,
),
errlog=errlog,
) as (read_stream, write_stream), ClientSession(
read_stream, write_stream, **session_kwargs
) as session:
yield session
def __repr__(self) -> str:
return (
f"<LenientStdioTransport(command={self.command!r}, args={self.args!r})>"
)
@contextlib.asynccontextmanager
async def _lenient_stdio_client( # noqa: PLR0915 — mirrors mcp.client.stdio.stdio_client
server: StdioServerParameters, errlog: TextIO = sys.stderr
) -> AsyncIterator[
tuple[
MemoryObjectReceiveStream[SessionMessage | Exception],
MemoryObjectSendStream[SessionMessage],
]
]:
"""Drop-in for ``mcp.client.stdio.stdio_client`` with a tolerant reader.
All differences from upstream live in ``stdout_reader``: lines that fail
``JSONRPCMessage.model_validate_json`` are logged at DEBUG and skipped,
never forwarded as exceptions. This is what makes warning-noisy MCPs
quiet from the consumer's point of view.
"""
read_stream_writer, read_stream = anyio.create_memory_object_stream[
SessionMessage | Exception
](0)
write_stream, write_stream_reader = anyio.create_memory_object_stream[
SessionMessage
](0)
try:
command = _get_executable_command(server.command)
env_default = _default_inherited_env()
process = await _create_platform_compatible_process(
command=command,
args=server.args,
env=(
{**env_default, **server.env}
if server.env is not None
else env_default
),
errlog=errlog,
cwd=server.cwd,
)
except OSError:
await read_stream.aclose()
await write_stream.aclose()
await read_stream_writer.aclose()
await write_stream_reader.aclose()
raise
async def stdout_reader() -> None:
assert process.stdout, "Opened process is missing stdout" # noqa: S101
try:
async with read_stream_writer:
buffer = ""
async for chunk in TextReceiveStream(
process.stdout,
encoding=server.encoding,
errors=server.encoding_error_handler,
):
lines = (buffer + chunk).split("\n")
buffer = lines.pop()
for line in lines:
stripped = line.strip()
if not stripped:
continue
try:
message = types.JSONRPCMessage.model_validate_json(
stripped
)
except Exception: # noqa: BLE001 — by design, see module doc
_log.debug(
"lenient stdio: dropped non-JSON line: %r",
stripped[:200],
)
continue
await read_stream_writer.send(SessionMessage(message))
except anyio.ClosedResourceError:
await anyio.lowlevel.checkpoint()
async def stdin_writer() -> None:
assert process.stdin, "Opened process is missing stdin" # noqa: S101
try:
async with write_stream_reader:
async for session_message in write_stream_reader:
payload = session_message.message.model_dump_json(
by_alias=True, exclude_none=True
)
await process.stdin.send(
(payload + "\n").encode(
encoding=server.encoding,
errors=server.encoding_error_handler,
)
)
except anyio.ClosedResourceError:
await anyio.lowlevel.checkpoint()
async with (
anyio.create_task_group() as tg,
process,
):
tg.start_soon(stdout_reader)
tg.start_soon(stdin_writer)
try:
yield read_stream, write_stream
finally:
# MCP spec stdio shutdown: close stdin → wait → SIGTERM → SIGKILL.
if process.stdin:
with contextlib.suppress(Exception):
await process.stdin.aclose()
try:
with anyio.fail_after(PROCESS_TERMINATION_TIMEOUT):
await process.wait()
except TimeoutError:
await _terminate_process_tree(process)
except ProcessLookupError:
pass
await read_stream.aclose()
await write_stream.aclose()
await read_stream_writer.aclose()
await write_stream_reader.aclose()
def _default_inherited_env() -> dict[str, str]:
"""Same env shortlist as ``mcp.client.stdio.get_default_environment``.
Re-exported so we can compose with the user's ``env`` overrides without
forcing a private import in this module's body.
"""
from mcp.client.stdio import get_default_environment
return get_default_environment()
__all__ = ["LenientStdioTransport"]
+27 -4
View File
@@ -23,20 +23,36 @@ class _BaseMcp(BaseModel):
class StdioMcp(_BaseMcp):
"""Subprocess MCP server we spawn and connect to over stdio."""
"""Subprocess MCP server we spawn and connect to over stdio.
``lenient`` switches the upstream stdio reader to a tolerant variant
that silently drops non-JSON-RPC lines from the subprocess's stdout
(``Processing...``-style chatter, banners, dependency-load messages).
The reference ``mcp.client.stdio.stdio_client`` forwards those parse
failures as exceptions, which bleed into Cursor/Cline UIs as warnings
when they connect through us. Default ``False`` keeps the strict
contract — flip it on per-namespace for known-noisy upstreams.
"""
kind: Literal["stdio"] = "stdio"
command: tuple[str, ...]
env: dict[str, str] | None = None
cwd: Path | None = None
lenient: bool = False
class HttpMcp(_BaseMcp):
"""Remote MCP server reached over streamable HTTP."""
"""Remote MCP server reached over streamable HTTP.
``headers`` are forwarded on every request to the upstream MCP — handy
for upstreams that authenticate via custom header rather than a Bearer
token (``auth``).
"""
kind: Literal["http"] = "http"
url: str
auth: str | None = None
headers: dict[str, str] | None = None
class PythonToolMcp(_BaseMcp):
@@ -62,19 +78,26 @@ class McpServer:
command: Iterable[str],
env: dict[str, str] | None = None,
cwd: Path | str | None = None,
lenient: bool = False,
) -> StdioMcp:
return StdioMcp(
name=name,
command=tuple(command),
env=env,
cwd=Path(cwd) if isinstance(cwd, str) else cwd,
lenient=lenient,
)
@classmethod
def http(
cls, *, name: str, url: str, auth: str | None = None
cls,
*,
name: str,
url: str,
auth: str | None = None,
headers: dict[str, str] | None = None,
) -> HttpMcp:
return HttpMcp(name=name, url=url, auth=auth)
return HttpMcp(name=name, url=url, auth=auth, headers=headers)
@classmethod
def python_tool(