diff --git a/src/claude_code_api/backend.py b/src/claude_code_api/backend.py index d0f5de6..f5e0b20 100644 --- a/src/claude_code_api/backend.py +++ b/src/claude_code_api/backend.py @@ -19,6 +19,7 @@ from __future__ import annotations import asyncio import contextlib import json +import logging import os import tempfile import uuid @@ -27,6 +28,8 @@ from dataclasses import dataclass, field from pathlib import Path from typing import Any, Literal, Self +_log = logging.getLogger("claude_code_api.backend") + from claude_code_api.errors import MessageParseError from claude_code_api.events import ( AssistantMessage, @@ -171,31 +174,84 @@ class ClaudeCodeBackend: async with self._lock: prior = list(messages[:-1]) fp_prior = hash_history(prior) + _log.info( + "complete: n_msgs=%d prior_fp=%s last_text_len=%d pool_size=%d", + len(messages), + fp_prior[:12], + len(last_text), + len(self._sessions), + ) session: _LiveSession if prior and fp_prior in self._sessions: session = self._sessions.pop(fp_prior) send_text = last_text + _log.info( + "complete: POOL HIT fp=%s -> reusing session_id=%s", + fp_prior[:12], + session.session_id, + ) else: + _log.info( + "complete: POOL MISS fp=%s (prior=%d msgs) -> spawning new session", + fp_prior[:12], + len(prior), + ) session = await self._create_session(prior) if prior and self._opts.history_injection_mode == "concat_message": send_text = build_concat_prompt(prior, last_text) + _log.info( + "complete: concat_message mode, send_text_len=%d", + len(send_text), + ) else: send_text = last_text events: list[Event] = [] + n_assistant = 0 + n_user = 0 + n_system = 0 try: + _log.info( + "complete: sending user msg to session_id=%s (text_len=%d)", + session.session_id, + len(send_text), + ) async for event in session.tm.send_user_message(send_text): events.append(event) + if isinstance(event, AssistantMessage): + n_assistant += 1 + elif isinstance(event, UserMessage): + n_user += 1 + else: + n_system += 1 yield event - except BaseException: + except BaseException as exc: + _log.exception( + "complete: session_id=%s FAILED (events so far: a=%d u=%d sys=%d): %s", + session.session_id, + n_assistant, + n_user, + n_system, + exc, + ) with contextlib.suppress(Exception): await session.aclose() raise synthesized_cycle = synthesize_turn_messages(events) new_history = [*list(messages), *synthesized_cycle] - self._sessions[hash_history(new_history)] = session + new_fp = hash_history(new_history) + self._sessions[new_fp] = session + _log.info( + "complete: session_id=%s DONE a=%d u=%d sys=%d synth=%d -> repooled under fp=%s", + session.session_id, + n_assistant, + n_user, + n_system, + len(synthesized_cycle), + new_fp[:12], + ) async def aclose(self) -> None: """Shut down all live sessions; remove the temp mcp-config file.""" @@ -237,10 +293,24 @@ class ClaudeCodeBackend: jsonl_path.write_text(seed, encoding="utf-8") start_offset = jsonl_path.stat().st_size resume = True + _log.info( + "_create_session: session_id=%s SEED jsonl=%s bytes=%d history_msgs=%d resume=True", + session_id, + jsonl_path, + start_offset, + len(history), + ) else: jsonl_path = resolve_jsonl_path(cwd, session_id) start_offset = 0 resume = False + _log.info( + "_create_session: session_id=%s FRESH jsonl=%s history_msgs=%d resume=False mode=%s", + session_id, + jsonl_path, + len(history), + self._opts.history_injection_mode, + ) if self._session_factory is not None: result = self._session_factory( @@ -260,6 +330,12 @@ class ClaudeCodeBackend: async def _spawn_real_session( self, *, session_id: str, resume: bool, jsonl_path: Path, start_offset: int ) -> _LiveSession: + _log.info( + "_spawn_real_session: session_id=%s resume=%s start_offset=%d", + session_id, + resume, + start_offset, + ) pty_opts = self._build_pty_options(session_id=session_id, resume=resume) pty = PtyClaudeProcess(pty_opts) watcher = JsonlWatcher(jsonl_path, start_offset=start_offset) @@ -274,6 +350,11 @@ class ClaudeCodeBackend: on_parse_error=self._on_parse_error, ) await tm.start() + _log.info( + "_spawn_real_session: session_id=%s STARTED pid=%s", + session_id, + getattr(pty, "pid", "?"), + ) return _LiveSession(pty=pty, watcher=watcher, tm=tm) def _build_pty_options(self, *, session_id: str, resume: bool) -> PtyProcessOptions: diff --git a/src/claude_code_api/pty.py b/src/claude_code_api/pty.py index a4969f4..caa3e25 100644 --- a/src/claude_code_api/pty.py +++ b/src/claude_code_api/pty.py @@ -15,6 +15,7 @@ from __future__ import annotations import asyncio import contextlib import errno +import logging import os import select import signal @@ -28,6 +29,8 @@ from ptyprocess import PtyProcess from claude_code_api.errors import CLINotFoundError +_log = logging.getLogger("claude_code_api.pty") + PtyOutputCallback = Callable[[bytes], None] _PROVIDER_ENV_VARS: tuple[str, ...] = ( @@ -239,6 +242,12 @@ class PtyClaudeProcess: msg = "PtyClaudeProcess.start() called twice" raise RuntimeError(msg) + _log.info( + "start: session_id=%s spawning argv=%r cwd=%s", + self._session_id, + self._argv, + self.cwd, + ) try: self._pty = PtyProcess.spawn( self._argv, @@ -248,7 +257,17 @@ class PtyClaudeProcess: dimensions=self._opts.dimensions, ) except FileNotFoundError as exc: + _log.exception( + "start: session_id=%s claude CLI not found: %s", + self._session_id, + self._opts.executable, + ) raise CLINotFoundError(executable=self._opts.executable) from exc + _log.info( + "start: session_id=%s spawned pid=%s", + self._session_id, + self._pty.pid, + ) self._drain_stop.clear() self._drain_thread = threading.Thread( target=self._drain_loop, @@ -312,12 +331,37 @@ class PtyClaudeProcess: msg = f"poll must be positive, got {poll!r}" raise ValueError(msg) loop = asyncio.get_running_loop() - deadline = loop.time() + timeout + t0 = loop.time() + deadline = t0 + timeout + last_progress = t0 while True: if marker in self.captured_output(): + _log.info( + "wait_for_output: session_id=%s marker %r FOUND after %.2fs (buf=%d bytes)", + self._session_id, + marker, + loop.time() - t0, + len(self.captured_output()), + ) return True - if loop.time() >= deadline: + now = loop.time() + if now >= deadline: + _log.warning( + "wait_for_output: session_id=%s marker %r TIMEOUT after %.2fs (buf=%d bytes)", + self._session_id, + marker, + now - t0, + len(self.captured_output()), + ) return False + if now - last_progress >= 5.0: + _log.info( + "wait_for_output: session_id=%s still waiting %.1fs (buf=%d bytes)", + self._session_id, + now - t0, + len(self.captured_output()), + ) + last_progress = now await asyncio.sleep(poll) async def wait_for_quiet( @@ -348,13 +392,29 @@ class PtyClaudeProcess: msg = f"poll must be positive, got {poll!r}" raise ValueError(msg) loop = asyncio.get_running_loop() - deadline = loop.time() + timeout + t0 = loop.time() + deadline = t0 + timeout last_len = len(self.captured_output()) - last_change = loop.time() + last_change = t0 while True: - if loop.time() - last_change >= idle: + now = loop.time() + if now - last_change >= idle: + _log.info( + "wait_for_quiet: session_id=%s idle %.2fs reached after %.2fs (buf=%d bytes)", + self._session_id, + idle, + now - t0, + last_len, + ) return True - if loop.time() >= deadline: + if now >= deadline: + _log.warning( + "wait_for_quiet: session_id=%s TIMEOUT after %.2fs (idle reached only %.2fs, buf=%d bytes — TUI still rendering)", + self._session_id, + now - t0, + now - last_change, + len(self.captured_output()), + ) return False await asyncio.sleep(poll) cur_len = len(self.captured_output()) @@ -390,13 +450,30 @@ class PtyClaudeProcess: payload = data.encode("utf-8") if isinstance(data, str) else bytes(data) pty = self._pty if not newline: + _log.info( + "write: session_id=%s RAW %d bytes (newline=False)", + self._session_id, + len(payload), + ) return await asyncio.to_thread(pty.write, payload) if payload.endswith(b"\r"): payload = payload[:-1] paste_chunk = b"\x1b[200~" + payload + b"\x1b[201~" + _log.info( + "write: session_id=%s PASTE %d bytes (paste-framed), will follow with Enter in %.0fms", + self._session_id, + len(payload), + _SUBMIT_KEY_DELAY * 1000, + ) n1 = await asyncio.to_thread(pty.write, paste_chunk) await asyncio.sleep(_SUBMIT_KEY_DELAY) n2 = await asyncio.to_thread(pty.write, b"\r") + _log.info( + "write: session_id=%s SUBMIT done (paste=%d Enter=%d)", + self._session_id, + n1, + n2, + ) return n1 + n2 async def send_control(self, char: str) -> None: diff --git a/src/claude_code_api/turn.py b/src/claude_code_api/turn.py index a737c6f..b071255 100644 --- a/src/claude_code_api/turn.py +++ b/src/claude_code_api/turn.py @@ -26,6 +26,7 @@ from __future__ import annotations import asyncio import contextlib +import logging from collections.abc import AsyncIterator, Callable, Iterable from typing import TYPE_CHECKING, Any, Self @@ -43,6 +44,8 @@ from claude_code_api.watcher import JsonlRecord, JsonlWatcher if TYPE_CHECKING: from claude_code_api.pty import PtyClaudeProcess +_log = logging.getLogger("claude_code_api.turn") + _TERMINAL_STOP_REASONS: frozenset[str] = frozenset( {"end_turn", "max_tokens", "stop_sequence", "refusal"} ) @@ -151,26 +154,68 @@ class TurnManager: a plain sleep then. """ if self._started: + _log.info( + "start: session_id=%s already started, skip", + self._pty.session_id, + ) return + _log.info( + "start: session_id=%s spawning PTY (startup_delay=%.1fs)", + self._pty.session_id, + self._startup_delay, + ) await self._pty.start() + _log.info( + "start: session_id=%s PTY spawned pid=%s, waiting for TUI ready...", + self._pty.session_id, + getattr(self._pty, "pid", "?"), + ) if self._startup_delay > 0: loop = asyncio.get_running_loop() - deadline = loop.time() + self._startup_delay + t0 = loop.time() + deadline = t0 + self._startup_delay wait_marker = getattr(self._pty, "wait_for_output", None) wait_quiet = getattr(self._pty, "wait_for_quiet", None) if wait_marker is not None: # Phase 1: bracketed-paste-enable byte tells us terminal # mode is set up. Necessary but not sufficient. - await wait_marker(_TUI_READY_MARKER, timeout=self._startup_delay) + marker_found = await wait_marker( + _TUI_READY_MARKER, timeout=self._startup_delay + ) + t_marker = loop.time() - t0 + _log.info( + "start: session_id=%s marker DECSET2004 %s after %.2fs", + self._pty.session_id, + "FOUND" if marker_found else "TIMED_OUT", + t_marker, + ) # Phase 2: wait for the TUI render burst to settle — # consecutive seconds of no new PTY bytes. Bounded by what's # left of `startup_delay`. remaining = max(0.0, deadline - loop.time()) if wait_quiet is not None and remaining > 0: - await wait_quiet(idle=_TUI_QUIET_PERIOD, timeout=remaining) + quiet_ok = await wait_quiet( + idle=_TUI_QUIET_PERIOD, timeout=remaining + ) + t_quiet = loop.time() - t0 + _log.info( + "start: session_id=%s quiet %s after %.2fs total", + self._pty.session_id, + "REACHED" if quiet_ok else "TIMED_OUT", + t_quiet, + ) else: + _log.info( + "start: session_id=%s no wait_for_output API, plain sleep %.1fs", + self._pty.session_id, + self._startup_delay, + ) await asyncio.sleep(self._startup_delay) self._started = True + _log.info( + "start: session_id=%s READY", + self._pty.session_id, + ) async def send_user_message(self, text: str) -> AsyncIterator[Event]: """Send `text` as a user prompt and stream events until turn-end. @@ -187,18 +232,50 @@ class TurnManager: raise RuntimeError(msg) self._turn_in_progress = True self._turn_count += 1 + sid = self._pty.session_id + text_preview = text[:80].replace("\n", "\\n") + _log.info( + "send_user_message: session_id=%s turn=%d text_len=%d preview=%r", + sid, + self._turn_count, + len(text), + text_preview, + ) try: try: - await self._pty.write(text) + n_written = await self._pty.write(text) + _log.info( + "send_user_message: session_id=%s wrote %d bytes to PTY", + sid, + n_written, + ) except OSError as exc: + _log.exception( + "send_user_message: session_id=%s PTY write failed: %s", sid, exc + ) raise self._classify_pty_failure( fallback_message="claude process not accepting input" ) from exc if not self._watcher.path.exists(): + _log.info( + "send_user_message: session_id=%s waiting for JSONL %s (timeout=%.1fs)", + sid, + self._watcher.path, + self._file_wait_timeout or 0.0, + ) try: await self._watcher.wait_for_file(timeout=self._file_wait_timeout) + _log.info( + "send_user_message: session_id=%s JSONL appeared", sid + ) except TimeoutError as exc: + _log.error( + "send_user_message: session_id=%s JSONL DID NOT APPEAR within %.1fs at %s", + sid, + self._file_wait_timeout or 0.0, + self._watcher.path, + ) raise self._classify_pty_failure( fallback_cls=SessionError, fallback_message=( @@ -206,25 +283,57 @@ class TurnManager: f"{self._file_wait_timeout}s: {self._watcher.path}" ), ) from exc + else: + _log.info( + "send_user_message: session_id=%s JSONL already exists at %s (resume mode)", + sid, + self._watcher.path, + ) terminal_assistant: AssistantMessage | None = None terminal_seen_at: float | None = None poll = self._watcher.poll_interval loop = asyncio.get_running_loop() + t_loop_start = loop.time() + n_records_seen = 0 + n_events_yielded = 0 + last_progress_log = t_loop_start while True: records = await self._watcher.read_once() if not records: + now = loop.time() + # Heartbeat log every 10s so a hung wait is visible. + if now - last_progress_log >= 10.0: + _log.info( + "send_user_message: session_id=%s WAITING %.1fs no records yet " + "(seen=%d yielded=%d terminal=%s pty_alive=%s)", + sid, + now - t_loop_start, + n_records_seen, + n_events_yielded, + terminal_assistant is not None, + self._pty_is_alive(), + ) + last_progress_log = now if ( terminal_assistant is not None and terminal_seen_at is not None and self._turn_duration_timeout is not None - and loop.time() - terminal_seen_at > self._turn_duration_timeout + and now - terminal_seen_at > self._turn_duration_timeout ): + _log.info( + "send_user_message: session_id=%s turn_duration_timeout after terminal, synthesizing result", + sid, + ) yield self._synthesize_result(terminal_assistant, None) return if terminal_assistant is None and not self._pty_is_alive(): + _log.error( + "send_user_message: session_id=%s PTY died before terminal assistant", + sid, + ) raise self._classify_pty_failure( fallback_message=( "claude process exited before a terminal " @@ -234,24 +343,60 @@ class TurnManager: await asyncio.sleep(poll) continue + n_records_seen += len(records) for rec in records: try: event = normalize( rec, include_meta_user=self._include_meta_user ) except MessageParseError as exc: + _log.warning( + "send_user_message: session_id=%s parse error: %s", + sid, + exc, + ) if self._on_parse_error is not None: with contextlib.suppress(Exception): self._on_parse_error(exc, rec) continue if event is None: continue + n_events_yielded += 1 + if isinstance(event, AssistantMessage): + n_blocks = len(event.content) if event.content else 0 + block_types = ",".join( + type(b).__name__ for b in (event.content or []) + ) + _log.info( + "send_user_message: session_id=%s yield AssistantMessage stop=%s blocks=%d [%s]", + sid, + event.stop_reason, + n_blocks, + block_types, + ) + elif isinstance(event, SystemMessage): + _log.info( + "send_user_message: session_id=%s yield SystemMessage subtype=%s", + sid, + event.subtype, + ) + else: + _log.info( + "send_user_message: session_id=%s yield %s", + sid, + type(event).__name__, + ) yield event if isinstance(event, AssistantMessage): if event.stop_reason in _TERMINAL_STOP_REASONS: terminal_assistant = event terminal_seen_at = loop.time() + _log.info( + "send_user_message: session_id=%s TERMINAL assistant stop=%s", + sid, + event.stop_reason, + ) if not self._wait_for_turn_duration: yield self._synthesize_result(terminal_assistant, None) return @@ -261,6 +406,11 @@ class TurnManager: and terminal_assistant is not None ): duration_ms = _extract_duration_ms(event.data) + _log.info( + "send_user_message: session_id=%s turn_duration=%sms, done", + sid, + duration_ms, + ) yield self._synthesize_result(terminal_assistant, duration_ms) return finally: