"""Unit + smoke tests for Layer 4 (`TurnManager`). Unit tests use a `FakePty` that, on `write()`, dumps a scripted list of JSONL records into a real temp file. A real `JsonlWatcher` tails that file so the manager's read/normalize/turn-end loop is exercised end-to-end without launching `claude`. The smoke test at the bottom spawns the real binary behind `RUN_CLAUDE_SMOKE=1` and also serves as the empirical probe for Open Q #2 (PTY echo / buffering). """ from __future__ import annotations import asyncio import json import os import sys from pathlib import Path from typing import Any import pytest from claude_code_api import ( AssistantMessage, AuthError, ProcessError, RateLimitError, ResultMessage, SessionError, SystemMessage, TextBlock, ToolResultBlock, ToolUseBlock, UserMessage, ) from claude_code_api.paths import resolve_jsonl_path from claude_code_api.watcher import JsonlWatcher from claude_code_api.pty import PtyClaudeProcess, PtyProcessOptions from claude_code_api.turn import TurnManager # --- fakes ----------------------------------------------------------------- class FakePty: """Stand-in for `PtyClaudeProcess` that flushes a scripted JSONL batch on write. The script is a list of records that get appended to `jsonl_path` (one JSON object per line) as soon as the manager calls `write()`. This lets a single synchronous setup drive the full turn loop — no async coordination, no real `claude`. Multi-write scripts are supported: the Nth `write()` flushes the Nth element of `scripts`. Stage 10 additions: `alive` and `output` knobs let tests simulate sub-process death and error chrome captured from the PTY drain buffer, which `TurnManager` consults when classifying failures. """ def __init__( self, tmp_path: Path, *, session_id: str = "fake-session-0001", scripts: list[list[dict[str, Any]]] | None = None, alive: bool = True, output: bytes = b"", ) -> None: self.cwd = str(tmp_path) self.session_id = session_id self._jsonl = tmp_path / f"{session_id}.jsonl" self._scripts = scripts if scripts is not None else [] self._write_count = 0 self.writes: list[str] = [] self.started = False self.closed = False self._alive = alive self._output = output async def start(self) -> None: self.started = True async def write(self, text: str, *, newline: bool = True) -> int: self.writes.append(text) if self._write_count < len(self._scripts): records = self._scripts[self._write_count] with self._jsonl.open("a", encoding="utf-8") as f: for rec in records: f.write(json.dumps(rec) + "\n") self._write_count += 1 return len(text) async def aclose(self) -> None: self.closed = True # --- Stage 10 surface ---------------------------------------------- def is_alive(self) -> bool: return self._alive def captured_output(self) -> bytes: return self._output def set_alive(self, alive: bool) -> None: self._alive = alive def set_output(self, output: bytes) -> None: self._output = output def _user_rec(text: str) -> dict[str, Any]: return { "type": "user", "uuid": f"u-{text[:8]}", "sessionId": "fake-session-0001", "parentUuid": None, "message": {"role": "user", "content": text}, } def _assistant_rec( text: str, *, stop_reason: str | None = "end_turn", usage: dict[str, Any] | None = None, ) -> dict[str, Any]: return { "type": "assistant", "uuid": f"a-{text[:8]}", "sessionId": "fake-session-0001", "parentUuid": None, "message": { "id": "msg_x", "role": "assistant", "model": "claude-test", "content": [{"type": "text", "text": text}], "stop_reason": stop_reason, "usage": usage or {"input_tokens": 1, "output_tokens": 1}, }, } def _tool_use_assistant_rec(name: str, tool_id: str) -> dict[str, Any]: return { "type": "assistant", "uuid": f"a-tu-{tool_id}", "sessionId": "fake-session-0001", "parentUuid": None, "message": { "id": "msg_y", "role": "assistant", "model": "claude-test", "content": [{"type": "tool_use", "id": tool_id, "name": name, "input": {}}], "stop_reason": "tool_use", "usage": {"input_tokens": 1, "output_tokens": 1}, }, } def _tool_result_user_rec(tool_id: str, content: str) -> dict[str, Any]: return { "type": "user", "uuid": f"u-tr-{tool_id}", "sessionId": "fake-session-0001", "parentUuid": None, "message": { "role": "user", "content": [{"type": "tool_result", "tool_use_id": tool_id, "content": content}], }, } def _turn_duration_rec(duration_ms: int = 1234) -> dict[str, Any]: return { "type": "system", "subtype": "turn_duration", "uuid": "sys-td", "sessionId": "fake-session-0001", "durationMs": duration_ms, } def _make_manager( fake: FakePty, *, wait_for_turn_duration: bool = False, startup_delay: float = 0.0, turn_duration_timeout: float | None = 1.0, on_parse_error: Any = None, ) -> TurnManager: """Build a TurnManager wired to a real JsonlWatcher on the fake's path.""" watcher = JsonlWatcher( Path(fake.cwd) / f"{fake.session_id}.jsonl", poll_interval=0.01, ) return TurnManager( fake, # type: ignore[arg-type] watcher, wait_for_turn_duration=wait_for_turn_duration, startup_delay=startup_delay, turn_duration_timeout=turn_duration_timeout, on_parse_error=on_parse_error, ) # --- construction validation ---------------------------------------------- def test_init_rejects_negative_file_wait_timeout(tmp_path: Path) -> None: fake = FakePty(tmp_path) watcher = JsonlWatcher(tmp_path / "x.jsonl") with pytest.raises(ValueError, match="file_wait_timeout"): TurnManager(fake, watcher, file_wait_timeout=-1) # type: ignore[arg-type] def test_init_rejects_negative_startup_delay(tmp_path: Path) -> None: fake = FakePty(tmp_path) watcher = JsonlWatcher(tmp_path / "x.jsonl") with pytest.raises(ValueError, match="startup_delay"): TurnManager(fake, watcher, startup_delay=-0.5) # type: ignore[arg-type] def test_init_rejects_negative_turn_duration_timeout(tmp_path: Path) -> None: fake = FakePty(tmp_path) watcher = JsonlWatcher(tmp_path / "x.jsonl") with pytest.raises(ValueError, match="turn_duration_timeout"): TurnManager(fake, watcher, turn_duration_timeout=-1) # type: ignore[arg-type] # --- lifecycle guards ----------------------------------------------------- @pytest.mark.asyncio async def test_send_before_start_raises(tmp_path: Path) -> None: fake = FakePty(tmp_path) tm = _make_manager(fake) with pytest.raises(RuntimeError, match="before start"): async for _ in tm.send_user_message("hi"): pass @pytest.mark.asyncio async def test_start_is_idempotent(tmp_path: Path) -> None: fake = FakePty(tmp_path) tm = _make_manager(fake) await tm.start() await tm.start() # FakePty.start() flips `started` either way; we just need no exception # and a stable state machine. assert fake.started is True # --- happy path: one turn, terminal end_turn ------------------------------- @pytest.mark.asyncio async def test_basic_turn_yields_user_assistant_then_result(tmp_path: Path) -> None: fake = FakePty( tmp_path, scripts=[ [ _user_rec("say hi"), _assistant_rec("hi!", stop_reason="end_turn"), # turn_duration is in the script but with # wait_for_turn_duration=False it gets queued behind our # early return — we don't yield it. _turn_duration_rec(), ] ], ) tm = _make_manager(fake) await tm.start() events: list[Any] = [] async for event in tm.send_user_message("say hi"): events.append(event) await tm.aclose() assert fake.writes == ["say hi"] assert isinstance(events[0], UserMessage) assert isinstance(events[1], AssistantMessage) assert events[1].stop_reason == "end_turn" assert isinstance(events[1].content[0], TextBlock) assert isinstance(events[-1], ResultMessage) assert events[-1].stop_reason == "end_turn" assert events[-1].num_turns == 1 assert events[-1].session_id == fake.session_id # No turn_duration → duration_ms falls back to 0 in the synthesized result. assert events[-1].duration_ms == 0 @pytest.mark.asyncio async def test_wait_for_turn_duration_carries_duration_ms(tmp_path: Path) -> None: fake = FakePty( tmp_path, scripts=[ [ _user_rec("ping"), _assistant_rec("pong", stop_reason="end_turn"), _turn_duration_rec(duration_ms=4242), ] ], ) tm = _make_manager(fake, wait_for_turn_duration=True) await tm.start() events = [e async for e in tm.send_user_message("ping")] await tm.aclose() # We also want the system event itself to be visible in the stream. assert any(isinstance(e, SystemMessage) and e.subtype == "turn_duration" for e in events) result = events[-1] assert isinstance(result, ResultMessage) assert result.duration_ms == 4242 # --- tool loop continues until next terminal ----------------------------- @pytest.mark.asyncio async def test_tool_use_stop_reason_does_not_close_turn(tmp_path: Path) -> None: fake = FakePty( tmp_path, scripts=[ [ _user_rec("compute"), _tool_use_assistant_rec("Bash", "tool_1"), _tool_result_user_rec("tool_1", "42"), _assistant_rec("the answer is 42", stop_reason="end_turn"), ] ], ) tm = _make_manager(fake) await tm.start() events = [e async for e in tm.send_user_message("compute")] await tm.aclose() assistants = [e for e in events if isinstance(e, AssistantMessage)] # Both assistant records made it through — the tool_use one did not # short-circuit the loop. assert len(assistants) == 2 assert assistants[0].stop_reason == "tool_use" assert assistants[1].stop_reason == "end_turn" assert isinstance(events[-1], ResultMessage) assert events[-1].stop_reason == "end_turn" # --- error & misuse paths ------------------------------------------------- @pytest.mark.asyncio async def test_parse_error_callback_keeps_stream_alive(tmp_path: Path) -> None: # A bogus record (missing `message`) sits between two valid ones. The # callback should fire once and the stream should still terminate cleanly. bad = {"type": "assistant", "uuid": "x", "sessionId": "fake-session-0001"} fake = FakePty( tmp_path, scripts=[ [ _user_rec("hi"), bad, _assistant_rec("ok", stop_reason="end_turn"), ] ], ) errors: list[tuple[Exception, dict[str, Any]]] = [] tm = _make_manager(fake, on_parse_error=lambda exc, rec: errors.append((exc, rec))) await tm.start() events = [e async for e in tm.send_user_message("hi")] await tm.aclose() assert len(errors) == 1 assert errors[0][1] is bad or errors[0][1] == bad assert isinstance(events[-1], ResultMessage) @pytest.mark.asyncio async def test_double_send_raises_while_turn_in_progress(tmp_path: Path) -> None: # Manager that will NEVER see a terminal assistant (no scripted records). # Drive one __anext__ on the first generator so it enters the polling loop, # then attempt a second concurrent send. fake = FakePty(tmp_path, scripts=[[]]) # Touch the file so the file-wait doesn't block forever. (tmp_path / f"{fake.session_id}.jsonl").touch() tm = _make_manager(fake) await tm.start() gen1 = tm.send_user_message("first") # Spin up the generator: schedule one read pass. task = asyncio.create_task(gen1.__anext__()) await asyncio.sleep(0.05) # let _iter_turn flip turn_in_progress with pytest.raises(RuntimeError, match="turn is in progress"): async for _ in tm.send_user_message("second"): pass task.cancel() with pytest.raises((asyncio.CancelledError, StopAsyncIteration)): await task await tm.aclose() @pytest.mark.asyncio async def test_aclose_terminates_owned_pty(tmp_path: Path) -> None: fake = FakePty(tmp_path) tm = _make_manager(fake) await tm.start() await tm.aclose() assert fake.closed is True @pytest.mark.asyncio async def test_aclose_skips_pty_when_not_owned(tmp_path: Path) -> None: fake = FakePty(tmp_path) watcher = JsonlWatcher(tmp_path / f"{fake.session_id}.jsonl", poll_interval=0.01) tm = TurnManager(fake, watcher, owns_pty=False, startup_delay=0.0) # type: ignore[arg-type] await tm.start() await tm.aclose() assert fake.closed is False # --- Stage 10: error mapping --------------------------------------------- @pytest.mark.asyncio async def test_session_error_raised_when_jsonl_never_appears(tmp_path: Path) -> None: """No script → FakePty.write() doesn't create the JSONL → the file-wait timeout fires → TurnManager raises SessionError (not the raw asyncio.TimeoutError).""" fake = FakePty(tmp_path, scripts=[]) # write() is a no-op for JSONL watcher = JsonlWatcher( tmp_path / f"{fake.session_id}.jsonl", poll_interval=0.01, ) tm = TurnManager( fake, # type: ignore[arg-type] watcher, startup_delay=0.0, file_wait_timeout=0.05, # fire fast ) await tm.start() with pytest.raises(SessionError): async for _ in tm.send_user_message("hi"): pass await tm.aclose() @pytest.mark.asyncio async def test_auth_marker_in_pty_output_raises_auth_error(tmp_path: Path) -> None: """When the JSONL never appears AND captured PTY output carries an auth-block marker, the classifier promotes the failure to AuthError (instead of the generic SessionError).""" fake = FakePty( tmp_path, scripts=[], output=b"Failed to authenticate. Please run /login.\r\n", ) watcher = JsonlWatcher( tmp_path / f"{fake.session_id}.jsonl", poll_interval=0.01, ) tm = TurnManager( fake, # type: ignore[arg-type] watcher, startup_delay=0.0, file_wait_timeout=0.05, ) await tm.start() with pytest.raises(AuthError): async for _ in tm.send_user_message("hi"): pass await tm.aclose() @pytest.mark.asyncio async def test_rate_limit_marker_promotes_session_error_to_rate_limit( tmp_path: Path, ) -> None: """Same path as the auth case but with a rate-limit marker.""" fake = FakePty( tmp_path, scripts=[], output=b"\x1b[31mYou've hit your limit\x1b[0m. Try again at 9pm.", ) watcher = JsonlWatcher( tmp_path / f"{fake.session_id}.jsonl", poll_interval=0.01, ) tm = TurnManager( fake, # type: ignore[arg-type] watcher, startup_delay=0.0, file_wait_timeout=0.05, ) await tm.start() with pytest.raises(RateLimitError): async for _ in tm.send_user_message("hi"): pass await tm.aclose() @pytest.mark.asyncio async def test_process_death_mid_poll_raises_process_error(tmp_path: Path) -> None: """The JSONL appears (so we leave the wait-for-file phase) but no terminal assistant ever arrives AND the PTY reports dead. Detection fires from inside the poll loop, with the captured output included in the exception so a gateway can log what claude wrote before exiting. """ fake = FakePty( tmp_path, scripts=[[_user_rec("hi")]], # only the user record — no assistant output=b"some claude chrome before death\r\n", ) watcher = JsonlWatcher( tmp_path / f"{fake.session_id}.jsonl", poll_interval=0.01, ) tm = TurnManager( fake, # type: ignore[arg-type] watcher, startup_delay=0.0, file_wait_timeout=2.0, ) await tm.start() async def consumer() -> list[Any]: events: list[Any] = [] async for ev in tm.send_user_message("hi"): events.append(ev) # Once we've seen the user record, declare the PTY dead so the # next polling pass enters the failure branch. if isinstance(ev, UserMessage): fake.set_alive(False) return events with pytest.raises(ProcessError) as info: await consumer() assert "exited before a terminal" in str(info.value) assert info.value.stderr is not None assert "claude chrome before death" in info.value.stderr await tm.aclose() @pytest.mark.asyncio async def test_process_death_with_rate_limit_marker_raises_rate_limit( tmp_path: Path, ) -> None: """Process-death classifier defers to the PTY marker: if the buffer carries a rate-limit notice, raise the typed marker, not the generic ProcessError.""" fake = FakePty( tmp_path, scripts=[[_user_rec("hi")]], output=b"You've hit your limit. Cooling off.", ) watcher = JsonlWatcher( tmp_path / f"{fake.session_id}.jsonl", poll_interval=0.01, ) tm = TurnManager( fake, # type: ignore[arg-type] watcher, startup_delay=0.0, file_wait_timeout=2.0, ) await tm.start() async def consumer() -> None: async for ev in tm.send_user_message("hi"): if isinstance(ev, UserMessage): fake.set_alive(False) with pytest.raises(RateLimitError): await consumer() await tm.aclose() # --- multi-turn (Stage 6) ------------------------------------------------- @pytest.mark.asyncio async def test_two_consecutive_turns_each_yield_only_fresh_records(tmp_path: Path) -> None: """Stage 6 core: a second `send_user_message()` on the same manager sees only the records appended after the first turn ended. The watcher is reused across turns and tracks the byte offset internally (see PROGRESS.md decision log: "TurnManager does NOT own JsonlWatcher.offset"). This test pins that contract. """ fake = FakePty( tmp_path, scripts=[ [ _user_rec("Q1"), _assistant_rec("A1", stop_reason="end_turn"), ], [ _user_rec("Q2"), _assistant_rec("A2", stop_reason="end_turn"), ], ], ) tm = _make_manager(fake) await tm.start() turn1 = [e async for e in tm.send_user_message("Q1")] turn2 = [e async for e in tm.send_user_message("Q2")] await tm.aclose() assert fake.writes == ["Q1", "Q2"] # Turn 1: user("Q1"), assistant("A1"), result assert [type(e).__name__ for e in turn1] == [ "UserMessage", "AssistantMessage", "ResultMessage", ] assert turn1[0].content == "Q1" assert isinstance(turn1[1], AssistantMessage) assert isinstance(turn1[1].content[0], TextBlock) assert turn1[1].content[0].text == "A1" assert isinstance(turn1[-1], ResultMessage) assert turn1[-1].num_turns == 1 # Turn 2 must NOT leak any of turn 1's records back to the caller. assert [type(e).__name__ for e in turn2] == [ "UserMessage", "AssistantMessage", "ResultMessage", ] assert turn2[0].content == "Q2" assert isinstance(turn2[1], AssistantMessage) assert isinstance(turn2[1].content[0], TextBlock) assert turn2[1].content[0].text == "A2" # Turn-count bookkeeping increments across turns; session_id is stable. assert isinstance(turn2[-1], ResultMessage) assert turn2[-1].num_turns == 2 assert turn2[-1].session_id == turn1[-1].session_id == fake.session_id assert tm.turn_count == 2 @pytest.mark.asyncio async def test_multi_turn_with_wait_for_turn_duration_carries_each_duration( tmp_path: Path, ) -> None: """When `wait_for_turn_duration=True`, each turn's synthesized result carries its own duration. The watcher offset advances past the intervening turn_duration heartbeat so turn 2 starts clean. """ fake = FakePty( tmp_path, scripts=[ [ _user_rec("ping1"), _assistant_rec("pong1", stop_reason="end_turn"), _turn_duration_rec(duration_ms=111), ], [ _user_rec("ping2"), _assistant_rec("pong2", stop_reason="end_turn"), _turn_duration_rec(duration_ms=222), ], ], ) tm = _make_manager(fake, wait_for_turn_duration=True) await tm.start() turn1 = [e async for e in tm.send_user_message("ping1")] turn2 = [e async for e in tm.send_user_message("ping2")] await tm.aclose() assert isinstance(turn1[-1], ResultMessage) assert turn1[-1].duration_ms == 111 assert turn1[-1].num_turns == 1 assert isinstance(turn2[-1], ResultMessage) assert turn2[-1].duration_ms == 222 assert turn2[-1].num_turns == 2 # --- smoke test (real claude) --------------------------------------------- _SMOKE_ENV = "RUN_CLAUDE_SMOKE" @pytest.mark.skipif( os.environ.get(_SMOKE_ENV) != "1", reason=f"set {_SMOKE_ENV}=1 to run the real-`claude` smoke test", ) @pytest.mark.asyncio async def test_smoke_send_hi(tmp_path: Path) -> None: """Smoke 1: end-to-end one-turn against real claude. Confirms: PTY spawn, JSONL discovery, watcher tail, normalizer mapping, turn-end detection, and ResultMessage synthesis all line up. Also doubles as the empirical probe for Open Q #2 — if claude doesn't pick up our prompt after `pty.write("say hi\\r")`, the JSONL never grows and the file-wait timeout fires; that failure mode tells us the carriage-return + 1s startup delay is not enough and we need a different submit mechanism. """ opts = PtyProcessOptions( cwd=str(tmp_path), dangerously_skip_permissions=True, ) pty = PtyClaudeProcess(opts) jsonl_path = resolve_jsonl_path(pty.cwd, pty.session_id) watcher = JsonlWatcher(jsonl_path) tm = TurnManager(pty, watcher) try: await tm.start() events: list[Any] = [] async for event in tm.send_user_message("say hi"): events.append(event) finally: await tm.aclose() assistants = [e for e in events if isinstance(e, AssistantMessage)] assert assistants, ( f"no AssistantMessage in stream; got {[type(e).__name__ for e in events]}" ) terminal = next( ( a for a in assistants if a.stop_reason in {"end_turn", "max_tokens", "stop_sequence", "refusal"} ), None, ) assert terminal is not None, ( f"no terminal stop_reason; got {[a.stop_reason for a in assistants]}" ) assert any(isinstance(b, TextBlock) for b in terminal.content) assert isinstance(events[-1], ResultMessage) assert events[-1].stop_reason == terminal.stop_reason assert events[-1].session_id == pty.session_id @pytest.mark.skipif( os.environ.get(_SMOKE_ENV) != "1", reason=f"set {_SMOKE_ENV}=1 to run the real-`claude` smoke test", ) @pytest.mark.asyncio async def test_smoke_multi_turn_context_persists(tmp_path: Path) -> None: """Smoke 2 (Stage 6): two turns on one TurnManager, the second must see the first's context. Turn 1 plants a memorable token via the user message; turn 2 asks for it back. If the same `--session-id` PTY truly accumulates context (as the JSONL design implies), the second assistant text contains the token. If instead each turn ran isolated, the second reply would not know it. The token is a low-entropy proper noun ("Beaver" — same one we used in the JSONL injection probe) chosen to be unlikely-but-not-impossible to appear spontaneously, so a false positive remains very unlikely while keeping the prompt natural. """ opts = PtyProcessOptions( cwd=str(tmp_path), dangerously_skip_permissions=True, ) pty = PtyClaudeProcess(opts) jsonl_path = resolve_jsonl_path(pty.cwd, pty.session_id) watcher = JsonlWatcher(jsonl_path) tm = TurnManager(pty, watcher) turn1_events: list[Any] = [] turn2_events: list[Any] = [] try: await tm.start() async for event in tm.send_user_message( "Please remember: my name is Beaver. Reply with just 'ok'." ): turn1_events.append(event) async for event in tm.send_user_message( "What is my name? Answer with the single word only." ): turn2_events.append(event) finally: await tm.aclose() # Both turns yielded a synthesized result; num_turns increments. assert isinstance(turn1_events[-1], ResultMessage) assert isinstance(turn2_events[-1], ResultMessage) assert turn1_events[-1].num_turns == 1 assert turn2_events[-1].num_turns == 2 assert turn1_events[-1].session_id == turn2_events[-1].session_id == pty.session_id assert tm.turn_count == 2 # Second turn's terminal assistant must reference the planted token. turn2_assistants = [e for e in turn2_events if isinstance(e, AssistantMessage)] terminal2 = next( ( a for a in turn2_assistants if a.stop_reason in {"end_turn", "max_tokens", "stop_sequence", "refusal"} ), None, ) assert terminal2 is not None, ( f"no terminal stop_reason in turn 2; got {[a.stop_reason for a in turn2_assistants]}" ) text2 = " ".join(b.text for b in terminal2.content if isinstance(b, TextBlock)) assert "beaver" in text2.lower(), ( f"turn 2 did not inherit context from turn 1; reply was: {text2!r}" ) # --- Stage 7: tool calls via external MCP server ------------------------- _REPO_ROOT = Path(__file__).resolve().parent.parent _ECHO_MCP_SCRIPT = _REPO_ROOT / "scripts" / "echo_mcp_server.py" @pytest.mark.skipif( os.environ.get(_SMOKE_ENV) != "1", reason=f"set {_SMOKE_ENV}=1 to run the real-`claude` smoke test", ) @pytest.mark.asyncio async def test_smoke_tool_call_via_mcp(tmp_path: Path) -> None: """Smoke 3 (Stage 7): real claude routes a tool call through an external stdio MCP server, and the resulting `tool_use` + `tool_result` records surface as typed events. Setup: - `scripts/echo_mcp_server.py` is a zero-dep stdio MCP server with one tool, `echo`, that returns its `text` argument verbatim. - We point claude at it via a temp `--mcp-config` JSON file (one server named "echo"). `--strict-mcp-config` keeps the user's ambient `.mcp.json` from leaking in and changing the tool surface. Assertions: - At least one `AssistantMessage.content` carries a `ToolUseBlock` whose name references the echo tool (claude exposes external MCP tools as `mcp____`, here `mcp__echo__echo`). - The follow-up `UserMessage` carries a `ToolResultBlock` whose content includes the marker token we asked the tool to echo — the only place that token can come from is the MCP server, so seeing it round-tripped proves the full path worked. - A terminal assistant closes the turn and the synthesized `ResultMessage` reflects its stop_reason. """ assert _ECHO_MCP_SCRIPT.exists(), f"missing echo MCP server at {_ECHO_MCP_SCRIPT}" marker = "banana42xyz" # low-collision sentinel; must appear in tool_result mcp_config_path = tmp_path / "mcp_config.json" mcp_config_path.write_text( json.dumps( { "mcpServers": { "echo": { "command": sys.executable, "args": [str(_ECHO_MCP_SCRIPT)], }, }, } ) ) opts = PtyProcessOptions( cwd=str(tmp_path), dangerously_skip_permissions=True, mcp_config=(str(mcp_config_path),), ) pty = PtyClaudeProcess(opts) jsonl_path = resolve_jsonl_path(pty.cwd, pty.session_id) watcher = JsonlWatcher(jsonl_path) # External MCP servers spawn during claude's startup, so the input box # mounts a bit later than for a bare session. The 60s file-wait still # leaves headroom even on a slow first MCP handshake. tm = TurnManager(pty, watcher, file_wait_timeout=60.0) prompt = f"Call mcp__echo__echo with text={marker!r}, then reply 'done'." events: list[Any] = [] try: await tm.start() async for event in tm.send_user_message(prompt): events.append(event) finally: await tm.aclose() # --- assertions --- tool_uses: list[ToolUseBlock] = [] for ev in events: if isinstance(ev, AssistantMessage): tool_uses.extend(b for b in ev.content if isinstance(b, ToolUseBlock)) assert tool_uses, ( "no ToolUseBlock in any assistant message; got " f"{[type(e).__name__ for e in events]}" ) echo_uses = [t for t in tool_uses if "echo" in t.name.lower()] assert echo_uses, ( f"no tool_use referenced the echo tool; saw names {[t.name for t in tool_uses]}" ) # The marker text only exists on the MCP server side, so finding it in a # tool_result block proves the round-trip actually completed. tool_results: list[ToolResultBlock] = [] for ev in events: if isinstance(ev, UserMessage) and isinstance(ev.content, list): tool_results.extend(b for b in ev.content if isinstance(b, ToolResultBlock)) assert tool_results, "no ToolResultBlock in any user message after the tool call" def _result_text(block: ToolResultBlock) -> str: if isinstance(block.content, str): return block.content if isinstance(block.content, list): chunks: list[str] = [] for part in block.content: if isinstance(part, dict) and isinstance(part.get("text"), str): chunks.append(part["text"]) return " ".join(chunks) return "" assert any(marker in _result_text(b) for b in tool_results), ( f"marker {marker!r} did not appear in any tool_result; got " f"{[_result_text(b) for b in tool_results]}" ) terminal_assistant = next( ( ev for ev in events if isinstance(ev, AssistantMessage) and ev.stop_reason in {"end_turn", "max_tokens", "stop_sequence", "refusal"} ), None, ) assert terminal_assistant is not None, ( "no terminal assistant after tool round-trip; got stop_reasons " f"{[e.stop_reason for e in events if isinstance(e, AssistantMessage)]}" ) assert isinstance(events[-1], ResultMessage) assert events[-1].stop_reason == terminal_assistant.stop_reason