feat: add more logging

This commit is contained in:
h
2026-05-22 22:45:44 +02:00
parent 21dc26810b
commit d05c8dd613
3 changed files with 321 additions and 13 deletions
+83 -2
View File
@@ -19,6 +19,7 @@ from __future__ import annotations
import asyncio
import contextlib
import json
import logging
import os
import tempfile
import uuid
@@ -27,6 +28,8 @@ from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Literal, Self
_log = logging.getLogger("claude_code_api.backend")
from claude_code_api.errors import MessageParseError
from claude_code_api.events import (
AssistantMessage,
@@ -171,31 +174,84 @@ class ClaudeCodeBackend:
async with self._lock:
prior = list(messages[:-1])
fp_prior = hash_history(prior)
_log.info(
"complete: n_msgs=%d prior_fp=%s last_text_len=%d pool_size=%d",
len(messages),
fp_prior[:12],
len(last_text),
len(self._sessions),
)
session: _LiveSession
if prior and fp_prior in self._sessions:
session = self._sessions.pop(fp_prior)
send_text = last_text
_log.info(
"complete: POOL HIT fp=%s -> reusing session_id=%s",
fp_prior[:12],
session.session_id,
)
else:
_log.info(
"complete: POOL MISS fp=%s (prior=%d msgs) -> spawning new session",
fp_prior[:12],
len(prior),
)
session = await self._create_session(prior)
if prior and self._opts.history_injection_mode == "concat_message":
send_text = build_concat_prompt(prior, last_text)
_log.info(
"complete: concat_message mode, send_text_len=%d",
len(send_text),
)
else:
send_text = last_text
events: list[Event] = []
n_assistant = 0
n_user = 0
n_system = 0
try:
_log.info(
"complete: sending user msg to session_id=%s (text_len=%d)",
session.session_id,
len(send_text),
)
async for event in session.tm.send_user_message(send_text):
events.append(event)
if isinstance(event, AssistantMessage):
n_assistant += 1
elif isinstance(event, UserMessage):
n_user += 1
else:
n_system += 1
yield event
except BaseException:
except BaseException as exc:
_log.exception(
"complete: session_id=%s FAILED (events so far: a=%d u=%d sys=%d): %s",
session.session_id,
n_assistant,
n_user,
n_system,
exc,
)
with contextlib.suppress(Exception):
await session.aclose()
raise
synthesized_cycle = synthesize_turn_messages(events)
new_history = [*list(messages), *synthesized_cycle]
self._sessions[hash_history(new_history)] = session
new_fp = hash_history(new_history)
self._sessions[new_fp] = session
_log.info(
"complete: session_id=%s DONE a=%d u=%d sys=%d synth=%d -> repooled under fp=%s",
session.session_id,
n_assistant,
n_user,
n_system,
len(synthesized_cycle),
new_fp[:12],
)
async def aclose(self) -> None:
"""Shut down all live sessions; remove the temp mcp-config file."""
@@ -237,10 +293,24 @@ class ClaudeCodeBackend:
jsonl_path.write_text(seed, encoding="utf-8")
start_offset = jsonl_path.stat().st_size
resume = True
_log.info(
"_create_session: session_id=%s SEED jsonl=%s bytes=%d history_msgs=%d resume=True",
session_id,
jsonl_path,
start_offset,
len(history),
)
else:
jsonl_path = resolve_jsonl_path(cwd, session_id)
start_offset = 0
resume = False
_log.info(
"_create_session: session_id=%s FRESH jsonl=%s history_msgs=%d resume=False mode=%s",
session_id,
jsonl_path,
len(history),
self._opts.history_injection_mode,
)
if self._session_factory is not None:
result = self._session_factory(
@@ -260,6 +330,12 @@ class ClaudeCodeBackend:
async def _spawn_real_session(
self, *, session_id: str, resume: bool, jsonl_path: Path, start_offset: int
) -> _LiveSession:
_log.info(
"_spawn_real_session: session_id=%s resume=%s start_offset=%d",
session_id,
resume,
start_offset,
)
pty_opts = self._build_pty_options(session_id=session_id, resume=resume)
pty = PtyClaudeProcess(pty_opts)
watcher = JsonlWatcher(jsonl_path, start_offset=start_offset)
@@ -274,6 +350,11 @@ class ClaudeCodeBackend:
on_parse_error=self._on_parse_error,
)
await tm.start()
_log.info(
"_spawn_real_session: session_id=%s STARTED pid=%s",
session_id,
getattr(pty, "pid", "?"),
)
return _LiveSession(pty=pty, watcher=watcher, tm=tm)
def _build_pty_options(self, *, session_id: str, resume: bool) -> PtyProcessOptions:
+83 -6
View File
@@ -15,6 +15,7 @@ from __future__ import annotations
import asyncio
import contextlib
import errno
import logging
import os
import select
import signal
@@ -28,6 +29,8 @@ from ptyprocess import PtyProcess
from claude_code_api.errors import CLINotFoundError
_log = logging.getLogger("claude_code_api.pty")
PtyOutputCallback = Callable[[bytes], None]
_PROVIDER_ENV_VARS: tuple[str, ...] = (
@@ -239,6 +242,12 @@ class PtyClaudeProcess:
msg = "PtyClaudeProcess.start() called twice"
raise RuntimeError(msg)
_log.info(
"start: session_id=%s spawning argv=%r cwd=%s",
self._session_id,
self._argv,
self.cwd,
)
try:
self._pty = PtyProcess.spawn(
self._argv,
@@ -248,7 +257,17 @@ class PtyClaudeProcess:
dimensions=self._opts.dimensions,
)
except FileNotFoundError as exc:
_log.exception(
"start: session_id=%s claude CLI not found: %s",
self._session_id,
self._opts.executable,
)
raise CLINotFoundError(executable=self._opts.executable) from exc
_log.info(
"start: session_id=%s spawned pid=%s",
self._session_id,
self._pty.pid,
)
self._drain_stop.clear()
self._drain_thread = threading.Thread(
target=self._drain_loop,
@@ -312,12 +331,37 @@ class PtyClaudeProcess:
msg = f"poll must be positive, got {poll!r}"
raise ValueError(msg)
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
t0 = loop.time()
deadline = t0 + timeout
last_progress = t0
while True:
if marker in self.captured_output():
_log.info(
"wait_for_output: session_id=%s marker %r FOUND after %.2fs (buf=%d bytes)",
self._session_id,
marker,
loop.time() - t0,
len(self.captured_output()),
)
return True
if loop.time() >= deadline:
now = loop.time()
if now >= deadline:
_log.warning(
"wait_for_output: session_id=%s marker %r TIMEOUT after %.2fs (buf=%d bytes)",
self._session_id,
marker,
now - t0,
len(self.captured_output()),
)
return False
if now - last_progress >= 5.0:
_log.info(
"wait_for_output: session_id=%s still waiting %.1fs (buf=%d bytes)",
self._session_id,
now - t0,
len(self.captured_output()),
)
last_progress = now
await asyncio.sleep(poll)
async def wait_for_quiet(
@@ -348,13 +392,29 @@ class PtyClaudeProcess:
msg = f"poll must be positive, got {poll!r}"
raise ValueError(msg)
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
t0 = loop.time()
deadline = t0 + timeout
last_len = len(self.captured_output())
last_change = loop.time()
last_change = t0
while True:
if loop.time() - last_change >= idle:
now = loop.time()
if now - last_change >= idle:
_log.info(
"wait_for_quiet: session_id=%s idle %.2fs reached after %.2fs (buf=%d bytes)",
self._session_id,
idle,
now - t0,
last_len,
)
return True
if loop.time() >= deadline:
if now >= deadline:
_log.warning(
"wait_for_quiet: session_id=%s TIMEOUT after %.2fs (idle reached only %.2fs, buf=%d bytes — TUI still rendering)",
self._session_id,
now - t0,
now - last_change,
len(self.captured_output()),
)
return False
await asyncio.sleep(poll)
cur_len = len(self.captured_output())
@@ -390,13 +450,30 @@ class PtyClaudeProcess:
payload = data.encode("utf-8") if isinstance(data, str) else bytes(data)
pty = self._pty
if not newline:
_log.info(
"write: session_id=%s RAW %d bytes (newline=False)",
self._session_id,
len(payload),
)
return await asyncio.to_thread(pty.write, payload)
if payload.endswith(b"\r"):
payload = payload[:-1]
paste_chunk = b"\x1b[200~" + payload + b"\x1b[201~"
_log.info(
"write: session_id=%s PASTE %d bytes (paste-framed), will follow with Enter in %.0fms",
self._session_id,
len(payload),
_SUBMIT_KEY_DELAY * 1000,
)
n1 = await asyncio.to_thread(pty.write, paste_chunk)
await asyncio.sleep(_SUBMIT_KEY_DELAY)
n2 = await asyncio.to_thread(pty.write, b"\r")
_log.info(
"write: session_id=%s SUBMIT done (paste=%d Enter=%d)",
self._session_id,
n1,
n2,
)
return n1 + n2
async def send_control(self, char: str) -> None:
+155 -5
View File
@@ -26,6 +26,7 @@ from __future__ import annotations
import asyncio
import contextlib
import logging
from collections.abc import AsyncIterator, Callable, Iterable
from typing import TYPE_CHECKING, Any, Self
@@ -43,6 +44,8 @@ from claude_code_api.watcher import JsonlRecord, JsonlWatcher
if TYPE_CHECKING:
from claude_code_api.pty import PtyClaudeProcess
_log = logging.getLogger("claude_code_api.turn")
_TERMINAL_STOP_REASONS: frozenset[str] = frozenset(
{"end_turn", "max_tokens", "stop_sequence", "refusal"}
)
@@ -151,26 +154,68 @@ class TurnManager:
a plain sleep then.
"""
if self._started:
_log.info(
"start: session_id=%s already started, skip",
self._pty.session_id,
)
return
_log.info(
"start: session_id=%s spawning PTY (startup_delay=%.1fs)",
self._pty.session_id,
self._startup_delay,
)
await self._pty.start()
_log.info(
"start: session_id=%s PTY spawned pid=%s, waiting for TUI ready...",
self._pty.session_id,
getattr(self._pty, "pid", "?"),
)
if self._startup_delay > 0:
loop = asyncio.get_running_loop()
deadline = loop.time() + self._startup_delay
t0 = loop.time()
deadline = t0 + self._startup_delay
wait_marker = getattr(self._pty, "wait_for_output", None)
wait_quiet = getattr(self._pty, "wait_for_quiet", None)
if wait_marker is not None:
# Phase 1: bracketed-paste-enable byte tells us terminal
# mode is set up. Necessary but not sufficient.
await wait_marker(_TUI_READY_MARKER, timeout=self._startup_delay)
marker_found = await wait_marker(
_TUI_READY_MARKER, timeout=self._startup_delay
)
t_marker = loop.time() - t0
_log.info(
"start: session_id=%s marker DECSET2004 %s after %.2fs",
self._pty.session_id,
"FOUND" if marker_found else "TIMED_OUT",
t_marker,
)
# Phase 2: wait for the TUI render burst to settle —
# consecutive seconds of no new PTY bytes. Bounded by what's
# left of `startup_delay`.
remaining = max(0.0, deadline - loop.time())
if wait_quiet is not None and remaining > 0:
await wait_quiet(idle=_TUI_QUIET_PERIOD, timeout=remaining)
quiet_ok = await wait_quiet(
idle=_TUI_QUIET_PERIOD, timeout=remaining
)
t_quiet = loop.time() - t0
_log.info(
"start: session_id=%s quiet %s after %.2fs total",
self._pty.session_id,
"REACHED" if quiet_ok else "TIMED_OUT",
t_quiet,
)
else:
_log.info(
"start: session_id=%s no wait_for_output API, plain sleep %.1fs",
self._pty.session_id,
self._startup_delay,
)
await asyncio.sleep(self._startup_delay)
self._started = True
_log.info(
"start: session_id=%s READY",
self._pty.session_id,
)
async def send_user_message(self, text: str) -> AsyncIterator[Event]:
"""Send `text` as a user prompt and stream events until turn-end.
@@ -187,18 +232,50 @@ class TurnManager:
raise RuntimeError(msg)
self._turn_in_progress = True
self._turn_count += 1
sid = self._pty.session_id
text_preview = text[:80].replace("\n", "\\n")
_log.info(
"send_user_message: session_id=%s turn=%d text_len=%d preview=%r",
sid,
self._turn_count,
len(text),
text_preview,
)
try:
try:
await self._pty.write(text)
n_written = await self._pty.write(text)
_log.info(
"send_user_message: session_id=%s wrote %d bytes to PTY",
sid,
n_written,
)
except OSError as exc:
_log.exception(
"send_user_message: session_id=%s PTY write failed: %s", sid, exc
)
raise self._classify_pty_failure(
fallback_message="claude process not accepting input"
) from exc
if not self._watcher.path.exists():
_log.info(
"send_user_message: session_id=%s waiting for JSONL %s (timeout=%.1fs)",
sid,
self._watcher.path,
self._file_wait_timeout or 0.0,
)
try:
await self._watcher.wait_for_file(timeout=self._file_wait_timeout)
_log.info(
"send_user_message: session_id=%s JSONL appeared", sid
)
except TimeoutError as exc:
_log.error(
"send_user_message: session_id=%s JSONL DID NOT APPEAR within %.1fs at %s",
sid,
self._file_wait_timeout or 0.0,
self._watcher.path,
)
raise self._classify_pty_failure(
fallback_cls=SessionError,
fallback_message=(
@@ -206,25 +283,57 @@ class TurnManager:
f"{self._file_wait_timeout}s: {self._watcher.path}"
),
) from exc
else:
_log.info(
"send_user_message: session_id=%s JSONL already exists at %s (resume mode)",
sid,
self._watcher.path,
)
terminal_assistant: AssistantMessage | None = None
terminal_seen_at: float | None = None
poll = self._watcher.poll_interval
loop = asyncio.get_running_loop()
t_loop_start = loop.time()
n_records_seen = 0
n_events_yielded = 0
last_progress_log = t_loop_start
while True:
records = await self._watcher.read_once()
if not records:
now = loop.time()
# Heartbeat log every 10s so a hung wait is visible.
if now - last_progress_log >= 10.0:
_log.info(
"send_user_message: session_id=%s WAITING %.1fs no records yet "
"(seen=%d yielded=%d terminal=%s pty_alive=%s)",
sid,
now - t_loop_start,
n_records_seen,
n_events_yielded,
terminal_assistant is not None,
self._pty_is_alive(),
)
last_progress_log = now
if (
terminal_assistant is not None
and terminal_seen_at is not None
and self._turn_duration_timeout is not None
and loop.time() - terminal_seen_at > self._turn_duration_timeout
and now - terminal_seen_at > self._turn_duration_timeout
):
_log.info(
"send_user_message: session_id=%s turn_duration_timeout after terminal, synthesizing result",
sid,
)
yield self._synthesize_result(terminal_assistant, None)
return
if terminal_assistant is None and not self._pty_is_alive():
_log.error(
"send_user_message: session_id=%s PTY died before terminal assistant",
sid,
)
raise self._classify_pty_failure(
fallback_message=(
"claude process exited before a terminal "
@@ -234,24 +343,60 @@ class TurnManager:
await asyncio.sleep(poll)
continue
n_records_seen += len(records)
for rec in records:
try:
event = normalize(
rec, include_meta_user=self._include_meta_user
)
except MessageParseError as exc:
_log.warning(
"send_user_message: session_id=%s parse error: %s",
sid,
exc,
)
if self._on_parse_error is not None:
with contextlib.suppress(Exception):
self._on_parse_error(exc, rec)
continue
if event is None:
continue
n_events_yielded += 1
if isinstance(event, AssistantMessage):
n_blocks = len(event.content) if event.content else 0
block_types = ",".join(
type(b).__name__ for b in (event.content or [])
)
_log.info(
"send_user_message: session_id=%s yield AssistantMessage stop=%s blocks=%d [%s]",
sid,
event.stop_reason,
n_blocks,
block_types,
)
elif isinstance(event, SystemMessage):
_log.info(
"send_user_message: session_id=%s yield SystemMessage subtype=%s",
sid,
event.subtype,
)
else:
_log.info(
"send_user_message: session_id=%s yield %s",
sid,
type(event).__name__,
)
yield event
if isinstance(event, AssistantMessage):
if event.stop_reason in _TERMINAL_STOP_REASONS:
terminal_assistant = event
terminal_seen_at = loop.time()
_log.info(
"send_user_message: session_id=%s TERMINAL assistant stop=%s",
sid,
event.stop_reason,
)
if not self._wait_for_turn_duration:
yield self._synthesize_result(terminal_assistant, None)
return
@@ -261,6 +406,11 @@ class TurnManager:
and terminal_assistant is not None
):
duration_ms = _extract_duration_ms(event.data)
_log.info(
"send_user_message: session_id=%s turn_duration=%sms, done",
sid,
duration_ms,
)
yield self._synthesize_result(terminal_assistant, duration_ms)
return
finally: