"""``MarkdownFrontend`` — chat-via-markdown-files frontend. Wires: * ``POST /chat {filename, content?, agent?}`` — bearer-authenticated trigger. The plugin in Obsidian fires this after the user edits a ``.md`` and the file gets synced (or with ``content`` to short-circuit the sync delay). We parse the file, check the last turn — assistant → no-op, user → run the agent and append. * ``GET /healthz`` — liveness. Concurrency model: an in-memory ``set[Path]`` of files currently in flight. Two concurrent requests for the same file → the second gets 409. The set is single-process (one gateway instance) — that's by design; the markdown frontend is the only writer in its vault from the gateway side. Cross-frontend logging: when ``log_all_chats=True``, ``configure()`` registers a handler on ``runtime.turn_log_handlers`` so every other frontend's completed turns also land in the vault. The handler logic lives in :mod:`.crossfront` so this module stays focused on the HTTP shape. """ from __future__ import annotations import contextlib import json import logging import os import tempfile import time from pathlib import Path from typing import TYPE_CHECKING, Any import aiofile from anthropic.types import RawContentBlockStopEvent from fastapi import FastAPI, HTTPException, Request, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from beaver_gateway.backends.claude_code import ClaudeCodeBackendAdapter, TurnCapture from beaver_gateway.core import audit from beaver_gateway.core.conversation_store import ( diff_and_fork, load_conversation, load_messages, mint_conversation, rewrite_messages, ) from beaver_gateway.core.turn_record import TurnRecord from beaver_gateway.frontends._accumulate import StreamAccumulator from beaver_gateway.frontends._auth import require_token from beaver_gateway.frontends.base import Frontend from beaver_gateway.frontends.markdown import parser, renderer from beaver_gateway.frontends.markdown.crossfront import ( CrossFrontendLogger, fingerprint_messages, ) if TYPE_CHECKING: from collections.abc import AsyncIterator, Callable from anthropic.types import MessageParam from beaver_gateway.frontends.base import GatewayRuntime _log = logging.getLogger("beaver_gateway.frontends.markdown") __all__ = ["MarkdownFrontend"] # How often we re-render the assistant turn into the .md file while the # backend stream is still open. Trades responsiveness (faster updates to # Obsidian sync / Raycast tailers) against write amplification. Each # ``RawContentBlockStopEvent`` also forces a flush regardless of the # timer, so block boundaries always land in the file. _STREAM_FLUSH_DEBOUNCE = 0.4 # Debounce for the SSE ``/chat/stream`` path. Network IO is cheaper than # atomic file rewrites, so we send updates more frequently — the client # wants the lowest possible latency and we control the renderer on the # other end (the Obsidian plugin splices deltas into the editor, no # disk round-trip). _SSE_FLUSH_DEBOUNCE = 0.1 class MarkdownFrontend(Frontend): """FastAPI app behind ``POST /chat`` driven by Obsidian-vault files.""" def __init__( self, *, vault_path: Path | str, host: str = "0.0.0.0", # noqa: S104 port: int = 8003, default_agent: str | None = None, log_all_chats: bool = False, logged_subdir: str = "_logs", log_path: Callable[[TurnRecord, Path], Path] | None = None, public_base_url: str | None = None, ) -> None: self.vault_path = Path(vault_path).expanduser().resolve() self.host = host self.port = port self.default_agent = default_agent self.log_all_chats = log_all_chats self.logged_subdir = logged_subdir self.log_path = log_path # External URL prefix when behind a reverse proxy — same role as # on the other bearer frontends. Trailing slash trimmed for # idempotent concatenation; ``None`` means "no proxy / advertise # raw host:port". self.public_base_url = public_base_url.rstrip("/") if public_base_url else None self._runtime: GatewayRuntime | None = None self._app: FastAPI | None = None # Files currently being processed by an in-flight ``POST /chat``. # Checked-and-added atomically in the request handler (no # ``await`` between the check and the insert) so a concurrent # request reliably loses the race to 409. self._busy: set[Path] = set() self._crossfront: CrossFrontendLogger | None = None def configure(self, runtime: GatewayRuntime) -> None: self._runtime = runtime self.vault_path.mkdir(parents=True, exist_ok=True) if self.log_all_chats: self._crossfront = CrossFrontendLogger( vault_path=self.vault_path, logged_subdir=self.logged_subdir, log_path=self.log_path, ) # Scan the existing logged files synchronously here so the # fingerprint→path map is populated before the first # cross-frontend turn arrives. Cheap: frontmatter-only read. self._crossfront.warm_index() runtime.turn_log_handlers.append(self._crossfront.handle) self._app = self._build_app(runtime) async def serve(self) -> None: import uvicorn if self._app is None: msg = "configure() must be called before serve()" raise RuntimeError(msg) config = uvicorn.Config( self._app, host=self.host, port=self.port, log_level="info" ) server = uvicorn.Server(config) await server.serve() # ---- app builder --------------------------------------------------- def _build_app(self, runtime: GatewayRuntime) -> FastAPI: app = FastAPI(title="beaver-gateway / Markdown") # ``/chat/stream`` is consumed via ``fetch`` from the Obsidian # plugin (``requestUrl`` can't read a body incrementally), and # ``fetch`` is subject to CORS. Auth is bearer-token so we don't # need credentialed mode; allow any origin and the standard # methods/headers. The other endpoints are happy to ride along. app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) @app.get("/healthz") async def healthz() -> dict[str, str]: return {"status": "ok"} @app.get("/agents") async def list_agents(request: Request) -> dict[str, Any]: await require_token(request, runtime, scope="messages") return {"agents": [{"name": a.name} for a in runtime.agents]} @app.post("/chat") async def chat(request: Request) -> Any: token_name = await require_token(request, runtime, scope="messages") try: body = await request.json() except json.JSONDecodeError as exc: raise HTTPException( status.HTTP_400_BAD_REQUEST, f"invalid JSON: {exc}" ) from exc filename = body.get("filename") if not isinstance(filename, str) or not filename.strip(): raise HTTPException( status.HTTP_400_BAD_REQUEST, "missing or non-string `filename`" ) content_override = body.get("content") agent_override = body.get("agent") if agent_override is not None and not isinstance(agent_override, str): raise HTTPException( status.HTTP_400_BAD_REQUEST, "`agent` must be a string" ) file_path = self._resolve_path(filename) # Atomic check-and-claim: both ops run between awaits, so a # second request can't slip into the same file slot. if file_path in self._busy: return JSONResponse( status_code=status.HTTP_409_CONFLICT, content={"status": "in_progress", "filename": filename}, ) self._busy.add(file_path) try: return await self._handle_chat( runtime=runtime, token_name=token_name, filename=filename, file_path=file_path, content_override=content_override, agent_override=agent_override, ) finally: self._busy.discard(file_path) @app.post("/chat/stream") async def chat_stream(request: Request) -> Any: # Same contract as ``/chat`` (bearer auth, identical body), # but the response is ``text/event-stream`` and intermediate # rendered states are pushed as ``delta`` events. The # gateway-side disk write only happens once, at end of turn, # so streaming consumers (Obsidian plugin) and Obsidian Sync # don't fight over the same file mid-stream. token_name = await require_token(request, runtime, scope="messages") try: body = await request.json() except json.JSONDecodeError as exc: raise HTTPException( status.HTTP_400_BAD_REQUEST, f"invalid JSON: {exc}" ) from exc filename = body.get("filename") if not isinstance(filename, str) or not filename.strip(): raise HTTPException( status.HTTP_400_BAD_REQUEST, "missing or non-string `filename`" ) content_override = body.get("content") agent_override = body.get("agent") if agent_override is not None and not isinstance(agent_override, str): raise HTTPException( status.HTTP_400_BAD_REQUEST, "`agent` must be a string" ) file_path = self._resolve_path(filename) # 409 path stays JSON — the stream hasn't started yet, so # the caller can read it the same way as on ``/chat``. if file_path in self._busy: return JSONResponse( status_code=status.HTTP_409_CONFLICT, content={"status": "in_progress", "filename": filename}, ) self._busy.add(file_path) async def gen() -> AsyncIterator[bytes]: try: async for chunk in self._handle_chat_streaming( runtime=runtime, token_name=token_name, filename=filename, file_path=file_path, content_override=content_override, agent_override=agent_override, ): yield chunk finally: self._busy.discard(file_path) return StreamingResponse( gen(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache, no-transform", "Connection": "keep-alive", # nginx default-buffers SSE bodies; this header tells # both nginx and uvicorn-behind-proxy to flush as we # write. Harmless if the deployment has no reverse # proxy in front. "X-Accel-Buffering": "no", }, ) return app # ---- dispatch ------------------------------------------------------ async def _handle_chat( self, *, runtime: GatewayRuntime, token_name: str, filename: str, file_path: Path, content_override: Any, agent_override: str | None, ) -> Any: if isinstance(content_override, str): await _write_atomic(file_path, content_override) file_text = content_override elif content_override is None: file_text = await _read_or_empty(file_path) else: raise HTTPException( status.HTTP_400_BAD_REQUEST, "`content` must be a string when present" ) parsed = parser.parse(file_text) agent_name = parser.resolve_agent( metadata=parsed.metadata, request_override=agent_override, default=self.default_agent, ) if not agent_name: raise HTTPException( status.HTTP_400_BAD_REQUEST, "no agent specified: pass `agent`, set frontmatter, " "or configure `default_agent`", ) # When the parser produced no messages (file empty / only # frontmatter), there's nothing to dispatch. if not parsed.messages: return { "status": "nothing_to_do", "reason": "empty file", "new_content": file_text, } if parser.last_role(parsed.messages) == "assistant": return { "status": "nothing_to_do", "reason": "last turn is assistant", "new_content": file_text, } agent = runtime.agents.get(agent_name) if agent is None: raise HTTPException( status.HTTP_404_NOT_FOUND, f"unknown agent: {agent_name!r}" ) backend = runtime.backends.get(agent.name) if backend is None: raise HTTPException( status.HTTP_503_SERVICE_UNAVAILABLE, f"no backend configured for agent {agent.name!r}", ) _log.info( "chat: actor=%s agent=%s file=%s msgs=%d", token_name, agent.name, filename, len(parsed.messages), ) await audit.log( runtime, actor=f"token:{token_name}", kind="markdown_chat", agent_name=agent.name, filename=filename, msgs=len(parsed.messages), ) # Resolve / mint the conversation row, align incoming against # stored history, and feed the aligned messages to the backend # — see ``core/conversation_store.py`` for the full rationale. # If the backend isn't claude-code (no ``TurnCapture`` support) # we fall through to the legacy parser-only path. conv, conv_external_id, stored_msgs = await self._resolve_conversation( runtime=runtime, metadata=parsed.metadata, agent_name=agent.name ) outcome = diff_and_fork(stored=stored_msgs, incoming=parsed.turns) capture: TurnCapture | None = ( TurnCapture() if isinstance(backend, ClaudeCodeBackendAdapter) else None ) kwargs: dict[str, Any] = {} if capture is not None: kwargs["capture"] = capture events = backend.complete( agent=agent, messages=outcome.messages, system=None, **kwargs ) try: message = await self._stream_to_file( events=events, file_path=file_path, parsed=parsed, model=agent.model or agent.name, filename=filename, ) except HTTPException: raise except Exception as exc: raise HTTPException( status.HTTP_500_INTERNAL_SERVER_ERROR, f"backend error: {exc}" ) from exc new_content = await self._write_assistant_reply( file_path=file_path, parsed=parsed, message=message, agent_name=agent.name, conv_external_id=conv_external_id, ) await self._persist_canonical_history( runtime=runtime, conversation_id=conv.id, persist_messages=outcome.persist_messages, new_user_text=parsed.turns[-1].text, capture=capture, message=message, ) # Broadcast our own turn so other handlers (none today, but the # symmetry is worth keeping) see what happened. ``source`` marks # the origin so ``CrossFrontendLogger`` can skip its own files. record = TurnRecord( agent_name=agent.name, input_messages=list(parsed.messages), output_message=message, system=None, source="markdown", ) for handler in runtime.turn_log_handlers: try: await handler(record) except Exception: # noqa: BLE001 _log.exception("turn_log_handler raised; continuing") return { "status": "ok", "turns_appended": 1, "agent": agent.name, "new_content": new_content, } # ---- streaming dispatch (SSE) -------------------------------------- async def _handle_chat_streaming( # noqa: PLR0915 — mirrors _handle_chat, splitting only doubles read cost self, *, runtime: GatewayRuntime, token_name: str, filename: str, file_path: Path, content_override: Any, agent_override: str | None, ) -> AsyncIterator[bytes]: """SSE counterpart of :meth:`_handle_chat`. Mirrors the same pipeline (resolve file → parse → resolve agent → run backend → persist), but emits ``event: delta`` frames as the rendered turn grows and a single terminal ``event: done`` / ``event: error``. Errors that ``_handle_chat`` would surface as ``HTTPException`` go out as ``error`` frames here (the HTTP envelope is already 200 by the time the stream starts). Intermediate disk writes are deliberately skipped — only the post-stream :meth:`_write_assistant_reply` lands on disk, so the gateway-side vault and the plugin-side editor are the only writers in their respective halves of Obsidian Sync. Final content is identical on both sides, so Sync no-ops. """ # File-text resolution + early bailouts. ``content_override`` is # still written to disk on the gateway side because that's the # state the rest of the request consumes; it just doesn't keep # ticking after that single write. if isinstance(content_override, str): await _write_atomic(file_path, content_override) file_text = content_override elif content_override is None: file_text = await _read_or_empty(file_path) else: yield _sse_pack( "error", { "status_code": status.HTTP_400_BAD_REQUEST, "detail": "`content` must be a string when present", }, ) return parsed = parser.parse(file_text) agent_name = parser.resolve_agent( metadata=parsed.metadata, request_override=agent_override, default=self.default_agent, ) if not agent_name: yield _sse_pack( "error", { "status_code": status.HTTP_400_BAD_REQUEST, "detail": ( "no agent specified: pass `agent`, set frontmatter, " "or configure `default_agent`" ), }, ) return if not parsed.messages: yield _sse_pack( "done", { "status": "nothing_to_do", "reason": "empty file", "new_content": file_text, }, ) return if parser.last_role(parsed.messages) == "assistant": yield _sse_pack( "done", { "status": "nothing_to_do", "reason": "last turn is assistant", "new_content": file_text, }, ) return agent = runtime.agents.get(agent_name) if agent is None: yield _sse_pack( "error", { "status_code": status.HTTP_404_NOT_FOUND, "detail": f"unknown agent: {agent_name!r}", }, ) return backend = runtime.backends.get(agent.name) if backend is None: yield _sse_pack( "error", { "status_code": status.HTTP_503_SERVICE_UNAVAILABLE, "detail": f"no backend configured for agent {agent.name!r}", }, ) return _log.info( "chat/stream: actor=%s agent=%s file=%s msgs=%d", token_name, agent.name, filename, len(parsed.messages), ) await audit.log( runtime, actor=f"token:{token_name}", kind="markdown_chat_stream", agent_name=agent.name, filename=filename, msgs=len(parsed.messages), ) conv, conv_external_id, stored_msgs = await self._resolve_conversation( runtime=runtime, metadata=parsed.metadata, agent_name=agent.name ) _log.info( "chat/stream: file=%s conv_external_id=%s conv_id=%d " "stored_msgs=%d incoming_turns=%d", filename, conv_external_id, conv.id or -1, len(stored_msgs), len(parsed.turns), ) outcome = diff_and_fork(stored=stored_msgs, incoming=parsed.turns) _log.info( "chat/stream: file=%s diff_and_fork divergence=%s " "backend_msgs=%d persist_msgs=%d", filename, outcome.divergence_index, len(outcome.messages), len(outcome.persist_messages), ) capture: TurnCapture | None = ( TurnCapture() if isinstance(backend, ClaudeCodeBackendAdapter) else None ) kwargs: dict[str, Any] = {} if capture is not None: kwargs["capture"] = capture _log.info( "chat/stream: file=%s calling backend.complete agent=%s capture=%s", filename, agent.name, capture is not None, ) events = backend.complete( agent=agent, messages=outcome.messages, system=None, **kwargs ) acc = StreamAccumulator() model = agent.model or agent.name last_flush = time.monotonic() last_payload: str | None = None def snapshot() -> str | None: partial = acc.finalize(model=model) if not partial.content: return None rendered = renderer.render_assistant_message(partial) new_body = renderer.append_to_body(parsed.body, rendered) return _reattach_frontmatter(parsed.metadata, new_body) try: async for ev in events: acc.feed(ev) now = time.monotonic() if ( isinstance(ev, RawContentBlockStopEvent) or (now - last_flush) >= _SSE_FLUSH_DEBOUNCE ): payload = snapshot() # Skip duplicate snapshots — e.g. tool_use blocks # render to the same prefix as before they closed # (we don't surface the tool-call args in markdown). if payload is not None and payload != last_payload: yield _sse_pack("delta", {"new_content": payload}) last_payload = payload last_flush = now except Exception as exc: # noqa: BLE001 — wire any backend failure as an SSE error frame _log.exception("backend failed for %s", filename) # Mirror the legacy path: write the last partial + an error # callout to disk so other consumers (logs, file watchers) # see what arrived. The client gets a clean SSE ``error``. partial = acc.finalize(model=model) new_body = parsed.body if partial.content: new_body = renderer.append_to_body( new_body, renderer.render_assistant_message(partial) ) new_body = renderer.append_to_body(new_body, _render_error_block(exc)) await _write_atomic( file_path, _reattach_frontmatter(parsed.metadata, new_body) ) yield _sse_pack( "error", { "status_code": status.HTTP_500_INTERNAL_SERVER_ERROR, "detail": f"backend error: {exc}", }, ) return message = acc.finalize(model=model) new_content = await self._write_assistant_reply( file_path=file_path, parsed=parsed, message=message, agent_name=agent.name, conv_external_id=conv_external_id, ) await self._persist_canonical_history( runtime=runtime, conversation_id=conv.id, persist_messages=outcome.persist_messages, new_user_text=parsed.turns[-1].text, capture=capture, message=message, ) record = TurnRecord( agent_name=agent.name, input_messages=list(parsed.messages), output_message=message, system=None, source="markdown", ) for handler in runtime.turn_log_handlers: try: await handler(record) except Exception: # noqa: BLE001 _log.exception("turn_log_handler raised; continuing") yield _sse_pack( "done", { "status": "ok", "turns_appended": 1, "agent": agent.name, "new_content": new_content, }, ) # ---- helpers ------------------------------------------------------- async def _stream_to_file( self, *, events: Any, file_path: Path, parsed: parser.ParsedFile, model: str, filename: str, ) -> Any: """Drain ``events`` into a ``Message``, flushing partials to disk. Flushes happen on each ``RawContentBlockStopEvent`` (natural block boundary, content is markdown-consistent) and on the ``_STREAM_FLUSH_DEBOUNCE`` timer between events. The partial write keeps the as-parsed frontmatter; the post-stream final write in ``_write_assistant_reply`` is what stamps the refreshed fingerprint / agent / conversation_id. On backend exception we still flush the last partial and append an error callout, so the human sees both what arrived and why it stopped. The exception propagates so ``_handle_chat`` can map it to a 500. """ acc = StreamAccumulator() async def flush_partial() -> None: partial = acc.finalize(model=model) if not partial.content: return rendered = renderer.render_assistant_message(partial) new_body = renderer.append_to_body(parsed.body, rendered) await _write_atomic( file_path, _reattach_frontmatter(parsed.metadata, new_body) ) try: last_flush = time.monotonic() async for ev in events: acc.feed(ev) now = time.monotonic() if ( isinstance(ev, RawContentBlockStopEvent) or (now - last_flush) >= _STREAM_FLUSH_DEBOUNCE ): await flush_partial() last_flush = now except Exception as exc: _log.exception("backend failed for %s", filename) partial = acc.finalize(model=model) new_body = parsed.body if partial.content: new_body = renderer.append_to_body( new_body, renderer.render_assistant_message(partial) ) new_body = renderer.append_to_body(new_body, _render_error_block(exc)) await _write_atomic( file_path, _reattach_frontmatter(parsed.metadata, new_body) ) raise return acc.finalize(model=model) async def _write_assistant_reply( self, *, file_path: Path, parsed: parser.ParsedFile, message: Any, agent_name: str, conv_external_id: str, ) -> str: """Render the assistant turn, append to the file, refresh frontmatter.""" rendered = renderer.render_assistant_message(message) new_body = renderer.append_to_body(parsed.body, rendered) new_body = renderer.append_to_body(new_body, renderer.USER_SCAFFOLD) # Recompute fingerprint so a future cross-frontend hit on this # same conversation can find it. Stored as hex string in # frontmatter — only the markdown frontend reads it. assistant_param: MessageParam = { "role": "assistant", "content": _flatten_assistant_text(message), } updated_messages: list[MessageParam] = [*parsed.messages, assistant_param] updated_metadata = dict(parsed.metadata) updated_metadata["agent"] = agent_name updated_metadata["conversation_id"] = conv_external_id updated_metadata["fingerprint"] = fingerprint_messages(updated_messages) new_content = _reattach_frontmatter(updated_metadata, new_body) await _write_atomic(file_path, new_content) return new_content async def _resolve_conversation( self, *, runtime: GatewayRuntime, metadata: dict[str, Any], agent_name: str ) -> tuple[Any, str, list[dict[str, Any]]]: """Resolve the conversation row + stored messages for this request. Looks up by frontmatter ``conversation_id``, mints a new row if missing, and returns ``(conv, external_id, stored_messages)``. ``conv.id`` is guaranteed non-None because both ``load_conversation`` (after refresh on a committed row) and ``mint_conversation`` (post-commit refresh) populate it. We coerce with a runtime check so the rest of the handler can treat it as ``int``. """ raw = metadata.get("conversation_id") lookup_id = raw if isinstance(raw, str) and raw else None async with runtime.db.session() as session: conv = None if lookup_id is not None: conv = await load_conversation( session, frontend="markdown", external_id=lookup_id ) if conv is None: _log.info( "_resolve_conversation: frontmatter conv_id=%s " "not found in DB, will mint new", lookup_id, ) else: _log.info( "_resolve_conversation: LOADED existing conv " "id=%d external_id=%s", conv.id or -1, conv.external_id, ) if conv is None: conv = await mint_conversation( session, frontend="markdown", agent_name=agent_name ) _log.info( "_resolve_conversation: MINTED new conv " "id=%d external_id=%s agent=%s", conv.id or -1, conv.external_id, agent_name, ) if conv.id is None: msg = "conversation row missing primary key after commit" raise RuntimeError(msg) stored = await load_messages(session, conversation_id=conv.id) return conv, conv.external_id, stored async def _persist_canonical_history( self, *, runtime: GatewayRuntime, conversation_id: int, persist_messages: list[dict[str, Any]], new_user_text: str, capture: TurnCapture | None, message: Any, ) -> None: """Stamp the DB with the post-turn canonical Anthropic-shape history. Combines the matched/spliced prior state, the new user prompt, and the synthesized assistant↔tool cycle from the backend (or a text-only fallback for backends without ``TurnCapture``). """ new_user_msg = {"role": "user", "content": new_user_text} synthesized = ( capture.synthesized_messages if capture is not None else _fallback_synthesized(message) ) canonical = [*persist_messages, new_user_msg, *synthesized] _log.info( "_persist_canonical_history: conv_id=%d writing %d msgs " "(prior=%d + new_user + synth=%d)", conversation_id, len(canonical), len(persist_messages), len(synthesized), ) async with runtime.db.session() as session: await rewrite_messages( session, conversation_id=conversation_id, messages=canonical ) _log.info( "_persist_canonical_history: conv_id=%d DB committed", conversation_id ) def _resolve_path(self, filename: str) -> Path: """Resolve ``filename`` under the vault; reject escapes.""" # ``filename`` may be relative or absolute; we always anchor # under ``vault_path`` so absolute paths from outside the vault # don't sneak through. ``Path("/foo/bar")`` combined with a # vault path keeps the absolute side; we strip leading slashes # to coerce the rooted form into a relative path before joining. rel = filename.lstrip("/") if not rel.endswith(".md"): rel = rel + ".md" candidate = (self.vault_path / rel).resolve() try: candidate.relative_to(self.vault_path) except ValueError as exc: raise HTTPException( status.HTTP_400_BAD_REQUEST, f"filename escapes vault: {filename!r}" ) from exc return candidate # ---- module-level utilities ---------------------------------------------- def _sse_pack(event: str, data: dict[str, Any]) -> bytes: r"""Format one Server-Sent Event frame. Uses named events (``event: ``) so the plugin can dispatch on type without parsing JSON discriminators. ``ensure_ascii=False`` so multibyte content rides through verbatim instead of becoming ``\uXXXX`` blobs that bloat the wire. """ body = json.dumps(data, ensure_ascii=False) return f"event: {event}\ndata: {body}\n\n".encode() async def _read_or_empty(path: Path) -> str: """Return file contents, or empty string if the file doesn't exist.""" # ``path.exists()`` here is a metadata stat — microseconds — and # gating an async read on whether the file is there is exactly the # check we want. Switching to anyio.Path / aiofiles.os just to # silence the async-pathlib lint would cost a dep edge for no # practical win. if not path.exists(): # noqa: ASYNC240 return "" async with aiofile.async_open(path, "r", encoding="utf-8") as f: return await f.read() async def _write_atomic(path: Path, content: str) -> None: """Write ``content`` to ``path`` via tmp + ``os.replace`` (atomic).""" path.parent.mkdir(parents=True, exist_ok=True) # ``NamedTemporaryFile`` keeps the file open which complicates # ``os.replace`` on some platforms. Build the tmp name manually, # write+fsync, then rename. Same-directory so the rename is atomic. tmp_name = tempfile.mkstemp( prefix=f".{path.name}.", suffix=".tmp", dir=str(path.parent) ) fd, tmp_path = tmp_name try: async with aiofile.async_open(tmp_path, "w", encoding="utf-8") as f: await f.write(content) os.close(fd) # ``os.replace`` is the atomic primitive — ``Path.replace`` is a # thin wrapper around the same syscall; either works, ``os.`` is # the one Linux/POSIX docs reach for. os.replace(tmp_path, path) # noqa: PTH105 except BaseException: # Cleanup on failure: close fd, remove tmp. with contextlib.suppress(OSError): os.close(fd) with contextlib.suppress(OSError): os.unlink(tmp_path) # noqa: PTH108 raise def _reattach_frontmatter(metadata: dict[str, Any], body: str) -> str: r"""Re-emit a ``.md`` file with YAML frontmatter at the top. Empty metadata → no frontmatter block (avoid littering every file with a hollow ``---\n---``). """ if not metadata: return body if body.endswith("\n") else body + "\n" import frontmatter as _fm post = _fm.Post(content=body, **metadata) return _fm.dumps(post) + "\n" def _fallback_synthesized(message: Any) -> list[dict[str, Any]]: """Build a single-assistant ``synthesized_messages`` list from a raw ``Message``. For backends that don't populate a :class:`TurnCapture` (anthropic HTTP, raycast, …) we don't have access to per-tool-cycle granularity, so the assistant reply lands in the DB as one canonical-block message. Tool memory across cache misses would degrade in that case, but those backends don't have the cache-miss re-seed problem to begin with — they manage history client-side. """ content: list[dict[str, Any]] = [] for block in getattr(message, "content", ()): btype = getattr(block, "type", None) if btype == "text": content.append({"type": "text", "text": getattr(block, "text", "") or ""}) elif btype == "tool_use": content.append( { "type": "tool_use", "id": getattr(block, "id", ""), "name": getattr(block, "name", ""), "input": getattr(block, "input", {}), } ) elif btype == "thinking": content.append( { "type": "thinking", "thinking": getattr(block, "thinking", "") or "", "signature": getattr(block, "signature", "") or "", } ) if not content: return [] return [{"role": "assistant", "content": content}] def _flatten_assistant_text(message: Any) -> str: """Pull all text blocks from an assistant ``Message`` and join them. Used when we need the assistant content as a plain string for fingerprinting / equality with a parser-shaped history (parser already drops thinking + tool_use from assistant turns). """ chunks = [ getattr(block, "text", "") or "" for block in getattr(message, "content", ()) if getattr(block, "type", None) == "text" ] return "\n\n".join(c for c in chunks if c) def _render_error_block(exc: BaseException) -> str: """Render a backend failure as an Assistant turn with a ``[!error]-`` callout.""" msg = str(exc) or exc.__class__.__name__ safe = msg.replace("\n", " ").strip() return f"### Assistant:\n\n> [!error]-\n> {safe}\n"