From 0128191ac30ea3558605d3e39fd8384883fc78df Mon Sep 17 00:00:00 2001 From: h Date: Wed, 20 May 2026 13:00:08 +0200 Subject: [PATCH] feat: add admin panel --- .gitignore | 1 + examples/config.py | 39 +- examples/docker-compose.yml | 5 + pyproject.toml | 2 + pyrightconfig.json | 10 + src/beaver_gateway/cli.py | 27 +- src/beaver_gateway/core/audit.py | 93 +++ src/beaver_gateway/core/auth.py | 341 ++++++++- .../frontends/admin/__init__.py | 11 + .../frontends/admin/frontend.py | 707 ++++++++++++++++++ .../frontends/admin/templates/_layout.html | 205 +++++ .../admin/templates/_token_created.html | 12 + .../admin/templates/_token_error.html | 1 + .../frontends/admin/templates/_token_row.html | 25 + .../frontends/admin/templates/audit.html | 37 + .../frontends/admin/templates/chat.html | 354 +++++++++ .../frontends/admin/templates/dashboard.html | 430 +++++++++++ .../frontends/admin/templates/login.html | 83 ++ .../frontends/admin/templates/tokens.html | 64 ++ src/beaver_gateway/frontends/anthropic.py | 68 +- src/beaver_gateway/frontends/base.py | 22 +- src/beaver_gateway/frontends/mcp_server.py | 193 +++-- src/beaver_gateway/storage/__init__.py | 40 + src/beaver_gateway/storage/db.py | 237 ++++++ src/beaver_gateway/storage/models.py | 80 ++ uv.lock | 13 + 26 files changed, 2985 insertions(+), 115 deletions(-) create mode 100644 pyrightconfig.json create mode 100644 src/beaver_gateway/core/audit.py create mode 100644 src/beaver_gateway/frontends/admin/__init__.py create mode 100644 src/beaver_gateway/frontends/admin/frontend.py create mode 100644 src/beaver_gateway/frontends/admin/templates/_layout.html create mode 100644 src/beaver_gateway/frontends/admin/templates/_token_created.html create mode 100644 src/beaver_gateway/frontends/admin/templates/_token_error.html create mode 100644 src/beaver_gateway/frontends/admin/templates/_token_row.html create mode 100644 src/beaver_gateway/frontends/admin/templates/audit.html create mode 100644 src/beaver_gateway/frontends/admin/templates/chat.html create mode 100644 src/beaver_gateway/frontends/admin/templates/dashboard.html create mode 100644 src/beaver_gateway/frontends/admin/templates/login.html create mode 100644 src/beaver_gateway/frontends/admin/templates/tokens.html create mode 100644 src/beaver_gateway/storage/__init__.py create mode 100644 src/beaver_gateway/storage/db.py create mode 100644 src/beaver_gateway/storage/models.py diff --git a/.gitignore b/.gitignore index 41f981d..05ce672 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ docs/ # Local env .env +db.db diff --git a/examples/config.py b/examples/config.py index 91b03cc..b70d14b 100644 --- a/examples/config.py +++ b/examples/config.py @@ -12,6 +12,7 @@ from beaver_gateway.agents.base import ExposedMcp from beaver_gateway.agents.claude import ClaudeAgent from beaver_gateway.agents.raycast import RaycastAgent, RemoteTool, UserPreferences from beaver_gateway.core.registry import Gateway +from beaver_gateway.frontends.admin import AdminFrontend from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend from beaver_gateway.frontends.mcp_server import McpServerFrontend from beaver_gateway.mcp.types import McpServer @@ -28,6 +29,7 @@ def current_time() -> str: return datetime.now().astimezone().isoformat() + gateway = Gateway( agents=[ # Phase 2.2 — ClaudeCodeBackendAdapter routes this agent's @@ -98,7 +100,7 @@ gateway = Gateway( # ClaudeCode adapter forwards that URL into # ``BackendOptions.mcp_servers``. Phase 3's ``McpServerFrontend`` # reverse-proxies the same internal URL out to external clients. - McpServer.python_tool(name="time", tools=[current_time]), + McpServer.python_tool(name="time", tools=[current_time]) # Phase 3 — illustrates the ``lenient`` flag. Real-world stdio MCPs # sometimes print "Processing..." or other chatter to stdout before # their actual JSON-RPC frames; the default mcp client forwards @@ -124,12 +126,41 @@ gateway = Gateway( # Phase 1.4 — expose the agents as `model=` on an # Anthropic-compatible Messages endpoint. Auth comes from # `BOOTSTRAP_TOKENS` in the env (`name1:value1,name2:value2`). + # + # Behind a reverse proxy (Caddy / nginx / Cloudflare) pass + # `public_base_url=` so the admin dashboard advertises the + # outside URL instead of `host:port`. Caddy strips its own + # prefix and the frontend's internal paths (`/v1/messages`, + # `/v1/models`) get appended: + # Caddy: handle_path /ai/* { reverse_proxy localhost:8000 } + # Config: AnthropicMessagesFrontend( + # port=8000, + # public_base_url="https://domain.com/ai") + # Result: https://domain.com/ai/v1/messages AnthropicMessagesFrontend(host="0.0.0.0", port=8000), # Phase 3 — re-exposes every declared `McpServer` outside the - # gateway with bearer auth + audit log. Per-namespace endpoints - # at `/mcp//`; flat bundle at `/mcp/all/`. Discovery page - # (HTML, auth-gated) at `/` with copy-pastable Cursor / Claude + # gateway with bearer auth + audit log. Each namespace lives + # at `//` on this port (the port itself disambiguates + # MCP traffic — no extra `/mcp` segment in the route); a flat + # bundle is published at `/all/`. Discovery page (HTML, + # auth-gated) at `/` with copy-pastable Cursor / Claude # Desktop snippets. Auth re-uses `BOOTSTRAP_TOKENS`. + # + # Same `public_base_url=` knob as above. Caddy strips its + # prefix; the frontend's `//` segment gets appended: + # Caddy: handle_path /mcp/* { reverse_proxy localhost:8001 } + # Config: McpServerFrontend( + # port=8001, + # public_base_url="https://domain.com/mcp") + # Result: https://domain.com/mcp// (and /mcp/all/) McpServerFrontend(host="0.0.0.0", port=8001), + # Phase 4.3 — browser admin UI. Creds come from + # `ADMIN_USER`/`ADMIN_PASS`; the session cookie is signed with + # `SESSION_SECRET`. Use it to mint tokens (Argon2-hashed in + # the DB), revoke them, and watch the audit log. Scope is + # enforced on the bearer frontends: tokens minted with scope + # `messages` only work on `/v1/messages`; `mcp` only on + # `/mcp/`; `*` works everywhere. + AdminFrontend(host="0.0.0.0", port=8002), ], ) diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml index 681ad8d..2eebeef 100644 --- a/examples/docker-compose.yml +++ b/examples/docker-compose.yml @@ -24,6 +24,11 @@ services: # config.py declares one, so set these (or remove the agent) # before exposing port 8000. ports: + # /v1/messages frontend - "8000:8000" + # MCP server frontend + - "8001:8001" + # Admin UI (Phase 4.3) — change ADMIN_USER/ADMIN_PASS/SESSION_SECRET + - "8002:8002" volumes: - ./config.py:/config/config.py:ro diff --git a/pyproject.toml b/pyproject.toml index 1b0b23e..f31f91b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,11 +8,13 @@ authors = [ requires-python = ">=3.13" dependencies = [ "aiohttp>=3.13.5", + "aiosqlite>=0.22.1", "anthropic>=0.103.0", "anyio>=4.13.0", "argon2-cffi>=25.1.0", "fastapi>=0.136.1", "fastmcp>=3.3.1", + "greenlet>=3.5.0", "itsdangerous>=2.2.0", "jinja2>=3.1.6", "psycopg[binary]>=3.3.4", diff --git a/pyrightconfig.json b/pyrightconfig.json new file mode 100644 index 0000000..42567c2 --- /dev/null +++ b/pyrightconfig.json @@ -0,0 +1,10 @@ +{ + "venvPath": ".", + "venv": ".venv", + "pythonVersion": "3.13", + "include": ["src"], + "exclude": [".venv", "**/__pycache__", ".pytest_cache"], + "extraPaths": ["."], + "reportMissingImports": "error", + "reportMissingTypeStubs": "none" +} diff --git a/src/beaver_gateway/cli.py b/src/beaver_gateway/cli.py index ea4146c..d1cf2e7 100644 --- a/src/beaver_gateway/cli.py +++ b/src/beaver_gateway/cli.py @@ -40,6 +40,7 @@ from beaver_gateway.core.registry import AgentRegistry, McpRegistry from beaver_gateway.frontends.base import GatewayRuntime from beaver_gateway.mcp.internal_app import build_internal_app from beaver_gateway.settings import Settings +from beaver_gateway.storage import Database if TYPE_CHECKING: from starlette.applications import Starlette @@ -65,9 +66,24 @@ async def _async_main() -> None: agents = AgentRegistry(gateway.agents) mcps = McpRegistry(gateway.mcps) - token_store = TokenStore.from_env(settings.bootstrap_tokens) + + # Phase 4.1 — open the async DB and run create_all once. Engine + # pool is process-wide; ``dispose()`` after the TaskGroup unwinds. + db = Database(settings.database_url) + await db.create_all() + + # Phase 4.2 — TokenStore now reads from the DB (in-memory cache + # primed at start, TTL-refreshed, last_used_at flushed by a + # background task). BOOTSTRAP_TOKENS layers on top so first-run / + # examples still work without DB writes. + token_store = TokenStore( + db, bootstrap=TokenStore.parse_bootstrap(settings.bootstrap_tokens) + ) async with AsyncExitStack() as stack: + stack.push_async_callback(db.dispose) + await token_store.start() + stack.push_async_callback(token_store.stop) # Internal MCP URLs must exist before we construct any # ClaudeCodeBackendAdapter — adapters bake the URLs into their # ``BackendOptions.mcp_servers`` at construction time. @@ -87,7 +103,12 @@ async def _async_main() -> None: mcps=mcps, backends=backends, token_store=token_store, + db=db, mcp_internal_urls=internal_urls, + admin_user=settings.admin_user, + admin_pass=settings.admin_pass, + session_secret=settings.session_secret, + frontends=tuple(gateway.frontends), ) for fe in gateway.frontends: @@ -115,9 +136,7 @@ async def _async_main() -> None: async with asyncio.TaskGroup() as tg: if internal_app is not None: - tg.create_task( - _serve_internal_mcp(internal_app, settings=settings) - ) + tg.create_task(_serve_internal_mcp(internal_app, settings=settings)) for fe in gateway.frontends: tg.create_task(fe.serve()) diff --git a/src/beaver_gateway/core/audit.py b/src/beaver_gateway/core/audit.py new file mode 100644 index 0000000..23bf26e --- /dev/null +++ b/src/beaver_gateway/core/audit.py @@ -0,0 +1,93 @@ +"""Single entry point for writing :class:`AuditLog` rows. + +Every frontend ends up needing the same three-line pattern — open a DB +session, append a row, swallow failures so the user-visible request +still succeeds. Phase 4.3 inlined that pattern in the admin frontend +under a private ``_audit()`` helper; Phase 4.4 lifts it here so the +Messages and MCP frontends can call the same function and so the +swallow-and-log policy lives in one place. + +The contract: + +* ``log(runtime, actor=..., kind=...)`` is fire-and-forget. It awaits + the DB write (so callers can ``await`` it before responding and get + ordering), but never raises — if the audit insert fails, the function + emits an ``exception`` log line and returns. +* ``actor`` is a free-form string. By convention: ``"token:"`` + for bearer-authenticated traffic, ``"admin:"`` for admin-UI + actions, ``"anon"`` for failed-auth paths we still want to record. +* ``kind`` is a short tag — see :data:`KNOWN_KINDS` for the set the + current frontends emit; new tags don't need a code change here, the + column is free-form. +* ``**detail`` is JSON-serialised by :func:`append_audit`. Keep it + small: paths, methods, status codes — not request bodies. Anything + passed here lands in ``AuditLog.detail_json`` verbatim. + +Why a thin wrapper rather than ``append_audit`` directly: callers want +"write if you can, otherwise carry on", and pulling the try/except into +every frontend was already starting to drift (admin had it, bearer +frontends would have copy-pasted). One module, one policy. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +from beaver_gateway.storage import append_audit + +if TYPE_CHECKING: + from beaver_gateway.frontends.base import GatewayRuntime + + +_log = logging.getLogger("beaver_gateway.audit") + + +# Tags currently emitted by the gateway. The set is informational — +# ``AuditLog.kind`` is free-form so new code can introduce new tags +# without touching this list — but listing them here gives the admin UI +# and any downstream log consumers one canonical reference. +KNOWN_KINDS: frozenset[str] = frozenset( + { + "messages", # POST /v1/messages accepted + "mcp_call", # /mcp//... proxied + "login_ok", + "login_failed", + "logout", + "token_create", + "token_revoke", + } +) + + +async def log( + runtime: GatewayRuntime, + *, + actor: str, + kind: str, + agent_name: str | None = None, + **detail: Any, +) -> None: + """Best-effort audit insert. Never raises. + + Opens its own short-lived :class:`AsyncSession` so callers don't + have to thread one through. If the DB hiccups (table missing, + disk full, connection drop), we log and move on — the audit trail + is observability, not a hard precondition for serving the request. + """ + try: + async with runtime.db.session() as session: + await append_audit( + session, + actor=actor, + kind=kind, + agent_name=agent_name, + detail=detail or None, + ) + except Exception: # noqa: BLE001 + _log.exception( + "audit write failed: actor=%s kind=%s agent=%s", actor, kind, agent_name + ) + + +__all__ = ["KNOWN_KINDS", "log"] diff --git a/src/beaver_gateway/core/auth.py b/src/beaver_gateway/core/auth.py index b2c8f5c..f3f9908 100644 --- a/src/beaver_gateway/core/auth.py +++ b/src/beaver_gateway/core/auth.py @@ -1,63 +1,195 @@ -"""Bearer-token verification (Phase 1.3 — in-memory only). +"""Bearer-token verification (Phase 4.2 — DB-backed with in-memory cache). -Phase 4 will replace this with a DB-backed store (PRD §8 ``tokens`` -table, Argon2 hashes, scopes, ``last_used_at`` batching). Until then, -frontends authenticate callers against an in-memory ``{value: name}`` -dict seeded from the ``BOOTSTRAP_TOKENS`` env var. +The store is fed by two sources: -Format:: +1. **DB** (``Token`` table from Phase 4.1) — the primary source. Rows + carry Argon2id hashes; the admin UI (Phase 4.3) will be the only + writer at steady state. +2. **`BOOTSTRAP_TOKENS`** env — a name→plaintext map kept around for + first-run, disaster-recovery, and ``examples/`` smoke tests. These + entries live alongside DB rows in the cache and are never persisted. - BOOTSTRAP_TOKENS=cursor:s3cret,laptop:hunter2 +Hot path is in-memory: at :meth:`start` we pull every non-revoked DB +row and stash it in a list; subsequent :meth:`verify` calls re-load +when the cache is older than ``ttl_seconds``. ``last_used_at`` updates +are coalesced into a small dict and flushed by a background task every +``flush_interval`` seconds — one transaction per flush rather than one +per request. -The *name* side is for audit lines — :py:meth:`TokenStore.verify` -returns it on hit so callers can attribute the request without -exposing the raw secret. ``None`` means "no such token"; callers -turn that into 401. +We can't index DB rows by a derived plaintext key because Argon2 salts +are random — so verify does a linear scan over candidates, calling +``argon2.PasswordHasher.verify`` on each. N is small by design (single +operator, ~10 tokens at most); the cost is irrelevant. The scan runs +through ``asyncio.to_thread`` to keep the event loop free of the ~50ms +KDF block. -This module deliberately knows nothing about HTTP frameworks — it -takes a raw token (or the verbatim ``Authorization`` header value) -and returns a name-or-None. Frontends own the response shape. +The module knows nothing about HTTP frameworks. It takes a raw token +(or a verbatim ``Authorization`` header value) and returns a +:class:`TokenIdentity` (name + scope + db-id), or ``None`` for a miss. +Frontends own the 401 response shape. """ from __future__ import annotations +import asyncio +import contextlib +import hmac +import logging +import time +from dataclasses import dataclass +from datetime import UTC, datetime from typing import TYPE_CHECKING +from argon2 import PasswordHasher +from argon2.exceptions import InvalidHashError, VerifyMismatchError + +from beaver_gateway.storage import list_active_tokens, touch_token + if TYPE_CHECKING: from collections.abc import Mapping + from beaver_gateway.storage import Database + + +_log = logging.getLogger("beaver_gateway.auth") + +_BOOTSTRAP_SCOPE = "*" + class TokenStoreError(ValueError): - """Malformed ``BOOTSTRAP_TOKENS`` value.""" + """Malformed ``BOOTSTRAP_TOKENS`` value or duplicate token.""" + + +VALID_SCOPES: frozenset[str] = frozenset({"*", "messages", "mcp", "admin"}) +"""The scopes a ``Token.scope`` may hold (Phase 4.3 admin UI enforces). + +* ``*`` — wildcard, may use any frontend +* ``messages`` — Anthropic Messages frontend only +* ``mcp`` — MCP server frontend only +* ``admin`` — reserved for programmatic admin access; the AdminFrontend + itself authenticates via session cookies, not bearer tokens, so this + scope is unused today and kept for forward compatibility. +""" + + +@dataclass(frozen=True, slots=True) +class TokenIdentity: + """What :meth:`TokenStore.verify` resolves to on success. + + ``token_id`` is the DB row id for persisted tokens, or ``None`` for + an env-bootstrap match (those have no DB row to touch). ``scope`` + gates which frontend the token may hit (see :data:`VALID_SCOPES`); + bootstrap tokens implicitly get ``"*"``. + """ + + name: str + scope: str + token_id: int | None + + def allows(self, required: str) -> bool: + """``True`` when this identity may access a route gated by ``required``. + + ``"*"`` is the wildcard; an exact match satisfies a single scope. + Unknown ``required`` values intentionally fall through to a + strict equality check — callers should pass one of + :data:`VALID_SCOPES`. + """ + return self.scope in ("*", required) + + +@dataclass(frozen=True, slots=True) +class _CachedToken: + """One non-revoked row, copied out of the DB into the hot-path cache.""" + + id: int + name: str + scope: str + hashed_value: str + + +# Default Argon2id parameters from ``argon2-cffi`` are fine for our scope. +# They target ~50ms on a modern CPU — enough to make a stolen-hash brute +# force expensive, cheap enough to verify a handful per request. +_HASHER = PasswordHasher() + + +def hash_token(plaintext: str) -> str: + """Return an Argon2id hash for ``plaintext`` (admin / seed-only path). + + Phase 4.3 will call this when the admin creates a token; Phase 4.2 + exposes it so smoke scripts can seed the DB without re-implementing + the same line. + """ + return _HASHER.hash(plaintext) class TokenStore: - """Constant-time-ish bearer verification over a static name→value map. + """DB-backed verifier with in-memory cache + TTL + batched touches. - The store is keyed by value internally (``verify`` lookup is O(1)) - but constructed from a name→value mapping because that's how the - user thinks about it — one human-readable label per caller. + Construct in ``cli.main`` after :class:`Database` is up, then + ``await store.start()`` to prime the cache and spin up the flusher + task. ``await store.stop()`` on shutdown drains the touch queue. + + Bootstrap entries (from ``BOOTSTRAP_TOKENS``) sit alongside DB rows + in the same lookup path; we check them first, in constant time, so + they remain usable even if the DB is unreachable. They never appear + in ``last_used_at`` flushes because they have no DB row. """ - __slots__ = ("_by_value",) + __slots__ = ( + "_bootstrap_by_value", + "_bootstrap_scopes", + "_cache", + "_db", + "_flush_interval", + "_flusher_task", + "_loaded_at", + "_lock", + "_touch_queue", + "_ttl", + ) - def __init__(self, tokens: Mapping[str, str]) -> None: + def __init__( + self, + db: Database | None = None, + *, + bootstrap: Mapping[str, str] | None = None, + ttl_seconds: float = 30.0, + flush_interval: float = 5.0, + ) -> None: + # Bootstrap is keyed by value internally so verify is O(1) over + # plaintext. Each value also keeps its name for audit lines. by_value: dict[str, str] = {} - for name, value in tokens.items(): + for name, value in (bootstrap or {}).items(): if not name or not value: - msg = f"empty name or value in token map (name={name!r})" + msg = f"empty name or value in bootstrap map (name={name!r})" raise TokenStoreError(msg) if value in by_value: msg = ( - f"duplicate token value for names " + f"duplicate bootstrap token value for names " f"{by_value[value]!r} and {name!r}" ) raise TokenStoreError(msg) by_value[value] = name - self._by_value = by_value + self._bootstrap_by_value: dict[str, str] = by_value + self._bootstrap_scopes: dict[str, str] = dict.fromkeys( + by_value.values(), _BOOTSTRAP_SCOPE + ) - @classmethod - def from_env(cls, raw: str) -> TokenStore: + self._db = db + self._ttl = ttl_seconds + self._flush_interval = flush_interval + + self._cache: list[_CachedToken] = [] + self._loaded_at: float = 0.0 + self._lock = asyncio.Lock() + self._touch_queue: dict[int, datetime] = {} + self._flusher_task: asyncio.Task[None] | None = None + + # ---- bootstrap parsing (kept for `cli` / tests) --------------------- + + @staticmethod + def parse_bootstrap(raw: str) -> dict[str, str]: """Parse ``name1:value1,name2:value2`` (the ``BOOTSTRAP_TOKENS`` form).""" tokens: dict[str, str] = {} for chunk in raw.split(","): @@ -73,15 +205,82 @@ class TokenStore: msg = f"duplicate token name: {name!r}" raise TokenStoreError(msg) tokens[name] = value - return cls(tokens) + return tokens - def verify(self, token: str | None) -> str | None: - """Return the token's name if known, else ``None``.""" + @classmethod + def from_env(cls, raw: str, db: Database | None = None) -> TokenStore: + """Legacy entrypoint: bootstrap-only (or bootstrap + db). + + Phase 1.3 call sites still expect a one-liner; we keep the + classmethod so they don't have to learn the new constructor. + """ + return cls(db, bootstrap=cls.parse_bootstrap(raw)) + + # ---- lifecycle ------------------------------------------------------ + + async def start(self) -> None: + """Prime the cache and (if a DB is attached) start the flusher loop.""" + await self._refresh() + if self._db is not None: + self._flusher_task = asyncio.create_task( + self._flusher_loop(), name="beaver-gateway.token-flusher" + ) + + async def stop(self) -> None: + """Cancel the flusher and run one final drain.""" + if self._flusher_task is not None: + self._flusher_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._flusher_task + self._flusher_task = None + await self._flush_now() + + async def invalidate(self) -> None: + """Force the next verify to re-read from DB (Phase 4.3 admin hook).""" + self._loaded_at = 0.0 + + # ---- verify path ---------------------------------------------------- + + async def verify(self, token: str | None) -> TokenIdentity | None: + """Return the matching identity, or ``None`` for unknown/empty tokens.""" if not token: return None - return self._by_value.get(token) - def verify_bearer(self, authorization: str | None) -> str | None: + # Bootstrap first: constant-time compare per entry, never hits DB. + # `compare_digest` is overkill for a name→value lookup but cheap + # and removes one timing variable for free. + for value, name in self._bootstrap_by_value.items(): + if hmac.compare_digest(token, value): + return TokenIdentity( + name=name, + scope=self._bootstrap_scopes.get(name, _BOOTSTRAP_SCOPE), + token_id=None, + ) + + if self._db is None: + return None + + await self._ensure_fresh() + + # Snapshot the cache reference so a refresh mid-scan doesn't + # surprise us. List itself is immutable per refresh (we swap, + # not mutate). + cache = self._cache + for entry in cache: + try: + await asyncio.to_thread(_HASHER.verify, entry.hashed_value, token) + except VerifyMismatchError: + continue + except InvalidHashError: + _log.warning( + "token row %d has an unparseable hash — skipping", entry.id + ) + continue + self._touch_queue[entry.id] = datetime.now(UTC) + return TokenIdentity(name=entry.name, scope=entry.scope, token_id=entry.id) + return None + + async def verify_bearer(self, authorization: str | None) -> TokenIdentity | None: """Strip the ``Bearer`` prefix (case-insensitive) then verify. Accepts a bare token too — Cursor's MCP transport sometimes @@ -93,13 +292,83 @@ class TokenStore: return None head, sep, rest = authorization.partition(" ") token = rest.strip() if sep and head.lower() == "bearer" else authorization - return self.verify(token) + return await self.verify(token) def __len__(self) -> int: - return len(self._by_value) + return len(self._cache) + len(self._bootstrap_by_value) def __bool__(self) -> bool: - return bool(self._by_value) + return bool(self._cache) or bool(self._bootstrap_by_value) + + # ---- internals ------------------------------------------------------ + + async def _ensure_fresh(self) -> None: + if self._db is None: + return + now = time.monotonic() + if now - self._loaded_at <= self._ttl: + return + async with self._lock: + # Re-check under the lock — first arrival reloaded, others + # should fall through. + now = time.monotonic() + if now - self._loaded_at <= self._ttl: + return + await self._refresh() + + async def _refresh(self) -> None: + if self._db is None: + self._loaded_at = time.monotonic() + return + async with self._db.session() as session: + rows = await list_active_tokens(session) + next_cache: list[_CachedToken] = [] + for row in rows: + if row.id is None: + # Defensive: SQLModel will assign an id on insert; a + # None here would mean someone handed us an unsaved row. + continue + next_cache.append( + _CachedToken( + id=row.id, + name=row.name, + scope=row.scope, + hashed_value=row.hashed_value, + ) + ) + self._cache = next_cache + self._loaded_at = time.monotonic() + _log.debug("token cache refreshed: %d active row(s)", len(next_cache)) + + async def _flusher_loop(self) -> None: + try: + while True: + await asyncio.sleep(self._flush_interval) + await self._flush_now() + except asyncio.CancelledError: + raise + except Exception: # noqa: BLE001 — never let the flusher die silently + _log.exception("token flusher crashed; touches will stop") + + async def _flush_now(self) -> None: + if self._db is None or not self._touch_queue: + return + # Detach the queue so concurrent verify() writes don't bleed + # into the in-flight transaction. + pending, self._touch_queue = self._touch_queue, {} + async with self._db.session() as session: + for token_id in pending: + # We don't pass the timestamp through — `touch_token` + # stamps `now` itself, and we'd rather have one source + # of truth than reconcile clocks. + await touch_token(session, token_id=token_id) + _log.debug("flushed %d token touch(es)", len(pending)) -__all__ = ["TokenStore", "TokenStoreError"] +__all__ = [ + "VALID_SCOPES", + "TokenIdentity", + "TokenStore", + "TokenStoreError", + "hash_token", +] diff --git a/src/beaver_gateway/frontends/admin/__init__.py b/src/beaver_gateway/frontends/admin/__init__.py new file mode 100644 index 0000000..94a050f --- /dev/null +++ b/src/beaver_gateway/frontends/admin/__init__.py @@ -0,0 +1,11 @@ +"""Admin UI (Phase 4.3). + +Browser-facing console: login, dashboard, token CRUD, audit viewer. +Templates live in ``./templates``; the package loader picks them up via +``importlib.resources`` so the layout works editable and inside the +Docker image without copy hacks. +""" + +from beaver_gateway.frontends.admin.frontend import AdminFrontend + +__all__ = ["AdminFrontend"] diff --git a/src/beaver_gateway/frontends/admin/frontend.py b/src/beaver_gateway/frontends/admin/frontend.py new file mode 100644 index 0000000..44407c5 --- /dev/null +++ b/src/beaver_gateway/frontends/admin/frontend.py @@ -0,0 +1,707 @@ +"""Single-operator admin console. + +Jinja2 + HTMX, no SPA framework. Session cookies signed with +``SESSION_SECRET`` via ``itsdangerous`` (8h TTL). CSRF is double-submit: +the signed session payload carries a random token that must echo back +on every state-changing request as a ``csrf_token`` form field. + +What lives in here: + +* ``GET /login`` / ``POST /login`` — env-cred check (``ADMIN_USER`` / + ``ADMIN_PASS`` compared with :func:`hmac.compare_digest`). +* ``POST /logout`` — clears the cookie. +* ``GET /`` — dashboard: declared agents + MCP namespaces + the last + audit slice. +* ``GET /tokens`` + ``POST /tokens`` + ``POST /tokens/{id}/revoke`` — + bearer-token CRUD. Plaintext is rendered exactly once at creation; the + DB only ever holds the Argon2 hash. HTMX fragments swap rows in place. +* ``GET /audit`` — paginated audit list (id-cursor). + +Things the admin frontend is *not* responsible for: HTTP-token verify +on ``/v1/messages`` and ``/mcp`` — that lives on those frontends and +uses :class:`TokenStore`. Admin's only authentication path is the +cookie session. +""" + +from __future__ import annotations + +import hmac +import json +import logging +import secrets +import time +from collections import deque +from typing import TYPE_CHECKING, Annotated, Any + +import itsdangerous +from fastapi import FastAPI, Form, HTTPException, Request, status +from fastapi.responses import ( + HTMLResponse, + RedirectResponse, + Response, + StreamingResponse, +) +from jinja2 import Environment, PackageLoader, select_autoescape + +from beaver_gateway.core import audit +from beaver_gateway.core.auth import VALID_SCOPES, hash_token +from beaver_gateway.frontends.base import Frontend +from beaver_gateway.storage import ( + create_token, + list_audit_records, + list_tokens, + revoke_token, +) + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + from datetime import datetime + + from beaver_gateway.core.events import MessageStreamEvent + from beaver_gateway.frontends.base import GatewayRuntime + + +_log = logging.getLogger("beaver_gateway.frontends.admin") + +# Cookie name is namespaced so a user can run multiple gateway instances +# behind one host header without sessions colliding. ``_v1`` lets us +# bump the payload schema later by invalidating old cookies via salt. +SESSION_COOKIE = "beaver_admin_session" +SESSION_MAX_AGE = 8 * 3600 +SESSION_SALT = "beaver-gateway.admin.session.v1" + +# Scopes the admin is allowed to mint. ``admin`` is reserved (no bearer +# route consumes it yet, but listing it keeps future programmatic +# admin access first-class) — see ``core.auth.VALID_SCOPES``. +CREATABLE_SCOPES = ("*", "messages", "mcp", "admin") + +# Audit-log page size — also used by the dashboard's "recent activity" +# panel. Small enough to keep the dashboard cheap, big enough to be +# useful at a glance. +AUDIT_PAGE_SIZE = 50 + +# /login brute-force ceiling: 5 failed attempts per source IP within +# LOGIN_WINDOW_SECONDS shuts the door with a 429 until the oldest +# failure ages out. Single-operator admin, so a legit user only ever +# burns one IP — a forgiving window beats a tight one with a lockout. +LOGIN_MAX_ATTEMPTS = 5 +LOGIN_WINDOW_SECONDS = 300.0 + + +__all__ = ["AdminFrontend"] + + +class AdminFrontend(Frontend): + """FastAPI app behind ``/login`` / ``/tokens`` / ``/audit`` / ``/``.""" + + def __init__(self, *, host: str = "0.0.0.0", port: int = 8002) -> None: # noqa: S104 + self.host = host + self.port = port + self._runtime: GatewayRuntime | None = None + self._app: FastAPI | None = None + + def configure(self, runtime: GatewayRuntime) -> None: + # Refuse to wire up if the env didn't carry the bits we need — + # an admin UI with empty creds is a footgun, and an unsigned + # session cookie is no session at all. + if not runtime.session_secret: + msg = "AdminFrontend requires SESSION_SECRET in env" + raise RuntimeError(msg) + if not runtime.admin_user or not runtime.admin_pass: + msg = "AdminFrontend requires ADMIN_USER and ADMIN_PASS in env" + raise RuntimeError(msg) + self._runtime = runtime + self._app = self._build_app(runtime) + + async def serve(self) -> None: + import uvicorn + + if self._app is None: + msg = "configure() must be called before serve()" + raise RuntimeError(msg) + config = uvicorn.Config( + self._app, host=self.host, port=self.port, log_level="info" + ) + await uvicorn.Server(config).serve() + + # ---- app construction ---------------------------------------------- + + def _build_app(self, runtime: GatewayRuntime) -> FastAPI: # noqa: PLR0915 + # Routes are declared inline so they close over ``runtime`` / + # ``signer`` / ``templates`` instead of threading them through + # self-state. Splitting the function would just trade size for + # bookkeeping — same total surface, harder to follow. + templates = _build_template_env() + signer = itsdangerous.URLSafeTimedSerializer( + runtime.session_secret, salt=SESSION_SALT + ) + login_limit = _LoginRateLimit( + max_attempts=LOGIN_MAX_ATTEMPTS, window=LOGIN_WINDOW_SECONDS + ) + app = FastAPI(title="beaver-gateway / admin", docs_url=None, redoc_url=None) + + def render(name: str, **ctx: Any) -> str: + return templates.get_template(name).render(**ctx) + + # ---- login ---- + + @app.get("/login", response_class=HTMLResponse) + async def login_page(request: Request) -> Response: + # Already-signed-in users skip the form. Login flash is + # carried in a one-shot query param so a failed POST can + # redirect back here without leaking creds in the URL. + if _current_user(request, signer): + return RedirectResponse("/", status_code=status.HTTP_303_SEE_OTHER) + error = request.query_params.get("error") + return HTMLResponse(render("login.html", error=error)) + + @app.post("/login", response_class=HTMLResponse) + async def login_submit( + request: Request, + username: Annotated[str, Form(...)], + password: Annotated[str, Form(...)], + ) -> Response: + ip = _client_ip(request) + if not login_limit.check(ip): + _log.warning("admin login rate-limited: ip=%s user=%r", ip, username) + await audit.log( + runtime, + actor=f"admin:{username}", + kind="login_failed", + reason="rate_limited", + ip=ip, + ) + # 429 carries the rendered form back inline so the user + # sees the same page with an error banner — no redirect + # roundtrip, and an explicit Retry-After for any well- + # behaved client (or Caddy doing its own bookkeeping). + return HTMLResponse( + render( + "login.html", + error="too many attempts; try again in a few minutes", + ), + status_code=status.HTTP_429_TOO_MANY_REQUESTS, + headers={"Retry-After": str(int(LOGIN_WINDOW_SECONDS))}, + ) + ok = hmac.compare_digest( + username, runtime.admin_user + ) and hmac.compare_digest(password, runtime.admin_pass) + if not ok: + login_limit.record_failure(ip) + _log.info("admin login failed: user=%r ip=%s", username, ip) + await audit.log( + runtime, + actor=f"admin:{username}", + kind="login_failed", + reason="bad_credentials", + ip=ip, + ) + # 303-redirect after POST so the browser doesn't replay + # the form on refresh. Status-303 forces GET on the + # follow-up regardless of the original method. + return RedirectResponse( + "/login?error=invalid+credentials", + status_code=status.HTTP_303_SEE_OTHER, + ) + login_limit.clear(ip) + csrf = secrets.token_urlsafe(24) + cookie = signer.dumps({"user": username, "csrf": csrf}) + response = RedirectResponse("/", status_code=status.HTTP_303_SEE_OTHER) + _set_session_cookie(response, cookie) + _log.info("admin login ok: user=%s ip=%s", username, ip) + await audit.log(runtime, actor=f"admin:{username}", kind="login_ok", ip=ip) + return response + + @app.post("/logout") + async def logout(request: Request) -> Response: + session = _require_session(request, signer) + await _require_csrf(request, session) + response = RedirectResponse("/login", status_code=status.HTTP_303_SEE_OTHER) + response.delete_cookie(SESSION_COOKIE) + await audit.log(runtime, actor=f"admin:{session['user']}", kind="logout") + return response + + # ---- dashboard ---- + + @app.get("/", response_class=HTMLResponse) + async def dashboard(request: Request) -> Response: + session = _require_session(request, signer) + async with runtime.db.session() as db_session: + audit = await list_audit_records(db_session, limit=AUDIT_PAGE_SIZE) + tokens = await list_tokens(db_session, include_revoked=False) + endpoints = _build_endpoint_catalog(request, runtime) + return HTMLResponse( + render( + "dashboard.html", + user=session["user"], + csrf=session["csrf"], + agents=list(runtime.agents), + mcps=list(runtime.mcps), + audit=audit, + tokens=tokens, + endpoints=endpoints, + ) + ) + + # ---- tokens ---- + + @app.get("/tokens", response_class=HTMLResponse) + async def tokens_page(request: Request) -> Response: + session = _require_session(request, signer) + include_revoked = request.query_params.get("include_revoked") == "1" + async with runtime.db.session() as db_session: + tokens = await list_tokens(db_session, include_revoked=include_revoked) + return HTMLResponse( + render( + "tokens.html", + user=session["user"], + csrf=session["csrf"], + tokens=tokens, + scopes=CREATABLE_SCOPES, + include_revoked=include_revoked, + ) + ) + + @app.post("/tokens", response_class=HTMLResponse) + async def tokens_create( + request: Request, + name: Annotated[str, Form(...)], + scope: Annotated[str, Form(...)], + ) -> Response: + session = _require_session(request, signer) + await _require_csrf(request, session) + name = name.strip() + if not name: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, "token name is required" + ) + if scope not in VALID_SCOPES: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, f"invalid scope: {scope!r}" + ) + # Generate the plaintext server-side — clients never pick + # their own. URL-safe so it copies cleanly into ``.env``. + plaintext = secrets.token_urlsafe(32) + hashed = hash_token(plaintext) + async with runtime.db.session() as db_session: + try: + row = await create_token( + db_session, name=name, scope=scope, hashed_value=hashed + ) + except Exception as exc: # noqa: BLE001 + # Likely UNIQUE(name) violation; the column is + # indexed so collisions are common. We surface a + # readable HTMX-friendly response rather than 500. + _log.warning("token create failed: %s", exc) + return HTMLResponse( + render( + "_token_error.html", + message=f"could not create token {name!r}: {exc}", + ), + status_code=status.HTTP_409_CONFLICT, + ) + await runtime.token_store.invalidate() + await audit.log( + runtime, + actor=f"admin:{session['user']}", + kind="token_create", + name=name, + scope=scope, + token_id=row.id, + ) + return HTMLResponse( + render( + "_token_created.html", + token=row, + plaintext=plaintext, + csrf=session["csrf"], + ) + ) + + @app.post("/tokens/{token_id}/revoke", response_class=HTMLResponse) + async def tokens_revoke(request: Request, token_id: int) -> Response: + session = _require_session(request, signer) + await _require_csrf(request, session) + async with runtime.db.session() as db_session: + ok = await revoke_token(db_session, token_id=token_id) + if not ok: + raise HTTPException( + status.HTTP_404_NOT_FOUND, f"no active token with id {token_id}" + ) + await runtime.token_store.invalidate() + await audit.log( + runtime, + actor=f"admin:{session['user']}", + kind="token_revoke", + token_id=token_id, + ) + # Re-fetch so the row reflects the just-stamped revoked_at. + async with runtime.db.session() as db_session: + rows = await list_tokens(db_session, include_revoked=True) + row = next((r for r in rows if r.id == token_id), None) + if row is None: + # Shouldn't happen — revoke_token said ok — but stay + # honest: respond empty so the row disappears from the + # table. + return HTMLResponse("") + return HTMLResponse( + render("_token_row.html", token=row, csrf=session["csrf"]) + ) + + # ---- audit ---- + + @app.get("/audit", response_class=HTMLResponse) + async def audit_page(request: Request) -> Response: + session = _require_session(request, signer) + before_raw = request.query_params.get("before") + before_id: int | None = None + if before_raw and before_raw.isdigit(): + before_id = int(before_raw) + async with runtime.db.session() as db_session: + rows = await list_audit_records( + db_session, limit=AUDIT_PAGE_SIZE, before_id=before_id + ) + # Cursor for the "next page" link — the smallest id on this + # page; ``None`` if we've run out of rows. + next_before = rows[-1].id if rows and len(rows) == AUDIT_PAGE_SIZE else None + return HTMLResponse( + render( + "audit.html", + user=session["user"], + csrf=session["csrf"], + audit=rows, + next_before=next_before, + ) + ) + + # ---- chat ---- + + # In-process playground: lets the operator drive any configured + # agent without minting a bearer token. Auth comes from the + # admin cookie; CSRF rides an ``X-CSRF-Token`` header because + # the body is JSON (no form to read). We call the backend + # directly — no HTTP hop to ``/v1/messages`` — so a chat turn + # bypasses the token store but is still audited as + # ``kind="messages"`` with ``actor="admin:"`` and + # ``source="admin_chat"`` in the detail. + @app.get("/chat", response_class=HTMLResponse) + async def chat_page(request: Request) -> Response: + session = _require_session(request, signer) + available = [ + a for a in runtime.agents if runtime.backends.get(a.name) is not None + ] + return HTMLResponse( + render( + "chat.html", + user=session["user"], + csrf=session["csrf"], + agents=available, + ) + ) + + @app.post("/chat/send") + async def chat_send(request: Request) -> Response: + session = _require_session(request, signer) + submitted = request.headers.get("x-csrf-token") + if not isinstance(submitted, str) or not hmac.compare_digest( + submitted, session["csrf"] + ): + raise HTTPException(status.HTTP_403_FORBIDDEN, "csrf check failed") + try: + body = await request.json() + except json.JSONDecodeError as exc: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, f"invalid JSON: {exc}" + ) from exc + model = body.get("model") + if not isinstance(model, str): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, "missing or non-string `model`" + ) + agent = runtime.agents.get(model) + if agent is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, f"unknown agent: {model!r}" + ) + backend = runtime.backends.get(agent.name) + if backend is None: + raise HTTPException( + status.HTTP_503_SERVICE_UNAVAILABLE, + f"no backend configured for agent {agent.name!r}", + ) + messages = body.get("messages") or [] + if not isinstance(messages, list): + raise HTTPException( + status.HTTP_400_BAD_REQUEST, "`messages` must be a list" + ) + system = body.get("system") + await audit.log( + runtime, + actor=f"admin:{session['user']}", + kind="messages", + agent_name=agent.name, + source="admin_chat", + msgs=len(messages), + ) + events = backend.complete( + agent=agent, + messages=messages, + system=system if isinstance(system, str) else None, + ) + return StreamingResponse( + _sse_events(events), media_type="text/event-stream" + ) + + return app + + +# ---- helpers ------------------------------------------------------------ + + +def _build_template_env() -> Environment: + env = Environment( + loader=PackageLoader("beaver_gateway.frontends.admin", "templates"), + autoescape=select_autoescape(["html"]), + trim_blocks=True, + lstrip_blocks=True, + ) + env.filters["fmt_dt"] = _fmt_dt + env.filters["fmt_detail"] = _fmt_detail + return env + + +def _fmt_dt(value: datetime | None) -> str: + if value is None: + return "—" + # ISO without microseconds, with explicit ``Z`` when UTC — easier to + # scan than the default ``+00:00``. + s = value.replace(microsecond=0).isoformat() + return s.replace("+00:00", "Z") + + +def _fmt_detail(raw: str) -> str: + """Pretty-print the audit detail blob; leave invalid JSON alone.""" + if not raw or raw == "{}": + return "" + try: + return json.dumps(json.loads(raw), separators=(", ", ": ")) + except json.JSONDecodeError: + return raw + + +def _build_endpoint_catalog( + request: Request, runtime: GatewayRuntime +) -> dict[str, Any]: + """Collect copy-pastable URLs for sibling bearer frontends. + + Precedence per frontend: + + 1. ``frontend.public_base_url`` if set — the operator's explicit + statement of "this is the URL my reverse proxy (Caddy / nginx / + Cloudflare / …) puts in front of me". Used verbatim with the + internal path (``/v1/messages``, ``//`` for MCP) appended. + 2. ``{scheme}://{request_hostname}:{frontend_port}`` — derived from + the browser's own request so dev / no-proxy setups Just Work. + Scheme honours ``X-Forwarded-Proto`` so a TLS terminator in + front of the admin gets the right protocol. + + Imports happen inside the function to avoid a hard dep from + ``admin.frontend`` on the other frontend modules — they're optional + and may have non-trivial transitive deps (aiohttp etc.). + """ + from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend + from beaver_gateway.frontends.mcp_server import McpServerFrontend + + scheme = request.headers.get("x-forwarded-proto") or request.url.scheme + hostname = request.url.hostname or "localhost" + + def _base_for(fe: AnthropicMessagesFrontend | McpServerFrontend) -> str: + if fe.public_base_url: + return fe.public_base_url + return f"{scheme}://{hostname}:{fe.port}" + + anthropic_base: str | None = None + mcp_base: str | None = None + for fe in runtime.frontends: + if isinstance(fe, AnthropicMessagesFrontend) and anthropic_base is None: + anthropic_base = _base_for(fe) + elif isinstance(fe, McpServerFrontend) and mcp_base is None: + mcp_base = _base_for(fe) + + agent_rows: list[dict[str, Any]] = [] + if anthropic_base is not None: + messages_url = f"{anthropic_base}/v1/messages" + agent_rows.extend( + { + "agent": a.name, + "model": a.model, + "agent_type": a.__class__.__name__, + "url": messages_url, + } + for a in runtime.agents + ) + + mcp_rows: list[dict[str, Any]] = [] + if mcp_base is not None: + mcp_rows.extend( + {"namespace": m.name, "kind": m.kind, "url": f"{mcp_base}/{m.name}/"} + for m in runtime.mcps + ) + # ``all`` is synthesised by the aggregator whenever at least + # one MCP is configured — see McpServerFrontend._upstream_url. + if runtime.mcps: + mcp_rows.append( + {"namespace": "all", "kind": "bundle", "url": f"{mcp_base}/all/"} + ) + + return { + "anthropic_base": anthropic_base, + "mcp_base": mcp_base, + "agents": agent_rows, + "mcps": mcp_rows, + } + + +async def _sse_events( + events: AsyncIterator[MessageStreamEvent], +) -> AsyncIterator[bytes]: + r"""Serialize a backend stream into Anthropic's ``text/event-stream`` form. + + Same wire shape as :mod:`beaver_gateway.frontends.anthropic` — + duplicated rather than imported so the admin frontend stays + independent of that module's private helpers, and so a mid-stream + failure surfaces as an in-band ``error`` event the chat UI can + render rather than a dangling connection. + """ + try: + async for ev in events: + payload = ev.model_dump_json() + yield f"event: {ev.type}\ndata: {payload}\n\n".encode() + except Exception as exc: # noqa: BLE001 + _log.exception("admin chat backend stream failed") + err = json.dumps( + {"type": "error", "error": {"type": "api_error", "message": str(exc)}} + ) + yield f"event: error\ndata: {err}\n\n".encode() + + +def _set_session_cookie(response: Response, value: str) -> None: + # ``samesite=lax`` keeps the cookie out of cross-site POSTs but + # follows top-level navigation; ``httponly`` keeps it out of JS; + # ``secure`` is gated on the deployment scheme — toggled by reverse + # proxies in front. Skip the secure flag here so localhost dev + # works over plain HTTP; production deployments behind a TLS + # terminator should set ``Secure`` via the proxy. + response.set_cookie( + SESSION_COOKIE, + value, + max_age=SESSION_MAX_AGE, + httponly=True, + samesite="lax", + path="/", + ) + + +def _current_user( + request: Request, signer: itsdangerous.URLSafeTimedSerializer +) -> dict[str, Any] | None: + raw = request.cookies.get(SESSION_COOKIE) + if not raw: + return None + try: + payload = signer.loads(raw, max_age=SESSION_MAX_AGE) + except itsdangerous.BadSignature: + return None + if not isinstance(payload, dict): + return None + user = payload.get("user") + csrf = payload.get("csrf") + if not isinstance(user, str) or not isinstance(csrf, str): + return None + return {"user": user, "csrf": csrf} + + +def _require_session( + request: Request, signer: itsdangerous.URLSafeTimedSerializer +) -> dict[str, Any]: + session = _current_user(request, signer) + if session is None: + # GET endpoints want a redirect (so the browser walks the user + # to the login form), not a JSON 401. Mutating endpoints will + # still trip CSRF below, so the redirect is harmless for those. + raise HTTPException(status.HTTP_303_SEE_OTHER, headers={"Location": "/login"}) + return session + + +def _client_ip(request: Request) -> str: + """Best-effort source IP, in precedence order. + + 1. ``Cf-Connecting-IP`` — Cloudflare's edge writes this and strips + anything inbound, so when it's present it's authoritative. + 2. ``X-Forwarded-For`` leftmost entry — what Caddy / nginx set when + they're the only proxy. We trust this because the deploy plan + puts Caddy directly in front; if the chain ever grows untrusted + hops, this header becomes spoofable from the public side. + 3. Socket peer — direct Tailscale / localhost hits. + """ + cf = request.headers.get("cf-connecting-ip") + if cf: + return cf.strip() + xff = request.headers.get("x-forwarded-for") + if xff: + first = xff.split(",", 1)[0].strip() + if first: + return first + return request.client.host if request.client else "unknown" + + +class _LoginRateLimit: + """In-memory sliding-window failure counter for ``POST /login``. + + Keyed by source IP (see :func:`_client_ip`). Each failure appends a + monotonic timestamp; :meth:`check` drops timestamps older than + ``window`` and refuses once ``max_attempts`` remain in the bucket. + A successful login calls :meth:`clear` to wipe the IP's bucket. + + No external store: a single-operator admin only needs to survive + process lifetime. Lives entirely on the event loop thread, so no + lock is needed — every method is synchronous and doesn't ``await``. + """ + + __slots__ = ("_failures", "_max_attempts", "_window") + + def __init__(self, *, max_attempts: int, window: float) -> None: + self._failures: dict[str, deque[float]] = {} + self._max_attempts = max_attempts + self._window = window + + def check(self, ip: str) -> bool: + bucket = self._failures.get(ip) + if bucket is None: + return True + cutoff = time.monotonic() - self._window + while bucket and bucket[0] < cutoff: + bucket.popleft() + if not bucket: + self._failures.pop(ip, None) + return True + return len(bucket) < self._max_attempts + + def record_failure(self, ip: str) -> None: + bucket = self._failures.setdefault(ip, deque()) + cutoff = time.monotonic() - self._window + while bucket and bucket[0] < cutoff: + bucket.popleft() + bucket.append(time.monotonic()) + + def clear(self, ip: str) -> None: + self._failures.pop(ip, None) + + +async def _require_csrf(request: Request, session: dict[str, Any]) -> None: + form = await request.form() + submitted = form.get("csrf_token") + if not isinstance(submitted, str) or not hmac.compare_digest( + submitted, session["csrf"] + ): + raise HTTPException(status.HTTP_403_FORBIDDEN, "csrf check failed") diff --git a/src/beaver_gateway/frontends/admin/templates/_layout.html b/src/beaver_gateway/frontends/admin/templates/_layout.html new file mode 100644 index 0000000..ca1e248 --- /dev/null +++ b/src/beaver_gateway/frontends/admin/templates/_layout.html @@ -0,0 +1,205 @@ + + + + + + {% block title %}beaver-gateway · admin{% endblock %} + + + + + {% block header %} +
+
+

beaver-gateway

+ + Signed in as {{ user }} +
+ + +
+
+
+ {% endblock %} +
+ {% block content %}{% endblock %} +
+ + diff --git a/src/beaver_gateway/frontends/admin/templates/_token_created.html b/src/beaver_gateway/frontends/admin/templates/_token_created.html new file mode 100644 index 0000000..f2728a2 --- /dev/null +++ b/src/beaver_gateway/frontends/admin/templates/_token_created.html @@ -0,0 +1,12 @@ +{# Renders into #token-create-result (hx-target on the form). The + two OOB swaps reach the rest of the page: prepend the new row, + and erase the empty-state placeholder if it's still in the DOM. #} + + + {% include "_token_row.html" %} + + diff --git a/src/beaver_gateway/frontends/admin/templates/_token_error.html b/src/beaver_gateway/frontends/admin/templates/_token_error.html new file mode 100644 index 0000000..75c2e61 --- /dev/null +++ b/src/beaver_gateway/frontends/admin/templates/_token_error.html @@ -0,0 +1 @@ + diff --git a/src/beaver_gateway/frontends/admin/templates/_token_row.html b/src/beaver_gateway/frontends/admin/templates/_token_row.html new file mode 100644 index 0000000..de1b2c7 --- /dev/null +++ b/src/beaver_gateway/frontends/admin/templates/_token_row.html @@ -0,0 +1,25 @@ + + {{ token.name }} + + {{ token.scope }} + + {{ token.created_at | fmt_dt }} + {{ token.last_used_at | fmt_dt }} + {{ token.revoked_at | fmt_dt }} + + {% if not token.revoked_at %} +
+ + +
+ {% else %} + revoked + {% endif %} + + diff --git a/src/beaver_gateway/frontends/admin/templates/audit.html b/src/beaver_gateway/frontends/admin/templates/audit.html new file mode 100644 index 0000000..66f9bb4 --- /dev/null +++ b/src/beaver_gateway/frontends/admin/templates/audit.html @@ -0,0 +1,37 @@ +{% extends "_layout.html" %} +{% set active = "audit" %} +{% block title %}beaver-gateway · audit{% endblock %} +{% block content %} +

Audit log

+
+ {% if audit %} + + + + + + + + {% for row in audit %} + + + + + + + + {% endfor %} + +
TimeActorKindAgentDetail
{{ row.ts | fmt_dt }}{{ row.actor }}{{ row.kind }}{{ row.agent_name or "—" }}{{ row.detail_json | fmt_detail }}
+

+ {% if next_before %} + Older entries → + {% else %} + End of log. + {% endif %} +

+ {% else %} +

Nothing logged yet.

+ {% endif %} +
+{% endblock %} diff --git a/src/beaver_gateway/frontends/admin/templates/chat.html b/src/beaver_gateway/frontends/admin/templates/chat.html new file mode 100644 index 0000000..f53f800 --- /dev/null +++ b/src/beaver_gateway/frontends/admin/templates/chat.html @@ -0,0 +1,354 @@ +{% extends "_layout.html" %} +{% set active = "chat" %} +{% block title %}beaver-gateway · chat{% endblock %} +{% block content %} +
+
+ +
+ +
+
+
+ + +
+
+ + + + +{% endblock %} diff --git a/src/beaver_gateway/frontends/admin/templates/dashboard.html b/src/beaver_gateway/frontends/admin/templates/dashboard.html new file mode 100644 index 0000000..09dfdf5 --- /dev/null +++ b/src/beaver_gateway/frontends/admin/templates/dashboard.html @@ -0,0 +1,430 @@ +{% extends "_layout.html" %} +{% set active = "dashboard" %} +{% block title %}beaver-gateway · dashboard{% endblock %} +{% block content %} +

Agents

+
+ {% if agents %} + + + + {% for a in agents %} + + + + + + + {% endfor %} + +
NameTypeModelExposed MCPs
{{ a.name }}{{ a.__class__.__name__ }}{{ a.model }} + {% for em in a.expose_mcps %}{{ em.name }}{% if not loop.last %}, {% endif %}{% else %}{% endfor %} +
+ {% else %} +

No agents configured.

+ {% endif %} +
+ +

MCP namespaces

+
+ {% if mcps %} + + + + {% for m in mcps %} + + + + + {% endfor %} + +
NameKind
{{ m.name }}{{ m.kind }}
+ {% else %} +

No MCP servers configured.

+ {% endif %} +
+ +

Endpoints

+
+ {% if endpoints.agents or endpoints.mcps %} +
+ + + +
+

+ Beaver stores only an Argon2 hash of each token, so the plaintext can't be reconstructed. + Paste the value you saved at creation; if you've lost it, mint a new one. + Below: pick a token to see which endpoints its scope covers, paste the secret to fill it + into URL / curl, then click Copy. +

+ + {% if endpoints.agents %} +

Agents — POST /v1/messages

+ + + + {% for ep in endpoints.agents %} + + + + + + + + {% endfor %} + +
AgentModelURL
+ {{ ep.agent }} + {{ ep.agent_type }} + + {{ ep.model }}{{ ep.url }} + + + +
+ {% elif endpoints.anthropic_base is none and agents %} +

+ Agents are declared but no AnthropicMessagesFrontend is configured — + add one to Gateway(frontends=[...]) to expose them over HTTP. +

+ {% endif %} + + {% if endpoints.mcps %} +

MCP — streamable HTTP

+ + + + {% for ep in endpoints.mcps %} + + + + + + + + {% endfor %} + +
NamespaceKindURL
+ {{ ep.namespace }} + + {{ ep.kind }}{{ ep.url }} + + + + +
+ {% elif endpoints.mcp_base is none and mcps %} +

+ MCP servers are declared but no McpServerFrontend is configured — + add one to Gateway(frontends=[...]) to expose them over HTTP. +

+ {% endif %} + {% else %} +

+ Nothing exposed yet — declare agents / MCPs and the matching frontends + (AnthropicMessagesFrontend, McpServerFrontend) in your config. +

+ {% endif %} +
+ +

Recent activity

+
+ {% if audit %} + + + + {% for row in audit %} + + + + + + + + {% endfor %} + +
TimeActorKindAgentDetail
{{ row.ts | fmt_dt }}{{ row.actor }}{{ row.kind }}{{ row.agent_name or "—" }}{{ row.detail_json | fmt_detail }}
+

Full log →

+ {% else %} +

Nothing logged yet.

+ {% endif %} +
+ + + + +{% endblock %} diff --git a/src/beaver_gateway/frontends/admin/templates/login.html b/src/beaver_gateway/frontends/admin/templates/login.html new file mode 100644 index 0000000..ef679ae --- /dev/null +++ b/src/beaver_gateway/frontends/admin/templates/login.html @@ -0,0 +1,83 @@ + + + + + + beaver-gateway · log in + + + +
+

beaver-gateway

+

Sign in to manage tokens and view audit logs.

+ {% if error %} +
{{ error }}
+ {% endif %} +
+ + + + + +
+
+ + diff --git a/src/beaver_gateway/frontends/admin/templates/tokens.html b/src/beaver_gateway/frontends/admin/templates/tokens.html new file mode 100644 index 0000000..f427cc7 --- /dev/null +++ b/src/beaver_gateway/frontends/admin/templates/tokens.html @@ -0,0 +1,64 @@ +{% extends "_layout.html" %} +{% set active = "tokens" %} +{% block title %}beaver-gateway · tokens{% endblock %} +{% block content %} +

Create token

+
+

Plaintext is shown once, immediately after creation. Copy it before you navigate away — the database only ever holds the Argon2 hash.

+
+
+ +
+
+ + +
+
+ + +
+
+ +
+
+
+
+ +

+ Tokens + + {% if include_revoked %} + Hide revoked + {% else %} + Show revoked + {% endif %} + +

+
+ + + + + + + {# Render the tbody unconditionally so the HTMX OOB swap on + create has a target even when the table starts empty. #} + + {% for token in tokens %} + {% include "_token_row.html" %} + {% else %} + + {% endfor %} + +
NameScopeCreatedLast usedRevoked
No tokens yet. Create one above.
+
+{% endblock %} diff --git a/src/beaver_gateway/frontends/anthropic.py b/src/beaver_gateway/frontends/anthropic.py index fa5a629..a533411 100644 --- a/src/beaver_gateway/frontends/anthropic.py +++ b/src/beaver_gateway/frontends/anthropic.py @@ -40,6 +40,7 @@ from anthropic.types import ( from fastapi import FastAPI, HTTPException, Request, status from fastapi.responses import JSONResponse, StreamingResponse +from beaver_gateway.core import audit from beaver_gateway.frontends.base import Frontend if TYPE_CHECKING: @@ -58,9 +59,24 @@ __all__ = ["AnthropicMessagesFrontend"] class AnthropicMessagesFrontend(Frontend): """FastAPI app behind ``POST /v1/messages`` + ``GET /v1/models``.""" - def __init__(self, *, host: str = "0.0.0.0", port: int = 8000) -> None: # noqa: S104 + def __init__( + self, + *, + host: str = "0.0.0.0", # noqa: S104 + port: int = 8000, + public_base_url: str | None = None, + ) -> None: self.host = host self.port = port + # External URL prefix the reverse proxy (Caddy/nginx/Cloudflare/…) + # uses to reach this frontend, e.g. ``https://api.example.com/ai``. + # The frontend's internal paths (``/v1/messages``, ``/v1/models``) + # are appended to it when the admin dashboard renders copy-pastable + # URLs. Trailing slash is stripped so the concatenation is + # idempotent. ``None`` means "advertise raw ``host:port``" (dev / + # no proxy) — the dashboard then derives the base from the + # browser's own request hostname. + self.public_base_url = public_base_url.rstrip("/") if public_base_url else None self._runtime: GatewayRuntime | None = None self._app: FastAPI | None = None @@ -93,7 +109,7 @@ class AnthropicMessagesFrontend(Frontend): @app.get("/v1/models") async def list_models(request: Request) -> dict[str, Any]: - _require_token(request, runtime) + await _require_token(request, runtime, scope="messages") data = [ { "type": "model", @@ -107,7 +123,7 @@ class AnthropicMessagesFrontend(Frontend): @app.post("/v1/messages") async def create_message(request: Request) -> Any: - token_name = _require_token(request, runtime) + token_name = await _require_token(request, runtime, scope="messages") try: body = await request.json() except json.JSONDecodeError as exc: @@ -148,6 +164,18 @@ class AnthropicMessagesFrontend(Frontend): stream_flag, len(messages), ) + # Record at request acceptance, not at stream completion: + # a long streaming response can be aborted mid-flight by + # the client, and we still want the row in the audit trail. + # Detail stays small — no message bodies, no system prompt. + await audit.log( + runtime, + actor=f"token:{token_name}", + kind="messages", + agent_name=agent.name, + stream=stream_flag, + msgs=len(messages), + ) # Forward per-request knobs the Anthropic body may carry — # backend adapters layer these over per-agent defaults. Only @@ -166,37 +194,45 @@ class AnthropicMessagesFrontend(Frontend): ) if stream_flag: - return StreamingResponse( - _sse(events), media_type="text/event-stream" - ) + return StreamingResponse(_sse(events), media_type="text/event-stream") message = await _accumulate(events, model=model) return JSONResponse(content=message.model_dump(mode="json")) return app -def _require_token(request: Request, runtime: GatewayRuntime) -> str: - """Verify the request's bearer and return the token's audit name. +async def _require_token( + request: Request, runtime: GatewayRuntime, *, scope: str +) -> str: + """Verify the request's bearer + scope, return the token's audit name. Accepts both ``X-Api-Key: `` (what the official Anthropic SDK sends — LibreChat, the CLI, third-party clients) and - ``Authorization: Bearer `` (curl, Cursor). Raises 401 on - miss. ``TokenStore`` doesn't know about HTTP, so response shape - is owned here. + ``Authorization: Bearer `` (curl, Cursor). 401 on missing / + unknown token; 403 on a known token whose scope doesn't cover + ``scope`` (Phase 4.3 — bootstrap tokens get ``"*"`` and pass + everything). """ api_key = request.headers.get("x-api-key") - name = ( - runtime.token_store.verify(api_key) + identity = ( + await runtime.token_store.verify(api_key) if api_key - else runtime.token_store.verify_bearer(request.headers.get("authorization")) + else await runtime.token_store.verify_bearer( + request.headers.get("authorization") + ) ) - if name is None: + if identity is None: raise HTTPException( status.HTTP_401_UNAUTHORIZED, "invalid or missing bearer token", headers={"WWW-Authenticate": "Bearer"}, ) - return name + if not identity.allows(scope): + raise HTTPException( + status.HTTP_403_FORBIDDEN, + f"token scope {identity.scope!r} does not cover {scope!r}", + ) + return identity.name async def _sse(events: AsyncIterator[MessageStreamEvent]) -> AsyncIterator[bytes]: diff --git a/src/beaver_gateway/frontends/base.py b/src/beaver_gateway/frontends/base.py index 1ad586a..bfce91e 100644 --- a/src/beaver_gateway/frontends/base.py +++ b/src/beaver_gateway/frontends/base.py @@ -15,11 +15,12 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING if TYPE_CHECKING: - from collections.abc import Mapping + from collections.abc import Mapping, Sequence from beaver_gateway.backends.base import Backend from beaver_gateway.core.auth import TokenStore from beaver_gateway.core.registry import AgentRegistry, McpRegistry + from beaver_gateway.storage import Database @dataclass(frozen=True, slots=True) @@ -35,13 +36,32 @@ class GatewayRuntime: declared ``McpServer`` so ``ClaudeCodeBackendAdapter`` (Phase 2.2) can pass them to ``BackendOptions.mcp_servers`` without re-running discovery. + + ``db`` (Phase 4.1) is the shared :class:`Database` handle. Phase 4.2 + will switch ``TokenStore`` to read from it; Phase 4.3 admin/audit + write through it. Phase 4.1 only attaches it — existing frontends + ignore it. """ agents: AgentRegistry mcps: McpRegistry backends: dict[str, Backend] token_store: TokenStore + db: Database mcp_internal_urls: Mapping[str, str] = field(default_factory=dict) + # Phase 4.3 — AdminFrontend reads creds + cookie-signing key from + # the runtime so the user's ``config.py`` doesn't have to know + # anything about env wiring. Defaulted to empty so existing tests / + # call sites that don't touch the admin path keep building; the + # admin frontend ``configure()`` itself rejects empty values. + admin_user: str = "" + admin_pass: str = "" + session_secret: str = "" + # The full sibling-frontends list, in declaration order. AdminFrontend + # uses it to advertise concrete bearer-endpoint URLs (host/port) on + # the dashboard so the operator can copy ready-to-use links / curl + # snippets. Other frontends ignore it. + frontends: Sequence[Frontend] = field(default_factory=tuple) class Frontend(ABC): diff --git a/src/beaver_gateway/frontends/mcp_server.py b/src/beaver_gateway/frontends/mcp_server.py index bd08488..111baba 100644 --- a/src/beaver_gateway/frontends/mcp_server.py +++ b/src/beaver_gateway/frontends/mcp_server.py @@ -1,11 +1,18 @@ """External MCP frontend (Phase 3.1). A streamable-HTTP gateway in front of the internal MCP aggregator -(``beaver_gateway.mcp.internal_app``). The aggregator already hosts -every declared ``McpServer`` (``python_tool``, stdio proxy, HTTP proxy) +(``beaver_gateway.mcp.internal_app``). The aggregator hosts every +declared ``McpServer`` (``python_tool``, stdio proxy, HTTP proxy) under ``/mcp/`` plus a flat ``/mcp/all`` bundle on -``127.0.0.1:INTERNAL_MCP_PORT``. This frontend re-exposes those URLs to -external clients with three additions: +``127.0.0.1:INTERNAL_MCP_PORT`` — that's the *internal* shape. + +This frontend re-exposes those namespaces on its own port directly at +``//`` (no ``/mcp/`` prefix in the external routes — the port +itself already disambiguates). Caddy / nginx / Cloudflare in front +typically strips a prefix back on: ``domain.com/mcp/* → :8001/*``, +controlled by the operator's reverse-proxy config and surfaced to the +admin dashboard via ``public_base_url``. Three additions on top of the +raw aggregator: * **Bearer auth** — ``Authorization: Bearer ``, ``X-Api-Key``, or ``?token=<…>`` query string. All three forms verify against the @@ -39,6 +46,7 @@ from starlette.applications import Starlette from starlette.responses import HTMLResponse, JSONResponse, StreamingResponse from starlette.routing import Route +from beaver_gateway.core import audit from beaver_gateway.frontends.base import Frontend from beaver_gateway.mcp.internal_app import ALL_NAMESPACE @@ -86,9 +94,30 @@ __all__ = ["McpServerFrontend"] class McpServerFrontend(Frontend): """Auth + audit + reverse-proxy in front of the internal MCP aggregator.""" - def __init__(self, *, host: str = "0.0.0.0", port: int = 8001) -> None: # noqa: S104 + def __init__( + self, + *, + host: str = "0.0.0.0", # noqa: S104 + port: int = 8001, + public_base_url: str | None = None, + ) -> None: self.host = host self.port = port + # External URL prefix the reverse proxy uses to reach this + # frontend. Internal routes mount namespaces at the port root + # (``//`` and ``/all/``) — Caddy / nginx / Cloudflare in + # front decides what external prefix they sit under. Typical + # symmetric setup: + # + # Caddy: handle_path /mcp/* { reverse_proxy localhost:8001 } + # config: public_base_url -> https://api.example.com/mcp + # dashboard advertises: https://api.example.com/mcp// + # + # The frontend's ``//`` segment gets appended verbatim. Set + # it to whatever matches your proxy. ``None`` means "advertise + # raw ``host:port`` derived from the inbound request" (dev / + # no proxy). + self.public_base_url = public_base_url.rstrip("/") if public_base_url else None self._runtime: GatewayRuntime | None = None self._app: Starlette | None = None # Single shared aiohttp session, opened once when uvicorn starts @@ -126,7 +155,7 @@ class McpServerFrontend(Frontend): # Long sock_read because MCP tool calls can take a while # (a claude tool over HTTP can easily stretch beyond 30s # on a real tool). - timeout=aiohttp.ClientTimeout(total=None, sock_read=600), + timeout=aiohttp.ClientTimeout(total=None, sock_read=600) ) try: yield @@ -138,16 +167,21 @@ class McpServerFrontend(Frontend): routes = [ Route("/", self._discovery, methods=["GET"]), Route("/healthz", self._healthz, methods=["GET"]), - # Two routes per namespace so both the trailing-slash and - # sub-path forms work (``/mcp/time`` AND ``/mcp/time/foo``). - # Starlette doesn't fold them into one route automatically. + # Namespaces mount at the root of this port — the port + # itself already disambiguates this from any other gateway + # surface. Two routes per namespace so both the + # trailing-slash and sub-path forms work (``/time`` AND + # ``/time/foo``); Starlette doesn't fold them into one + # route automatically. The literal routes above (``/``, + # ``/healthz``) are listed first and win the match, so + # they're not eaten by ``/{namespace}``. Route( - "/mcp/{namespace}", + "/{namespace}", self._proxy_endpoint, methods=["GET", "POST", "DELETE", "OPTIONS"], ), Route( - "/mcp/{namespace}/{path:path}", + "/{namespace}/{path:path}", self._proxy_endpoint, methods=["GET", "POST", "DELETE", "OPTIONS"], ), @@ -159,17 +193,19 @@ class McpServerFrontend(Frontend): async def _discovery(self, request: Request) -> HTMLResponse | JSONResponse: runtime = self._require_runtime() - token_name = _verify_request(request, runtime) - if token_name is None: - return _unauthorized() - # Use the request's own scheme+host so the snippets work behind - # reverse proxies / tunnels. Falls back to the configured - # host:port if the client didn't send Host (curl --raw). - base = _external_base_url(request, self.host, self.port) + token_name, err = await _verify_request(request, runtime) + if err is not None: + return err + assert token_name is not None # noqa: S101 — narrow for ty + # ``public_base_url`` wins if configured — it's the operator's + # explicit statement of "this is the URL my reverse proxy puts + # in front of me". Otherwise: use the request's own scheme+host + # so snippets work behind generic reverse proxies / tunnels; + # and fall back to the configured host:port if the client + # didn't send Host (curl --raw). + base = self.public_base_url or _external_base_url(request, self.host, self.port) html = _render_discovery_page( - base_url=base, - namespaces=list(runtime.mcps), - actor=token_name, + base_url=base, namespaces=list(runtime.mcps), actor=token_name ) return HTMLResponse(html) @@ -177,9 +213,10 @@ class McpServerFrontend(Frontend): self, request: Request ) -> StreamingResponse | JSONResponse: runtime = self._require_runtime() - token_name = _verify_request(request, runtime) - if token_name is None: - return _unauthorized() + token_name, err = await _verify_request(request, runtime) + if err is not None: + return err + assert token_name is not None # noqa: S101 — narrow for ty namespace = request.path_params["namespace"] subpath = request.path_params.get("path", "") @@ -190,16 +227,21 @@ class McpServerFrontend(Frontend): token_name, namespace, ) + await audit.log( + runtime, + actor=f"token:{token_name}", + kind="mcp_call", + namespace=namespace, + method=request.method, + status=404, + ) return JSONResponse( - {"error": "unknown namespace", "namespace": namespace}, - status_code=404, + {"error": "unknown namespace", "namespace": namespace}, status_code=404 ) if self._http is None: # Lifespan hasn't run yet (shouldn't happen with uvicorn). - return JSONResponse( - {"error": "frontend not ready"}, status_code=503 - ) + return JSONResponse({"error": "frontend not ready"}, status_code=503) return await _reverse_proxy( client=self._http, @@ -207,6 +249,7 @@ class McpServerFrontend(Frontend): upstream_url=upstream_url, namespace=namespace, actor=token_name, + runtime=runtime, ) def _upstream_url(self, namespace: str, subpath: str) -> str | None: @@ -233,23 +276,38 @@ class McpServerFrontend(Frontend): return self._runtime -def _verify_request(request: Request, runtime: GatewayRuntime) -> str | None: +_MCP_SCOPE = "mcp" + + +async def _verify_request( + request: Request, runtime: GatewayRuntime +) -> tuple[str | None, JSONResponse | None]: """Accept ``Authorization: Bearer``, ``X-Api-Key``, or ``?token=``. The third form is the escape hatch for clients that can only put secrets in the URL (claude.ai's MCP config historically did this). - All three roads end at the same :class:`TokenStore`. + All three roads end at the same :class:`TokenStore`. Returns + ``(actor_name, None)`` on success, ``(None, 401|403)`` otherwise + — the caller forwards the response as-is. Splitting auth vs scope + failures matters: 401 says "send me a token", 403 says "this token + is real but not for this endpoint". """ api_key = request.headers.get("x-api-key") if api_key: - return runtime.token_store.verify(api_key) - auth_header = request.headers.get("authorization") - if auth_header: - return runtime.token_store.verify_bearer(auth_header) - qs_token = request.query_params.get("token") - if qs_token: - return runtime.token_store.verify(qs_token) - return None + identity = await runtime.token_store.verify(api_key) + else: + auth_header = request.headers.get("authorization") + if auth_header: + identity = await runtime.token_store.verify_bearer(auth_header) + else: + qs_token = request.query_params.get("token") + identity = await runtime.token_store.verify(qs_token) if qs_token else None + + if identity is None: + return None, _unauthorized() + if not identity.allows(_MCP_SCOPE): + return None, _forbidden(identity.scope, _MCP_SCOPE) + return identity.name, None def _unauthorized() -> JSONResponse: @@ -260,6 +318,13 @@ def _unauthorized() -> JSONResponse: ) +def _forbidden(scope: str, required: str) -> JSONResponse: + return JSONResponse( + {"error": "insufficient scope", "scope": scope, "required": required}, + status_code=403, + ) + + def _join_subpath(base_url: str, subpath: str) -> str: """Concatenate the loopback URL with the proxied sub-path. @@ -287,6 +352,7 @@ async def _reverse_proxy( upstream_url: str, namespace: str, actor: str, + runtime: GatewayRuntime, ) -> StreamingResponse | JSONResponse: """Bidirectionally stream an MCP request between client ↔ internal aggregator. @@ -325,9 +391,16 @@ async def _reverse_proxy( namespace, exc, ) + await audit.log( + runtime, + actor=f"token:{actor}", + kind="mcp_call", + namespace=namespace, + method=request.method, + status=502, + ) return JSONResponse( - {"error": "upstream MCP unreachable", "detail": str(exc)}, - status_code=502, + {"error": "upstream MCP unreachable", "detail": str(exc)}, status_code=502 ) _log.info( @@ -338,6 +411,17 @@ async def _reverse_proxy( request.url.path, upstream_resp.status, ) + # Audit at upstream-response time: status reflects the MCP call's + # outcome (200 / tool-error / 4xx). Streaming relay below may be + # cut short by the client, but the row is already in by then. + await audit.log( + runtime, + actor=f"token:{actor}", + kind="mcp_call", + namespace=namespace, + method=request.method, + status=upstream_resp.status, + ) response_headers = _response_headers(upstream_resp.headers) @@ -397,29 +481,31 @@ def _scrub_query(query: str, *, drop: frozenset[str] | set[str]) -> str: return urlencode(kept) -def _render_discovery_page( - *, base_url: str, namespaces: list[Any], actor: str -) -> str: +def _render_discovery_page(*, base_url: str, namespaces: list[Any], actor: str) -> str: """Render the auth-gated namespace + config-snippet page. Inline HTML (no Jinja file) — keeps Phase 3 free of template-dir plumbing that Phase 4's AdminFrontend will own. """ name_list = [getattr(ns, "name", str(ns)) for ns in namespaces] - rows = "\n".join( - f""" + rows = ( + "\n".join( + f""" {_escape(name)} - {_escape(f"{base_url}/mcp/{name}/")} + {_escape(f"{base_url}/{name}/")} """ - for name in name_list - ) or """ No MCP servers configured.""" + for name in name_list + ) + or """ \ +No MCP servers configured.""" + ) cursor_snippet = _CURSOR_SNIPPET.format(base_url=base_url) claude_desktop_snippet = _CLAUDE_DESKTOP_SNIPPET.format(base_url=base_url) return _DISCOVERY_TEMPLATE.format( actor=_escape(actor), base_url=_escape(base_url), rows=rows, - all_url=_escape(f"{base_url}/mcp/{ALL_NAMESPACE}/"), + all_url=_escape(f"{base_url}/{ALL_NAMESPACE}/"), cursor_snippet=_escape(cursor_snippet), claude_desktop_snippet=_escape(claude_desktop_snippet), ) @@ -443,8 +529,7 @@ _DISCOVERY_TEMPLATE = "\n".join( # noqa: FLY002 — readability beats one-strin " beaver-gateway · MCP discovery", "