Files

1069 lines
36 KiB
Python

"""Unit + smoke tests for Layer 5 (`ClaudeCodeBackend`).
Unit tests inject a `FakePty`-backed session factory so we can drive the
dispatch logic end-to-end — fingerprint lookup, fresh spawn vs continuation,
native_jsonl seeding vs concat_message preamble, post-turn fingerprint
stash — without launching `claude`. The smoke test at the bottom spawns
the real binary behind `RUN_CLAUDE_SMOKE=1`.
"""
from __future__ import annotations
import asyncio
import contextlib
import json
import os
from pathlib import Path
from typing import Any
import pytest
from claude_code_api import (
AssistantMessage,
BackendOptions,
ClaudeCodeBackend,
ResultMessage,
SessionError,
TextBlock,
UserMessage,
)
from claude_code_api.backend import _LiveSession
from claude_code_api.injection import hash_history
from claude_code_api.paths import resolve_jsonl_path
from claude_code_api.watcher import JsonlWatcher
from claude_code_api.turn import TurnManager
# --- fakes -----------------------------------------------------------------
class FakePty:
"""Records writes and flushes a scripted JSONL batch on each `write()`.
Reused shape from `test_turn_manager.py` so the contract stays familiar.
Each backend `complete()` call ultimately drives one `write()` on the
underlying PTY, which consumes the next entry in `scripts`. Tests pre-load
the script list with one batch per expected turn.
"""
def __init__(
self,
jsonl_path: Path,
*,
session_id: str,
scripts: list[list[dict[str, Any]]],
) -> None:
self.cwd = str(jsonl_path.parent)
self.session_id = session_id
self._jsonl = jsonl_path
self._scripts = scripts
self._write_count = 0
self.writes: list[str] = []
self.started = False
self.closed = False
async def start(self) -> None:
self.started = True
async def write(self, text: str, *, newline: bool = True) -> int:
self.writes.append(text)
if self._write_count < len(self._scripts):
self._jsonl.parent.mkdir(parents=True, exist_ok=True)
with self._jsonl.open("a", encoding="utf-8") as f:
for rec in self._scripts[self._write_count]:
f.write(json.dumps(rec) + "\n")
self._write_count += 1
return len(text)
async def aclose(self) -> None:
self.closed = True
def _user_rec(text: str, session_id: str) -> dict[str, Any]:
return {
"type": "user",
"uuid": f"u-{text[:8]}",
"sessionId": session_id,
"parentUuid": None,
"message": {"role": "user", "content": text},
}
def _assistant_rec(
text: str,
session_id: str,
*,
stop_reason: str = "end_turn",
) -> dict[str, Any]:
return {
"type": "assistant",
"uuid": f"a-{text[:8]}",
"sessionId": session_id,
"parentUuid": None,
"message": {
"id": "msg_x",
"role": "assistant",
"model": "claude-test",
"content": [{"type": "text", "text": text}],
"stop_reason": stop_reason,
"usage": {"input_tokens": 1, "output_tokens": 1},
},
}
class FakeFactoryHarness:
"""Builds the `_session_factory` callable the backend wants, while
also tracking every session spawned so tests can inspect them.
Each call to the factory pops the next FakePty script batch off the
queue and wires a real `TurnManager` + `JsonlWatcher` around it — that
way we exercise the same code path real sessions use, only the bottom
layer is faked.
"""
def __init__(self, scripts_per_session: list[list[list[dict[str, Any]]]]) -> None:
self._scripts = list(scripts_per_session)
self.spawned: list[FakePty] = []
self.seed_files: list[tuple[Path, bytes]] = []
def __call__(
self,
backend: ClaudeCodeBackend,
session_id: str,
resume: bool,
jsonl_path: Path,
start_offset: int,
) -> Any:
# Reconstruct the test-visible script for THIS session.
if not self._scripts:
raise AssertionError("FakeFactoryHarness ran out of scripts")
scripts = self._scripts.pop(0)
if resume and jsonl_path.exists():
self.seed_files.append((jsonl_path, jsonl_path.read_bytes()))
fake = FakePty(jsonl_path, session_id=session_id, scripts=scripts)
self.spawned.append(fake)
watcher = JsonlWatcher(jsonl_path, poll_interval=0.01, start_offset=start_offset)
tm = TurnManager(
fake, # type: ignore[arg-type]
watcher,
startup_delay=0.0,
file_wait_timeout=2.0,
)
async def _start() -> _LiveSession:
await tm.start()
return _LiveSession(pty=fake, watcher=watcher, tm=tm) # type: ignore[arg-type]
return _start()
# --- option / validation tests --------------------------------------------
@pytest.mark.asyncio
async def test_complete_rejects_empty_messages(tmp_path: Path) -> None:
backend = ClaudeCodeBackend(BackendOptions(cwd=str(tmp_path)))
with pytest.raises(ValueError, match="empty"):
async for _ in backend.complete([]):
pass
await backend.aclose()
@pytest.mark.asyncio
async def test_complete_rejects_non_user_last_message(tmp_path: Path) -> None:
backend = ClaudeCodeBackend(BackendOptions(cwd=str(tmp_path)))
with pytest.raises(ValueError, match="user"):
async for _ in backend.complete([{"role": "assistant", "content": "hi"}]):
pass
await backend.aclose()
@pytest.mark.asyncio
async def test_complete_after_aclose_raises(tmp_path: Path) -> None:
backend = ClaudeCodeBackend(BackendOptions(cwd=str(tmp_path)))
await backend.aclose()
with pytest.raises(RuntimeError, match="closed"):
async for _ in backend.complete([{"role": "user", "content": "hi"}]):
pass
# --- single-turn fresh session -------------------------------------------
@pytest.mark.asyncio
async def test_complete_fresh_session_yields_events(tmp_path: Path) -> None:
"""One message → spawn a fresh session, run one turn, get events back.
Because there's no prior history, no seed JSONL gets written. The fake
PTY's `write()` appends a scripted `(user, assistant)` pair to the JSONL
on disk; the real watcher tails it and the real TurnManager closes the
turn on the terminal assistant.
"""
# We need to know the session_id ahead of time? No — let the factory
# pull it from the backend's invocation. The scripts in scripts_per_session
# carry sessionId fields but those are decorative for our purposes —
# the watcher / normalizer don't filter on them.
scripts_per_session = [
# session 0:
[
# turn 0 batch (written on first write())
[
_user_rec("hi", "S0"),
_assistant_rec("hello there", "S0"),
],
],
]
harness = FakeFactoryHarness(scripts_per_session)
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path)),
_session_factory=harness,
)
events: list[Any] = []
async for event in backend.complete([{"role": "user", "content": "hi"}]):
events.append(event)
await backend.aclose()
assert len(harness.spawned) == 1
assert harness.spawned[0].writes == ["hi"]
assert any(isinstance(e, UserMessage) for e in events)
assert any(isinstance(e, AssistantMessage) for e in events)
assert isinstance(events[-1], ResultMessage)
assert events[-1].stop_reason == "end_turn"
# No seed was written — first turn has empty prior history.
assert harness.seed_files == []
@pytest.mark.asyncio
async def test_live_sessions_visible_during_turn_and_after(tmp_path: Path) -> None:
"""A session must appear in ``live_sessions`` *while* its turn runs — so
the admin terminal viewer can attach mid-flight, not only once the turn
is done — and must stay visible as an idle pooled session afterwards so
snapshots / interaction with a quiet terminal keep working.
Regression guard: the session is popped from the fingerprint pool (or
not yet pooled, on a fresh spawn) for the duration of the turn, so
without the in-flight ``_active`` registry an active session is
invisible to ``live_sessions``.
"""
scripts_per_session = [
[
[_user_rec("hi", "S0"), _assistant_rec("hello there", "S0")],
],
]
harness = FakeFactoryHarness(scripts_per_session)
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path)),
_session_factory=harness,
)
seen_live_mid = False
async for _event in backend.complete([{"role": "user", "content": "hi"}]):
live = backend.live_sessions
if live:
seen_live_mid = True
# The only live pty is the one the harness spawned for this turn.
assert list(live.values()) == [harness.spawned[0]]
# The regression we're guarding against: visible *during* the turn.
assert seen_live_mid
# Still visible once the turn finishes — repooled idle session, so the
# viewer can snapshot / interact exactly as before.
assert list(backend.live_sessions.values()) == [harness.spawned[0]]
await backend.aclose()
# Teardown clears the pool.
assert backend.live_sessions == {}
# --- multi-turn fingerprint reuse ----------------------------------------
@pytest.mark.asyncio
async def test_continuation_reuses_live_session(tmp_path: Path) -> None:
"""Second `complete()` whose `messages[:-1]` matches the post-turn
fingerprint of the first call must hit the live session — no new PTY,
no seed file.
"""
scripts_per_session = [
# session 0 handles BOTH turns (two write() calls).
[
[_user_rec("hi", "S0"), _assistant_rec("hello there", "S0")],
[_user_rec("again", "S0"), _assistant_rec("hi again", "S0")],
],
]
harness = FakeFactoryHarness(scripts_per_session)
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path)),
_session_factory=harness,
)
events1: list[Any] = []
async for e in backend.complete([{"role": "user", "content": "hi"}]):
events1.append(e)
# Build the continuation: client echoes back our synthesized assistant
# in canonical Anthropic shape (list of blocks).
continuation = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": [{"type": "text", "text": "hello there"}]},
{"role": "user", "content": "again"},
]
events2: list[Any] = []
async for e in backend.complete(continuation):
events2.append(e)
await backend.aclose()
# Only ONE session was spawned across both turns.
assert len(harness.spawned) == 1
assert harness.spawned[0].writes == ["hi", "again"]
# Second turn's events are clean (turn_count bookkeeping):
assert isinstance(events2[-1], ResultMessage)
assert events2[-1].num_turns == 2
@pytest.mark.asyncio
async def test_unmatched_history_spawns_new_session_via_native_jsonl(
tmp_path: Path,
) -> None:
"""When prior history doesn't match any live session, the backend
seeds a JSONL with that history and spawns a fresh `--resume` session
(native_jsonl default mode).
"""
scripts_per_session = [
# one session for one turn — the only write() is the new user message
[
[_user_rec("how are you?", "S0"), _assistant_rec("good", "S0")],
],
]
harness = FakeFactoryHarness(scripts_per_session)
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path)),
_session_factory=harness,
)
# Three messages, no live session in the pool — must seed.
messages = [
{"role": "user", "content": "remember beaver"},
{"role": "assistant", "content": "ok"},
{"role": "user", "content": "how are you?"},
]
events: list[Any] = []
async for e in backend.complete(messages):
events.append(e)
await backend.aclose()
assert len(harness.spawned) == 1
# Only the LAST user message is sent into the PTY — history went via seed.
assert harness.spawned[0].writes == ["how are you?"]
# A seed file was captured by the harness.
assert len(harness.seed_files) == 1
_seed_path, seed_bytes = harness.seed_files[0]
seed_lines = [
json.loads(line) for line in seed_bytes.decode("utf-8").strip().splitlines()
]
# Seeded prior turn: permission-mode + snapshot + user + assistant.
assert [r["type"] for r in seed_lines] == [
"permission-mode",
"file-history-snapshot",
"user",
"assistant",
]
user_rec = seed_lines[2]
asst_rec = seed_lines[3]
assert user_rec["message"]["content"] == "remember beaver"
assert asst_rec["message"]["content"] == [{"type": "text", "text": "ok"}]
assert isinstance(events[-1], ResultMessage)
@pytest.mark.asyncio
async def test_unmatched_history_uses_concat_message_when_configured(
tmp_path: Path,
) -> None:
"""In `concat_message` mode the backend does NOT write a seed JSONL —
it concatenates the prior history into the first stdin payload."""
scripts_per_session = [
[
[_user_rec("how are you?", "S0"), _assistant_rec("good", "S0")],
],
]
harness = FakeFactoryHarness(scripts_per_session)
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path), history_injection_mode="concat_message"),
_session_factory=harness,
)
messages = [
{"role": "user", "content": "remember beaver"},
{"role": "assistant", "content": "ok"},
{"role": "user", "content": "how are you?"},
]
async for _ in backend.complete(messages):
pass
await backend.aclose()
assert harness.seed_files == [] # no native injection in concat mode
assert len(harness.spawned) == 1
sent = harness.spawned[0].writes[0]
# The first payload is the concat preamble + the new user prompt.
assert "Previous conversation context:" in sent
assert "[User]: remember beaver" in sent
assert "[Assistant]: ok" in sent
assert "Continue from here. New user message: how are you?" in sent
# --- failure handling ----------------------------------------------------
@pytest.mark.asyncio
async def test_complete_failure_does_not_stash_broken_session(tmp_path: Path) -> None:
"""If the turn iteration raises, the session must be closed and NOT
re-stored under any fingerprint.
"""
class BrokenFactory:
def __init__(self) -> None:
self.spawned: list[FakePty] = []
def __call__(
self,
backend: ClaudeCodeBackend,
session_id: str,
resume: bool,
jsonl_path: Path,
start_offset: int,
) -> Any:
fake = FakePty(jsonl_path, session_id=session_id, scripts=[])
self.spawned.append(fake)
watcher = JsonlWatcher(jsonl_path, poll_interval=0.01)
tm = TurnManager(
fake, # type: ignore[arg-type]
watcher,
startup_delay=0.0,
file_wait_timeout=0.05, # fires fast — no JSONL ever appears
)
async def _start() -> _LiveSession:
await tm.start()
return _LiveSession(pty=fake, watcher=watcher, tm=tm) # type: ignore[arg-type]
return _start()
factory = BrokenFactory()
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path)),
_session_factory=factory,
)
with pytest.raises(SessionError):
async for _ in backend.complete([{"role": "user", "content": "hi"}]):
pass
assert backend.live_session_count == 0
assert factory.spawned[0].closed is True
await backend.aclose()
# --- cancellation (Stage 9) ----------------------------------------------
class _HangingFactory:
"""Factory whose sessions never produce records — perfect for cancel tests.
`write()` creates the JSONL (so `wait_for_file()` returns immediately) but
leaves it empty, so `TurnManager.send_user_message` enters its poll loop
and stays there until something cancels it from outside.
"""
def __init__(self) -> None:
self.spawned: list[FakePty] = []
def __call__(
self,
backend: ClaudeCodeBackend,
session_id: str,
resume: bool,
jsonl_path: Path,
start_offset: int,
) -> Any:
fake = FakePty(jsonl_path, session_id=session_id, scripts=[[]])
self.spawned.append(fake)
watcher = JsonlWatcher(jsonl_path, poll_interval=0.01)
tm = TurnManager(
fake, # type: ignore[arg-type]
watcher,
startup_delay=0.0,
file_wait_timeout=2.0,
)
async def _start() -> _LiveSession:
await tm.start()
return _LiveSession(pty=fake, watcher=watcher, tm=tm) # type: ignore[arg-type]
return _start()
@pytest.mark.asyncio
async def test_cancel_mid_turn_closes_session_and_leaves_pool_empty(
tmp_path: Path,
) -> None:
"""task.cancel() on a consumer iterating `complete()` must:
- propagate CancelledError to the consumer,
- tear down the live session (PTY closed via TurnManager.aclose),
- leave the live-session pool empty (broken session is never re-stashed).
"""
factory = _HangingFactory()
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path)),
_session_factory=factory,
)
started = asyncio.Event()
async def consumer() -> None:
async for _ in backend.complete([{"role": "user", "content": "hi"}]):
started.set()
started.set() # also signal if iteration ends naturally (shouldn't here)
task = asyncio.create_task(consumer())
# Let the turn enter its poll loop. The poll interval is 10ms; 200ms is
# plenty for the FakePty.write() + first read_once() to land.
await asyncio.sleep(0.2)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
assert backend.live_session_count == 0
assert len(factory.spawned) == 1
assert factory.spawned[0].closed is True
await backend.aclose()
@pytest.mark.asyncio
async def test_cancel_releases_lock_so_next_complete_works(tmp_path: Path) -> None:
"""After a cancelled turn, the backend's internal lock must be released
so a subsequent `complete()` can run. We follow up with a normal call
against a healthy session and assert it completes end-to-end.
"""
class HangThenRespondFactory:
"""First spawn hangs (cancel target); second spawn completes a turn."""
def __init__(self) -> None:
self._spawn_index = 0
self.spawned: list[FakePty] = []
def __call__(
self,
backend: ClaudeCodeBackend,
session_id: str,
resume: bool,
jsonl_path: Path,
start_offset: int,
) -> Any:
idx = self._spawn_index
self._spawn_index += 1
if idx == 0:
scripts: list[list[dict[str, Any]]] = [[]] # hangs
else:
scripts = [
[
_user_rec("hi", "S1"),
_assistant_rec("hello", "S1"),
]
]
fake = FakePty(jsonl_path, session_id=session_id, scripts=scripts)
self.spawned.append(fake)
watcher = JsonlWatcher(jsonl_path, poll_interval=0.01)
tm = TurnManager(
fake, # type: ignore[arg-type]
watcher,
startup_delay=0.0,
file_wait_timeout=2.0,
)
async def _start() -> _LiveSession:
await tm.start()
return _LiveSession(pty=fake, watcher=watcher, tm=tm) # type: ignore[arg-type]
return _start()
factory = HangThenRespondFactory()
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path)),
_session_factory=factory,
)
# First call: cancel mid-stream.
async def consumer() -> None:
async for _ in backend.complete([{"role": "user", "content": "hi"}]):
pass
task = asyncio.create_task(consumer())
await asyncio.sleep(0.2)
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
# Second call: must proceed without deadlocking on the lock.
events: list[Any] = []
async for e in backend.complete([{"role": "user", "content": "hi"}]):
events.append(e)
assert len(factory.spawned) == 2
assert factory.spawned[0].closed is True # cancelled session is dead
assert isinstance(events[-1], ResultMessage)
assert events[-1].num_turns == 1 # fresh session, fresh counter
await backend.aclose()
# --- mcp_servers materialization -----------------------------------------
def test_mcp_config_argument_writes_temp_file_lazily(tmp_path: Path) -> None:
"""`mcp_servers` lifts to a temp `--mcp-config` JSON written on first
access; the file is removed in `aclose()`."""
backend = ClaudeCodeBackend(
BackendOptions(
cwd=str(tmp_path),
mcp_servers={"echo": {"command": "/bin/echo", "args": []}},
)
)
paths = backend._mcp_config_argument() # type: ignore[attr-defined]
assert len(paths) == 1
p = Path(paths[0])
assert p.exists()
body = json.loads(p.read_text())
assert body == {"mcpServers": {"echo": {"command": "/bin/echo", "args": []}}}
# Calling again returns the same path; no second file.
paths2 = backend._mcp_config_argument() # type: ignore[attr-defined]
assert paths2 == paths
# aclose() removes the file.
asyncio.run(backend.aclose())
assert not p.exists()
def test_no_mcp_config_returns_empty_tuple(tmp_path: Path) -> None:
backend = ClaudeCodeBackend(BackendOptions(cwd=str(tmp_path)))
assert backend._mcp_config_argument() == () # type: ignore[attr-defined]
# --- post-turn fingerprint key shape -------------------------------------
def test_post_turn_fingerprint_matches_canonical_continuation(tmp_path: Path) -> None:
"""Regression: the backend stashes the live session under
hash_history(messages + [synthesized_assistant]) where the synthesized
assistant uses the `[{"type": "text", "text": ...}]` block shape.
A gateway that echoes that same shape back on the next request must
look up to the same fingerprint. Pin both sides of that contract here.
"""
# Synthesized assistant after one turn yielding "hello there":
synthesized = {
"role": "assistant",
"content": [{"type": "text", "text": "hello there"}],
}
messages_sent = [{"role": "user", "content": "hi"}]
fp_stash = hash_history([*messages_sent, synthesized])
next_request_prior = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": [{"type": "text", "text": "hello there"}]},
]
fp_lookup = hash_history(next_request_prior)
assert fp_stash == fp_lookup
# --- multi-message synthesis (tool-use cycles) ---------------------------
def _assistant_tool_use_rec(
name: str, tool_id: str, session_id: str
) -> dict[str, Any]:
return {
"type": "assistant",
"uuid": f"a-tu-{tool_id}",
"sessionId": session_id,
"parentUuid": None,
"message": {
"id": "msg_x",
"role": "assistant",
"model": "claude-test",
"content": [
{"type": "tool_use", "id": tool_id, "name": name, "input": {"q": "x"}}
],
"stop_reason": "tool_use",
"usage": {"input_tokens": 1, "output_tokens": 1},
},
}
def _user_tool_result_rec(
tool_id: str, content: str, session_id: str
) -> dict[str, Any]:
return {
"type": "user",
"uuid": f"u-tr-{tool_id}",
"sessionId": session_id,
"parentUuid": None,
"message": {
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": tool_id, "content": content}
],
},
}
@pytest.mark.asyncio
async def test_tool_use_cycle_stashes_full_history_for_continuation(
tmp_path: Path,
) -> None:
"""When a turn ran through a tool_use cycle, the second `complete()`
must reuse the same live session if the client echoes back the WHOLE
cycle (assistant-with-tool_use + user-with-tool_result + final
assistant). This is the contract the gateway depends on.
"""
scripts_per_session = [
# ONE session handles both turns.
[
# turn 0: prompt → tool_use → tool_result → final text
[
_user_rec("look up X", "S0"),
_assistant_tool_use_rec("Read", "tu_1", "S0"),
_user_tool_result_rec("tu_1", "result body", "S0"),
_assistant_rec("done", "S0"),
],
# turn 1: follow-up reuses same session (one write into the PTY)
[
_user_rec("now what", "S0"),
_assistant_rec("ack", "S0"),
],
],
]
harness = FakeFactoryHarness(scripts_per_session)
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path)),
_session_factory=harness,
)
async for _ in backend.complete([{"role": "user", "content": "look up X"}]):
pass
# The continuation echoes back the FULL synthesized cycle, in canonical
# Anthropic-block shape — same as what the gateway will pull from its
# conversation store.
continuation = [
{"role": "user", "content": "look up X"},
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "tu_1", "name": "Read", "input": {"q": "x"}}
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_1",
"content": "result body",
"is_error": None,
}
],
},
{"role": "assistant", "content": [{"type": "text", "text": "done"}]},
{"role": "user", "content": "now what"},
]
async for _ in backend.complete(continuation):
pass
await backend.aclose()
# One session for both turns proves the fingerprint lookup hit.
assert len(harness.spawned) == 1
assert harness.spawned[0].writes == ["look up X", "now what"]
def test_synthesize_turn_messages_covers_whole_cycle() -> None:
"""Direct unit check on the public synthesizer: full cycle in, full
canonical-block message list out, intermediate echo UserMessages
filtered, tool_result UserMessages preserved.
"""
from claude_code_api import synthesize_turn_messages
from claude_code_api.events import (
AssistantMessage,
ResultMessage,
TextBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
events = [
UserMessage(content="look up X"), # echo, must be skipped
AssistantMessage(
content=[ToolUseBlock(id="tu_1", name="Read", input={"q": "x"})],
model="claude-test",
stop_reason="tool_use",
),
UserMessage(
content=[
ToolResultBlock(tool_use_id="tu_1", content="result body"),
]
),
AssistantMessage(
content=[TextBlock(text="done")],
model="claude-test",
stop_reason="end_turn",
),
ResultMessage(subtype="success", duration_ms=10, num_turns=1, session_id="S0"),
]
out = synthesize_turn_messages(events)
assert out == [
{
"role": "assistant",
"content": [
{"type": "tool_use", "id": "tu_1", "name": "Read", "input": {"q": "x"}}
],
},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": "tu_1",
"content": "result body",
"is_error": None,
}
],
},
{"role": "assistant", "content": [{"type": "text", "text": "done"}]},
]
# --- smoke test (real claude) --------------------------------------------
_SMOKE_ENV = "RUN_CLAUDE_SMOKE"
@pytest.mark.skipif(
os.environ.get(_SMOKE_ENV) != "1",
reason=f"set {_SMOKE_ENV}=1 to run the real-`claude` smoke test",
)
@pytest.mark.asyncio
async def test_smoke_backend_round_trip(tmp_path: Path) -> None:
"""End-to-end against real claude through the public API.
Single `complete()` call with no prior history → fresh session →
yields events. Asserts the same shape contracts the gateway will
rely on: at least one terminal assistant message and a final
`ResultMessage` whose session_id matches the live PTY.
"""
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path), dangerously_skip_permissions=True),
)
events: list[Any] = []
try:
async for event in backend.complete([{"role": "user", "content": "say hi"}]):
events.append(event)
finally:
await backend.aclose()
terminal = next(
(
e
for e in events
if isinstance(e, AssistantMessage)
and e.stop_reason in {"end_turn", "max_tokens", "stop_sequence", "refusal"}
),
None,
)
assert terminal is not None
assert any(isinstance(b, TextBlock) for b in terminal.content)
assert isinstance(events[-1], ResultMessage)
assert events[-1].stop_reason == terminal.stop_reason
@pytest.mark.skipif(
os.environ.get(_SMOKE_ENV) != "1",
reason=f"set {_SMOKE_ENV}=1 to run the real-`claude` smoke test",
)
@pytest.mark.asyncio
async def test_smoke_backend_native_jsonl_injection(tmp_path: Path) -> None:
"""Real claude, real injection: send a 3-message history (no live
session yet), the backend writes a seed JSONL and resumes — the
assistant reply must reference the seeded context.
"""
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path), dangerously_skip_permissions=True),
)
messages = [
{"role": "user", "content": "My name is Beaver. Please remember it."},
{"role": "assistant", "content": "Got it — your name is Beaver."},
{"role": "user", "content": "What is my name? Answer with just the name, one word."},
]
events: list[Any] = []
try:
async for event in backend.complete(messages):
events.append(event)
finally:
await backend.aclose()
# The seeded JSONL should be visible on disk under the session path.
# (We can't easily get the session_id back here, but the test of
# correctness is in the reply.)
terminal = next(
(
e
for e in events
if isinstance(e, AssistantMessage) and e.stop_reason == "end_turn"
),
None,
)
assert terminal is not None
text = " ".join(b.text for b in terminal.content if isinstance(b, TextBlock))
assert "beaver" in text.lower(), f"injection failed to plant context; got {text!r}"
# Sanity: the file the backend resumed against exists and contains our seed.
session_id = events[-1].session_id # type: ignore[union-attr]
assert isinstance(session_id, str)
jsonl_path = resolve_jsonl_path(str(tmp_path), session_id)
assert jsonl_path.exists()
# The seeded user record's content text is in the file.
assert "My name is Beaver" in jsonl_path.read_text()
@pytest.mark.skipif(
os.environ.get(_SMOKE_ENV) != "1",
reason=f"set {_SMOKE_ENV}=1 to run the real-`claude` smoke test",
)
@pytest.mark.asyncio
async def test_smoke_cancellation_kills_pty_no_zombie(tmp_path: Path) -> None:
"""Smoke 4 (Stage 9): cancel a real long-running turn, assert the PTY
dies cleanly with no zombie left behind.
Strategy:
- prompt claude with something verbose so the turn stays in flight
long enough for us to cancel mid-stream;
- wrap the spawn through `_session_factory` so we can capture the
live `PtyClaudeProcess` while it's still in flight (the backend
does NOT keep in-flight sessions in `_sessions`);
- cancel the consumer task as soon as we've seen at least one event
(proving the turn really started — otherwise we'd be cancelling a
not-yet-spawned session);
- after the cancel propagates, assert: PTY is dead (no `kill -0`),
pool is empty, and a second `complete()` on the same backend still
works (lock was released).
"""
import signal as _signal
captured: list[Any] = [] # collected _LiveSession objects
backend_box: dict[str, ClaudeCodeBackend] = {}
def capturing_factory(
backend: ClaudeCodeBackend,
session_id: str,
resume: bool,
jsonl_path: Path,
start_offset: int,
) -> Any:
async def _real() -> Any:
session = await backend._spawn_real_session( # type: ignore[attr-defined]
session_id=session_id,
resume=resume,
jsonl_path=jsonl_path,
start_offset=start_offset,
)
captured.append(session)
return session
return _real()
backend = ClaudeCodeBackend(
BackendOptions(cwd=str(tmp_path), dangerously_skip_permissions=True),
_session_factory=capturing_factory,
)
backend_box["b"] = backend
saw_event = asyncio.Event()
events: list[Any] = []
long_prompt = (
"Please count slowly from 1 to 500, one number per line, in plain text. "
"Do not stop until you reach 500."
)
async def consumer() -> None:
async for event in backend.complete([{"role": "user", "content": long_prompt}]):
events.append(event)
saw_event.set()
task = asyncio.create_task(consumer())
try:
# Wait until we have at least one event so we know the turn is in
# flight on a live PTY. 30s is comfortably above the typical
# spawn + first-record latency (~3-5s for cold claude startup).
await asyncio.wait_for(saw_event.wait(), timeout=30.0)
assert len(captured) == 1, (
f"expected exactly one captured session at cancel time; got {len(captured)}"
)
live = captured[0]
pid = live.pty.pid
assert pid is not None and pid > 0
assert live.pty.is_alive() is True
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task
# SIGTERM ladder runs inside session.aclose() during cleanup, so by
# the time `await task` returns the PTY has been reaped.
assert live.pty.is_alive() is False, "PTY still alive after cancel cleanup"
assert backend.live_session_count == 0, (
"cancelled session must not be re-stashed in the live pool"
)
# Belt-and-suspenders: confirm the OS no longer has the pid.
# `os.kill(pid, 0)` raises ProcessLookupError when the process is gone;
# any other state (zombie not yet reaped, still alive) raises something
# else or returns successfully. We accept both ProcessLookupError and
# the kernel reporting the pid is gone.
try:
os.kill(pid, 0)
# If we got here, the pid is still claimable. With pty.close(force=True)
# in _reap that shouldn't happen, but on macOS the reap might race
# very briefly — give it one more beat.
await asyncio.sleep(0.2)
with pytest.raises(ProcessLookupError):
os.kill(pid, 0)
except ProcessLookupError:
pass # expected: process is gone
# Lock released — a fresh call must still work end-to-end.
followup_events: list[Any] = []
async for ev in backend.complete([{"role": "user", "content": "say hi"}]):
followup_events.append(ev)
assert isinstance(followup_events[-1], ResultMessage), (
"follow-up turn failed; backend may have leaked state after cancel"
)
finally:
# Defensive: if anything above failed, make sure we don't leave a
# zombie claude around for the next test run.
if not task.done():
task.cancel()
with contextlib.suppress(BaseException):
await task
for s in captured:
if s.pty.is_alive():
with contextlib.suppress(BaseException):
s.pty._pty.kill(_signal.SIGKILL) # type: ignore[union-attr]
await backend.aclose()