feat: add synthesize_turn_messages

This commit is contained in:
h
2026-05-21 12:21:03 +02:00
parent a1a043373d
commit 1f20cef7d4
3 changed files with 210 additions and 22 deletions
+2
View File
@@ -10,6 +10,7 @@ from claude_code_api.backend import (
BackendOptions,
ClaudeCodeBackend,
HistoryInjectionMode,
synthesize_turn_messages,
)
from claude_code_api.errors import (
AuthError,
@@ -76,4 +77,5 @@ __all__ = [
"classify_pty_failure",
"is_valid_model",
"normalize",
"synthesize_turn_messages",
]
+41 -22
View File
@@ -36,6 +36,7 @@ from claude_code_api.events import (
ThinkingBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
from claude_code_api.injection import (
build_concat_prompt,
@@ -51,10 +52,6 @@ HistoryInjectionMode = Literal["native_jsonl", "concat_message"]
ParseErrorCallback = Callable[[MessageParseError, dict[str, Any]], None]
_TERMINAL_STOP_REASONS: frozenset[str] = frozenset(
{"end_turn", "max_tokens", "stop_sequence", "refusal"}
)
@dataclass(frozen=True)
class BackendOptions:
@@ -196,8 +193,8 @@ class ClaudeCodeBackend:
await session.aclose()
raise
synthetic_asst = _synthesize_assistant_dict(events)
new_history = [*list(messages), synthetic_asst]
synthesized_cycle = synthesize_turn_messages(events)
new_history = [*list(messages), *synthesized_cycle]
self._sessions[hash_history(new_history)] = session
async def aclose(self) -> None:
@@ -351,22 +348,43 @@ def _user_text_payload(content: Any) -> str:
raise ValueError(msg)
def _synthesize_assistant_dict(events: Iterable[Event]) -> dict[str, Any]:
"""Render the terminal assistant message in Anthropic Messages format."""
terminal: AssistantMessage | None = None
for ev in reversed(list(events)):
if (
isinstance(ev, AssistantMessage)
and ev.stop_reason in _TERMINAL_STOP_REASONS
):
terminal = ev
break
if terminal is None:
return {"role": "assistant", "content": []}
return {
"role": "assistant",
"content": [_block_to_dict(b) for b in terminal.content],
}
def synthesize_turn_messages(events: Iterable[Event]) -> list[dict[str, Any]]:
"""Render a turn's full assistant↔tool cycle as Anthropic-shape messages.
A single ``complete()`` call can produce multiple ``AssistantMessage``
records (each tool-use cycle is its own record, terminated by a
``UserMessage`` carrying the matching ``tool_result`` blocks). We
fold that whole sequence into a list of canonical messages — exactly
what the Anthropic Messages API would see if claude were running
over the wire instead of in a PTY. The result is what the session
fingerprint is computed over and what gets seeded into JSONL on a
cache-miss re-spawn, so the live PTY and a freshly-resumed one stay
semantically equivalent.
Excludes intermediate ``UserMessage`` records that carry only the
echoed prompt text (string content) — those are claude's own input
record, not part of the conversational reply. Only tool_result
``UserMessage`` records (list-of-blocks content) survive.
"""
out: list[dict[str, Any]] = []
for ev in events:
if isinstance(ev, AssistantMessage):
out.append(
{
"role": "assistant",
"content": [_block_to_dict(b) for b in ev.content],
}
)
elif isinstance(ev, UserMessage):
content = ev.content
if isinstance(content, list) and content:
out.append(
{
"role": "user",
"content": [_block_to_dict(b) for b in content],
}
)
return out
def _block_to_dict(block: ContentBlock) -> dict[str, Any]:
@@ -401,4 +419,5 @@ __all__ = [
"ClaudeCodeBackend",
"HistoryInjectionMode",
"ParseErrorCallback",
"synthesize_turn_messages",
]
+167
View File
@@ -628,6 +628,173 @@ def test_post_turn_fingerprint_matches_canonical_continuation(tmp_path: Path) ->
assert fp_stash == fp_lookup
# --- multi-message synthesis (tool-use cycles) ---------------------------
def _assistant_tool_use_rec(
name: str, tool_id: str, session_id: str
) -> dict[str, Any]:
return {
"type": "assistant",
"uuid": f"a-tu-{tool_id}",
"sessionId": session_id,
"parentUuid": None,
"message": {
"id": "msg_x",
"role": "assistant",
"model": "claude-test",
"content": [
{"type": "tool_use", "id": tool_id, "name": name, "input": {"q": "x"}}
],
"stop_reason": "tool_use",
"usage": {"input_tokens": 1, "output_tokens": 1},
},
}
def _user_tool_result_rec(
tool_id: str, content: str, session_id: str
) -> dict[str, Any]:
return {
"type": "user",
"uuid": f"u-tr-{tool_id}",
"sessionId": session_id,
"parentUuid": None,
"message": {
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": tool_id, "content": content}
],
},
}
@pytest.mark.asyncio
async def test_tool_use_cycle_stashes_full_history_for_continuation(
tmp_path: Path,
) -> None:
"""When a turn ran through a tool_use cycle, the second `complete()`
must reuse the same live session if the client echoes back the WHOLE
cycle (assistant-with-tool_use + user-with-tool_result + final
assistant). This is the contract the gateway depends on.
"""
scripts_per_session = [
# ONE session handles both turns.
[
# turn 0: prompt → tool_use → tool_result → final text
[
_user_rec("look up X", "S0"),
_assistant_tool_use_rec("Read", "tu_1", "S0"),
_user_tool_result_rec("tu_1", "result body", "S0"),
_assistant_rec("done", "S0"),
],
# turn 1: follow-up reuses same session (one write into the PTY)
[
_user_rec("now what", "S0"),
_assistant_rec("ack", "S0"),
],
],
]
harness = FakeFactoryHarness(scripts_per_session)
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path)),
_session_factory=harness,
)
async for _ in backend.complete([{"role": "user", "content": "look up X"}]):
pass
# The continuation echoes back the FULL synthesized cycle, in canonical
# Anthropic-block shape — same as what the gateway will pull from its
# conversation store.
continuation = [
{"role": "user", "content": "look up X"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "tu_1", "name": "Read", "input": {"q": "x"}}
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_1",
"content": "result body",
"is_error": None,
}
],
},
{"role": "assistant", "content": [{"type": "text", "text": "done"}]},
{"role": "user", "content": "now what"},
]
async for _ in backend.complete(continuation):
pass
await backend.aclose()
# One session for both turns proves the fingerprint lookup hit.
assert len(harness.spawned) == 1
assert harness.spawned[0].writes == ["look up X", "now what"]
def test_synthesize_turn_messages_covers_whole_cycle() -> None:
"""Direct unit check on the public synthesizer: full cycle in, full
canonical-block message list out, intermediate echo UserMessages
filtered, tool_result UserMessages preserved.
"""
from claude_code_api import synthesize_turn_messages
from claude_code_api.events import (
AssistantMessage,
ResultMessage,
TextBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
events = [
UserMessage(content="look up X"), # echo, must be skipped
AssistantMessage(
content=[ToolUseBlock(id="tu_1", name="Read", input={"q": "x"})],
model="claude-test",
stop_reason="tool_use",
),
UserMessage(
content=[
ToolResultBlock(tool_use_id="tu_1", content="result body"),
]
),
AssistantMessage(
content=[TextBlock(text="done")],
model="claude-test",
stop_reason="end_turn",
),
ResultMessage(subtype="success", duration_ms=10, num_turns=1, session_id="S0"),
]
out = synthesize_turn_messages(events)
assert out == [
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "tu_1", "name": "Read", "input": {"q": "x"}}
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_1",
"content": "result body",
"is_error": None,
}
],
},
{"role": "assistant", "content": [{"type": "text", "text": "done"}]},
]
# --- smoke test (real claude) --------------------------------------------