feat: better injection, pty snapshots

This commit is contained in:
h
2026-05-23 00:12:15 +02:00
parent d05c8dd613
commit b5e0166c48
4 changed files with 455 additions and 111 deletions
+178 -59
View File
@@ -28,8 +28,10 @@ import uuid
from collections.abc import Iterable, Mapping
from typing import Any
_DEFAULT_CLAUDE_VERSION = "2.1.143"
_DEFAULT_CLAUDE_VERSION = "2.1.147"
_DEFAULT_MODEL = "claude-opus-4-7"
_DEFAULT_PERMISSION_MODE = "bypassPermissions"
_DEFAULT_GIT_BRANCH = "HEAD"
def hash_history(messages: Iterable[Mapping[str, Any]]) -> str:
@@ -95,26 +97,37 @@ def build_seed_jsonl(
cwd: str,
claude_version: str = _DEFAULT_CLAUDE_VERSION,
model: str = _DEFAULT_MODEL,
permission_mode: str = _DEFAULT_PERMISSION_MODE,
git_branch: str = _DEFAULT_GIT_BRANCH,
now_iso: str | None = None,
) -> str:
"""Render a message list as a native claude JSONL transcript.
Output is a newline-terminated string of one JSON object per line. The
schema mirrors what claude itself writes (minus the snapshot records).
Output is a newline-terminated string of one JSON object per line.
The schema mirrors what claude 2.1.147 itself writes:
The caller writes the result to
`~/.claude/projects/<key>/<session_id>.jsonl` and spawns
`claude --resume <session_id>`. Claude appends its own
`file-history-snapshot` / `last-prompt` / `permission-mode` records on
resume — we don't need to.
- One `permission-mode` record at the top (session-level).
- Before each new user prompt: a `file-history-snapshot` record whose
`messageId` equals the user record's `uuid`.
- User prompt records carry `permissionMode` and a per-turn `promptId`.
- Assistant messages are split into one record per content block, all
sharing the same `msg_id` and `requestId` (this is how the live
claude streams them — thinking, tool_use, text each get their own
line). `parentUuid` chains record-to-record.
- `user` records carrying `tool_result` blocks are split one record
per tool_result, each parented on the corresponding assistant
tool_use record (so `parentUuid` graphs match what claude expects).
Empty history is permitted; the returned string is empty in that case.
Empty history returns an empty string (no records, no
permission-mode — claude will write its own on first turn).
"""
msg_list = list(messages)
if not msg_list:
return ""
if now_iso is None:
now_iso = _now_iso()
lines: list[str] = []
parent_uuid: str | None = None
common = {
"isSidechain": False,
"userType": "external",
@@ -122,61 +135,141 @@ def build_seed_jsonl(
"cwd": cwd,
"sessionId": session_id,
"version": claude_version,
"gitBranch": "",
"gitBranch": git_branch,
}
for m in messages:
records: list[dict[str, Any]] = [
{
"type": "permission-mode",
"permissionMode": permission_mode,
"sessionId": session_id,
}
]
parent_uuid: str | None = None
current_prompt_id: str | None = None
tool_use_to_assistant_uuid: dict[str, str] = {}
for m in msg_list:
role = m.get("role")
content = m.get("content")
if role == "user":
user_uuid = str(uuid.uuid4())
record = {
"parentUuid": parent_uuid,
"promptId": str(uuid.uuid4()),
"type": "user",
"message": {
"role": "user",
"content": _content_for_seed(m.get("content"), role="user"),
},
"isMeta": False,
"uuid": user_uuid,
"timestamp": now_iso,
**common,
}
parent_uuid = user_uuid
tool_result_blocks = _extract_tool_result_blocks(content)
if tool_result_blocks:
for block in tool_result_blocks:
tu_id = str(block.get("tool_use_id", ""))
source_uuid = tool_use_to_assistant_uuid.get(tu_id, parent_uuid)
rec_uuid = str(uuid.uuid4())
records.append(
{
"parentUuid": source_uuid,
"promptId": current_prompt_id,
"type": "user",
"message": {
"role": "user",
"content": [dict(block)],
},
"uuid": rec_uuid,
"timestamp": now_iso,
"toolUseResult": block.get("content"),
"sourceToolAssistantUUID": source_uuid,
**common,
}
)
parent_uuid = rec_uuid
else:
current_prompt_id = str(uuid.uuid4())
user_uuid = str(uuid.uuid4())
records.append(
{
"type": "file-history-snapshot",
"messageId": user_uuid,
"snapshot": {
"messageId": user_uuid,
"trackedFileBackups": {},
"timestamp": now_iso,
},
"isSnapshotUpdate": False,
}
)
records.append(
{
"parentUuid": parent_uuid,
"promptId": current_prompt_id,
"type": "user",
"message": {
"role": "user",
"content": _content_for_seed(content, role="user"),
},
"uuid": user_uuid,
"timestamp": now_iso,
"permissionMode": permission_mode,
**common,
}
)
parent_uuid = user_uuid
elif role == "assistant":
assistant_uuid = str(uuid.uuid4())
record = {
"parentUuid": parent_uuid,
"message": {
"model": model,
"id": f"msg_{uuid.uuid4().hex[:24]}",
"type": "message",
"role": "assistant",
"content": _content_for_seed(m.get("content"), role="assistant"),
"stop_reason": "end_turn",
"stop_sequence": None,
"stop_details": None,
"usage": {
"input_tokens": 0,
"output_tokens": 0,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"service_tier": "standard",
},
},
"requestId": f"req_{uuid.uuid4().hex[:24]}",
"type": "assistant",
"uuid": assistant_uuid,
"timestamp": now_iso,
**common,
}
parent_uuid = assistant_uuid
blocks = _content_for_seed(content, role="assistant")
if not isinstance(blocks, list) or not blocks:
msg = "assistant content must be a non-empty list of blocks"
raise ValueError(msg)
msg_id = f"msg_{uuid.uuid4().hex[:24]}"
request_id = f"req_{uuid.uuid4().hex[:24]}"
has_tool_use = any(
isinstance(b, Mapping) and b.get("type") == "tool_use" for b in blocks
)
stop_reason = "tool_use" if has_tool_use else "end_turn"
for block in blocks:
rec_uuid = str(uuid.uuid4())
records.append(
{
"parentUuid": parent_uuid,
"message": {
"model": model,
"id": msg_id,
"type": "message",
"role": "assistant",
"content": [dict(block)],
"stop_reason": stop_reason,
"stop_sequence": None,
"stop_details": None,
"usage": _default_usage(),
"diagnostics": None,
},
"requestId": request_id,
"type": "assistant",
"uuid": rec_uuid,
"timestamp": now_iso,
**common,
}
)
if (
isinstance(block, Mapping)
and block.get("type") == "tool_use"
and block.get("id")
):
tool_use_to_assistant_uuid[str(block["id"])] = rec_uuid
parent_uuid = rec_uuid
else:
msg = f"message role must be 'user' or 'assistant', got {role!r}"
raise ValueError(msg)
lines.append(json.dumps(record))
if not lines:
return ""
return "\n".join(lines) + "\n"
return "\n".join(json.dumps(r) for r in records) + "\n"
def _extract_tool_result_blocks(content: Any) -> list[Mapping[str, Any]]:
"""Return the tool_result blocks inside a user message, or [] if none.
A user message is treated as a tool_result-carrying continuation if
ANY of its blocks is a `tool_result`. We don't expect mixed content
here — the canonical store separates prompts from tool results — but
if a mix shows up we still emit just the tool_result blocks.
"""
if not isinstance(content, list):
return []
return [
b for b in content if isinstance(b, Mapping) and b.get("type") == "tool_result"
]
def _content_for_seed(content: Any, *, role: str) -> Any:
@@ -196,6 +289,32 @@ def _content_for_seed(content: Any, *, role: str) -> Any:
raise ValueError(msg)
def _default_usage() -> dict[str, Any]:
"""Mimic the `usage` shape claude 2.1.147 writes for replayed turns.
Zero-valued because the seed represents a historical turn whose token
accounting is no longer interesting; claude only reads structure.
"""
return {
"input_tokens": 0,
"output_tokens": 0,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"service_tier": "standard",
"server_tool_use": {
"web_search_requests": 0,
"web_fetch_requests": 0,
},
"cache_creation": {
"ephemeral_1h_input_tokens": 0,
"ephemeral_5m_input_tokens": 0,
},
"inference_geo": "",
"iterations": [],
"speed": "standard",
}
_CONCAT_PREAMBLE = "Previous conversation context:"
_CONCAT_DIVIDER = "Continue from here. New user message:"
+69
View File
@@ -17,6 +17,7 @@ import contextlib
import errno
import logging
import os
import pathlib
import select
import signal
import threading
@@ -46,6 +47,15 @@ _VALID_PERMISSION_MODES: frozenset[str] = frozenset(
_DEFAULT_DRAIN_CHUNK = 65536
_DEFAULT_OUTPUT_BUFFER_CAP = 1_000_000
# When `CLAUDE_PTY_SNAPSHOT_DIR` is set, each live PTY periodically writes
# its captured output buffer to ``<dir>/<session_id>.bin`` so an operator
# can ``cat`` / ``less -R`` the file and see what's currently on claude's
# TUI screen. Useful for diagnosing apparent hangs that are actually long
# thinking phases — the JSONL stays silent while thinking but the TUI
# usually shows a spinner / partial output / a status message.
_SNAPSHOT_ENV = "CLAUDE_PTY_SNAPSHOT_DIR"
_SNAPSHOT_INTERVAL = 10.0
# Gap between the bracketed-paste closing marker and the Enter keystroke
# in `PtyClaudeProcess.write()`. Claude's Ink-based TUI reads stdin in
# chunks; if `\r` is glued to `ESC [ 201 ~` it gets absorbed by the
@@ -202,6 +212,7 @@ class PtyClaudeProcess:
self._drain_stop = threading.Event()
self._output_lock = threading.Lock()
self._output_buffer = bytearray()
self._snapshot_task: asyncio.Task[None] | None = None
@property
def session_id(self) -> str:
@@ -276,6 +287,13 @@ class PtyClaudeProcess:
)
self._drain_thread.start()
snapshot_dir = os.environ.get(_SNAPSHOT_ENV)
if snapshot_dir:
self._snapshot_task = asyncio.create_task(
self._snapshot_loop(pathlib.Path(snapshot_dir)),
name=f"pty-snapshot-{self._session_id[:8]}",
)
def _drain_loop(self) -> None:
pty = self._pty
if pty is None:
@@ -308,6 +326,51 @@ class PtyClaudeProcess:
if overflow > 0:
del self._output_buffer[:overflow]
async def _snapshot_loop(self, snapshot_dir: pathlib.Path) -> None:
"""Periodically dump the captured PTY buffer to a file.
Atomic via write-to-tmp + rename so a reader never sees a
half-written file. On task cancellation (i.e. PTY shutdown),
writes one final snapshot before returning.
"""
target = snapshot_dir / f"{self._session_id}.bin"
tmp = snapshot_dir / f"{self._session_id}.bin.tmp"
try:
snapshot_dir.mkdir(parents=True, exist_ok=True) # noqa: ASYNC240 — one-shot at startup, fast
except OSError as exc:
_log.warning(
"snapshot_loop: session_id=%s mkdir(%s) failed: %s — disabling",
self._session_id,
snapshot_dir,
exc,
)
return
_log.info(
"snapshot_loop: session_id=%s writing to %s every %.1fs",
self._session_id,
target,
_SNAPSHOT_INTERVAL,
)
try:
while True:
await asyncio.sleep(_SNAPSHOT_INTERVAL)
self._write_snapshot(tmp, target)
except asyncio.CancelledError:
self._write_snapshot(tmp, target)
raise
def _write_snapshot(self, tmp: pathlib.Path, target: pathlib.Path) -> None:
buf = self.captured_output()
try:
tmp.write_bytes(buf)
tmp.replace(target)
except OSError as exc:
_log.warning(
"snapshot_loop: session_id=%s write failed: %s",
self._session_id,
exc,
)
async def wait_for_output(
self,
marker: bytes,
@@ -526,6 +589,12 @@ class PtyClaudeProcess:
thread = self._drain_thread
if thread is not None and thread.is_alive():
await asyncio.to_thread(thread.join, 1.0)
snapshot_task = self._snapshot_task
if snapshot_task is not None and not snapshot_task.done():
snapshot_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await snapshot_task
self._snapshot_task = None
with contextlib.suppress(OSError):
pty.close(force=True)
return exit_status
+11 -4
View File
@@ -320,10 +320,17 @@ async def test_unmatched_history_spawns_new_session_via_native_jsonl(
seed_lines = [
json.loads(line) for line in seed_bytes.decode("utf-8").strip().splitlines()
]
# Two seeded records (one user + one assistant) for the prior turn.
assert [r["type"] for r in seed_lines] == ["user", "assistant"]
assert seed_lines[0]["message"]["content"] == "remember beaver"
assert seed_lines[1]["message"]["content"] == [{"type": "text", "text": "ok"}]
# Seeded prior turn: permission-mode + snapshot + user + assistant.
assert [r["type"] for r in seed_lines] == [
"permission-mode",
"file-history-snapshot",
"user",
"assistant",
]
user_rec = seed_lines[2]
asst_rec = seed_lines[3]
assert user_rec["message"]["content"] == "remember beaver"
assert asst_rec["message"]["content"] == [{"type": "text", "text": "ok"}]
assert isinstance(events[-1], ResultMessage)
+197 -48
View File
@@ -1,9 +1,8 @@
"""Unit tests for `history_injection` helpers.
Pure functions, no claude / no filesystem. The seed-JSONL shape is regression
tested against the same minimal contract that `probe_jsonl_injection.py`
proved out empirically (see FINDINGS § *Native JSONL injection works on
--resume*).
tested against the format claude 2.1.147 itself writes (verified empirically
against real session transcripts).
"""
from __future__ import annotations
@@ -67,42 +66,32 @@ def test_hash_history_text_blocks_collide_with_string_form() -> None:
# --- build_seed_jsonl -----------------------------------------------------
def _records(seed: str) -> list[dict]:
return [json.loads(line) for line in seed.strip().splitlines()]
def test_build_seed_jsonl_empty_is_empty_string() -> None:
assert build_seed_jsonl([], session_id="s", cwd="/tmp") == ""
def test_build_seed_jsonl_two_records_for_one_turn() -> None:
def test_build_seed_jsonl_starts_with_permission_mode() -> None:
"""Non-empty seed must lead with a `permission-mode` record — claude
2.1.147 writes one at session start and expects it on resume."""
seed = build_seed_jsonl(
[
{"role": "user", "content": "My name is Beaver."},
{"role": "assistant", "content": "Got it."},
],
[{"role": "user", "content": "hi"}],
session_id="sid-1",
cwd="/work",
)
lines = [json.loads(line) for line in seed.strip().splitlines()]
assert len(lines) == 2
user_rec, asst_rec = lines
assert user_rec["type"] == "user"
assert user_rec["sessionId"] == "sid-1"
assert user_rec["cwd"] == "/work"
assert user_rec["parentUuid"] is None
assert user_rec["message"] == {"role": "user", "content": "My name is Beaver."}
assert user_rec["isMeta"] is False
assert "uuid" in user_rec and "timestamp" in user_rec
assert asst_rec["type"] == "assistant"
assert asst_rec["parentUuid"] == user_rec["uuid"]
assert asst_rec["message"]["role"] == "assistant"
assert asst_rec["message"]["content"] == [{"type": "text", "text": "Got it."}]
assert asst_rec["message"]["stop_reason"] == "end_turn"
assert asst_rec["sessionId"] == "sid-1"
recs = _records(seed)
assert recs[0]["type"] == "permission-mode"
assert recs[0]["sessionId"] == "sid-1"
assert recs[0]["permissionMode"] == "bypassPermissions"
def test_build_seed_jsonl_chains_parent_uuids_across_turns() -> None:
"""The parentUuid graph must form a linear chain across turns — that's
how claude reconstructs conversation order on resume."""
def test_build_seed_jsonl_snapshot_precedes_each_user_prompt() -> None:
"""Before every new user prompt, claude writes a
`file-history-snapshot` whose `messageId` matches the user record's
`uuid`. Resume parsing depends on this pairing."""
seed = build_seed_jsonl(
[
{"role": "user", "content": "u1"},
@@ -113,38 +102,198 @@ def test_build_seed_jsonl_chains_parent_uuids_across_turns() -> None:
session_id="s",
cwd="/tmp",
)
recs = [json.loads(line) for line in seed.strip().splitlines()]
assert len(recs) == 4
assert recs[0]["parentUuid"] is None
assert recs[1]["parentUuid"] == recs[0]["uuid"]
assert recs[2]["parentUuid"] == recs[1]["uuid"]
assert recs[3]["parentUuid"] == recs[2]["uuid"]
recs = _records(seed)
snapshots = [r for r in recs if r.get("type") == "file-history-snapshot"]
users = [r for r in recs if r.get("type") == "user"]
assert len(snapshots) == 2
assert len(users) == 2
for snap, usr in zip(snapshots, users, strict=True):
assert snap["messageId"] == usr["uuid"]
assert snap["snapshot"]["messageId"] == usr["uuid"]
def test_build_seed_jsonl_passes_list_content_through_for_user() -> None:
"""A user record with a tool_result block (the only list-form user
content claude itself writes) must round-trip verbatim."""
def test_build_seed_jsonl_user_record_has_permission_mode_no_isMeta() -> None:
"""User records carry `permissionMode` but no `isMeta` — that's what
claude 2.1.147 writes. Adding `isMeta` to user records is one of the
things that made the old seed look 'wrong' on strict resume."""
seed = build_seed_jsonl(
[{"role": "user", "content": "hi"}],
session_id="s",
cwd="/tmp",
)
user_rec = next(r for r in _records(seed) if r.get("type") == "user")
assert user_rec["permissionMode"] == "bypassPermissions"
assert "isMeta" not in user_rec
assert user_rec["version"] == "2.1.147"
assert user_rec["gitBranch"] == "HEAD"
assert "promptId" in user_rec
def test_build_seed_jsonl_assistant_string_content_wraps_as_text_block() -> None:
seed = build_seed_jsonl(
[
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "t1", "content": "42"},
],
}
{"role": "user", "content": "u"},
{"role": "assistant", "content": "Got it."},
],
session_id="s",
cwd="/tmp",
)
rec = json.loads(seed.strip())
assert rec["message"]["content"] == [
{"type": "tool_result", "tool_use_id": "t1", "content": "42"},
asst_recs = [r for r in _records(seed) if r.get("type") == "assistant"]
assert len(asst_recs) == 1
msg = asst_recs[0]["message"]
assert msg["content"] == [{"type": "text", "text": "Got it."}]
assert msg["stop_reason"] == "end_turn"
assert msg["diagnostics"] is None
assert "server_tool_use" in msg["usage"]
assert "cache_creation" in msg["usage"]
def test_build_seed_jsonl_splits_assistant_blocks_into_separate_records() -> None:
"""A multi-block assistant message becomes one record per block, all
sharing the same `msg_id` and `requestId`. parentUuid chains them."""
seed = build_seed_jsonl(
[
{"role": "user", "content": "u"},
{
"role": "assistant",
"content": [
{"type": "thinking", "thinking": "ponder", "signature": "sig"},
{"type": "tool_use", "id": "tu1", "name": "bash", "input": {}},
],
},
],
session_id="s",
cwd="/tmp",
)
asst_recs = [r for r in _records(seed) if r.get("type") == "assistant"]
assert len(asst_recs) == 2
assert asst_recs[0]["message"]["id"] == asst_recs[1]["message"]["id"]
assert asst_recs[0]["requestId"] == asst_recs[1]["requestId"]
assert asst_recs[1]["parentUuid"] == asst_recs[0]["uuid"]
assert asst_recs[0]["message"]["content"][0]["type"] == "thinking"
assert asst_recs[1]["message"]["content"][0]["type"] == "tool_use"
assert asst_recs[0]["message"]["stop_reason"] == "tool_use"
def test_build_seed_jsonl_tool_result_parents_to_matching_tool_use() -> None:
"""A user message with a `tool_result` block becomes a user record
whose parentUuid points to the assistant record that emitted the
matching `tool_use`. `sourceToolAssistantUUID` mirrors that link."""
seed = build_seed_jsonl(
[
{"role": "user", "content": "u"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "tu1", "name": "bash", "input": {}}
],
},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "tu1", "content": "ok"}
],
},
],
session_id="s",
cwd="/tmp",
)
recs = _records(seed)
asst = next(r for r in recs if r.get("type") == "assistant")
# second user record is the tool_result one (first was the prompt)
user_recs = [r for r in recs if r.get("type") == "user"]
tool_result_rec = user_recs[1]
assert tool_result_rec["parentUuid"] == asst["uuid"]
assert tool_result_rec["sourceToolAssistantUUID"] == asst["uuid"]
assert tool_result_rec["toolUseResult"] == "ok"
assert tool_result_rec["message"]["content"] == [
{"type": "tool_result", "tool_use_id": "tu1", "content": "ok"}
]
def test_build_seed_jsonl_splits_multi_tool_result_user_message() -> None:
"""When a user message carries multiple tool_result blocks (one per
parallel tool_use), claude writes one record per result. Each parents
on the corresponding assistant record."""
seed = build_seed_jsonl(
[
{"role": "user", "content": "u"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "A", "name": "x", "input": {}}
],
},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "B", "name": "y", "input": {}}
],
},
{
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": "A", "content": "a"},
{"type": "tool_result", "tool_use_id": "B", "content": "b"},
],
},
],
session_id="s",
cwd="/tmp",
)
recs = _records(seed)
asst_recs = [r for r in recs if r.get("type") == "assistant"]
tool_result_recs = [
r
for r in recs
if r.get("type") == "user"
and isinstance(r["message"]["content"], list)
and r["message"]["content"][0].get("type") == "tool_result"
]
assert len(tool_result_recs) == 2
a_uuid = next(
r["uuid"]
for r in asst_recs
if r["message"]["content"][0].get("id") == "A"
)
b_uuid = next(
r["uuid"]
for r in asst_recs
if r["message"]["content"][0].get("id") == "B"
)
assert tool_result_recs[0]["sourceToolAssistantUUID"] == a_uuid
assert tool_result_recs[1]["sourceToolAssistantUUID"] == b_uuid
def test_build_seed_jsonl_chains_parent_uuids_linearly() -> None:
"""Every content-carrying record (user, assistant, tool_result) chains
via parentUuid back through the record graph. The first user has
parentUuid=None; subsequent records have non-null parents."""
seed = build_seed_jsonl(
[
{"role": "user", "content": "u1"},
{"role": "assistant", "content": "a1"},
{"role": "user", "content": "u2"},
{"role": "assistant", "content": "a2"},
],
session_id="s",
cwd="/tmp",
)
chain = [
r
for r in _records(seed)
if r.get("type") in ("user", "assistant")
]
assert chain[0]["parentUuid"] is None
for prev, nxt in zip(chain, chain[1:], strict=False):
assert nxt["parentUuid"] == prev["uuid"]
def test_build_seed_jsonl_rejects_unknown_role() -> None:
with pytest.raises(ValueError, match="role"):
build_seed_jsonl([{"role": "system", "content": "x"}], session_id="s", cwd="/tmp")
build_seed_jsonl(
[{"role": "system", "content": "x"}], session_id="s", cwd="/tmp"
)
# --- build_concat_prompt --------------------------------------------------