diff --git a/src/beaver_gateway/backends/claude_code.py b/src/beaver_gateway/backends/claude_code.py index 173122b..bf033c7 100644 --- a/src/beaver_gateway/backends/claude_code.py +++ b/src/beaver_gateway/backends/claude_code.py @@ -226,6 +226,18 @@ class ClaudeCodeBackendAdapter: def live_session_count(self) -> int: return self._backend.live_session_count + @property + def live_sessions(self) -> dict[str, Any]: + """Live PTY processes keyed by claude session_id. + + Pass-through to the underlying ``ClaudeCodeBackend``. The value + type is the claude-code-api ``PtyClaudeProcess``; admin code + consumes ``captured_output()`` / ``add_output_listener`` / + ``write`` from it. Typed as ``Any`` to avoid leaking the lower + layer's type into the gateway's public surface. + """ + return self._backend.live_sessions + async def __aenter__(self) -> Self: await self._backend.__aenter__() return self diff --git a/src/beaver_gateway/frontends/admin/frontend.py b/src/beaver_gateway/frontends/admin/frontend.py index ccfdd0f..bc6c0f6 100644 --- a/src/beaver_gateway/frontends/admin/frontend.py +++ b/src/beaver_gateway/frontends/admin/frontend.py @@ -25,6 +25,8 @@ cookie session. from __future__ import annotations +import asyncio +import contextlib import hmac import json import logging @@ -35,7 +37,7 @@ from typing import TYPE_CHECKING, Annotated, Any from urllib.parse import urlsplit import itsdangerous -from fastapi import FastAPI, Form, HTTPException, Request, status +from fastapi import FastAPI, Form, HTTPException, Request, WebSocket, status from fastapi.responses import ( HTMLResponse, RedirectResponse, @@ -510,12 +512,193 @@ class AdminFrontend(Frontend): media_type="text/event-stream", ) + # ---- PTY terminal viewer ---- + + @app.get("/pty", response_class=HTMLResponse) + async def pty_list(request: Request) -> Response: + session = _require_session(request, signer, url_prefix=url_prefix) + sessions = _collect_pty_sessions(runtime) + return HTMLResponse( + render( + "pty_list.html", + user=session["user"], + csrf=session["csrf"], + sessions=sessions, + ) + ) + + @app.get("/pty/{session_id}", response_class=HTMLResponse) + async def pty_view(session_id: str, request: Request) -> Response: + session = _require_session(request, signer, url_prefix=url_prefix) + return HTMLResponse( + render( + "pty.html", + user=session["user"], + csrf=session["csrf"], + session_id=session_id, + ) + ) + + @app.websocket("/pty/{session_id}/ws") + async def pty_ws(websocket: WebSocket, session_id: str) -> None: + # Session check via cookie — websockets carry cookies, same + # signed payload as the HTTP routes. Reject before accepting + # so the client gets a clean 403 (which manifests as a WS + # connection failure on the JS side). + if _current_user_from_cookie(websocket, signer) is None: + await websocket.close(code=status.WS_1008_POLICY_VIOLATION) + return + pty = _find_pty(runtime, session_id) + if pty is None: + await websocket.close(code=status.WS_1008_POLICY_VIOLATION) + return + await _bridge_pty_websocket(websocket, pty) + return app # ---- helpers ------------------------------------------------------------ +def _collect_pty_sessions(runtime: GatewayRuntime) -> list[dict[str, Any]]: + """Enumerate live PTY sessions across all backends. + + A backend qualifies if it exposes a ``live_sessions`` mapping + (currently only ``ClaudeCodeBackendAdapter``). Other backend types + are quietly skipped — the admin terminal viewer only makes sense for + PTY-backed agents. + """ + out: list[dict[str, Any]] = [] + for agent_name, backend in runtime.backends.items(): + live = getattr(backend, "live_sessions", None) + if not isinstance(live, dict): + continue + for session_id, pty in live.items(): + out.append( + { + "agent": agent_name, + "session_id": session_id, + "pid": getattr(pty, "pid", None), + "buffer_size": len(pty.captured_output()) + if hasattr(pty, "captured_output") + else 0, + } + ) + return out + + +def _find_pty(runtime: GatewayRuntime, session_id: str) -> Any: + """Locate a live PTY by its claude session_id, across all backends.""" + for backend in runtime.backends.values(): + live = getattr(backend, "live_sessions", None) + if not isinstance(live, dict): + continue + pty = live.get(session_id) + if pty is not None: + return pty + return None + + +def _current_user_from_cookie( + websocket: WebSocket, signer: itsdangerous.URLSafeTimedSerializer +) -> dict[str, Any] | None: + """Validate the admin session cookie on a WebSocket handshake. + + Mirrors :func:`_current_user` but reads from the websocket's + request-style cookies. ``BadSignature`` / ``SignatureExpired`` / + ``BadData`` all collapse to None — the caller closes the connection. + """ + raw = websocket.cookies.get(SESSION_COOKIE) + if not raw: + return None + try: + payload = signer.loads(raw, max_age=SESSION_MAX_AGE) + except itsdangerous.BadData: + return None + if not isinstance(payload, dict) or "user" not in payload: + return None + return payload + + +async def _bridge_pty_websocket(websocket: WebSocket, pty: Any) -> None: + """Bidirectional bridge between a WebSocket and a live PtyClaudeProcess. + + Output direction: the drain thread already runs continuously and + fans incoming bytes to all subscribed listeners. We register a + listener that drops each chunk into an :class:`asyncio.Queue` via + ``call_soon_threadsafe`` (the drain runs on a thread, not the loop). + A dedicated sender task drains the queue into ``websocket.send_bytes``. + + Input direction: each client text frame is encoded UTF-8 and written + to the PTY raw (``newline=False``) so xterm.js can pass through key + sequences (Enter, arrows, control chars) verbatim. + + On either side closing, both tasks unwind and the listener is + removed. The initial ``captured_output()`` snapshot is sent so a + new client sees the current screen state, not a blank terminal. + """ + await websocket.accept() + loop = asyncio.get_running_loop() + out_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=1024) + + def listener(chunk: bytes) -> None: + loop.call_soon_threadsafe(out_queue.put_nowait, chunk) + + pty.add_output_listener(listener) + try: + # Initial replay so the client doesn't see a blank terminal. + initial = pty.captured_output() + if initial: + await websocket.send_bytes(bytes(initial)) + + async def writer() -> None: + while True: + chunk = await out_queue.get() + if chunk is None: + return + await websocket.send_bytes(chunk) + + async def reader() -> None: + while True: + msg = await websocket.receive() + if msg["type"] == "websocket.disconnect": + return + data = msg.get("text") or msg.get("bytes") + if data is None: + continue + payload = data.encode("utf-8") if isinstance(data, str) else bytes(data) + await pty.write(payload, newline=False) + + writer_task = asyncio.create_task(writer(), name="pty-ws-writer") + reader_task = asyncio.create_task(reader(), name="pty-ws-reader") + try: + done, pending = await asyncio.wait( + {writer_task, reader_task}, return_when=asyncio.FIRST_COMPLETED + ) + for t in pending: + t.cancel() + with contextlib.suppress(BaseException): + await t + for t in done: + # Surface a real error instead of hiding it in the + # task's stored exception — but never crash the bridge + # on a clean disconnect. + exc = t.exception() + if exc is not None and not isinstance( + exc, asyncio.CancelledError | RuntimeError | OSError + ): + _log.warning("pty websocket task failed: %s", exc) + finally: + out_queue.put_nowait(None) + finally: + pty.remove_output_listener(listener) + with contextlib.suppress(BaseException): + await websocket.close() + + +# ---- helpers (originals) ------------------------------------------------ + + def _build_template_env() -> Environment: env = Environment( loader=PackageLoader("beaver_gateway.frontends.admin", "templates"), diff --git a/src/beaver_gateway/frontends/admin/templates/_layout.html b/src/beaver_gateway/frontends/admin/templates/_layout.html index 5084416..ede592b 100644 --- a/src/beaver_gateway/frontends/admin/templates/_layout.html +++ b/src/beaver_gateway/frontends/admin/templates/_layout.html @@ -187,6 +187,7 @@ diff --git a/src/beaver_gateway/frontends/admin/templates/pty.html b/src/beaver_gateway/frontends/admin/templates/pty.html new file mode 100644 index 0000000..c8bbd52 --- /dev/null +++ b/src/beaver_gateway/frontends/admin/templates/pty.html @@ -0,0 +1,146 @@ +{% extends "_layout.html" %} +{% set active = "pty" %} +{% block title %}PTY {{ session_id }} · beaver-gateway{% endblock %} +{% block content %} + + + + +
{{ session_id }}+ Live view of the claude subprocess TUI. Keystrokes you type here go + straight into its stdin — Enter, arrows, Ctrl-C, paste, all work. + ← back to list +
+ + + +
+ Each row is a running claude subprocess. Open one to see what's
+ currently rendered on its TUI and (if needed) type into it directly.
+
| Session ID | +Agent | +PID | +Buffer | ++ |
|---|---|---|---|---|
{{ s.session_id }} |
+ {{ s.agent }} | +{{ s.pid or "?" }} | +{{ s.buffer_size }} bytes | ++ Open + | +