"""Unit tests for Layer 2 (`JsonlWatcher`). All tests use temp files; no `claude` involved. The watcher is exercised both in its single-pass mode (`read_once`) and in its long-running mode (`tail`). For `tail`, a producer task appends to the file while a consumer pulls from the async iterator; both run under one event loop with a short poll interval so tests stay quick. """ from __future__ import annotations import asyncio import json from pathlib import Path import pytest from claude_code_api.watcher import JsonlWatcher def _write_records(path: Path, records: list[dict]) -> None: """Append JSONL records as a single text blob (with trailing newline).""" blob = "".join(json.dumps(r) + "\n" for r in records) with path.open("a", encoding="utf-8") as f: f.write(blob) # --- construction validation ------------------------------------------------ def test_init_rejects_nonpositive_poll_interval(tmp_path: Path) -> None: with pytest.raises(ValueError, match="poll_interval"): JsonlWatcher(tmp_path / "x.jsonl", poll_interval=0) with pytest.raises(ValueError, match="poll_interval"): JsonlWatcher(tmp_path / "x.jsonl", poll_interval=-1) def test_init_rejects_negative_start_offset(tmp_path: Path) -> None: with pytest.raises(ValueError, match="start_offset"): JsonlWatcher(tmp_path / "x.jsonl", start_offset=-1) def test_init_rejects_nonpositive_read_chunk(tmp_path: Path) -> None: with pytest.raises(ValueError, match="read_chunk"): JsonlWatcher(tmp_path / "x.jsonl", read_chunk=0) def test_path_is_exposed(tmp_path: Path) -> None: p = tmp_path / "x.jsonl" w = JsonlWatcher(p) assert w.path == p assert w.offset == 0 # --- read_once: synchronous behavior --------------------------------------- @pytest.mark.asyncio async def test_read_once_returns_empty_when_file_missing(tmp_path: Path) -> None: w = JsonlWatcher(tmp_path / "missing.jsonl") assert await w.read_once() == [] # Offset must not advance when there's nothing to read. assert w.offset == 0 @pytest.mark.asyncio async def test_read_once_returns_all_existing_records(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" records = [ {"type": "user", "i": 0}, {"type": "assistant", "i": 1}, {"type": "system", "i": 2}, ] _write_records(p, records) w = JsonlWatcher(p) got = await w.read_once() assert got == records # Offset should now be at EOF. assert w.offset == p.stat().st_size @pytest.mark.asyncio async def test_read_once_is_incremental(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" _write_records(p, [{"i": 0}]) w = JsonlWatcher(p) assert await w.read_once() == [{"i": 0}] # Second pass with no new bytes: empty. assert await w.read_once() == [] # Append more — only the new ones come out. _write_records(p, [{"i": 1}, {"i": 2}]) assert await w.read_once() == [{"i": 1}, {"i": 2}] @pytest.mark.asyncio async def test_read_once_buffers_partial_line(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" # Write a complete record + a partial record (no trailing newline). rec1 = {"complete": True} partial = '{"complete":' with p.open("w", encoding="utf-8") as f: f.write(json.dumps(rec1) + "\n") f.write(partial) # no newline w = JsonlWatcher(p) assert await w.read_once() == [rec1] # Offset has consumed the partial bytes too — they're stashed internally. assert w.offset == p.stat().st_size # Now finish the partial line. with p.open("a", encoding="utf-8") as f: f.write(" false}\n") assert await w.read_once() == [{"complete": False}] @pytest.mark.asyncio async def test_read_once_skips_blank_lines(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" # Mix in some blank lines — the watcher should ignore them rather than # treat them as parse errors. with p.open("w", encoding="utf-8") as f: f.write("\n") f.write(json.dumps({"i": 0}) + "\n") f.write(" \n") f.write(json.dumps({"i": 1}) + "\n") f.write("\n") w = JsonlWatcher(p) assert await w.read_once() == [{"i": 0}, {"i": 1}] @pytest.mark.asyncio async def test_read_once_invokes_parse_error_callback(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" with p.open("w", encoding="utf-8") as f: f.write(json.dumps({"i": 0}) + "\n") f.write("this is not json\n") f.write(json.dumps({"i": 2}) + "\n") errors: list[tuple[bytes, Exception]] = [] w = JsonlWatcher(p, on_parse_error=lambda line, exc: errors.append((line, exc))) got = await w.read_once() # Bad line skipped; valid ones returned. assert got == [{"i": 0}, {"i": 2}] assert len(errors) == 1 bad_line, exc = errors[0] assert bad_line == b"this is not json" assert isinstance(exc, json.JSONDecodeError) @pytest.mark.asyncio async def test_read_once_drops_malformed_silently_without_callback(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" with p.open("w", encoding="utf-8") as f: f.write("garbage\n") f.write(json.dumps({"i": 1}) + "\n") w = JsonlWatcher(p) # no callback assert await w.read_once() == [{"i": 1}] @pytest.mark.asyncio async def test_read_once_handles_chunk_boundary(tmp_path: Path) -> None: """A record larger than `read_chunk` must still come out whole.""" p = tmp_path / "s.jsonl" big = {"payload": "x" * 8000, "i": 0} small = {"i": 1} _write_records(p, [big, small]) w = JsonlWatcher(p, read_chunk=128) # force many chunks per record assert await w.read_once() == [big, small] @pytest.mark.asyncio async def test_start_offset_skips_initial_content(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" _write_records(p, [{"i": 0}, {"i": 1}]) initial_size = p.stat().st_size # Start a watcher pointed at EOF — it should see only future appends. w = JsonlWatcher(p, start_offset=initial_size) assert await w.read_once() == [] _write_records(p, [{"i": 2}]) assert await w.read_once() == [{"i": 2}] @pytest.mark.asyncio async def test_read_once_resets_on_truncation(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" _write_records(p, [{"i": 0}, {"i": 1}]) w = JsonlWatcher(p) assert await w.read_once() == [{"i": 0}, {"i": 1}] # Truncate (or rotate) — write a brand-new shorter file. p.write_text(json.dumps({"reset": True}) + "\n", encoding="utf-8") assert await w.read_once() == [{"reset": True}] assert w.offset == p.stat().st_size # --- wait_for_file ---------------------------------------------------------- @pytest.mark.asyncio async def test_wait_for_file_returns_immediately_if_exists(tmp_path: Path) -> None: p = tmp_path / "exists.jsonl" p.write_text("", encoding="utf-8") w = JsonlWatcher(p, poll_interval=0.01) # If this doesn't return promptly we'd hang — wrap in a tight timeout. await asyncio.wait_for(w.wait_for_file(timeout=1.0), timeout=1.0) @pytest.mark.asyncio async def test_wait_for_file_picks_up_late_creation(tmp_path: Path) -> None: p = tmp_path / "later.jsonl" w = JsonlWatcher(p, poll_interval=0.01) async def create_later() -> None: await asyncio.sleep(0.05) p.write_text("", encoding="utf-8") creator = asyncio.create_task(create_later()) try: await asyncio.wait_for(w.wait_for_file(timeout=1.0), timeout=1.0) finally: await creator @pytest.mark.asyncio async def test_wait_for_file_times_out(tmp_path: Path) -> None: p = tmp_path / "never.jsonl" w = JsonlWatcher(p, poll_interval=0.01) with pytest.raises(TimeoutError): await w.wait_for_file(timeout=0.05) @pytest.mark.asyncio async def test_wait_for_file_rejects_negative_timeout(tmp_path: Path) -> None: w = JsonlWatcher(tmp_path / "x.jsonl") with pytest.raises(ValueError, match="timeout"): await w.wait_for_file(timeout=-1) # --- tail: long-running async iteration ------------------------------------ @pytest.mark.asyncio async def test_tail_yields_existing_records_first(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" _write_records(p, [{"i": 0}, {"i": 1}]) w = JsonlWatcher(p, poll_interval=0.01) seen: list[dict] = [] async def consume() -> None: async for rec in w.tail(): seen.append(rec) if len(seen) >= 2: return await asyncio.wait_for(consume(), timeout=2.0) assert seen == [{"i": 0}, {"i": 1}] @pytest.mark.asyncio async def test_tail_waits_for_file_then_yields(tmp_path: Path) -> None: p = tmp_path / "delayed.jsonl" w = JsonlWatcher(p, poll_interval=0.01) seen: list[dict] = [] async def consume() -> None: async for rec in w.tail(): seen.append(rec) if len(seen) >= 1: return async def produce() -> None: await asyncio.sleep(0.05) _write_records(p, [{"late": True}]) consumer = asyncio.create_task(consume()) producer = asyncio.create_task(produce()) await asyncio.wait_for(asyncio.gather(consumer, producer), timeout=2.0) assert seen == [{"late": True}] @pytest.mark.asyncio async def test_tail_streams_incremental_appends(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" p.write_text("", encoding="utf-8") w = JsonlWatcher(p, poll_interval=0.01) seen: list[dict] = [] target = [{"i": 0}, {"i": 1}, {"i": 2}, {"i": 3}] async def consume() -> None: async for rec in w.tail(): seen.append(rec) if len(seen) >= len(target): return async def produce() -> None: for rec in target: _write_records(p, [rec]) await asyncio.sleep(0.02) consumer = asyncio.create_task(consume()) producer = asyncio.create_task(produce()) await asyncio.wait_for(asyncio.gather(consumer, producer), timeout=3.0) assert seen == target @pytest.mark.asyncio async def test_tail_handles_appends_arriving_mid_line(tmp_path: Path) -> None: """A record split across two writes (no newline in the first) must arrive as one parsed record once the second chunk lands.""" p = tmp_path / "s.jsonl" p.write_text("", encoding="utf-8") w = JsonlWatcher(p, poll_interval=0.01) seen: list[dict] = [] async def consume() -> None: async for rec in w.tail(): seen.append(rec) if len(seen) >= 1: return async def produce() -> None: # Write the first half, sleep past at least one poll, then the rest. with p.open("a", encoding="utf-8") as f: f.write('{"split":') f.flush() await asyncio.sleep(0.05) with p.open("a", encoding="utf-8") as f: f.write(" true}\n") f.flush() consumer = asyncio.create_task(consume()) producer = asyncio.create_task(produce()) await asyncio.wait_for(asyncio.gather(consumer, producer), timeout=2.0) assert seen == [{"split": True}] @pytest.mark.asyncio async def test_tail_is_cancellable(tmp_path: Path) -> None: p = tmp_path / "s.jsonl" p.write_text("", encoding="utf-8") w = JsonlWatcher(p, poll_interval=0.01) async def consume() -> None: async for _ in w.tail(): pass task = asyncio.create_task(consume()) # Give it a few poll ticks to settle into the idle loop, then cancel. await asyncio.sleep(0.05) task.cancel() with pytest.raises(asyncio.CancelledError): await task