feat: add pty monitoring for claude code

This commit is contained in:
h
2026-05-23 00:37:49 +02:00
parent 9de5647cfa
commit c7b3eddbf4
6 changed files with 386 additions and 3 deletions
@@ -226,6 +226,18 @@ class ClaudeCodeBackendAdapter:
def live_session_count(self) -> int:
return self._backend.live_session_count
@property
def live_sessions(self) -> dict[str, Any]:
"""Live PTY processes keyed by claude session_id.
Pass-through to the underlying ``ClaudeCodeBackend``. The value
type is the claude-code-api ``PtyClaudeProcess``; admin code
consumes ``captured_output()`` / ``add_output_listener`` /
``write`` from it. Typed as ``Any`` to avoid leaking the lower
layer's type into the gateway's public surface.
"""
return self._backend.live_sessions
async def __aenter__(self) -> Self:
await self._backend.__aenter__()
return self
+184 -1
View File
@@ -25,6 +25,8 @@ cookie session.
from __future__ import annotations
import asyncio
import contextlib
import hmac
import json
import logging
@@ -35,7 +37,7 @@ from typing import TYPE_CHECKING, Annotated, Any
from urllib.parse import urlsplit
import itsdangerous
from fastapi import FastAPI, Form, HTTPException, Request, status
from fastapi import FastAPI, Form, HTTPException, Request, WebSocket, status
from fastapi.responses import (
HTMLResponse,
RedirectResponse,
@@ -510,12 +512,193 @@ class AdminFrontend(Frontend):
media_type="text/event-stream",
)
# ---- PTY terminal viewer ----
@app.get("/pty", response_class=HTMLResponse)
async def pty_list(request: Request) -> Response:
session = _require_session(request, signer, url_prefix=url_prefix)
sessions = _collect_pty_sessions(runtime)
return HTMLResponse(
render(
"pty_list.html",
user=session["user"],
csrf=session["csrf"],
sessions=sessions,
)
)
@app.get("/pty/{session_id}", response_class=HTMLResponse)
async def pty_view(session_id: str, request: Request) -> Response:
session = _require_session(request, signer, url_prefix=url_prefix)
return HTMLResponse(
render(
"pty.html",
user=session["user"],
csrf=session["csrf"],
session_id=session_id,
)
)
@app.websocket("/pty/{session_id}/ws")
async def pty_ws(websocket: WebSocket, session_id: str) -> None:
# Session check via cookie — websockets carry cookies, same
# signed payload as the HTTP routes. Reject before accepting
# so the client gets a clean 403 (which manifests as a WS
# connection failure on the JS side).
if _current_user_from_cookie(websocket, signer) is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
pty = _find_pty(runtime, session_id)
if pty is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await _bridge_pty_websocket(websocket, pty)
return app
# ---- helpers ------------------------------------------------------------
def _collect_pty_sessions(runtime: GatewayRuntime) -> list[dict[str, Any]]:
"""Enumerate live PTY sessions across all backends.
A backend qualifies if it exposes a ``live_sessions`` mapping
(currently only ``ClaudeCodeBackendAdapter``). Other backend types
are quietly skipped — the admin terminal viewer only makes sense for
PTY-backed agents.
"""
out: list[dict[str, Any]] = []
for agent_name, backend in runtime.backends.items():
live = getattr(backend, "live_sessions", None)
if not isinstance(live, dict):
continue
for session_id, pty in live.items():
out.append(
{
"agent": agent_name,
"session_id": session_id,
"pid": getattr(pty, "pid", None),
"buffer_size": len(pty.captured_output())
if hasattr(pty, "captured_output")
else 0,
}
)
return out
def _find_pty(runtime: GatewayRuntime, session_id: str) -> Any:
"""Locate a live PTY by its claude session_id, across all backends."""
for backend in runtime.backends.values():
live = getattr(backend, "live_sessions", None)
if not isinstance(live, dict):
continue
pty = live.get(session_id)
if pty is not None:
return pty
return None
def _current_user_from_cookie(
websocket: WebSocket, signer: itsdangerous.URLSafeTimedSerializer
) -> dict[str, Any] | None:
"""Validate the admin session cookie on a WebSocket handshake.
Mirrors :func:`_current_user` but reads from the websocket's
request-style cookies. ``BadSignature`` / ``SignatureExpired`` /
``BadData`` all collapse to None — the caller closes the connection.
"""
raw = websocket.cookies.get(SESSION_COOKIE)
if not raw:
return None
try:
payload = signer.loads(raw, max_age=SESSION_MAX_AGE)
except itsdangerous.BadData:
return None
if not isinstance(payload, dict) or "user" not in payload:
return None
return payload
async def _bridge_pty_websocket(websocket: WebSocket, pty: Any) -> None:
"""Bidirectional bridge between a WebSocket and a live PtyClaudeProcess.
Output direction: the drain thread already runs continuously and
fans incoming bytes to all subscribed listeners. We register a
listener that drops each chunk into an :class:`asyncio.Queue` via
``call_soon_threadsafe`` (the drain runs on a thread, not the loop).
A dedicated sender task drains the queue into ``websocket.send_bytes``.
Input direction: each client text frame is encoded UTF-8 and written
to the PTY raw (``newline=False``) so xterm.js can pass through key
sequences (Enter, arrows, control chars) verbatim.
On either side closing, both tasks unwind and the listener is
removed. The initial ``captured_output()`` snapshot is sent so a
new client sees the current screen state, not a blank terminal.
"""
await websocket.accept()
loop = asyncio.get_running_loop()
out_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=1024)
def listener(chunk: bytes) -> None:
loop.call_soon_threadsafe(out_queue.put_nowait, chunk)
pty.add_output_listener(listener)
try:
# Initial replay so the client doesn't see a blank terminal.
initial = pty.captured_output()
if initial:
await websocket.send_bytes(bytes(initial))
async def writer() -> None:
while True:
chunk = await out_queue.get()
if chunk is None:
return
await websocket.send_bytes(chunk)
async def reader() -> None:
while True:
msg = await websocket.receive()
if msg["type"] == "websocket.disconnect":
return
data = msg.get("text") or msg.get("bytes")
if data is None:
continue
payload = data.encode("utf-8") if isinstance(data, str) else bytes(data)
await pty.write(payload, newline=False)
writer_task = asyncio.create_task(writer(), name="pty-ws-writer")
reader_task = asyncio.create_task(reader(), name="pty-ws-reader")
try:
done, pending = await asyncio.wait(
{writer_task, reader_task}, return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
with contextlib.suppress(BaseException):
await t
for t in done:
# Surface a real error instead of hiding it in the
# task's stored exception — but never crash the bridge
# on a clean disconnect.
exc = t.exception()
if exc is not None and not isinstance(
exc, asyncio.CancelledError | RuntimeError | OSError
):
_log.warning("pty websocket task failed: %s", exc)
finally:
out_queue.put_nowait(None)
finally:
pty.remove_output_listener(listener)
with contextlib.suppress(BaseException):
await websocket.close()
# ---- helpers (originals) ------------------------------------------------
def _build_template_env() -> Environment:
env = Environment(
loader=PackageLoader("beaver_gateway.frontends.admin", "templates"),
@@ -187,6 +187,7 @@
<nav class="tabs">
<a href="{{ p }}/" class="{% if active == 'dashboard' %}active{% endif %}">Dashboard</a>
<a href="{{ p }}/chat" class="{% if active == 'chat' %}active{% endif %}">Chat</a>
<a href="{{ p }}/pty" class="{% if active == 'pty' %}active{% endif %}">PTY</a>
<a href="{{ p }}/tokens" class="{% if active == 'tokens' %}active{% endif %}">Tokens</a>
<a href="{{ p }}/audit" class="{% if active == 'audit' %}active{% endif %}">Audit</a>
</nav>
@@ -0,0 +1,146 @@
{% extends "_layout.html" %}
{% set active = "pty" %}
{% block title %}PTY {{ session_id }} · beaver-gateway{% endblock %}
{% block content %}
<style>
/* xterm.js styles inlined from the CDN bundle — keep them tight and
constrained to the terminal block so they don't leak into the rest
of the admin UI. */
.term-wrap {
background: #000;
border-radius: 12px;
padding: 12px;
border: 1px solid var(--line);
}
.term-host {
height: 70vh;
}
.term-host .xterm {
height: 100%;
}
.term-toolbar {
display: flex;
gap: 0.5rem;
align-items: center;
margin-bottom: 0.75rem;
flex-wrap: wrap;
}
.term-toolbar .status {
margin-left: auto;
font-size: 0.85em;
color: var(--muted);
}
.term-toolbar .status.connected { color: #027a48; }
.term-toolbar .status.dropped { color: var(--danger); }
</style>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/@xterm/xterm@5.5.0/css/xterm.min.css">
<h2>PTY <code>{{ session_id }}</code></h2>
<p class="muted">
Live view of the claude subprocess TUI. Keystrokes you type here go
straight into its stdin — Enter, arrows, Ctrl-C, paste, all work.
<a href="{{ p }}/pty">← back to list</a>
</p>
<div class="term-toolbar">
<button id="reconnect-btn" type="button">Reconnect</button>
<button id="enter-btn" type="button" title="Send a single \r — handy if a paste is stuck in the input box">Send Enter</button>
<button id="ctrl-c-btn" type="button">Send Ctrl-C</button>
<span id="status" class="status">connecting…</span>
</div>
<div class="term-wrap">
<div id="term" class="term-host"></div>
</div>
<script src="https://cdn.jsdelivr.net/npm/@xterm/xterm@5.5.0/lib/xterm.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/@xterm/addon-fit@0.10.0/lib/addon-fit.min.js"></script>
<script>
(function () {
const sessionId = {{ session_id|tojson }};
const wsBase =
(location.protocol === "https:" ? "wss://" : "ws://") +
location.host +
{{ p|tojson }} +
"/pty/" + encodeURIComponent(sessionId) + "/ws";
const term = new Terminal({
fontFamily: 'ui-monospace, "SF Mono", Menlo, Consolas, monospace',
fontSize: 13,
theme: {
background: "#000000",
foreground: "#d0d0d0",
},
convertEol: false,
cursorBlink: true,
scrollback: 5000,
});
const fit = new FitAddon.FitAddon();
term.loadAddon(fit);
term.open(document.getElementById("term"));
// Defer fit until after layout settles; xterm's measurement reads
// computed CSS that isn't stable until the page paints once.
requestAnimationFrame(() => fit.fit());
window.addEventListener("resize", () => fit.fit());
const statusEl = document.getElementById("status");
function setStatus(text, cls) {
statusEl.textContent = text;
statusEl.className = "status" + (cls ? " " + cls : "");
}
let ws = null;
let manuallyClosed = false;
function connect() {
manuallyClosed = false;
setStatus("connecting…", "");
ws = new WebSocket(wsBase);
ws.binaryType = "arraybuffer";
ws.onopen = () => setStatus("connected", "connected");
ws.onclose = () => {
if (!manuallyClosed) setStatus("disconnected", "dropped");
};
ws.onerror = () => setStatus("error", "dropped");
ws.onmessage = (ev) => {
if (typeof ev.data === "string") {
term.write(ev.data);
} else {
// ArrayBuffer — feed raw bytes to xterm. It expects either
// string or Uint8Array; the latter preserves byte boundaries
// exactly, which matters for ANSI sequences split across frames.
term.write(new Uint8Array(ev.data));
}
};
}
// Keystrokes -> server. xterm.js gives us the exact byte sequence
// the terminal would emit (e.g. Enter -> "\r", arrows -> "\x1b[A"
// etc.), so we just pass it through.
term.onData((data) => {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(data);
}
});
document.getElementById("reconnect-btn").addEventListener("click", () => {
if (ws) {
manuallyClosed = true;
ws.close();
}
connect();
});
document.getElementById("enter-btn").addEventListener("click", () => {
if (ws && ws.readyState === WebSocket.OPEN) ws.send("\r");
term.focus();
});
document.getElementById("ctrl-c-btn").addEventListener("click", () => {
if (ws && ws.readyState === WebSocket.OPEN) ws.send("\x03");
term.focus();
});
connect();
})();
</script>
{% endblock %}
@@ -0,0 +1,41 @@
{% extends "_layout.html" %}
{% set active = "pty" %}
{% block title %}PTY · beaver-gateway · admin{% endblock %}
{% block content %}
<h2>Live PTY sessions</h2>
<p class="muted">
Each row is a running <code>claude</code> subprocess. Open one to see what's
currently rendered on its TUI and (if needed) type into it directly.
</p>
{% if sessions %}
<div class="card" style="padding: 0;">
<table>
<thead>
<tr>
<th>Session ID</th>
<th>Agent</th>
<th>PID</th>
<th>Buffer</th>
<th></th>
</tr>
</thead>
<tbody>
{% for s in sessions %}
<tr>
<td><code>{{ s.session_id }}</code></td>
<td>{{ s.agent }}</td>
<td><span class="pill">{{ s.pid or "?" }}</span></td>
<td>{{ s.buffer_size }} bytes</td>
<td>
<a class="btn primary" href="{{ p }}/pty/{{ s.session_id }}">Open</a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<div class="banner">No live PTY sessions right now. Start a turn and refresh.</div>
{% endif %}
{% endblock %}
Generated
+2 -2
View File
@@ -287,7 +287,7 @@ local = [
{ name = "raycast-api", version = "0.1.0", source = { editable = "../raycast-api" } },
]
prod = [
{ name = "claude-code-api", version = "0.1.0", source = { git = "https://git.kotikot.com/beaver/claude-code-api.git#339461716338c2e6850c7aa654a429efcaf80bf7" } },
{ name = "claude-code-api", version = "0.1.0", source = { git = "https://git.kotikot.com/beaver/claude-code-api.git#27e6e5a1bfc87d68213d08ef67e269350188c935" } },
{ name = "raycast-api", version = "0.1.0", source = { git = "https://git.kotikot.com/beaver/raycast-api.git#e73894c8e435da5c0709f92f69f11bcd0dab9afe" } },
]
@@ -419,7 +419,7 @@ wheels = [
[[package]]
name = "claude-code-api"
version = "0.1.0"
source = { git = "https://git.kotikot.com/beaver/claude-code-api.git#339461716338c2e6850c7aa654a429efcaf80bf7" }
source = { git = "https://git.kotikot.com/beaver/claude-code-api.git#27e6e5a1bfc87d68213d08ef67e269350188c935" }
resolution-markers = [
"python_full_version >= '3.14'",
"python_full_version < '3.14'",