feat: implement claude code backend

This commit is contained in:
h
2026-05-19 23:11:07 +02:00
parent 757065f21c
commit 99a30f256d
8 changed files with 797 additions and 7 deletions
+55 -3
View File
@@ -4,20 +4,65 @@
# ClaudeAgent, RaycastAgent, McpServer, ExposedMcp, Gateway already
# bound — so importing them is optional. We import explicitly here so
# IDEs and type-checkers see real symbols instead of free variables.
import tempfile
from datetime import date
from pathlib import Path
from beaver_gateway.agents.base import ExposedMcp
from beaver_gateway.agents.claude import ClaudeAgent
from beaver_gateway.agents.raycast import RaycastAgent, RemoteTool, UserPreferences
from beaver_gateway.core.registry import Gateway
from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend
from beaver_gateway.mcp.types import McpServer
def current_time() -> str:
"""Return the current local time as an ISO-8601 string.
Trivial demo tool for the Phase 2.1 internal MCP aggregator —
confirms a ``python_tool`` namespace is reachable on
``http://127.0.0.1:<INTERNAL_MCP_PORT>/mcp/time``.
"""
from datetime import datetime
return datetime.now().astimezone().isoformat()
gateway = Gateway(
agents=[
# Phase 2.2 — ClaudeCodeBackendAdapter routes this agent's
# ``/v1/messages`` calls through ``claude-code-api``. The
# ``time`` MCP gets exposed as ``mcp__time__current_time`` to
# the subscription claude session via
# ``BackendOptions.mcp_servers`` pointing at the internal
# aggregator on ``127.0.0.1:INTERNAL_MCP_PORT/mcp/time/``.
#
# Fresh empty tempdir (not a hardcoded ``/tmp``) for two
# reasons: claude-code-api derives the JSONL project-key from
# ``cwd``, but claude itself writes the JSONL using the cwd's
# realpath — on macOS ``/tmp`` and ``/var/folders/...`` are
# both ``/private/...`` symlinks, so unresolved cwds make
# ``JsonlWatcher`` time out waiting on the wrong path. The
# explicit ``.resolve()`` collapses the symlink before claude
# ever sees the dir, and ``mkdtemp`` guarantees the directory
# is empty so claude does not pick up leftover files.
ClaudeAgent(
name="stub",
model="claude-sonnet-4-6",
system_prompt="You are a stub agent used to validate the Phase 0 skeleton.",
cwd="/tmp",
# ``system_prompt`` is appended to claude's built-in agent
# prompt (via ``--append-system-prompt``) — so it adds the
# agent's identity on top of claude-code's baseline tool
# knowledge, rather than replacing it. Same shape as the
# RaycastAgent's ``system_prompt → additional_system_instructions``
# mapping. For full ``BackendOptions`` knobs (timeouts,
# extra_args, history mode, etc.) import ``ClaudeCodeOptions``
# and pass ``options=ClaudeCodeOptions(...)``.
system_prompt=(
"You are a stub agent used to validate the Phase 0 skeleton.\n"
"If asked the current time, call the `current_time`"
" MCP tool instead of guessing."
),
cwd=Path(tempfile.mkdtemp(prefix="beaver-stub-cwd-")).resolve(),
expose_mcps=(ExposedMcp(name="time"),),
),
# Phase 1.2 — a RaycastAgent the AnthropicMessagesFrontend will
# route via RaycastBackend. Phase 1.5 added the per-agent knobs
@@ -45,7 +90,14 @@ gateway = Gateway(
),
),
],
mcps=[],
mcps=[
# Phase 2.1 — bundle of plain Python callables exposed as one
# FastMCP namespace. The internal aggregator mounts it under
# ``/mcp/time`` on ``127.0.0.1:INTERNAL_MCP_PORT``; Phase 2.2's
# ClaudeCode adapter will forward that URL into
# ``BackendOptions.mcp_servers``.
McpServer.python_tool(name="time", tools=[current_time]),
],
frontends=[
# Phase 1.4 — expose the agents as `model=<name>` on an
# Anthropic-compatible Messages endpoint. Auth comes from
+159 -1
View File
@@ -3,14 +3,169 @@
Notice the absence of a ``streaming`` field — claude-code does not emit
token-level deltas, and that fact is encoded in the type, not in a
runtime branch.
``BaseAgent.system_prompt`` maps onto the claude CLI's
``--system-prompt`` — i.e. it really *is* the agent's system prompt,
not "added on top of claude-code's giant built-in". The additive slot
``--append-system-prompt`` is exposed via
:attr:`ClaudeCodeOptions.append_system_prompt` for the rare case
when the user wants claude-code's planning conventions / dynamic
sections *and* a delta on top. Difference, measured empirically:
* tools (names, JSON schemas, embedded guidance like Bash's "prefer
Read over cat/head/tail") survive both flags — they ride the
Anthropic API ``tools=[]`` field, not the system prompt text;
* ``--system-prompt`` drops ~8.6k tokens of claude-code's *textual*
baseline — agent persona, multi-step-work conventions ("use
TaskCreate proactively", etc.), and the dynamic per-machine sections
(cwd, env info, git status, memory paths — see the
``--exclude-dynamic-system-prompt-sections`` flag note in
``claude --help``: "Only applies with the default system prompt
(ignored with --system-prompt)").
We pick override as the default because it preserves the principle of
least surprise: the ``system_prompt=...`` you wrote on the agent is
what claude actually receives. If you need claude-code's full
planning/dynamic-context behaviour on top of your prompt, opt in via
``options=ClaudeCodeOptions(append_system_prompt=...)`` and leave
``system_prompt`` for your agent's identity (or vice versa — set
``system_prompt=""``-ish and put everything in ``append_*``).
Per-agent passthrough of ``claude_code_api.BackendOptions`` lives in
:class:`ClaudeCodeOptions`, attached to :class:`ClaudeAgent` as
``options``. Every tunable knob ``BackendOptions`` supports — except
the ones derived from the agent's own primary surface
(``cwd`` / ``model`` / ``system_prompt`` / ``available_native_tools`` →
``allowed_tools`` / ``expose_mcps`` → ``mcp_servers``) — is exposed
there. Defaults match upstream except where correctness demands
otherwise (``wait_for_turn_duration=True``,
``dangerously_skip_permissions=True``).
"""
from __future__ import annotations
from pathlib import Path # noqa: TC003 — runtime use by pydantic
from collections.abc import Mapping # noqa: TC003 — pydantic runtime
from pathlib import Path # noqa: TC003 — pydantic runtime
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field
from beaver_gateway.agents.base import BaseAgent
HistoryInjectionMode = Literal["native_jsonl", "concat_message"]
"""Mirrors ``claude_code_api.HistoryInjectionMode`` to avoid an import
cycle on the user-facing path (configs may be loaded without
``claude-code-api`` installed, e.g. ``--extra prod`` minus claude)."""
class ClaudeCodeOptions(BaseModel):
"""Per-agent passthrough for ``claude_code_api.BackendOptions``.
Mirrors every field of ``BackendOptions`` except those that come
from the agent's primary surface
(``cwd`` / ``model`` / ``system_prompt`` /
``available_native_tools`` / ``expose_mcps``). Defaults match
upstream, with two exceptions for correctness:
* ``wait_for_turn_duration=True`` — without it, extended-thinking
turns drop the text response because ``TurnManager`` returns on
the first terminal assistant record (the thinking snapshot) and
never reads the second one (the actual text). Phase 2.2 PROGRESS
describes the incident in detail.
* ``dangerously_skip_permissions=True`` — Beaver Gateway is a
single-user trusted-config product; the user already wrote the
``config.py`` that spawns claude. Permission prompts on a
headless backend mean stuck turns.
Any field on :class:`claude_code_api.BackendOptions` we forward
here lives on this model with the same name and type, so a future
``BackendOptions`` field addition is a one-liner here plus a
one-liner in :mod:`backends.claude_code`.
"""
model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)
append_system_prompt: str | None = None
"""Maps to claude CLI's ``--append-system-prompt``. Opting in
re-attaches claude-code's full built-in prompt (persona, planning
conventions like "use TaskCreate proactively", and the dynamic
per-machine sections — cwd, env info, git status, memory paths)
plus this string on top. By default we ship only
:attr:`BaseAgent.system_prompt` via ``--system-prompt`` (~8.6k
tokens lighter); use this when the agent should behave like a
real claude-code coding session and your text is a delta on top
of those built-ins. Tool schemas survive either way — they ride
the Anthropic ``tools=[]`` channel, not the prompt text."""
disallowed_tools: tuple[str, ...] = ()
"""Tool names that claude must refuse to call. Combines with the
agent's ``available_native_tools`` allowlist — disallow wins."""
permission_mode: str = "bypassPermissions"
"""``claude --permission-mode`` value. Only meaningful when
``dangerously_skip_permissions=False``; otherwise the CLI bypasses
the prompt path entirely."""
dangerously_skip_permissions: bool = True
"""Pass ``--dangerously-skip-permissions`` to claude. ``True`` by
default — see the class docstring for why."""
effort: str | None = None
"""``claude --effort`` value (typically ``low`` / ``medium`` /
``high``). Tunes reasoning effort budget for newer models."""
add_dir: tuple[str, ...] = ()
"""Extra directories claude is allowed to read/edit beyond
``cwd``. Maps to repeated ``--add-dir`` flags."""
settings: str | None = None
"""Path to a ``--settings`` JSON file claude should load (hooks,
MCP servers, etc. that don't belong in our internal aggregator)."""
extra_args: tuple[str, ...] = ()
"""Raw extra argv to append to the claude command. Escape hatch
for flags we haven't surfaced as first-class options."""
extra_env: Mapping[str, str] = Field(default_factory=dict)
"""Additional environment variables for the spawned claude
process. Layered on top of the gateway's own env after
``preserve_provider_env`` is applied."""
preserve_provider_env: bool = False
"""When ``False`` (default), the spawn env strips
``ANTHROPIC_API_KEY`` / ``ANTHROPIC_AUTH_TOKEN`` /
``ANTHROPIC_BASE_URL`` so claude uses subscription auth instead of
leaking through whatever the gateway process inherited."""
history_injection_mode: HistoryInjectionMode = "native_jsonl"
"""How prior turns are seeded into a fresh session when no live
session matches: ``native_jsonl`` writes a hand-crafted transcript
and ``--resume``s; ``concat_message`` instead folds history into
the first user prompt. ``native_jsonl`` is more faithful."""
wait_for_turn_duration: bool = True
"""Keep reading JSONL until the ``turn_duration`` heartbeat
arrives, instead of returning on the first terminal assistant
record. ``True`` by default — see class docstring."""
include_meta_user: bool = False
"""Surface claude's ``isMeta=True`` user records (local-command
caveats) as ``UserMessage`` events. Off by default — they're not
part of the real conversation."""
startup_delay: float = 1.0
"""Seconds to wait after spawning the PTY before the first
write — claude's TUI takes a beat to settle."""
file_wait_timeout: float = 30.0
"""How long to wait for the session JSONL to appear after spawn.
Failure here usually means a CLI auth / config problem."""
turn_duration_timeout: float = 5.0
"""How long to wait for the ``turn_duration`` heartbeat once a
terminal assistant has been seen. Bound on extra latency when
``wait_for_turn_duration=True``."""
class ClaudeAgent(BaseAgent):
"""Agent backed by ``claude-code-api``.
@@ -25,3 +180,6 @@ class ClaudeAgent(BaseAgent):
cwd: Path
available_native_tools: tuple[str, ...] = ()
options: ClaudeCodeOptions = Field(default_factory=ClaudeCodeOptions)
"""Per-agent passthrough for the underlying claude-code-api
``BackendOptions``. See :class:`ClaudeCodeOptions`."""
+327
View File
@@ -0,0 +1,327 @@
"""Claude Code backend adapter.
One :class:`ClaudeCodeBackendAdapter` per :class:`ClaudeAgent`. The
underlying :class:`claude_code_api.ClaudeCodeBackend` bakes ``cwd`` /
``model`` / ``system_prompt`` / MCP wiring into a single
:class:`~claude_code_api.BackendOptions` at construction time, so a
single backend instance is conceptually bound to one agent (different
agents would mean different cwds / system prompts / exposed MCPs and
thus different live-session pools).
Per :meth:`complete` we:
* hand the full Anthropic-style ``messages`` list to
``ClaudeCodeBackend.complete`` — it does its own fingerprint-based
session lookup, so we never need to track sessions ourselves;
* re-emit each ``AssistantMessage`` as ``content_block_start`` +
one delta + ``content_block_stop`` per content block, with
monotonically increasing indices spanning the entire turn (one
``message_start`` … ``message_stop`` envelope per ``complete`` call);
* close the envelope on the synthesized ``ResultMessage``.
The per-request ``system`` parameter is intentionally **ignored** —
``BackendOptions.system_prompt`` is fixed at session-spawn time, and the
agent's ``system_prompt`` is the canonical identity of the agent.
"""
from __future__ import annotations
import json
import uuid
from typing import TYPE_CHECKING, Any, Self
from claude_code_api import (
AssistantMessage,
BackendOptions,
ClaudeCodeBackend,
ResultMessage,
TextBlock,
ThinkingBlock,
ToolUseBlock,
)
from beaver_gateway.agents.claude import ClaudeAgent
from beaver_gateway.core.events import (
StopReason,
build_content_block_stop,
build_input_json_delta,
build_message_delta,
build_message_start,
build_message_stop,
build_signature_delta,
build_text_block_start,
build_text_delta,
build_thinking_block_start,
build_thinking_delta,
build_tool_use_block_start,
)
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable, Mapping
from anthropic.types import MessageParam
from beaver_gateway.agents.base import BaseAgent
from beaver_gateway.core.events import MessageStreamEvent
__all__ = ["ClaudeCodeBackendAdapter"]
_CLAUDE_TO_ANTHROPIC_STOP: dict[str, StopReason] = {
"end_turn": "end_turn",
"tool_use": "tool_use",
"max_tokens": "max_tokens",
"stop_sequence": "stop_sequence",
"refusal": "refusal",
}
def _map_stop_reason(raw: str | None) -> StopReason:
"""Map claude-code's stop reason into Anthropic's vocabulary.
Unknown / missing values collapse to ``end_turn`` so the client sees
a clean finish rather than a wire-format error.
"""
if raw is None:
return "end_turn"
return _CLAUDE_TO_ANTHROPIC_STOP.get(raw, "end_turn")
def _build_mcp_servers(
agent: ClaudeAgent, mcp_internal_urls: Mapping[str, str]
) -> dict[str, dict[str, Any]] | None:
"""Render ``agent.expose_mcps`` into ``BackendOptions.mcp_servers``.
Each exposed MCP is a streamable-HTTP pointer at the gateway's
internal aggregator (built by :mod:`beaver_gateway.mcp.internal_app`).
``None`` keeps claude-code from materializing an ``--mcp-config``
file when the agent exposes nothing.
"""
if not agent.expose_mcps:
return None
servers: dict[str, dict[str, Any]] = {}
for em in agent.expose_mcps:
url = mcp_internal_urls.get(em.name)
if url is None:
msg = (
f"agent {agent.name!r} exposes MCP {em.name!r} "
"but no internal URL is registered for it"
)
raise ValueError(msg)
servers[em.name] = {"type": "http", "url": url}
return servers
def _build_backend_options(
agent: ClaudeAgent, mcp_internal_urls: Mapping[str, str]
) -> BackendOptions:
"""Compose the per-agent :class:`BackendOptions`.
Agent-primary fields:
* ``cwd`` / ``model`` come from the agent directly;
* ``system_prompt`` carries :attr:`BaseAgent.system_prompt`
verbatim — i.e. wire-level ``--system-prompt`` (~8.6k tokens
lighter than ``--append-system-prompt`` because claude-code's
persona/planning conventions and dynamic sections drop out;
tool schemas survive via the API ``tools=[]`` channel);
* ``append_system_prompt`` carries
:attr:`ClaudeCodeOptions.append_system_prompt`, normally
``None``. Setting it re-attaches claude-code's built-in prompt
*and* this delta — opt-in for "claude as a real coding session";
* ``allowed_tools`` follows the PLAN: when the user lists native
tools we restrict to those *plus* a per-MCP wildcard so MCP tools
stay reachable; when no native list is declared we leave
``allowed_tools`` empty (= all tools allowed by claude-code's
default);
* ``mcp_servers`` comes from :func:`_build_mcp_servers`.
Every other tunable knob is passed through from
:attr:`ClaudeAgent.options`. Our default overrides
(``wait_for_turn_duration=True``,
``dangerously_skip_permissions=True``) live on
:class:`ClaudeCodeOptions`, not here, so a user who builds
``ClaudeCodeOptions(...)`` explicitly inherits the same defaults
instead of getting whatever claude-code-api ships.
"""
allowed_tools: tuple[str, ...] = ()
if agent.available_native_tools:
mcp_wildcards = tuple(f"mcp__{em.name}" for em in agent.expose_mcps)
allowed_tools = tuple(agent.available_native_tools) + mcp_wildcards
opt = agent.options
return BackendOptions(
cwd=agent.cwd,
model=agent.model or None,
system_prompt=agent.system_prompt,
append_system_prompt=opt.append_system_prompt,
allowed_tools=allowed_tools,
mcp_servers=_build_mcp_servers(agent, mcp_internal_urls),
disallowed_tools=opt.disallowed_tools,
permission_mode=opt.permission_mode,
dangerously_skip_permissions=opt.dangerously_skip_permissions,
effort=opt.effort,
add_dir=opt.add_dir,
settings=opt.settings,
extra_args=opt.extra_args,
extra_env=opt.extra_env,
preserve_provider_env=opt.preserve_provider_env,
history_injection_mode=opt.history_injection_mode,
wait_for_turn_duration=opt.wait_for_turn_duration,
include_meta_user=opt.include_meta_user,
startup_delay=opt.startup_delay,
file_wait_timeout=opt.file_wait_timeout,
turn_duration_timeout=opt.turn_duration_timeout,
)
class ClaudeCodeBackendAdapter:
"""One ``claude-code-api`` backend bound to a single :class:`ClaudeAgent`.
Owns the underlying :class:`ClaudeCodeBackend`'s lifecycle through
the async-context-manager protocol so :mod:`beaver_gateway.cli` can
park it in its ``AsyncExitStack``.
"""
def __init__(
self,
*,
agent: ClaudeAgent,
mcp_internal_urls: Mapping[str, str],
) -> None:
self._agent = agent
self._backend = ClaudeCodeBackend(
_build_backend_options(agent, mcp_internal_urls)
)
@property
def agent(self) -> ClaudeAgent:
return self._agent
@property
def live_session_count(self) -> int:
return self._backend.live_session_count
async def __aenter__(self) -> Self:
await self._backend.__aenter__()
return self
async def __aexit__(
self, exc_type: object, exc: object, tb: object
) -> None:
await self._backend.__aexit__(exc_type, exc, tb)
async def aclose(self) -> None:
await self._backend.aclose()
async def complete(
self,
*,
agent: BaseAgent,
messages: Iterable[MessageParam],
system: str | None = None, # noqa: ARG002 — see module docstring
**options: Any, # noqa: ARG002 — no per-request knobs for claude-code yet
) -> AsyncIterator[MessageStreamEvent]:
if not isinstance(agent, ClaudeAgent):
msg = (
"ClaudeCodeBackendAdapter requires ClaudeAgent, "
f"got {type(agent).__name__}"
)
raise TypeError(msg)
if agent.name != self._agent.name:
# Adapter is per-agent; routing a different agent through it
# would mean a different cwd / system_prompt / MCP set than
# the live-session pool was spawned with.
msg = (
f"ClaudeCodeBackendAdapter bound to {self._agent.name!r} "
f"got request for {agent.name!r}"
)
raise ValueError(msg)
message_id = f"msg_{uuid.uuid4().hex}"
yield build_message_start(message_id=message_id, model=agent.model)
next_index = 0
stop_reason: str | None = None
usage: Mapping[str, Any] | None = None
async for event in self._backend.complete(list(messages)):
if isinstance(event, AssistantMessage):
for block in event.content:
for ev in _emit_block(block, next_index):
yield ev
next_index += 1
elif isinstance(event, ResultMessage):
stop_reason = event.stop_reason
usage = event.usage
# ResultMessage is always last (TurnManager synthesizes
# it as the terminal event), so we break after emitting
# the envelope close.
break
# UserMessage (tool_result records) and SystemMessage
# (turn_duration heartbeats) carry no content for the
# /v1/messages caller — skip silently.
yield build_message_delta(
stop_reason=_map_stop_reason(stop_reason),
usage=_normalize_usage(usage),
)
yield build_message_stop()
def _emit_block(
block: TextBlock | ThinkingBlock | ToolUseBlock | Any, index: int
) -> Iterable[MessageStreamEvent]:
"""Render one ``claude-code`` content block as Anthropic stream events.
``ToolResultBlock`` would arrive only on user-role records — we
don't emit it here because :meth:`complete` skips ``UserMessage``.
"""
if isinstance(block, TextBlock):
return (
build_text_block_start(index),
build_text_delta(index, block.text),
build_content_block_stop(index),
)
if isinstance(block, ThinkingBlock):
return (
build_thinking_block_start(index),
build_thinking_delta(index, block.thinking),
build_signature_delta(index, block.signature),
build_content_block_stop(index),
)
if isinstance(block, ToolUseBlock):
partial = json.dumps(
block.input, separators=(",", ":"), ensure_ascii=False
)
return (
build_tool_use_block_start(index, tool_use_id=block.id, name=block.name),
build_input_json_delta(index, partial),
build_content_block_stop(index),
)
return ()
def _normalize_usage(usage: Mapping[str, Any] | None) -> dict[str, int] | None:
"""Coerce claude-code's ``usage`` dict to Anthropic ``MessageDeltaUsage`` shape.
claude-code copies whatever the JSONL ``usage`` record carried —
fields can be missing, strings, or ints. We pass through only the
fields ``MessageDeltaUsage`` knows about and discard the rest so an
odd ``cache_creation`` object structure doesn't fail pydantic
validation downstream.
"""
if not usage:
return None
out: dict[str, int] = {}
for key in (
"input_tokens",
"output_tokens",
"cache_creation_input_tokens",
"cache_read_input_tokens",
):
value = usage.get(key)
if isinstance(value, int):
out[key] = value
return out or None
+86 -2
View File
@@ -6,6 +6,12 @@ each frontend with a ``GatewayRuntime``, and run all
``frontend.serve()`` coroutines concurrently. Without any frontends we
still print the Phase 0 DoD line and exit cleanly so the bare skeleton
keeps working.
Phase 2.1 — when the user declares any ``McpServer``, we additionally
build the internal MCP aggregator app and run it on
``127.0.0.1:INTERNAL_MCP_PORT`` as another task inside the same
TaskGroup. URLs are surfaced through ``GatewayRuntime.mcp_internal_urls``
so Phase 2.2's ClaudeCode adapter can find them.
"""
from __future__ import annotations
@@ -15,20 +21,27 @@ import logging
from contextlib import AsyncExitStack
from typing import TYPE_CHECKING
import uvicorn
import uvloop
from raycast_api import Client as RaycastClient
from raycast_api.config import Config as RaycastConfig
from beaver_gateway import config_loader
from beaver_gateway.agents.claude import ClaudeAgent
from beaver_gateway.agents.raycast import RaycastAgent
from beaver_gateway.backends.claude_code import ClaudeCodeBackendAdapter
from beaver_gateway.backends.raycast import RaycastBackend
from beaver_gateway.core.auth import TokenStore
from beaver_gateway.core.registry import AgentRegistry, McpRegistry
from beaver_gateway.frontends.base import GatewayRuntime
from beaver_gateway.mcp.internal_app import build_internal_app
from beaver_gateway.settings import Settings
if TYPE_CHECKING:
from starlette.applications import Starlette
from beaver_gateway.backends.base import Backend
from beaver_gateway.mcp.types import McpServerT
_log = logging.getLogger("beaver_gateway.cli")
@@ -51,8 +64,18 @@ async def _async_main() -> None:
token_store = TokenStore.from_env(settings.bootstrap_tokens)
async with AsyncExitStack() as stack:
# Internal MCP URLs must exist before we construct any
# ClaudeCodeBackendAdapter — adapters bake the URLs into their
# ``BackendOptions.mcp_servers`` at construction time.
internal_app, internal_urls = _build_internal_mcp(
gateway.mcps, settings=settings
)
backends: dict[str, Backend] = await _build_backends(
settings=settings, agents=agents, stack=stack
settings=settings,
agents=agents,
stack=stack,
mcp_internal_urls=internal_urls,
)
runtime = GatewayRuntime(
@@ -60,6 +83,7 @@ async def _async_main() -> None:
mcps=mcps,
backends=backends,
token_store=token_store,
mcp_internal_urls=internal_urls,
)
for fe in gateway.frontends:
@@ -80,18 +104,66 @@ async def _async_main() -> None:
)
if not gateway.frontends:
# No external listeners → nothing to serve. The internal
# MCP app has no consumer on its own, so we skip running
# it in this path and exit cleanly (Phase 0 DoD).
return
async with asyncio.TaskGroup() as tg:
if internal_app is not None:
tg.create_task(
_serve_internal_mcp(internal_app, settings=settings)
)
for fe in gateway.frontends:
tg.create_task(fe.serve())
def _build_internal_mcp(
mcps: list[McpServerT], *, settings: Settings
) -> tuple[Starlette | None, dict[str, str]]:
"""Build the aggregator app + URL map, or return ``(None, {})``.
The URL map is always handed out (frontends may still introspect
``runtime.mcp_internal_urls`` even if nothing is configured); the
app is ``None`` when there are no MCPs to mount, so the caller
skips the uvicorn task entirely.
"""
if not mcps:
return None, {}
app, urls = build_internal_app(
mcps, host="127.0.0.1", port=settings.internal_mcp_port
)
return app, urls
async def _serve_internal_mcp(app: Starlette, *, settings: Settings) -> None:
"""Run the internal MCP aggregator on loopback.
Bound to ``127.0.0.1`` (never EXPOSE'd) — only the in-process
ClaudeCode subprocess reaches it. Logged at ``warning`` level so
we don't drown the gateway's own logs in per-request noise.
"""
config = uvicorn.Config(
app,
host="127.0.0.1",
port=settings.internal_mcp_port,
log_level="warning",
loop="uvloop",
)
server = uvicorn.Server(config)
_log.info(
"internal MCP aggregator on http://127.0.0.1:%d/mcp/<name>",
settings.internal_mcp_port,
)
await server.serve()
async def _build_backends(
*,
settings: Settings,
agents: AgentRegistry,
stack: AsyncExitStack,
mcp_internal_urls: dict[str, str],
) -> dict[str, Backend]:
"""Construct one backend per agent name.
@@ -99,6 +171,12 @@ async def _build_backends(
(bearer + device-id are process-wide), so we open it lazily — only
when at least one ``RaycastAgent`` is present — and close it via
the caller's exit stack.
Each :class:`ClaudeAgent` gets its own
:class:`ClaudeCodeBackendAdapter`: ``BackendOptions`` pins
``cwd`` / ``model`` / ``system_prompt`` / ``mcp_servers`` for the
lifetime of the underlying ``ClaudeCodeBackend``, so different
agents can't share one.
"""
backends: dict[str, Backend] = {}
@@ -110,7 +188,13 @@ async def _build_backends(
for a in raycast_agents:
backends[a.name] = raycast_backend
# Phase 2: ClaudeAgent → ClaudeCodeBackendAdapter goes here.
for a in agents:
if isinstance(a, ClaudeAgent):
adapter = ClaudeCodeBackendAdapter(
agent=a, mcp_internal_urls=mcp_internal_urls
)
await stack.enter_async_context(adapter)
backends[a.name] = adapter
return backends
+9 -1
View File
@@ -11,10 +11,12 @@ and hands it to each frontend's ``configure``.
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Mapping
from beaver_gateway.backends.base import Backend
from beaver_gateway.core.auth import TokenStore
from beaver_gateway.core.registry import AgentRegistry, McpRegistry
@@ -28,12 +30,18 @@ class GatewayRuntime:
instance can serve many ``RaycastAgent`` instances, but the lookup
site (an inbound request with ``model=<agent.name>``) already has
the name in hand, so the indirection lives one step earlier.
``mcp_internal_urls`` is filled in Phase 2.1: one loopback URL per
declared ``McpServer`` so ``ClaudeCodeBackendAdapter`` (Phase 2.2)
can pass them to ``BackendOptions.mcp_servers`` without re-running
discovery.
"""
agents: AgentRegistry
mcps: McpRegistry
backends: dict[str, Backend]
token_store: TokenStore
mcp_internal_urls: Mapping[str, str] = field(default_factory=dict)
class Frontend(ABC):
+48
View File
@@ -0,0 +1,48 @@
"""Proxy ``FastMCP`` servers for user-declared external MCPs (``stdio``/``http``).
Both flavours end up as a ``FastMCPProxy`` instance, built via
``fastmcp.server.create_proxy``. The proxy lazily opens the underlying
client transport when the first MCP request arrives, so we don't pay
for connections that nothing routes to. From the aggregator app's
point of view a proxy is indistinguishable from a regular ``FastMCP``
namespace — same ``http_app`` surface, same mount semantics.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from fastmcp import Client
from fastmcp.client.transports import StdioTransport, StreamableHttpTransport
from fastmcp.server import create_proxy
if TYPE_CHECKING:
from fastmcp import FastMCP
from beaver_gateway.mcp.types import HttpMcp, StdioMcp
def build_stdio_proxy(spec: StdioMcp) -> FastMCP:
"""Wrap a stdio subprocess MCP into a mountable ``FastMCPProxy``.
``spec.command`` is a non-empty tuple; the first element is the
executable and the rest are CLI args. ``StdioTransport`` keeps the
subprocess alive across calls.
"""
if not spec.command:
msg = f"stdio MCP {spec.name!r} has empty command"
raise ValueError(msg)
command, *args = spec.command
transport = StdioTransport(
command=command,
args=list(args),
env=spec.env,
cwd=str(spec.cwd) if spec.cwd is not None else None,
)
return create_proxy(Client(transport, name=spec.name))
def build_http_proxy(spec: HttpMcp) -> FastMCP:
"""Wrap a remote streamable-HTTP MCP into a mountable ``FastMCPProxy``."""
transport = StreamableHttpTransport(url=spec.url, auth=spec.auth)
return create_proxy(Client(transport, name=spec.name))
+87
View File
@@ -0,0 +1,87 @@
"""Internal MCP aggregator — one ASGI app, N FastMCP namespaces.
Each ``McpServer`` declared in the user's config becomes its own
``FastMCP`` instance (regular for ``python_tool``, ``FastMCPProxy`` for
``stdio``/``http``) and is mounted under ``/mcp/<name>`` on a single
Starlette app. This app runs on ``127.0.0.1:INTERNAL_MCP_PORT`` (not
EXPOSE'd in Docker) so the ClaudeCode subprocess can reach each
namespace via loopback as a distinct MCP server URL — preserving
per-domain framing while costing only one process worth of RAM
(PRD §6).
The aggregator returns both the app and a ``{name: url}`` map; Phase
2.2's ``ClaudeCodeBackendAdapter`` plugs the map directly into
``BackendOptions.mcp_servers``.
"""
from __future__ import annotations
from contextlib import AsyncExitStack, asynccontextmanager
from typing import TYPE_CHECKING
from starlette.applications import Starlette
from starlette.routing import Mount
from beaver_gateway.mcp.client_pool import build_http_proxy, build_stdio_proxy
from beaver_gateway.mcp.types import HttpMcp, PythonToolMcp, StdioMcp
from beaver_gateway.mcp.wrap import build_python_tool_server
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable
from fastmcp import FastMCP
from beaver_gateway.mcp.types import McpServerT
def build_internal_app(
mcps: Iterable[McpServerT], *, host: str, port: int
) -> tuple[Starlette, dict[str, str]]:
"""Build the aggregator ``Starlette`` app and the ``{name: url}`` map.
``host``/``port`` only flavour the URL strings handed back — actually
listening on them is the caller's job (``cli.main`` runs a uvicorn
server in a TaskGroup). We accept the address here so callers don't
have to format the URLs themselves and risk drifting from the
``/mcp/<name>`` convention.
"""
servers: dict[str, FastMCP] = {spec.name: _build_server(spec) for spec in mcps}
child_apps = [s.http_app(transport="http", path="/") for s in servers.values()]
routes = [
Mount(f"/mcp/{name}", app=app)
for name, app in zip(servers, child_apps, strict=True)
]
@asynccontextmanager
async def lifespan(_parent: Starlette) -> AsyncIterator[None]:
# Each FastMCP http_app stores its session manager init in its
# own lifespan. Without entering them the streamable-HTTP layer
# 500s on every request. AsyncExitStack composes them so all
# children come up together and unwind in reverse order on
# shutdown.
async with AsyncExitStack() as stack:
for child in child_apps:
await stack.enter_async_context(child.router.lifespan_context(child))
yield
app = Starlette(routes=routes, lifespan=lifespan)
# Trailing slash on the published URL skips Starlette's
# 307 redirect from ``/mcp/<name>`` to ``/mcp/<name>/`` that
# ``Mount`` produces when a child route lives at ``/``.
urls = {name: f"http://{host}:{port}/mcp/{name}/" for name in servers}
return app, urls
def _build_server(spec: McpServerT) -> FastMCP:
"""Dispatch on the discriminated union to the matching builder."""
if isinstance(spec, PythonToolMcp):
return build_python_tool_server(spec)
if isinstance(spec, StdioMcp):
return build_stdio_proxy(spec)
if isinstance(spec, HttpMcp):
return build_http_proxy(spec)
# `McpServerT` is a closed union; this is unreachable but keeps
# type-narrowing honest if a new variant lands without updates here.
msg = f"unsupported McpServer variant: {type(spec).__name__}"
raise TypeError(msg)
+26
View File
@@ -0,0 +1,26 @@
"""Wrap a ``PythonToolMcp`` spec into a mountable ``FastMCP`` instance.
Each ``python_tool`` McpServer in the user's config becomes a separate
``FastMCP`` namespace — one domain, one server URL — so models keep the
per-domain framing they were trained on (see PRD §6).
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from fastmcp import FastMCP
if TYPE_CHECKING:
from beaver_gateway.mcp.types import PythonToolMcp
def build_python_tool_server(spec: PythonToolMcp) -> FastMCP:
"""Construct a ``FastMCP`` namespace from a ``PythonToolMcp``.
The callables in ``spec.tools`` are registered as MCP tools using
FastMCP's introspection — names, docstrings, and type hints turn
into the tool schema. No decorator wiring is needed: the
``tools=`` constructor argument accepts plain callables.
"""
return FastMCP(name=spec.name, tools=list(spec.tools))