diff --git a/src/claude_code_api/backend.py b/src/claude_code_api/backend.py index f5e0b20..878e454 100644 --- a/src/claude_code_api/backend.py +++ b/src/claude_code_api/backend.py @@ -149,6 +149,18 @@ class ClaudeCodeBackend: def live_session_count(self) -> int: return len(self._sessions) + @property + def live_sessions(self) -> dict[str, PtyClaudeProcess]: + """Snapshot of live PTY processes keyed by ``session_id``. + + Returned dict is a copy — caller may iterate freely without + worrying about concurrent ``complete()`` calls reshuffling + ``_sessions`` (which is keyed by history fingerprint, not + ``session_id``). Intended for debug surfaces (e.g. admin + terminal viewer) that need to look up a session by id. + """ + return {s.session_id: s.pty for s in self._sessions.values()} + async def complete(self, messages: list[Mapping[str, Any]]) -> AsyncIterator[Event]: """Run one turn against the matching session (or spawn one). diff --git a/src/claude_code_api/pty.py b/src/claude_code_api/pty.py index 92c24b8..d6cbed8 100644 --- a/src/claude_code_api/pty.py +++ b/src/claude_code_api/pty.py @@ -214,8 +214,14 @@ class PtyClaudeProcess: output_buffer_cap: int = _DEFAULT_OUTPUT_BUFFER_CAP, ) -> None: self._opts = options - self._on_output = on_pty_output self._output_buffer_cap = output_buffer_cap + # Listener list — drain thread fans out each chunk to every + # subscriber. Single-callback ``on_pty_output`` ctor arg is kept + # for compat; it just becomes the first listener. + self._output_listeners: list[PtyOutputCallback] = [] + self._listeners_lock = threading.Lock() + if on_pty_output is not None: + self._output_listeners.append(on_pty_output) if options.resume_session_id is not None: self._session_id = options.resume_session_id @@ -259,6 +265,22 @@ class PtyClaudeProcess: with self._output_lock: return bytes(self._output_buffer) + def add_output_listener(self, listener: PtyOutputCallback) -> None: + """Subscribe a callback to every chunk the drain thread reads. + + Callbacks run on the drain thread; do as little as possible in + them (e.g. push to a queue) and never block. Exceptions are + swallowed per-listener to keep one bad subscriber from poisoning + the drain loop. + """ + with self._listeners_lock: + self._output_listeners.append(listener) + + def remove_output_listener(self, listener: PtyOutputCallback) -> None: + """Unsubscribe a previously added listener. Silent on missing.""" + with self._listeners_lock, contextlib.suppress(ValueError): + self._output_listeners.remove(listener) + async def start(self) -> None: """Spawn the child synchronously on the main thread. @@ -333,8 +355,9 @@ class PtyClaudeProcess: continue if not data: break - cb = self._on_output - if cb is not None: + with self._listeners_lock: + listeners = list(self._output_listeners) + for cb in listeners: with contextlib.suppress(Exception): cb(data) with self._output_lock: