diff --git a/src/claude_code_api/injection.py b/src/claude_code_api/injection.py index f6f0242..20a6d4d 100644 --- a/src/claude_code_api/injection.py +++ b/src/claude_code_api/injection.py @@ -28,8 +28,10 @@ import uuid from collections.abc import Iterable, Mapping from typing import Any -_DEFAULT_CLAUDE_VERSION = "2.1.143" +_DEFAULT_CLAUDE_VERSION = "2.1.147" _DEFAULT_MODEL = "claude-opus-4-7" +_DEFAULT_PERMISSION_MODE = "bypassPermissions" +_DEFAULT_GIT_BRANCH = "HEAD" def hash_history(messages: Iterable[Mapping[str, Any]]) -> str: @@ -95,26 +97,37 @@ def build_seed_jsonl( cwd: str, claude_version: str = _DEFAULT_CLAUDE_VERSION, model: str = _DEFAULT_MODEL, + permission_mode: str = _DEFAULT_PERMISSION_MODE, + git_branch: str = _DEFAULT_GIT_BRANCH, now_iso: str | None = None, ) -> str: """Render a message list as a native claude JSONL transcript. - Output is a newline-terminated string of one JSON object per line. The - schema mirrors what claude itself writes (minus the snapshot records). + Output is a newline-terminated string of one JSON object per line. + The schema mirrors what claude 2.1.147 itself writes: - The caller writes the result to - `~/.claude/projects//.jsonl` and spawns - `claude --resume `. Claude appends its own - `file-history-snapshot` / `last-prompt` / `permission-mode` records on - resume — we don't need to. + - One `permission-mode` record at the top (session-level). + - Before each new user prompt: a `file-history-snapshot` record whose + `messageId` equals the user record's `uuid`. + - User prompt records carry `permissionMode` and a per-turn `promptId`. + - Assistant messages are split into one record per content block, all + sharing the same `msg_id` and `requestId` (this is how the live + claude streams them — thinking, tool_use, text each get their own + line). `parentUuid` chains record-to-record. + - `user` records carrying `tool_result` blocks are split one record + per tool_result, each parented on the corresponding assistant + tool_use record (so `parentUuid` graphs match what claude expects). - Empty history is permitted; the returned string is empty in that case. + Empty history returns an empty string (no records, no + permission-mode — claude will write its own on first turn). """ + msg_list = list(messages) + if not msg_list: + return "" + if now_iso is None: now_iso = _now_iso() - lines: list[str] = [] - parent_uuid: str | None = None common = { "isSidechain": False, "userType": "external", @@ -122,61 +135,141 @@ def build_seed_jsonl( "cwd": cwd, "sessionId": session_id, "version": claude_version, - "gitBranch": "", + "gitBranch": git_branch, } - for m in messages: + + records: list[dict[str, Any]] = [ + { + "type": "permission-mode", + "permissionMode": permission_mode, + "sessionId": session_id, + } + ] + + parent_uuid: str | None = None + current_prompt_id: str | None = None + tool_use_to_assistant_uuid: dict[str, str] = {} + + for m in msg_list: role = m.get("role") + content = m.get("content") if role == "user": - user_uuid = str(uuid.uuid4()) - record = { - "parentUuid": parent_uuid, - "promptId": str(uuid.uuid4()), - "type": "user", - "message": { - "role": "user", - "content": _content_for_seed(m.get("content"), role="user"), - }, - "isMeta": False, - "uuid": user_uuid, - "timestamp": now_iso, - **common, - } - parent_uuid = user_uuid + tool_result_blocks = _extract_tool_result_blocks(content) + if tool_result_blocks: + for block in tool_result_blocks: + tu_id = str(block.get("tool_use_id", "")) + source_uuid = tool_use_to_assistant_uuid.get(tu_id, parent_uuid) + rec_uuid = str(uuid.uuid4()) + records.append( + { + "parentUuid": source_uuid, + "promptId": current_prompt_id, + "type": "user", + "message": { + "role": "user", + "content": [dict(block)], + }, + "uuid": rec_uuid, + "timestamp": now_iso, + "toolUseResult": block.get("content"), + "sourceToolAssistantUUID": source_uuid, + **common, + } + ) + parent_uuid = rec_uuid + else: + current_prompt_id = str(uuid.uuid4()) + user_uuid = str(uuid.uuid4()) + records.append( + { + "type": "file-history-snapshot", + "messageId": user_uuid, + "snapshot": { + "messageId": user_uuid, + "trackedFileBackups": {}, + "timestamp": now_iso, + }, + "isSnapshotUpdate": False, + } + ) + records.append( + { + "parentUuid": parent_uuid, + "promptId": current_prompt_id, + "type": "user", + "message": { + "role": "user", + "content": _content_for_seed(content, role="user"), + }, + "uuid": user_uuid, + "timestamp": now_iso, + "permissionMode": permission_mode, + **common, + } + ) + parent_uuid = user_uuid elif role == "assistant": - assistant_uuid = str(uuid.uuid4()) - record = { - "parentUuid": parent_uuid, - "message": { - "model": model, - "id": f"msg_{uuid.uuid4().hex[:24]}", - "type": "message", - "role": "assistant", - "content": _content_for_seed(m.get("content"), role="assistant"), - "stop_reason": "end_turn", - "stop_sequence": None, - "stop_details": None, - "usage": { - "input_tokens": 0, - "output_tokens": 0, - "cache_creation_input_tokens": 0, - "cache_read_input_tokens": 0, - "service_tier": "standard", - }, - }, - "requestId": f"req_{uuid.uuid4().hex[:24]}", - "type": "assistant", - "uuid": assistant_uuid, - "timestamp": now_iso, - **common, - } - parent_uuid = assistant_uuid + blocks = _content_for_seed(content, role="assistant") + if not isinstance(blocks, list) or not blocks: + msg = "assistant content must be a non-empty list of blocks" + raise ValueError(msg) + msg_id = f"msg_{uuid.uuid4().hex[:24]}" + request_id = f"req_{uuid.uuid4().hex[:24]}" + has_tool_use = any( + isinstance(b, Mapping) and b.get("type") == "tool_use" for b in blocks + ) + stop_reason = "tool_use" if has_tool_use else "end_turn" + for block in blocks: + rec_uuid = str(uuid.uuid4()) + records.append( + { + "parentUuid": parent_uuid, + "message": { + "model": model, + "id": msg_id, + "type": "message", + "role": "assistant", + "content": [dict(block)], + "stop_reason": stop_reason, + "stop_sequence": None, + "stop_details": None, + "usage": _default_usage(), + "diagnostics": None, + }, + "requestId": request_id, + "type": "assistant", + "uuid": rec_uuid, + "timestamp": now_iso, + **common, + } + ) + if ( + isinstance(block, Mapping) + and block.get("type") == "tool_use" + and block.get("id") + ): + tool_use_to_assistant_uuid[str(block["id"])] = rec_uuid + parent_uuid = rec_uuid else: msg = f"message role must be 'user' or 'assistant', got {role!r}" raise ValueError(msg) - lines.append(json.dumps(record)) - if not lines: - return "" - return "\n".join(lines) + "\n" + + return "\n".join(json.dumps(r) for r in records) + "\n" + + +def _extract_tool_result_blocks(content: Any) -> list[Mapping[str, Any]]: + """Return the tool_result blocks inside a user message, or [] if none. + + A user message is treated as a tool_result-carrying continuation if + ANY of its blocks is a `tool_result`. We don't expect mixed content + here — the canonical store separates prompts from tool results — but + if a mix shows up we still emit just the tool_result blocks. + """ + if not isinstance(content, list): + return [] + return [ + b for b in content if isinstance(b, Mapping) and b.get("type") == "tool_result" + ] def _content_for_seed(content: Any, *, role: str) -> Any: @@ -196,6 +289,32 @@ def _content_for_seed(content: Any, *, role: str) -> Any: raise ValueError(msg) +def _default_usage() -> dict[str, Any]: + """Mimic the `usage` shape claude 2.1.147 writes for replayed turns. + + Zero-valued because the seed represents a historical turn whose token + accounting is no longer interesting; claude only reads structure. + """ + return { + "input_tokens": 0, + "output_tokens": 0, + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0, + "service_tier": "standard", + "server_tool_use": { + "web_search_requests": 0, + "web_fetch_requests": 0, + }, + "cache_creation": { + "ephemeral_1h_input_tokens": 0, + "ephemeral_5m_input_tokens": 0, + }, + "inference_geo": "", + "iterations": [], + "speed": "standard", + } + + _CONCAT_PREAMBLE = "Previous conversation context:" _CONCAT_DIVIDER = "Continue from here. New user message:" diff --git a/src/claude_code_api/pty.py b/src/claude_code_api/pty.py index caa3e25..c8e5744 100644 --- a/src/claude_code_api/pty.py +++ b/src/claude_code_api/pty.py @@ -17,6 +17,7 @@ import contextlib import errno import logging import os +import pathlib import select import signal import threading @@ -46,6 +47,15 @@ _VALID_PERMISSION_MODES: frozenset[str] = frozenset( _DEFAULT_DRAIN_CHUNK = 65536 _DEFAULT_OUTPUT_BUFFER_CAP = 1_000_000 +# When `CLAUDE_PTY_SNAPSHOT_DIR` is set, each live PTY periodically writes +# its captured output buffer to ``/.bin`` so an operator +# can ``cat`` / ``less -R`` the file and see what's currently on claude's +# TUI screen. Useful for diagnosing apparent hangs that are actually long +# thinking phases — the JSONL stays silent while thinking but the TUI +# usually shows a spinner / partial output / a status message. +_SNAPSHOT_ENV = "CLAUDE_PTY_SNAPSHOT_DIR" +_SNAPSHOT_INTERVAL = 10.0 + # Gap between the bracketed-paste closing marker and the Enter keystroke # in `PtyClaudeProcess.write()`. Claude's Ink-based TUI reads stdin in # chunks; if `\r` is glued to `ESC [ 201 ~` it gets absorbed by the @@ -202,6 +212,7 @@ class PtyClaudeProcess: self._drain_stop = threading.Event() self._output_lock = threading.Lock() self._output_buffer = bytearray() + self._snapshot_task: asyncio.Task[None] | None = None @property def session_id(self) -> str: @@ -276,6 +287,13 @@ class PtyClaudeProcess: ) self._drain_thread.start() + snapshot_dir = os.environ.get(_SNAPSHOT_ENV) + if snapshot_dir: + self._snapshot_task = asyncio.create_task( + self._snapshot_loop(pathlib.Path(snapshot_dir)), + name=f"pty-snapshot-{self._session_id[:8]}", + ) + def _drain_loop(self) -> None: pty = self._pty if pty is None: @@ -308,6 +326,51 @@ class PtyClaudeProcess: if overflow > 0: del self._output_buffer[:overflow] + async def _snapshot_loop(self, snapshot_dir: pathlib.Path) -> None: + """Periodically dump the captured PTY buffer to a file. + + Atomic via write-to-tmp + rename so a reader never sees a + half-written file. On task cancellation (i.e. PTY shutdown), + writes one final snapshot before returning. + """ + target = snapshot_dir / f"{self._session_id}.bin" + tmp = snapshot_dir / f"{self._session_id}.bin.tmp" + try: + snapshot_dir.mkdir(parents=True, exist_ok=True) # noqa: ASYNC240 — one-shot at startup, fast + except OSError as exc: + _log.warning( + "snapshot_loop: session_id=%s mkdir(%s) failed: %s — disabling", + self._session_id, + snapshot_dir, + exc, + ) + return + _log.info( + "snapshot_loop: session_id=%s writing to %s every %.1fs", + self._session_id, + target, + _SNAPSHOT_INTERVAL, + ) + try: + while True: + await asyncio.sleep(_SNAPSHOT_INTERVAL) + self._write_snapshot(tmp, target) + except asyncio.CancelledError: + self._write_snapshot(tmp, target) + raise + + def _write_snapshot(self, tmp: pathlib.Path, target: pathlib.Path) -> None: + buf = self.captured_output() + try: + tmp.write_bytes(buf) + tmp.replace(target) + except OSError as exc: + _log.warning( + "snapshot_loop: session_id=%s write failed: %s", + self._session_id, + exc, + ) + async def wait_for_output( self, marker: bytes, @@ -526,6 +589,12 @@ class PtyClaudeProcess: thread = self._drain_thread if thread is not None and thread.is_alive(): await asyncio.to_thread(thread.join, 1.0) + snapshot_task = self._snapshot_task + if snapshot_task is not None and not snapshot_task.done(): + snapshot_task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await snapshot_task + self._snapshot_task = None with contextlib.suppress(OSError): pty.close(force=True) return exit_status diff --git a/tests/test_backend.py b/tests/test_backend.py index 2aaa88e..89073c7 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -320,10 +320,17 @@ async def test_unmatched_history_spawns_new_session_via_native_jsonl( seed_lines = [ json.loads(line) for line in seed_bytes.decode("utf-8").strip().splitlines() ] - # Two seeded records (one user + one assistant) for the prior turn. - assert [r["type"] for r in seed_lines] == ["user", "assistant"] - assert seed_lines[0]["message"]["content"] == "remember beaver" - assert seed_lines[1]["message"]["content"] == [{"type": "text", "text": "ok"}] + # Seeded prior turn: permission-mode + snapshot + user + assistant. + assert [r["type"] for r in seed_lines] == [ + "permission-mode", + "file-history-snapshot", + "user", + "assistant", + ] + user_rec = seed_lines[2] + asst_rec = seed_lines[3] + assert user_rec["message"]["content"] == "remember beaver" + assert asst_rec["message"]["content"] == [{"type": "text", "text": "ok"}] assert isinstance(events[-1], ResultMessage) diff --git a/tests/test_injection.py b/tests/test_injection.py index 28c1b6a..60821e7 100644 --- a/tests/test_injection.py +++ b/tests/test_injection.py @@ -1,9 +1,8 @@ """Unit tests for `history_injection` helpers. Pure functions, no claude / no filesystem. The seed-JSONL shape is regression -tested against the same minimal contract that `probe_jsonl_injection.py` -proved out empirically (see FINDINGS § *Native JSONL injection works on ---resume*). +tested against the format claude 2.1.147 itself writes (verified empirically +against real session transcripts). """ from __future__ import annotations @@ -67,42 +66,32 @@ def test_hash_history_text_blocks_collide_with_string_form() -> None: # --- build_seed_jsonl ----------------------------------------------------- +def _records(seed: str) -> list[dict]: + return [json.loads(line) for line in seed.strip().splitlines()] + + def test_build_seed_jsonl_empty_is_empty_string() -> None: assert build_seed_jsonl([], session_id="s", cwd="/tmp") == "" -def test_build_seed_jsonl_two_records_for_one_turn() -> None: +def test_build_seed_jsonl_starts_with_permission_mode() -> None: + """Non-empty seed must lead with a `permission-mode` record — claude + 2.1.147 writes one at session start and expects it on resume.""" seed = build_seed_jsonl( - [ - {"role": "user", "content": "My name is Beaver."}, - {"role": "assistant", "content": "Got it."}, - ], + [{"role": "user", "content": "hi"}], session_id="sid-1", cwd="/work", ) - lines = [json.loads(line) for line in seed.strip().splitlines()] - assert len(lines) == 2 - user_rec, asst_rec = lines - - assert user_rec["type"] == "user" - assert user_rec["sessionId"] == "sid-1" - assert user_rec["cwd"] == "/work" - assert user_rec["parentUuid"] is None - assert user_rec["message"] == {"role": "user", "content": "My name is Beaver."} - assert user_rec["isMeta"] is False - assert "uuid" in user_rec and "timestamp" in user_rec - - assert asst_rec["type"] == "assistant" - assert asst_rec["parentUuid"] == user_rec["uuid"] - assert asst_rec["message"]["role"] == "assistant" - assert asst_rec["message"]["content"] == [{"type": "text", "text": "Got it."}] - assert asst_rec["message"]["stop_reason"] == "end_turn" - assert asst_rec["sessionId"] == "sid-1" + recs = _records(seed) + assert recs[0]["type"] == "permission-mode" + assert recs[0]["sessionId"] == "sid-1" + assert recs[0]["permissionMode"] == "bypassPermissions" -def test_build_seed_jsonl_chains_parent_uuids_across_turns() -> None: - """The parentUuid graph must form a linear chain across turns — that's - how claude reconstructs conversation order on resume.""" +def test_build_seed_jsonl_snapshot_precedes_each_user_prompt() -> None: + """Before every new user prompt, claude writes a + `file-history-snapshot` whose `messageId` matches the user record's + `uuid`. Resume parsing depends on this pairing.""" seed = build_seed_jsonl( [ {"role": "user", "content": "u1"}, @@ -113,38 +102,198 @@ def test_build_seed_jsonl_chains_parent_uuids_across_turns() -> None: session_id="s", cwd="/tmp", ) - recs = [json.loads(line) for line in seed.strip().splitlines()] - assert len(recs) == 4 - assert recs[0]["parentUuid"] is None - assert recs[1]["parentUuid"] == recs[0]["uuid"] - assert recs[2]["parentUuid"] == recs[1]["uuid"] - assert recs[3]["parentUuid"] == recs[2]["uuid"] + recs = _records(seed) + snapshots = [r for r in recs if r.get("type") == "file-history-snapshot"] + users = [r for r in recs if r.get("type") == "user"] + assert len(snapshots) == 2 + assert len(users) == 2 + for snap, usr in zip(snapshots, users, strict=True): + assert snap["messageId"] == usr["uuid"] + assert snap["snapshot"]["messageId"] == usr["uuid"] -def test_build_seed_jsonl_passes_list_content_through_for_user() -> None: - """A user record with a tool_result block (the only list-form user - content claude itself writes) must round-trip verbatim.""" +def test_build_seed_jsonl_user_record_has_permission_mode_no_isMeta() -> None: + """User records carry `permissionMode` but no `isMeta` — that's what + claude 2.1.147 writes. Adding `isMeta` to user records is one of the + things that made the old seed look 'wrong' on strict resume.""" + seed = build_seed_jsonl( + [{"role": "user", "content": "hi"}], + session_id="s", + cwd="/tmp", + ) + user_rec = next(r for r in _records(seed) if r.get("type") == "user") + assert user_rec["permissionMode"] == "bypassPermissions" + assert "isMeta" not in user_rec + assert user_rec["version"] == "2.1.147" + assert user_rec["gitBranch"] == "HEAD" + assert "promptId" in user_rec + + +def test_build_seed_jsonl_assistant_string_content_wraps_as_text_block() -> None: seed = build_seed_jsonl( [ - { - "role": "user", - "content": [ - {"type": "tool_result", "tool_use_id": "t1", "content": "42"}, - ], - } + {"role": "user", "content": "u"}, + {"role": "assistant", "content": "Got it."}, ], session_id="s", cwd="/tmp", ) - rec = json.loads(seed.strip()) - assert rec["message"]["content"] == [ - {"type": "tool_result", "tool_use_id": "t1", "content": "42"}, + asst_recs = [r for r in _records(seed) if r.get("type") == "assistant"] + assert len(asst_recs) == 1 + msg = asst_recs[0]["message"] + assert msg["content"] == [{"type": "text", "text": "Got it."}] + assert msg["stop_reason"] == "end_turn" + assert msg["diagnostics"] is None + assert "server_tool_use" in msg["usage"] + assert "cache_creation" in msg["usage"] + + +def test_build_seed_jsonl_splits_assistant_blocks_into_separate_records() -> None: + """A multi-block assistant message becomes one record per block, all + sharing the same `msg_id` and `requestId`. parentUuid chains them.""" + seed = build_seed_jsonl( + [ + {"role": "user", "content": "u"}, + { + "role": "assistant", + "content": [ + {"type": "thinking", "thinking": "ponder", "signature": "sig"}, + {"type": "tool_use", "id": "tu1", "name": "bash", "input": {}}, + ], + }, + ], + session_id="s", + cwd="/tmp", + ) + asst_recs = [r for r in _records(seed) if r.get("type") == "assistant"] + assert len(asst_recs) == 2 + assert asst_recs[0]["message"]["id"] == asst_recs[1]["message"]["id"] + assert asst_recs[0]["requestId"] == asst_recs[1]["requestId"] + assert asst_recs[1]["parentUuid"] == asst_recs[0]["uuid"] + assert asst_recs[0]["message"]["content"][0]["type"] == "thinking" + assert asst_recs[1]["message"]["content"][0]["type"] == "tool_use" + assert asst_recs[0]["message"]["stop_reason"] == "tool_use" + + +def test_build_seed_jsonl_tool_result_parents_to_matching_tool_use() -> None: + """A user message with a `tool_result` block becomes a user record + whose parentUuid points to the assistant record that emitted the + matching `tool_use`. `sourceToolAssistantUUID` mirrors that link.""" + seed = build_seed_jsonl( + [ + {"role": "user", "content": "u"}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "tu1", "name": "bash", "input": {}} + ], + }, + { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "tu1", "content": "ok"} + ], + }, + ], + session_id="s", + cwd="/tmp", + ) + recs = _records(seed) + asst = next(r for r in recs if r.get("type") == "assistant") + # second user record is the tool_result one (first was the prompt) + user_recs = [r for r in recs if r.get("type") == "user"] + tool_result_rec = user_recs[1] + assert tool_result_rec["parentUuid"] == asst["uuid"] + assert tool_result_rec["sourceToolAssistantUUID"] == asst["uuid"] + assert tool_result_rec["toolUseResult"] == "ok" + assert tool_result_rec["message"]["content"] == [ + {"type": "tool_result", "tool_use_id": "tu1", "content": "ok"} ] +def test_build_seed_jsonl_splits_multi_tool_result_user_message() -> None: + """When a user message carries multiple tool_result blocks (one per + parallel tool_use), claude writes one record per result. Each parents + on the corresponding assistant record.""" + seed = build_seed_jsonl( + [ + {"role": "user", "content": "u"}, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "A", "name": "x", "input": {}} + ], + }, + { + "role": "assistant", + "content": [ + {"type": "tool_use", "id": "B", "name": "y", "input": {}} + ], + }, + { + "role": "user", + "content": [ + {"type": "tool_result", "tool_use_id": "A", "content": "a"}, + {"type": "tool_result", "tool_use_id": "B", "content": "b"}, + ], + }, + ], + session_id="s", + cwd="/tmp", + ) + recs = _records(seed) + asst_recs = [r for r in recs if r.get("type") == "assistant"] + tool_result_recs = [ + r + for r in recs + if r.get("type") == "user" + and isinstance(r["message"]["content"], list) + and r["message"]["content"][0].get("type") == "tool_result" + ] + assert len(tool_result_recs) == 2 + a_uuid = next( + r["uuid"] + for r in asst_recs + if r["message"]["content"][0].get("id") == "A" + ) + b_uuid = next( + r["uuid"] + for r in asst_recs + if r["message"]["content"][0].get("id") == "B" + ) + assert tool_result_recs[0]["sourceToolAssistantUUID"] == a_uuid + assert tool_result_recs[1]["sourceToolAssistantUUID"] == b_uuid + + +def test_build_seed_jsonl_chains_parent_uuids_linearly() -> None: + """Every content-carrying record (user, assistant, tool_result) chains + via parentUuid back through the record graph. The first user has + parentUuid=None; subsequent records have non-null parents.""" + seed = build_seed_jsonl( + [ + {"role": "user", "content": "u1"}, + {"role": "assistant", "content": "a1"}, + {"role": "user", "content": "u2"}, + {"role": "assistant", "content": "a2"}, + ], + session_id="s", + cwd="/tmp", + ) + chain = [ + r + for r in _records(seed) + if r.get("type") in ("user", "assistant") + ] + assert chain[0]["parentUuid"] is None + for prev, nxt in zip(chain, chain[1:], strict=False): + assert nxt["parentUuid"] == prev["uuid"] + + def test_build_seed_jsonl_rejects_unknown_role() -> None: with pytest.raises(ValueError, match="role"): - build_seed_jsonl([{"role": "system", "content": "x"}], session_id="s", cwd="/tmp") + build_seed_jsonl( + [{"role": "system", "content": "x"}], session_id="s", cwd="/tmp" + ) # --- build_concat_prompt --------------------------------------------------