diff --git a/examples/config.py b/examples/config.py index e6a1442..91b03cc 100644 --- a/examples/config.py +++ b/examples/config.py @@ -13,6 +13,7 @@ from beaver_gateway.agents.claude import ClaudeAgent from beaver_gateway.agents.raycast import RaycastAgent, RemoteTool, UserPreferences from beaver_gateway.core.registry import Gateway from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend +from beaver_gateway.frontends.mcp_server import McpServerFrontend from beaver_gateway.mcp.types import McpServer @@ -94,14 +95,41 @@ gateway = Gateway( # Phase 2.1 — bundle of plain Python callables exposed as one # FastMCP namespace. The internal aggregator mounts it under # ``/mcp/time`` on ``127.0.0.1:INTERNAL_MCP_PORT``; Phase 2.2's - # ClaudeCode adapter will forward that URL into - # ``BackendOptions.mcp_servers``. + # ClaudeCode adapter forwards that URL into + # ``BackendOptions.mcp_servers``. Phase 3's ``McpServerFrontend`` + # reverse-proxies the same internal URL out to external clients. McpServer.python_tool(name="time", tools=[current_time]), + # Phase 3 — illustrates the ``lenient`` flag. Real-world stdio MCPs + # sometimes print "Processing..." or other chatter to stdout before + # their actual JSON-RPC frames; the default mcp client forwards + # those parse failures downstream as warnings (visible in + # Cursor/Cline). With ``lenient=True`` we silently drop non-JSON + # lines, so downstream UIs see clean JSON-RPC only. The command + # below is just a placeholder — replace with whatever stdio MCP + # you actually want gateway to ingest (e.g. an obsidian-mcp). + # + # Commented out by default: example users won't have the binary + # installed and an unreachable command makes ``docker compose up`` + # surface a confusing "command not found" line at first request. + # Uncomment after pointing ``command`` at a real stdio MCP. + # + # McpServer.stdio( + # name="obsidian", + # command=["uvx", "mcp-obsidian"], + # env={"OBSIDIAN_API_KEY": "..."}, + # lenient=True, + # ), ], frontends=[ # Phase 1.4 — expose the agents as `model=` on an # Anthropic-compatible Messages endpoint. Auth comes from # `BOOTSTRAP_TOKENS` in the env (`name1:value1,name2:value2`). AnthropicMessagesFrontend(host="0.0.0.0", port=8000), + # Phase 3 — re-exposes every declared `McpServer` outside the + # gateway with bearer auth + audit log. Per-namespace endpoints + # at `/mcp//`; flat bundle at `/mcp/all/`. Discovery page + # (HTML, auth-gated) at `/` with copy-pastable Cursor / Claude + # Desktop snippets. Auth re-uses `BOOTSTRAP_TOKENS`. + McpServerFrontend(host="0.0.0.0", port=8001), ], ) diff --git a/src/beaver_gateway/cli.py b/src/beaver_gateway/cli.py index 868ccbf..ea4146c 100644 --- a/src/beaver_gateway/cli.py +++ b/src/beaver_gateway/cli.py @@ -12,6 +12,10 @@ build the internal MCP aggregator app and run it on ``127.0.0.1:INTERNAL_MCP_PORT`` as another task inside the same TaskGroup. URLs are surfaced through ``GatewayRuntime.mcp_internal_urls`` so Phase 2.2's ClaudeCode adapter can find them. + +Phase 3 — the same aggregator backs the external ``McpServerFrontend``; +``cli`` doesn't have to know that the frontend reverse-proxies into it, +it just keeps the aggregator running for anyone who needs it. """ from __future__ import annotations diff --git a/src/beaver_gateway/frontends/mcp_server.py b/src/beaver_gateway/frontends/mcp_server.py new file mode 100644 index 0000000..bd08488 --- /dev/null +++ b/src/beaver_gateway/frontends/mcp_server.py @@ -0,0 +1,520 @@ +"""External MCP frontend (Phase 3.1). + +A streamable-HTTP gateway in front of the internal MCP aggregator +(``beaver_gateway.mcp.internal_app``). The aggregator already hosts +every declared ``McpServer`` (``python_tool``, stdio proxy, HTTP proxy) +under ``/mcp/`` plus a flat ``/mcp/all`` bundle on +``127.0.0.1:INTERNAL_MCP_PORT``. This frontend re-exposes those URLs to +external clients with three additions: + +* **Bearer auth** — ``Authorization: Bearer ``, ``X-Api-Key``, + or ``?token=<…>`` query string. All three forms verify against the + same :class:`TokenStore` as ``AnthropicMessagesFrontend``. +* **Audit log** — one line per request (token name, namespace, + request method/path, response status). The DB-backed audit log lives + in Phase 4; for now we just emit a structured log line. +* **Discovery page** at ``GET /`` (auth-gated) — HTML rendered with a + tiny inline Jinja2 template listing every namespace plus copy-pastable + config snippets for Cursor / claude.ai / Claude Desktop. + +Why a reverse proxy and not a second mount? FastMCP's session managers +are tied to the lifespan they were created in; running the same +aggregator under two uvicorn servers double-initializes state. Building +two parallel aggregators would double upstream connections (two +subprocesses for every stdio MCP, two HTTP clients for every remote). +A loopback proxy keeps one source of truth — the internal aggregator — +and lets us layer policy on the outside. +""" + +from __future__ import annotations + +import asyncio +import logging +from contextlib import asynccontextmanager +from typing import TYPE_CHECKING, Any +from urllib.parse import urlencode + +import aiohttp +from starlette.applications import Starlette +from starlette.responses import HTMLResponse, JSONResponse, StreamingResponse +from starlette.routing import Route + +from beaver_gateway.frontends.base import Frontend +from beaver_gateway.mcp.internal_app import ALL_NAMESPACE + +if TYPE_CHECKING: + from collections.abc import AsyncIterator, Mapping + + from starlette.requests import Request + + from beaver_gateway.frontends.base import GatewayRuntime + + +_log = logging.getLogger("beaver_gateway.frontends.mcp_server") + + +# Hop-by-hop headers that must NOT be forwarded across an HTTP proxy +# (RFC 7230 §6.1). Bypassing this filter would break chunked transfer +# encoding when ``Content-Length`` arrives, or upstream-aware proxies +# would refuse the second hop's connection-pool reuse. +_HOP_BY_HOP_HEADERS = frozenset( + { + "connection", + "keep-alive", + "proxy-authenticate", + "proxy-authorization", + "te", + "trailers", + "transfer-encoding", + "upgrade", + "host", + "content-length", + } +) + +# Standard auth-bearing headers we *do not* forward to the internal app — +# the internal app is on loopback with no auth of its own, and forwarding +# the inbound bearer would only confuse it. Each method-specific MCP +# request from the upstream Cursor/etc. carries a fresh ``mcp-session-id`` +# that we *must* forward. +_AUTH_HEADERS = frozenset({"authorization", "x-api-key"}) + + +__all__ = ["McpServerFrontend"] + + +class McpServerFrontend(Frontend): + """Auth + audit + reverse-proxy in front of the internal MCP aggregator.""" + + def __init__(self, *, host: str = "0.0.0.0", port: int = 8001) -> None: # noqa: S104 + self.host = host + self.port = port + self._runtime: GatewayRuntime | None = None + self._app: Starlette | None = None + # Single shared aiohttp session, opened once when uvicorn starts + # the Starlette lifespan. Reused across every proxied request — + # MCP clients (esp. claude.ai) reconnect frequently, and a fresh + # ClientSession per call would cost a TCP handshake every time. + self._http: aiohttp.ClientSession | None = None + + def configure(self, runtime: GatewayRuntime) -> None: + self._runtime = runtime + self._app = self._build_app(runtime) + + async def serve(self) -> None: + import uvicorn + + if self._app is None: + msg = "configure() must be called before serve()" + raise RuntimeError(msg) + + config = uvicorn.Config( + self._app, host=self.host, port=self.port, log_level="info" + ) + server = uvicorn.Server(config) + await server.serve() + + def _build_app(self, runtime: GatewayRuntime) -> Starlette: # noqa: ARG002 + # The Starlette lifespan owns the shared aiohttp session: opened + # at startup, closed at shutdown so we don't leak sockets when + # uvicorn is restarted under the same process. Starlette wants a + # plain context manager, not an async-generator function — we + # decorate to match. + @asynccontextmanager + async def lifespan(_app: Starlette) -> AsyncIterator[None]: + self._http = aiohttp.ClientSession( + # Long sock_read because MCP tool calls can take a while + # (a claude tool over HTTP can easily stretch beyond 30s + # on a real tool). + timeout=aiohttp.ClientTimeout(total=None, sock_read=600), + ) + try: + yield + finally: + if self._http is not None: + await self._http.close() + self._http = None + + routes = [ + Route("/", self._discovery, methods=["GET"]), + Route("/healthz", self._healthz, methods=["GET"]), + # Two routes per namespace so both the trailing-slash and + # sub-path forms work (``/mcp/time`` AND ``/mcp/time/foo``). + # Starlette doesn't fold them into one route automatically. + Route( + "/mcp/{namespace}", + self._proxy_endpoint, + methods=["GET", "POST", "DELETE", "OPTIONS"], + ), + Route( + "/mcp/{namespace}/{path:path}", + self._proxy_endpoint, + methods=["GET", "POST", "DELETE", "OPTIONS"], + ), + ] + return Starlette(routes=routes, lifespan=lifespan) + + async def _healthz(self, _request: Request) -> JSONResponse: + return JSONResponse({"status": "ok"}) + + async def _discovery(self, request: Request) -> HTMLResponse | JSONResponse: + runtime = self._require_runtime() + token_name = _verify_request(request, runtime) + if token_name is None: + return _unauthorized() + # Use the request's own scheme+host so the snippets work behind + # reverse proxies / tunnels. Falls back to the configured + # host:port if the client didn't send Host (curl --raw). + base = _external_base_url(request, self.host, self.port) + html = _render_discovery_page( + base_url=base, + namespaces=list(runtime.mcps), + actor=token_name, + ) + return HTMLResponse(html) + + async def _proxy_endpoint( + self, request: Request + ) -> StreamingResponse | JSONResponse: + runtime = self._require_runtime() + token_name = _verify_request(request, runtime) + if token_name is None: + return _unauthorized() + + namespace = request.path_params["namespace"] + subpath = request.path_params.get("path", "") + upstream_url = self._upstream_url(namespace, subpath) + if upstream_url is None: + _log.info( + "mcp: actor=%s namespace=%s 404 (unknown namespace)", + token_name, + namespace, + ) + return JSONResponse( + {"error": "unknown namespace", "namespace": namespace}, + status_code=404, + ) + + if self._http is None: + # Lifespan hasn't run yet (shouldn't happen with uvicorn). + return JSONResponse( + {"error": "frontend not ready"}, status_code=503 + ) + + return await _reverse_proxy( + client=self._http, + request=request, + upstream_url=upstream_url, + namespace=namespace, + actor=token_name, + ) + + def _upstream_url(self, namespace: str, subpath: str) -> str | None: + runtime = self._require_runtime() + # ``all`` is built by the aggregator unconditionally when at least + # one MCP is configured; the URL map only contains per-domain + # entries (see ``build_internal_app``), so we synthesize ``all``'s + # loopback URL from any per-domain URL's authority. + if namespace == ALL_NAMESPACE: + sample = next(iter(runtime.mcp_internal_urls.values()), None) + if sample is None: + return None + base = sample.rsplit("/mcp/", 1)[0] + return _join_subpath(f"{base}/mcp/{ALL_NAMESPACE}/", subpath) + base = runtime.mcp_internal_urls.get(namespace) + if base is None: + return None + return _join_subpath(base, subpath) + + def _require_runtime(self) -> GatewayRuntime: + if self._runtime is None: + msg = "configure() must be called before serving requests" + raise RuntimeError(msg) + return self._runtime + + +def _verify_request(request: Request, runtime: GatewayRuntime) -> str | None: + """Accept ``Authorization: Bearer``, ``X-Api-Key``, or ``?token=``. + + The third form is the escape hatch for clients that can only put + secrets in the URL (claude.ai's MCP config historically did this). + All three roads end at the same :class:`TokenStore`. + """ + api_key = request.headers.get("x-api-key") + if api_key: + return runtime.token_store.verify(api_key) + auth_header = request.headers.get("authorization") + if auth_header: + return runtime.token_store.verify_bearer(auth_header) + qs_token = request.query_params.get("token") + if qs_token: + return runtime.token_store.verify(qs_token) + return None + + +def _unauthorized() -> JSONResponse: + return JSONResponse( + {"error": "invalid or missing bearer token"}, + status_code=401, + headers={"WWW-Authenticate": "Bearer"}, + ) + + +def _join_subpath(base_url: str, subpath: str) -> str: + """Concatenate the loopback URL with the proxied sub-path. + + ``base_url`` always ends in ``/`` (the aggregator publishes URLs + that way to avoid Starlette's 307 redirect dance); the sub-path is + appended verbatim, with the query string handled by the caller. + """ + if subpath: + return base_url + subpath.lstrip("/") + return base_url + + +def _external_base_url(request: Request, fallback_host: str, fallback_port: int) -> str: + host = request.headers.get("host") + if host: + scheme = request.headers.get("x-forwarded-proto", request.url.scheme) + return f"{scheme}://{host}" + return f"http://{fallback_host}:{fallback_port}" + + +async def _reverse_proxy( + *, + client: aiohttp.ClientSession, + request: Request, + upstream_url: str, + namespace: str, + actor: str, +) -> StreamingResponse | JSONResponse: + """Bidirectionally stream an MCP request between client ↔ internal aggregator. + + Streamable-HTTP MCP responses can be a long-running SSE stream + (tools that emit partial progress) or a one-shot JSON body; we + don't peek — just relay chunks as they arrive in either direction + until both sides close. + """ + qs = request.url.query + if qs: + # Drop ``?token=`` from the forwarded URL — internal app doesn't + # need it, and propagating creds further than necessary widens + # the leak surface (logs, metrics, traces all see query strings). + scrubbed = _scrub_query(qs, drop={"token"}) + if scrubbed: + upstream_url = f"{upstream_url}?{scrubbed}" + + forward_headers = _forward_headers(request) + + body_iter: AsyncIterator[bytes] | None = None + if request.method not in {"GET", "HEAD", "OPTIONS"}: + body_iter = _request_body_iter(request) + + try: + upstream_resp = await client.request( + request.method, + upstream_url, + headers=forward_headers, + data=body_iter, + allow_redirects=False, + ) + except aiohttp.ClientError as exc: + _log.warning( + "mcp: actor=%s namespace=%s upstream connect failed: %s", + actor, + namespace, + exc, + ) + return JSONResponse( + {"error": "upstream MCP unreachable", "detail": str(exc)}, + status_code=502, + ) + + _log.info( + "mcp: actor=%s namespace=%s %s %s -> %d", + actor, + namespace, + request.method, + request.url.path, + upstream_resp.status, + ) + + response_headers = _response_headers(upstream_resp.headers) + + async def relay() -> AsyncIterator[bytes]: + try: + async for chunk in upstream_resp.content.iter_any(): + yield chunk + except (aiohttp.ClientError, asyncio.CancelledError): + # Caller hung up or upstream dropped — just stop relaying. + return + finally: + upstream_resp.release() + + return StreamingResponse( + relay(), + status_code=upstream_resp.status, + headers=response_headers, + media_type=upstream_resp.headers.get("content-type"), + ) + + +async def _request_body_iter(request: Request) -> AsyncIterator[bytes]: + async for chunk in request.stream(): + if chunk: + yield chunk + + +def _forward_headers(request: Request) -> dict[str, str]: + out: dict[str, str] = {} + for key, value in request.headers.items(): + lowered = key.lower() + if lowered in _HOP_BY_HOP_HEADERS or lowered in _AUTH_HEADERS: + continue + out[key] = value + return out + + +def _response_headers(headers: Mapping[str, str]) -> dict[str, str]: + out: dict[str, str] = {} + for key, value in headers.items(): + if key.lower() in _HOP_BY_HOP_HEADERS: + continue + out[key] = value + return out + + +def _scrub_query(query: str, *, drop: frozenset[str] | set[str]) -> str: + """Remove sensitive keys from a URL-encoded query string.""" + kept: list[tuple[str, str]] = [] + for entry in query.split("&"): + if not entry: + continue + name, sep, value = entry.partition("=") + if name in drop: + continue + kept.append((name, value if sep else "")) + return urlencode(kept) + + +def _render_discovery_page( + *, base_url: str, namespaces: list[Any], actor: str +) -> str: + """Render the auth-gated namespace + config-snippet page. + + Inline HTML (no Jinja file) — keeps Phase 3 free of template-dir + plumbing that Phase 4's AdminFrontend will own. + """ + name_list = [getattr(ns, "name", str(ns)) for ns in namespaces] + rows = "\n".join( + f""" + {_escape(name)} + {_escape(f"{base_url}/mcp/{name}/")} + """ + for name in name_list + ) or """ No MCP servers configured.""" + cursor_snippet = _CURSOR_SNIPPET.format(base_url=base_url) + claude_desktop_snippet = _CLAUDE_DESKTOP_SNIPPET.format(base_url=base_url) + return _DISCOVERY_TEMPLATE.format( + actor=_escape(actor), + base_url=_escape(base_url), + rows=rows, + all_url=_escape(f"{base_url}/mcp/{ALL_NAMESPACE}/"), + cursor_snippet=_escape(cursor_snippet), + claude_desktop_snippet=_escape(claude_desktop_snippet), + ) + + +def _escape(value: str) -> str: + return ( + value.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + ) + + +_DISCOVERY_TEMPLATE = "\n".join( # noqa: FLY002 — readability beats one-string-blob + [ + "", + '', + "", + ' ', + " beaver-gateway · MCP discovery", + " ", + "", + "", + "

beaver-gateway

", + '

Signed in as {actor}.' + " Base URL: {base_url}

", + "

Namespaces

", + " ", + " ", + " ", + "{rows}", + " ", + "
NameURL
", + '

Bundle endpoint (flat namespace,' + " escape-hatch): {all_url}

", + "

Cursor / Cline

", + "

Add to your MCP config:

", + "
{cursor_snippet}
", + "

Claude Desktop

", + "

Add to " + "~/Library/Application Support/Claude/claude_desktop_config.json" + ":

", + "
{claude_desktop_snippet}
", + "", + "", + "", + ] +) + + +_CURSOR_SNIPPET = """{{ + "mcpServers": {{ + "beaver-time": {{ + "url": "{base_url}/mcp/time/", + "headers": {{ "Authorization": "Bearer " }} + }} + }} +}}""" + + +_CLAUDE_DESKTOP_SNIPPET = """{{ + "mcpServers": {{ + "beaver-time": {{ + "type": "http", + "url": "{base_url}/mcp/time/", + "headers": {{ "Authorization": "Bearer " }} + }} + }} +}}""" diff --git a/src/beaver_gateway/mcp/client_pool.py b/src/beaver_gateway/mcp/client_pool.py index 99174d8..02552a7 100644 --- a/src/beaver_gateway/mcp/client_pool.py +++ b/src/beaver_gateway/mcp/client_pool.py @@ -16,8 +16,11 @@ from fastmcp import Client from fastmcp.client.transports import StdioTransport, StreamableHttpTransport from fastmcp.server import create_proxy +from beaver_gateway.mcp.lenient import LenientStdioTransport + if TYPE_CHECKING: from fastmcp import FastMCP + from fastmcp.client.transports.base import ClientTransport from beaver_gateway.mcp.types import HttpMcp, StdioMcp @@ -27,22 +30,37 @@ def build_stdio_proxy(spec: StdioMcp) -> FastMCP: ``spec.command`` is a non-empty tuple; the first element is the executable and the rest are CLI args. ``StdioTransport`` keeps the - subprocess alive across calls. + subprocess alive across calls. When ``spec.lenient`` is set we + swap in :class:`LenientStdioTransport`, which silently drops + non-JSON-RPC stdout lines instead of forwarding them as exceptions + (lets us ingest MCPs that print ``Processing...`` chatter on stdout + without leaking warnings to downstream UIs). """ if not spec.command: msg = f"stdio MCP {spec.name!r} has empty command" raise ValueError(msg) command, *args = spec.command - transport = StdioTransport( - command=command, - args=list(args), - env=spec.env, - cwd=str(spec.cwd) if spec.cwd is not None else None, - ) + cwd = str(spec.cwd) if spec.cwd is not None else None + transport: ClientTransport + if spec.lenient: + transport = LenientStdioTransport( + command=command, args=list(args), env=spec.env, cwd=cwd + ) + else: + transport = StdioTransport( + command=command, args=list(args), env=spec.env, cwd=cwd + ) return create_proxy(Client(transport, name=spec.name)) def build_http_proxy(spec: HttpMcp) -> FastMCP: - """Wrap a remote streamable-HTTP MCP into a mountable ``FastMCPProxy``.""" - transport = StreamableHttpTransport(url=spec.url, auth=spec.auth) + """Wrap a remote streamable-HTTP MCP into a mountable ``FastMCPProxy``. + + ``spec.headers`` are folded into the upstream client; pass per-call + auth (e.g. ``X-Api-Key``) here rather than as ``auth`` when the + upstream rejects Bearer headers. + """ + transport = StreamableHttpTransport( + url=spec.url, auth=spec.auth, headers=spec.headers + ) return create_proxy(Client(transport, name=spec.name)) diff --git a/src/beaver_gateway/mcp/internal_app.py b/src/beaver_gateway/mcp/internal_app.py index e4414b3..294a7f4 100644 --- a/src/beaver_gateway/mcp/internal_app.py +++ b/src/beaver_gateway/mcp/internal_app.py @@ -1,4 +1,4 @@ -"""Internal MCP aggregator — one ASGI app, N FastMCP namespaces. +"""Internal MCP aggregator — one ASGI app, N FastMCP namespaces + ``all``. Each ``McpServer`` declared in the user's config becomes its own ``FastMCP`` instance (regular for ``python_tool``, ``FastMCPProxy`` for @@ -9,9 +9,18 @@ namespace via loopback as a distinct MCP server URL — preserving per-domain framing while costing only one process worth of RAM (PRD §6). +Phase 3 adds ``/mcp/all/``: a single FastMCP whose tools are the union +of every namespace's tools, prefixed by FastMCP's ``namespace_`` +convention (e.g. ``time_current_time``). It's the escape-hatch for +clients that can only configure one MCP server — discouraged for tool- +heavy setups (PRD §6 cites the ~95%→~71% tool-selection drop on flat +namespaces) but real and reachable. + The aggregator returns both the app and a ``{name: url}`` map; Phase 2.2's ``ClaudeCodeBackendAdapter`` plugs the map directly into -``BackendOptions.mcp_servers``. +``BackendOptions.mcp_servers``. ``/mcp/all/`` is NOT included in that +map — claude-code-agents always get per-domain framing; only the +external MCP frontend (Phase 3.1) reverse-proxies the flat endpoint. """ from __future__ import annotations @@ -19,6 +28,7 @@ from __future__ import annotations from contextlib import AsyncExitStack, asynccontextmanager from typing import TYPE_CHECKING +from fastmcp import FastMCP from starlette.applications import Starlette from starlette.routing import Mount @@ -29,30 +39,48 @@ from beaver_gateway.mcp.wrap import build_python_tool_server if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterable - from fastmcp import FastMCP - from beaver_gateway.mcp.types import McpServerT +ALL_NAMESPACE = "all" +"""URL segment for the flat-namespace aggregator (``/mcp/all/``).""" + + def build_internal_app( mcps: Iterable[McpServerT], *, host: str, port: int ) -> tuple[Starlette, dict[str, str]]: - """Build the aggregator ``Starlette`` app and the ``{name: url}`` map. + """Build the aggregator ``Starlette`` app and the per-namespace URL map. ``host``/``port`` only flavour the URL strings handed back — actually listening on them is the caller's job (``cli.main`` runs a uvicorn server in a TaskGroup). We accept the address here so callers don't have to format the URLs themselves and risk drifting from the ``/mcp/`` convention. + + Returns a map of ``{namespace: url}`` for the per-domain endpoints + only (claude-code's MCP routing expects per-domain framing). The + ``/mcp/all/`` bundle endpoint exists on the same app but is + intentionally omitted from the URL map — it's only meaningful to + external clients via the MCP frontend, not to claude-code. """ servers: dict[str, FastMCP] = {spec.name: _build_server(spec) for spec in mcps} - child_apps = [s.http_app(transport="http", path="/") for s in servers.values()] + child_apps = { + name: s.http_app(transport="http", path="/") + for name, s in servers.items() + } routes = [ - Mount(f"/mcp/{name}", app=app) - for name, app in zip(servers, child_apps, strict=True) + Mount(f"/mcp/{name}", app=app) for name, app in child_apps.items() ] + # /mcp/all — flat-namespace bundle. Skip when there's nothing to + # bundle so we don't pay for an empty session manager lifecycle. + all_app = None + if servers: + all_server = _build_all_server(servers) + all_app = all_server.http_app(transport="http", path="/") + routes.append(Mount(f"/mcp/{ALL_NAMESPACE}", app=all_app)) + @asynccontextmanager async def lifespan(_parent: Starlette) -> AsyncIterator[None]: # Each FastMCP http_app stores its session manager init in its @@ -61,8 +89,12 @@ def build_internal_app( # children come up together and unwind in reverse order on # shutdown. async with AsyncExitStack() as stack: - for child in child_apps: + for child in child_apps.values(): await stack.enter_async_context(child.router.lifespan_context(child)) + if all_app is not None: + await stack.enter_async_context( + all_app.router.lifespan_context(all_app) + ) yield app = Starlette(routes=routes, lifespan=lifespan) @@ -85,3 +117,22 @@ def _build_server(spec: McpServerT) -> FastMCP: # type-narrowing honest if a new variant lands without updates here. msg = f"unsupported McpServer variant: {type(spec).__name__}" raise TypeError(msg) + + +def _build_all_server(children: dict[str, FastMCP]) -> FastMCP: + """Compose a single FastMCP whose tools union every namespace's tools. + + FastMCP's ``mount(namespace=...)`` namespaces every tool as + ``_``, so the flat endpoint becomes + ``time_current_time``, ``calendar_event_create``, etc. + + Mounted children are accessed in-memory via ``FastMCPProvider`` — + requests don't bounce through the child's own ``http_app``, so this + aggregator has its own independent streamable-HTTP session manager + and lifespan, and the per-namespace ``/mcp//`` mounts keep + working unchanged. + """ + parent = FastMCP(name="beaver-gateway-all") + for name, child in children.items(): + parent.mount(child, namespace=name) + return parent diff --git a/src/beaver_gateway/mcp/lenient.py b/src/beaver_gateway/mcp/lenient.py new file mode 100644 index 0000000..e5b1f10 --- /dev/null +++ b/src/beaver_gateway/mcp/lenient.py @@ -0,0 +1,234 @@ +"""Tolerant transports for upstream MCPs that don't strictly speak JSON-RPC. + +Some real-world MCP servers print non-JSON chatter to stdout before / between +their actual JSON-RPC frames (``Processing...``, banners, dependency-load +messages, etc.). The reference ``mcp.client.stdio.stdio_client`` parses every +stdout line as JSON-RPC and ships any parse failure downstream as an +exception, which the MCP ``ClientSession`` then logs as a warning that bleeds +into client UIs (Cursor, Cline) when they connect through us. + +``LenientStdioTransport`` re-implements the stdio-client wiring with one +behavioural change: lines that don't parse as JSON-RPC are *silently +dropped* (one ``DEBUG`` log entry, no exception forwarded). Downstream +consumers see only valid messages. We keep the rest of the contract identical +to the reference client, including the spec-mandated graceful shutdown +sequence (close stdin → wait → SIGTERM → SIGKILL). + +The transport plugs into ``fastmcp.server.create_proxy`` just like +``StdioTransport`` does, so the rest of the aggregator doesn't need to +know which flavour it got. +""" + +from __future__ import annotations + +import contextlib +import logging +import sys +from typing import TYPE_CHECKING + +import anyio +import anyio.lowlevel +from anyio.streams.text import TextReceiveStream +from fastmcp.client.transports.base import ClientTransport +from mcp import ClientSession, types +from mcp.client.stdio import ( + PROCESS_TERMINATION_TIMEOUT, + StdioServerParameters, + _create_platform_compatible_process, + _get_executable_command, + _terminate_process_tree, +) +from mcp.shared.message import SessionMessage + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + from typing import TextIO, Unpack + + from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream + from fastmcp.client.transports.base import SessionKwargs + + +_log = logging.getLogger("beaver_gateway.mcp.lenient") + + +class LenientStdioTransport(ClientTransport): + """Stdio transport that tolerates non-JSON-RPC stdout noise. + + Behaves like :class:`fastmcp.client.transports.StdioTransport` from the + consumer's perspective: one ``ClientSession`` per ``connect_session`` + block, subprocess scoped to that block. We don't replicate the upstream + ``keep_alive`` flag because the only caller (``create_proxy``) opens + the session lazily on first request and keeps it open for the lifetime + of the proxy. + """ + + def __init__( + self, + command: str, + args: list[str], + env: dict[str, str] | None = None, + cwd: str | None = None, + log_file: TextIO | None = None, + ) -> None: + self.command = command + self.args = args + self.env = env + self.cwd = cwd + # TextIO only — pre-open Path callers themselves. The upstream + # ``StdioTransport`` opens Path for you, but we keep this thin so + # the type contract stays narrow and easy to validate. + self.log_file = log_file + + @contextlib.asynccontextmanager + async def connect_session( + self, **session_kwargs: Unpack[SessionKwargs] + ) -> AsyncIterator[ClientSession]: + errlog: TextIO = self.log_file if self.log_file is not None else sys.stderr + async with _lenient_stdio_client( + StdioServerParameters( + command=self.command, + args=self.args, + env=self.env, + cwd=self.cwd, + ), + errlog=errlog, + ) as (read_stream, write_stream), ClientSession( + read_stream, write_stream, **session_kwargs + ) as session: + yield session + + def __repr__(self) -> str: + return ( + f"" + ) + + +@contextlib.asynccontextmanager +async def _lenient_stdio_client( # noqa: PLR0915 — mirrors mcp.client.stdio.stdio_client + server: StdioServerParameters, errlog: TextIO = sys.stderr +) -> AsyncIterator[ + tuple[ + MemoryObjectReceiveStream[SessionMessage | Exception], + MemoryObjectSendStream[SessionMessage], + ] +]: + """Drop-in for ``mcp.client.stdio.stdio_client`` with a tolerant reader. + + All differences from upstream live in ``stdout_reader``: lines that fail + ``JSONRPCMessage.model_validate_json`` are logged at DEBUG and skipped, + never forwarded as exceptions. This is what makes warning-noisy MCPs + quiet from the consumer's point of view. + """ + read_stream_writer, read_stream = anyio.create_memory_object_stream[ + SessionMessage | Exception + ](0) + write_stream, write_stream_reader = anyio.create_memory_object_stream[ + SessionMessage + ](0) + + try: + command = _get_executable_command(server.command) + env_default = _default_inherited_env() + process = await _create_platform_compatible_process( + command=command, + args=server.args, + env=( + {**env_default, **server.env} + if server.env is not None + else env_default + ), + errlog=errlog, + cwd=server.cwd, + ) + except OSError: + await read_stream.aclose() + await write_stream.aclose() + await read_stream_writer.aclose() + await write_stream_reader.aclose() + raise + + async def stdout_reader() -> None: + assert process.stdout, "Opened process is missing stdout" # noqa: S101 + try: + async with read_stream_writer: + buffer = "" + async for chunk in TextReceiveStream( + process.stdout, + encoding=server.encoding, + errors=server.encoding_error_handler, + ): + lines = (buffer + chunk).split("\n") + buffer = lines.pop() + for line in lines: + stripped = line.strip() + if not stripped: + continue + try: + message = types.JSONRPCMessage.model_validate_json( + stripped + ) + except Exception: # noqa: BLE001 — by design, see module doc + _log.debug( + "lenient stdio: dropped non-JSON line: %r", + stripped[:200], + ) + continue + await read_stream_writer.send(SessionMessage(message)) + except anyio.ClosedResourceError: + await anyio.lowlevel.checkpoint() + + async def stdin_writer() -> None: + assert process.stdin, "Opened process is missing stdin" # noqa: S101 + try: + async with write_stream_reader: + async for session_message in write_stream_reader: + payload = session_message.message.model_dump_json( + by_alias=True, exclude_none=True + ) + await process.stdin.send( + (payload + "\n").encode( + encoding=server.encoding, + errors=server.encoding_error_handler, + ) + ) + except anyio.ClosedResourceError: + await anyio.lowlevel.checkpoint() + + async with ( + anyio.create_task_group() as tg, + process, + ): + tg.start_soon(stdout_reader) + tg.start_soon(stdin_writer) + try: + yield read_stream, write_stream + finally: + # MCP spec stdio shutdown: close stdin → wait → SIGTERM → SIGKILL. + if process.stdin: + with contextlib.suppress(Exception): + await process.stdin.aclose() + try: + with anyio.fail_after(PROCESS_TERMINATION_TIMEOUT): + await process.wait() + except TimeoutError: + await _terminate_process_tree(process) + except ProcessLookupError: + pass + await read_stream.aclose() + await write_stream.aclose() + await read_stream_writer.aclose() + await write_stream_reader.aclose() + + +def _default_inherited_env() -> dict[str, str]: + """Same env shortlist as ``mcp.client.stdio.get_default_environment``. + + Re-exported so we can compose with the user's ``env`` overrides without + forcing a private import in this module's body. + """ + from mcp.client.stdio import get_default_environment + + return get_default_environment() + + +__all__ = ["LenientStdioTransport"] diff --git a/src/beaver_gateway/mcp/types.py b/src/beaver_gateway/mcp/types.py index 898de21..bd7a7f8 100644 --- a/src/beaver_gateway/mcp/types.py +++ b/src/beaver_gateway/mcp/types.py @@ -23,20 +23,36 @@ class _BaseMcp(BaseModel): class StdioMcp(_BaseMcp): - """Subprocess MCP server we spawn and connect to over stdio.""" + """Subprocess MCP server we spawn and connect to over stdio. + + ``lenient`` switches the upstream stdio reader to a tolerant variant + that silently drops non-JSON-RPC lines from the subprocess's stdout + (``Processing...``-style chatter, banners, dependency-load messages). + The reference ``mcp.client.stdio.stdio_client`` forwards those parse + failures as exceptions, which bleed into Cursor/Cline UIs as warnings + when they connect through us. Default ``False`` keeps the strict + contract — flip it on per-namespace for known-noisy upstreams. + """ kind: Literal["stdio"] = "stdio" command: tuple[str, ...] env: dict[str, str] | None = None cwd: Path | None = None + lenient: bool = False class HttpMcp(_BaseMcp): - """Remote MCP server reached over streamable HTTP.""" + """Remote MCP server reached over streamable HTTP. + + ``headers`` are forwarded on every request to the upstream MCP — handy + for upstreams that authenticate via custom header rather than a Bearer + token (``auth``). + """ kind: Literal["http"] = "http" url: str auth: str | None = None + headers: dict[str, str] | None = None class PythonToolMcp(_BaseMcp): @@ -62,19 +78,26 @@ class McpServer: command: Iterable[str], env: dict[str, str] | None = None, cwd: Path | str | None = None, + lenient: bool = False, ) -> StdioMcp: return StdioMcp( name=name, command=tuple(command), env=env, cwd=Path(cwd) if isinstance(cwd, str) else cwd, + lenient=lenient, ) @classmethod def http( - cls, *, name: str, url: str, auth: str | None = None + cls, + *, + name: str, + url: str, + auth: str | None = None, + headers: dict[str, str] | None = None, ) -> HttpMcp: - return HttpMcp(name=name, url=url, auth=auth) + return HttpMcp(name=name, url=url, auth=auth, headers=headers) @classmethod def python_tool(