feat: add pty monitoring
This commit is contained in:
@@ -149,6 +149,18 @@ class ClaudeCodeBackend:
|
|||||||
def live_session_count(self) -> int:
|
def live_session_count(self) -> int:
|
||||||
return len(self._sessions)
|
return len(self._sessions)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def live_sessions(self) -> dict[str, PtyClaudeProcess]:
|
||||||
|
"""Snapshot of live PTY processes keyed by ``session_id``.
|
||||||
|
|
||||||
|
Returned dict is a copy — caller may iterate freely without
|
||||||
|
worrying about concurrent ``complete()`` calls reshuffling
|
||||||
|
``_sessions`` (which is keyed by history fingerprint, not
|
||||||
|
``session_id``). Intended for debug surfaces (e.g. admin
|
||||||
|
terminal viewer) that need to look up a session by id.
|
||||||
|
"""
|
||||||
|
return {s.session_id: s.pty for s in self._sessions.values()}
|
||||||
|
|
||||||
async def complete(self, messages: list[Mapping[str, Any]]) -> AsyncIterator[Event]:
|
async def complete(self, messages: list[Mapping[str, Any]]) -> AsyncIterator[Event]:
|
||||||
"""Run one turn against the matching session (or spawn one).
|
"""Run one turn against the matching session (or spawn one).
|
||||||
|
|
||||||
|
|||||||
@@ -214,8 +214,14 @@ class PtyClaudeProcess:
|
|||||||
output_buffer_cap: int = _DEFAULT_OUTPUT_BUFFER_CAP,
|
output_buffer_cap: int = _DEFAULT_OUTPUT_BUFFER_CAP,
|
||||||
) -> None:
|
) -> None:
|
||||||
self._opts = options
|
self._opts = options
|
||||||
self._on_output = on_pty_output
|
|
||||||
self._output_buffer_cap = output_buffer_cap
|
self._output_buffer_cap = output_buffer_cap
|
||||||
|
# Listener list — drain thread fans out each chunk to every
|
||||||
|
# subscriber. Single-callback ``on_pty_output`` ctor arg is kept
|
||||||
|
# for compat; it just becomes the first listener.
|
||||||
|
self._output_listeners: list[PtyOutputCallback] = []
|
||||||
|
self._listeners_lock = threading.Lock()
|
||||||
|
if on_pty_output is not None:
|
||||||
|
self._output_listeners.append(on_pty_output)
|
||||||
|
|
||||||
if options.resume_session_id is not None:
|
if options.resume_session_id is not None:
|
||||||
self._session_id = options.resume_session_id
|
self._session_id = options.resume_session_id
|
||||||
@@ -259,6 +265,22 @@ class PtyClaudeProcess:
|
|||||||
with self._output_lock:
|
with self._output_lock:
|
||||||
return bytes(self._output_buffer)
|
return bytes(self._output_buffer)
|
||||||
|
|
||||||
|
def add_output_listener(self, listener: PtyOutputCallback) -> None:
|
||||||
|
"""Subscribe a callback to every chunk the drain thread reads.
|
||||||
|
|
||||||
|
Callbacks run on the drain thread; do as little as possible in
|
||||||
|
them (e.g. push to a queue) and never block. Exceptions are
|
||||||
|
swallowed per-listener to keep one bad subscriber from poisoning
|
||||||
|
the drain loop.
|
||||||
|
"""
|
||||||
|
with self._listeners_lock:
|
||||||
|
self._output_listeners.append(listener)
|
||||||
|
|
||||||
|
def remove_output_listener(self, listener: PtyOutputCallback) -> None:
|
||||||
|
"""Unsubscribe a previously added listener. Silent on missing."""
|
||||||
|
with self._listeners_lock, contextlib.suppress(ValueError):
|
||||||
|
self._output_listeners.remove(listener)
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
"""Spawn the child synchronously on the main thread.
|
"""Spawn the child synchronously on the main thread.
|
||||||
|
|
||||||
@@ -333,8 +355,9 @@ class PtyClaudeProcess:
|
|||||||
continue
|
continue
|
||||||
if not data:
|
if not data:
|
||||||
break
|
break
|
||||||
cb = self._on_output
|
with self._listeners_lock:
|
||||||
if cb is not None:
|
listeners = list(self._output_listeners)
|
||||||
|
for cb in listeners:
|
||||||
with contextlib.suppress(Exception):
|
with contextlib.suppress(Exception):
|
||||||
cb(data)
|
cb(data)
|
||||||
with self._output_lock:
|
with self._output_lock:
|
||||||
|
|||||||
Reference in New Issue
Block a user