From 3dc780c74c73613f87fb5638cd06daa9157357dd Mon Sep 17 00:00:00 2001
From: h
Date: Wed, 20 May 2026 21:30:10 +0200
Subject: [PATCH] refactor: add markdown frontend
---
examples/config.py | 58 +++
pyproject.toml | 2 +
src/beaver_gateway/core/turn_record.py | 133 ++++++
src/beaver_gateway/frontends/_accumulate.py | 146 +++++++
src/beaver_gateway/frontends/_auth.py | 53 +++
.../frontends/admin/frontend.py | 76 +++-
.../frontends/admin/templates/dashboard.html | 50 +++
src/beaver_gateway/frontends/anthropic.py | 222 ++++------
src/beaver_gateway/frontends/base.py | 17 +-
.../frontends/markdown/__init__.py | 15 +
.../frontends/markdown/crossfront.py | 302 +++++++++++++
.../frontends/markdown/frontend.py | 404 ++++++++++++++++++
.../frontends/markdown/parser.py | 187 ++++++++
.../frontends/markdown/renderer.py | 181 ++++++++
uv.lock | 16 +
15 files changed, 1721 insertions(+), 141 deletions(-)
create mode 100644 src/beaver_gateway/core/turn_record.py
create mode 100644 src/beaver_gateway/frontends/_accumulate.py
create mode 100644 src/beaver_gateway/frontends/_auth.py
create mode 100644 src/beaver_gateway/frontends/markdown/__init__.py
create mode 100644 src/beaver_gateway/frontends/markdown/crossfront.py
create mode 100644 src/beaver_gateway/frontends/markdown/frontend.py
create mode 100644 src/beaver_gateway/frontends/markdown/parser.py
create mode 100644 src/beaver_gateway/frontends/markdown/renderer.py
diff --git a/examples/config.py b/examples/config.py
index b70d14b..0eb0717 100644
--- a/examples/config.py
+++ b/examples/config.py
@@ -12,12 +12,30 @@ from beaver_gateway.agents.base import ExposedMcp
from beaver_gateway.agents.claude import ClaudeAgent
from beaver_gateway.agents.raycast import RaycastAgent, RemoteTool, UserPreferences
from beaver_gateway.core.registry import Gateway
+from beaver_gateway.core.turn_record import TurnRecord, slugify
from beaver_gateway.frontends.admin import AdminFrontend
from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend
+from beaver_gateway.frontends.markdown import MarkdownFrontend
from beaver_gateway.frontends.mcp_server import McpServerFrontend
from beaver_gateway.mcp.types import McpServer
+def chat_log_path(record: TurnRecord, vault: Path) -> Path:
+ """Decide where a logged chat from another frontend lands in the vault.
+
+ Called by ``MarkdownFrontend`` (with ``log_all_chats=True``) the first
+ time a conversation needs a file — continuation turns are matched by
+ fingerprint and stick to the file picked here. Return value can be
+ absolute or relative; relative paths are anchored under ``vault``.
+
+ Layout below: ``//_.md`` where
+ ``topic`` is a slug of the very first user message in the chat.
+ """
+ today = date.today()
+ topic = slugify(record.first_user_text, maxlen=40)
+ return vault / f"{today:%Y-%m}" / f"{today:%Y-%m-%d}_{topic}.md"
+
+
def current_time() -> str:
"""Return the current local time as an ISO-8601 string.
@@ -162,5 +180,45 @@ gateway = Gateway(
# `messages` only work on `/v1/messages`; `mcp` only on
# `/mcp/`; `*` works everywhere.
AdminFrontend(host="0.0.0.0", port=8002),
+ # Obsidian-vault chat frontend. Each `.md` is one conversation
+ # (User/Assistant turn pairs). The Obsidian companion plugin
+ # POSTs `{filename, content?}` to `/chat` — the frontend reads
+ # the file, runs the agent if the last turn is `user`, and
+ # appends the assistant reply back. With `log_all_chats=True`
+ # *every* turn (from Anthropic Messages too) is mirrored into
+ # `{vault}/_logs//` so the vault is the central archive.
+ #
+ # `vault_path` here points at a per-restart tempdir so the
+ # example boots cleanly; in real deployments mount the
+ # Obsidian-sync container's vault volume to a stable path and
+ # pass that instead.
+ MarkdownFrontend(
+ host="0.0.0.0",
+ port=8003,
+ # Point at the dedicated chats subdir of your real Obsidian
+ # vault — the gateway has no idea (and no need) about other
+ # notes outside it. Path resolution / vault-escape checks
+ # are anchored here, so absolute-path attempts (and ``..``
+ # tricks) can't reach notes alongside it.
+ #
+ # vault_path=Path("/Users/me/Obsidian/Personal/chats"),
+ #
+ # Per-restart tempdir kept here so the example boots even
+ # without a real vault on the host.
+ vault_path=Path(tempfile.mkdtemp(prefix="beaver-vault-")).resolve(),
+ default_agent="research",
+ log_all_chats=True,
+ # ``log_path`` (optional) overrides the default
+ # ``{vault}/_logs//_.md`` layout for chats
+ # logged from OTHER frontends (Anthropic Messages, admin
+ # in-browser chat). Defined as a top-of-file function so the
+ # types are explicit and the IDE can hover them; a lambda
+ # works too, but a real ``def`` keeps the signature visible
+ # and lets you docstring it. Heads up: any custom path
+ # forces ``warm_index`` to scan the entire vault on startup
+ # so the fingerprint→file map survives a restart no matter
+ # where you put files.
+ log_path=chat_log_path,
+ ),
],
)
diff --git a/pyproject.toml b/pyproject.toml
index f31f91b..0a5751e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -7,6 +7,7 @@ authors = [
]
requires-python = ">=3.13"
dependencies = [
+ "aiofile>=3.11.1",
"aiohttp>=3.13.5",
"aiosqlite>=0.22.1",
"anthropic>=0.103.0",
@@ -20,6 +21,7 @@ dependencies = [
"psycopg[binary]>=3.3.4",
"pydantic>=2.13.4",
"pydantic-settings>=2.14.1",
+ "python-frontmatter>=1.2.0",
"sqlmodel>=0.0.38",
"uvicorn[standard]>=0.47.0",
"uvloop>=0.22.1",
diff --git a/src/beaver_gateway/core/turn_record.py b/src/beaver_gateway/core/turn_record.py
new file mode 100644
index 0000000..6c06085
--- /dev/null
+++ b/src/beaver_gateway/core/turn_record.py
@@ -0,0 +1,133 @@
+"""Cross-frontend turn record.
+
+Frontends that finish a turn (the Anthropic Messages frontend, the
+markdown frontend) emit a :class:`TurnRecord` to every handler in
+``GatewayRuntime.turn_log_handlers``. The markdown frontend uses this
+to persist chats from other frontends into the Obsidian vault — see
+``frontends/markdown/crossfront.py``.
+
+Kept tiny on purpose: it carries the structured-enough payload a logger
+needs (which agent ran, input history, the assembled assistant reply)
+and nothing else. The full event stream is gone by the time handlers
+run — if a future consumer needs deltas it would subscribe at a lower
+level, not here.
+"""
+
+from __future__ import annotations
+
+import re
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any
+
+if TYPE_CHECKING:
+ from anthropic.types import Message, MessageParam
+
+
+__all__ = ["TurnRecord", "slugify"]
+
+
+# Filesystem-safe slug: collapse anything that isn't a word char or
+# space/hyphen to a hyphen, then squash runs of separators. Aimed at
+# letting users build filenames from ``record.first_user_text`` without
+# hand-rolling sanitization in every config.
+_SLUG_BAD_RE = re.compile(r"[^\w\s\-]+", flags=re.UNICODE)
+_SLUG_SEP_RE = re.compile(r"[\s\-]+", flags=re.UNICODE)
+
+
+def slugify(text: str, *, maxlen: int = 40) -> str:
+ """Sanitize ``text`` for use as a filename fragment.
+
+ Strips punctuation, collapses whitespace/hyphens into single ``-``,
+ and truncates to ``maxlen``. Returns ``"untitled"`` for empty input.
+ Unicode letters are preserved (Obsidian handles them fine; macOS
+ and modern Linux fs's too).
+ """
+ cleaned = _SLUG_BAD_RE.sub(" ", text).strip()
+ cleaned = _SLUG_SEP_RE.sub("-", cleaned).strip("-")
+ if not cleaned:
+ return "untitled"
+ if len(cleaned) > maxlen:
+ cleaned = cleaned[:maxlen].rstrip("-")
+ return cleaned or "untitled"
+
+
+@dataclass(frozen=True, slots=True)
+class TurnRecord:
+ """One completed turn, as seen by a frontend.
+
+ ``input_messages`` is the conversation history sent to the backend
+ (everything *before* the assistant reply). ``output_message`` is the
+ finalized assistant ``Message`` (post-accumulation, with all content
+ blocks attached). ``system`` is the per-request system prompt if any
+ — agents own their own ``system_prompt``, this is the override the
+ caller passed; most handlers can ignore it.
+
+ ``source`` names the frontend that produced the record so the
+ cross-frontend logger can avoid logging its own turns (markdown
+ frontend writing the file would otherwise also receive its own
+ broadcast and double-write).
+ """
+
+ agent_name: str
+ input_messages: list[MessageParam]
+ output_message: Message
+ system: str | None = None
+ source: str = "anthropic"
+
+ @property
+ def first_user_text(self) -> str:
+ """Plain text of the *earliest* user turn in this conversation.
+
+ Useful for naming new files by topic. Empty string if the input
+ history somehow has no user turn (shouldn't happen — turns are
+ broadcast only after a user → assistant cycle).
+ """
+ for msg in self.input_messages:
+ if msg.get("role") == "user":
+ return _text_of(msg.get("content", ""))
+ return ""
+
+ @property
+ def last_user_text(self) -> str:
+ """Plain text of the most recent user turn (the trigger)."""
+ for msg in reversed(self.input_messages):
+ if msg.get("role") == "user":
+ return _text_of(msg.get("content", ""))
+ return ""
+
+ @property
+ def assistant_text(self) -> str:
+ """Plain text of the assistant reply, with thinking/tool blocks dropped."""
+ chunks = [
+ getattr(block, "text", "") or ""
+ for block in getattr(self.output_message, "content", ())
+ if getattr(block, "type", None) == "text"
+ ]
+ return "\n\n".join(c for c in chunks if c)
+
+
+def _text_of(content: object) -> str:
+ """Flatten an Anthropic ``MessageParam.content`` field to a plain string.
+
+ Handles both shapes the SDK accepts: raw string, or a list of block
+ dicts (we pull ``text`` blocks only). Anything we don't recognize is
+ silently skipped — these helpers exist for naming files, not for
+ faithful content reconstruction.
+ """
+ if isinstance(content, str):
+ return content
+ if isinstance(content, list):
+ parts: list[str] = []
+ for blk in content:
+ if not isinstance(blk, dict):
+ continue
+ # ``MessageParam.content`` is typed as a union of typed-dicts
+ # per Anthropic SDK; we only care about plain ``text`` blocks
+ # and look them up via ``Any`` to dodge the keyed-typed-dict
+ # variance gymnastics (``ty`` won't let an open dict alias a
+ # closed-keyed one).
+ d: Any = blk
+ if d.get("type") == "text":
+ parts.append(str(d.get("text", "")))
+ return "\n\n".join(p for p in parts if p)
+ return ""
diff --git a/src/beaver_gateway/frontends/_accumulate.py b/src/beaver_gateway/frontends/_accumulate.py
new file mode 100644
index 0000000..5e33609
--- /dev/null
+++ b/src/beaver_gateway/frontends/_accumulate.py
@@ -0,0 +1,146 @@
+"""Collapse an Anthropic event stream into one ``Message``.
+
+Extracted from ``AnthropicMessagesFrontend`` so the markdown frontend
+can run the same accumulation logic when it wants the finalized turn
+rather than raw SSE chunks. Mirrors the Anthropic SDK's own accumulator:
+walks events, builds block dicts indexed by their ``content_block``
+index, folds text / thinking deltas in, buffers ``input_json_delta``
+chunks until the block closes (then JSON-parses them once).
+"""
+
+from __future__ import annotations
+
+import json
+from typing import TYPE_CHECKING, Any, Literal, cast
+
+from anthropic.types import (
+ InputJSONDelta,
+ Message,
+ RawContentBlockDeltaEvent,
+ RawContentBlockStartEvent,
+ RawContentBlockStopEvent,
+ RawMessageDeltaEvent,
+ RawMessageStartEvent,
+ SignatureDelta,
+ TextBlock,
+ TextDelta,
+ ThinkingBlock,
+ ThinkingDelta,
+ ToolUseBlock,
+ Usage,
+)
+
+if TYPE_CHECKING:
+ from collections.abc import AsyncIterator
+
+ from beaver_gateway.core.events import MessageStreamEvent, StopReason
+
+
+__all__ = ["StreamAccumulator", "accumulate"]
+
+
+class StreamAccumulator:
+ """Folds a stream of events into one ``Message``, incrementally.
+
+ Use when you need to *both* forward events somewhere (SSE) *and*
+ keep a finalized ``Message`` for post-stream work (audit, logging
+ to disk). Call :meth:`feed` for each event, :meth:`finalize` once.
+ """
+
+ __slots__ = (
+ "_blocks",
+ "_json_buffers",
+ "_message_id",
+ "_role",
+ "_stop_reason",
+ "_stop_sequence",
+ "_usage",
+ )
+
+ def __init__(self) -> None:
+ self._message_id: str = ""
+ self._role: str = "assistant"
+ self._usage: Usage = Usage(input_tokens=0, output_tokens=0)
+ self._blocks: dict[int, dict[str, Any]] = {}
+ self._json_buffers: dict[int, str] = {}
+ self._stop_reason: str | None = None
+ self._stop_sequence: str | None = None
+
+ def feed(self, ev: MessageStreamEvent) -> None:
+ # isinstance, not ``ev.type == "..."``: ty narrows on the
+ # discriminator only via the class, and the raw event union
+ # carries its own discriminators (``Raw*Event``) the SDK
+ # already promises.
+ if isinstance(ev, RawMessageStartEvent):
+ self._message_id = ev.message.id
+ self._role = ev.message.role
+ self._usage = ev.message.usage
+ elif isinstance(ev, RawContentBlockStartEvent):
+ self._blocks[ev.index] = ev.content_block.model_dump()
+ if self._blocks[ev.index].get("type") == "tool_use":
+ self._json_buffers[ev.index] = ""
+ elif isinstance(ev, RawContentBlockDeltaEvent):
+ blk = self._blocks.setdefault(ev.index, {})
+ delta = ev.delta
+ if isinstance(delta, TextDelta):
+ blk["text"] = blk.get("text", "") + delta.text
+ elif isinstance(delta, InputJSONDelta):
+ self._json_buffers[ev.index] = (
+ self._json_buffers.get(ev.index, "") + delta.partial_json
+ )
+ elif isinstance(delta, ThinkingDelta):
+ blk["thinking"] = blk.get("thinking", "") + delta.thinking
+ elif isinstance(delta, SignatureDelta):
+ blk["signature"] = delta.signature
+ elif isinstance(ev, RawContentBlockStopEvent):
+ blk = self._blocks.get(ev.index, {})
+ if blk.get("type") == "tool_use":
+ raw = self._json_buffers.pop(ev.index, "")
+ blk["input"] = json.loads(raw) if raw.strip() else {}
+ elif isinstance(ev, RawMessageDeltaEvent):
+ self._stop_reason = ev.delta.stop_reason
+ self._stop_sequence = ev.delta.stop_sequence
+ if ev.usage.output_tokens:
+ self._usage = Usage.model_validate(
+ {
+ **self._usage.model_dump(),
+ "output_tokens": ev.usage.output_tokens,
+ }
+ )
+
+ def finalize(self, *, model: str) -> Message:
+ content: list[Any] = []
+ for idx in sorted(self._blocks):
+ bd = self._blocks[idx]
+ btype = bd.get("type")
+ if btype == "text":
+ content.append(TextBlock.model_validate(bd))
+ elif btype == "tool_use":
+ content.append(ToolUseBlock.model_validate(bd))
+ elif btype == "thinking":
+ content.append(ThinkingBlock.model_validate(bd))
+
+ # ``role`` is always ``"assistant"`` at the wire level — we
+ # initialised the field to that and only overwrite from a
+ # ``RawMessageStartEvent`` which itself carries the same literal.
+ # The cast keeps both type-checkers happy without a runtime check.
+ return Message(
+ id=self._message_id or "msg_unknown",
+ type="message",
+ role=cast("Literal['assistant']", self._role),
+ model=model,
+ content=content,
+ stop_reason=cast("StopReason | None", self._stop_reason),
+ stop_sequence=self._stop_sequence,
+ usage=self._usage,
+ )
+
+
+async def accumulate(
+ events: AsyncIterator[MessageStreamEvent], *, model: str
+) -> Message:
+ """Drain ``events`` into one ``Message`` (used by ``stream=false`` paths)."""
+ acc = StreamAccumulator()
+ async for ev in events:
+ acc.feed(ev)
+ return acc.finalize(model=model)
diff --git a/src/beaver_gateway/frontends/_auth.py b/src/beaver_gateway/frontends/_auth.py
new file mode 100644
index 0000000..1be5cb2
--- /dev/null
+++ b/src/beaver_gateway/frontends/_auth.py
@@ -0,0 +1,53 @@
+"""Shared bearer-token verification for HTTP frontends.
+
+Extracted from ``AnthropicMessagesFrontend`` so the markdown frontend
+(and any future bearer-protected frontend) can reuse one canonical
+verifier instead of copy-pasting the header-parsing dance.
+"""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from fastapi import HTTPException, status
+
+if TYPE_CHECKING:
+ from fastapi import Request
+
+ from beaver_gateway.frontends.base import GatewayRuntime
+
+
+__all__ = ["require_token"]
+
+
+async def require_token(
+ request: Request, runtime: GatewayRuntime, *, scope: str
+) -> str:
+ """Verify the request's bearer + scope, return the token's audit name.
+
+ Accepts both ``X-Api-Key: `` (Anthropic SDK / LibreChat) and
+ ``Authorization: Bearer `` (curl, Cursor). 401 on missing /
+ unknown token; 403 on a known token whose scope doesn't cover
+ ``scope``. Bootstrap tokens implicitly carry ``"*"`` and pass every
+ scope check.
+ """
+ api_key = request.headers.get("x-api-key")
+ identity = (
+ await runtime.token_store.verify(api_key)
+ if api_key
+ else await runtime.token_store.verify_bearer(
+ request.headers.get("authorization")
+ )
+ )
+ if identity is None:
+ raise HTTPException(
+ status.HTTP_401_UNAUTHORIZED,
+ "invalid or missing bearer token",
+ headers={"WWW-Authenticate": "Bearer"},
+ )
+ if not identity.allows(scope):
+ raise HTTPException(
+ status.HTTP_403_FORBIDDEN,
+ f"token scope {identity.scope!r} does not cover {scope!r}",
+ )
+ return identity.name
diff --git a/src/beaver_gateway/frontends/admin/frontend.py b/src/beaver_gateway/frontends/admin/frontend.py
index 44407c5..2408d05 100644
--- a/src/beaver_gateway/frontends/admin/frontend.py
+++ b/src/beaver_gateway/frontends/admin/frontend.py
@@ -45,6 +45,8 @@ from jinja2 import Environment, PackageLoader, select_autoescape
from beaver_gateway.core import audit
from beaver_gateway.core.auth import VALID_SCOPES, hash_token
+from beaver_gateway.core.turn_record import TurnRecord
+from beaver_gateway.frontends._accumulate import StreamAccumulator
from beaver_gateway.frontends.base import Frontend
from beaver_gateway.storage import (
create_token,
@@ -57,6 +59,8 @@ if TYPE_CHECKING:
from collections.abc import AsyncIterator
from datetime import datetime
+ from anthropic.types import MessageParam
+
from beaver_gateway.core.events import MessageStreamEvent
from beaver_gateway.frontends.base import GatewayRuntime
@@ -449,7 +453,15 @@ class AdminFrontend(Frontend):
system=system if isinstance(system, str) else None,
)
return StreamingResponse(
- _sse_events(events), media_type="text/event-stream"
+ _sse_events_and_broadcast(
+ events,
+ runtime=runtime,
+ agent_name=agent.name,
+ input_messages=messages,
+ system=system if isinstance(system, str) else None,
+ model=agent.model or agent.name,
+ ),
+ media_type="text/event-stream",
)
return app
@@ -510,23 +522,31 @@ def _build_endpoint_catalog(
and may have non-trivial transitive deps (aiohttp etc.).
"""
from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend
+ from beaver_gateway.frontends.markdown import MarkdownFrontend
from beaver_gateway.frontends.mcp_server import McpServerFrontend
scheme = request.headers.get("x-forwarded-proto") or request.url.scheme
hostname = request.url.hostname or "localhost"
- def _base_for(fe: AnthropicMessagesFrontend | McpServerFrontend) -> str:
+ def _base_for(
+ fe: AnthropicMessagesFrontend | McpServerFrontend | MarkdownFrontend,
+ ) -> str:
if fe.public_base_url:
return fe.public_base_url
return f"{scheme}://{hostname}:{fe.port}"
anthropic_base: str | None = None
mcp_base: str | None = None
+ markdown_fe: MarkdownFrontend | None = None
+ markdown_base: str | None = None
for fe in runtime.frontends:
if isinstance(fe, AnthropicMessagesFrontend) and anthropic_base is None:
anthropic_base = _base_for(fe)
elif isinstance(fe, McpServerFrontend) and mcp_base is None:
mcp_base = _base_for(fe)
+ elif isinstance(fe, MarkdownFrontend) and markdown_fe is None:
+ markdown_fe = fe
+ markdown_base = _base_for(fe)
agent_rows: list[dict[str, Any]] = []
if anthropic_base is not None:
@@ -554,27 +574,57 @@ def _build_endpoint_catalog(
{"namespace": "all", "kind": "bundle", "url": f"{mcp_base}/all/"}
)
+ markdown_row: dict[str, Any] | None = None
+ if markdown_fe is not None and markdown_base is not None:
+ # Pick a sane example filename from the vault, prefer something
+ # the user can recognise. Falls back to a generic ``chat.md``.
+ sample_agent = markdown_fe.default_agent or next(
+ (a.name for a in runtime.agents), ""
+ )
+ markdown_row = {
+ "url": f"{markdown_base}/chat",
+ "vault_path": str(markdown_fe.vault_path),
+ "default_agent": markdown_fe.default_agent,
+ "log_all_chats": markdown_fe.log_all_chats,
+ "logged_subdir": markdown_fe.logged_subdir,
+ "sample_agent": sample_agent,
+ }
+
return {
"anthropic_base": anthropic_base,
"mcp_base": mcp_base,
+ "markdown_base": markdown_base,
"agents": agent_rows,
"mcps": mcp_rows,
+ "markdown": markdown_row,
}
-async def _sse_events(
+async def _sse_events_and_broadcast(
events: AsyncIterator[MessageStreamEvent],
+ *,
+ runtime: GatewayRuntime,
+ agent_name: str,
+ input_messages: list[MessageParam],
+ system: str | None,
+ model: str,
) -> AsyncIterator[bytes]:
- r"""Serialize a backend stream into Anthropic's ``text/event-stream`` form.
+ r"""Serialize a backend stream as SSE and broadcast a ``TurnRecord`` after.
Same wire shape as :mod:`beaver_gateway.frontends.anthropic` —
duplicated rather than imported so the admin frontend stays
independent of that module's private helpers, and so a mid-stream
failure surfaces as an in-band ``error`` event the chat UI can
- render rather than a dangling connection.
+ render rather than a dangling connection. The events also feed a
+ side ``StreamAccumulator`` so once the SSE response closes we hand
+ the assembled ``Message`` to ``runtime.turn_log_handlers`` (the
+ markdown frontend's archive logger lives there). ``source="admin"``
+ so the cross-frontend logger knows where the turn came from.
"""
+ acc = StreamAccumulator()
try:
async for ev in events:
+ acc.feed(ev)
payload = ev.model_dump_json()
yield f"event: {ev.type}\ndata: {payload}\n\n".encode()
except Exception as exc: # noqa: BLE001
@@ -583,6 +633,22 @@ async def _sse_events(
{"type": "error", "error": {"type": "api_error", "message": str(exc)}}
)
yield f"event: error\ndata: {err}\n\n".encode()
+ return
+ if not runtime.turn_log_handlers:
+ return
+ message = acc.finalize(model=model)
+ record = TurnRecord(
+ agent_name=agent_name,
+ input_messages=list(input_messages),
+ output_message=message,
+ system=system,
+ source="admin",
+ )
+ for handler in runtime.turn_log_handlers:
+ try:
+ await handler(record)
+ except Exception: # noqa: BLE001
+ _log.exception("turn_log_handler raised; continuing")
def _set_session_cookie(response: Response, value: str) -> None:
diff --git a/src/beaver_gateway/frontends/admin/templates/dashboard.html b/src/beaver_gateway/frontends/admin/templates/dashboard.html
index 09dfdf5..7c44e74 100644
--- a/src/beaver_gateway/frontends/admin/templates/dashboard.html
+++ b/src/beaver_gateway/frontends/admin/templates/dashboard.html
@@ -134,6 +134,41 @@
add one to Gateway(frontends=[...]) to expose them over HTTP.
+ {% if endpoints.markdown.default_agent %}
+ {{ endpoints.markdown.default_agent }}
+ {% else %}
+ — (request must set agent or frontmatter)
+ {% endif %}
+
+
{{ endpoints.markdown.url }}
+
+
+
+
+
+
+
+
+
+ {% endif %}
{% else %}
Nothing exposed yet — declare agents / MCPs and the matching frontends
@@ -284,6 +319,21 @@
" " + url,
].join("\n");
}
+ if (kind === "markdown") {
+ const agent = row.getAttribute("data-sample-agent") || "";
+ const body = JSON.stringify({
+ filename: "example.md",
+ content: "### User:\n\nhello\n",
+ agent: agent,
+ });
+ return [
+ "curl \\",
+ " -H " + escSh("Authorization: Bearer " + tok) + " \\",
+ " -H 'content-type: application/json' \\",
+ " -d " + escSh(body) + " \\",
+ " " + url,
+ ].join("\n");
+ }
if (kind === "mcp") {
// Streamable-HTTP MCP wants `initialize` first — the response
// carries the `Mcp-Session-Id` header you must echo back on
diff --git a/src/beaver_gateway/frontends/anthropic.py b/src/beaver_gateway/frontends/anthropic.py
index 9e14e94..ff9bd7d 100644
--- a/src/beaver_gateway/frontends/anthropic.py
+++ b/src/beaver_gateway/frontends/anthropic.py
@@ -21,31 +21,20 @@ import json
import logging
from typing import TYPE_CHECKING, Any
-from anthropic.types import (
- InputJSONDelta,
- Message,
- RawContentBlockDeltaEvent,
- RawContentBlockStartEvent,
- RawContentBlockStopEvent,
- RawMessageDeltaEvent,
- RawMessageStartEvent,
- SignatureDelta,
- TextBlock,
- TextDelta,
- ThinkingBlock,
- ThinkingDelta,
- ToolUseBlock,
- Usage,
-)
from fastapi import FastAPI, HTTPException, Request, status
from fastapi.responses import JSONResponse, StreamingResponse
from beaver_gateway.core import audit
+from beaver_gateway.core.turn_record import TurnRecord
+from beaver_gateway.frontends._accumulate import StreamAccumulator, accumulate
+from beaver_gateway.frontends._auth import require_token
from beaver_gateway.frontends.base import Frontend
if TYPE_CHECKING:
from collections.abc import AsyncIterator
+ from anthropic.types import Message, MessageParam
+
from beaver_gateway.core.events import MessageStreamEvent
from beaver_gateway.frontends.base import GatewayRuntime
@@ -109,7 +98,7 @@ class AnthropicMessagesFrontend(Frontend):
@app.get("/v1/models")
async def list_models(request: Request) -> dict[str, Any]:
- await _require_token(request, runtime, scope="messages")
+ await require_token(request, runtime, scope="messages")
data = [
{
"type": "model",
@@ -123,7 +112,7 @@ class AnthropicMessagesFrontend(Frontend):
@app.post("/v1/messages")
async def create_message(request: Request) -> Any:
- token_name = await _require_token(request, runtime, scope="messages")
+ token_name = await require_token(request, runtime, scope="messages")
try:
body = await request.json()
except json.JSONDecodeError as exc:
@@ -193,58 +182,63 @@ class AnthropicMessagesFrontend(Frontend):
**options,
)
+ system_str = system if isinstance(system, str) else None
+
if stream_flag:
- return StreamingResponse(_sse(events), media_type="text/event-stream")
- message = await _accumulate(events, model=model)
+ # Side-accumulate while streaming so we can still emit a
+ # ``TurnRecord`` after the response closes. The buffered
+ # ``Message`` lives only in this coroutine's frame; SSE
+ # bytes still flow to the client unchanged.
+ return StreamingResponse(
+ _sse_and_broadcast(
+ events,
+ runtime=runtime,
+ agent_name=agent.name,
+ input_messages=messages,
+ system=system_str,
+ model=model,
+ ),
+ media_type="text/event-stream",
+ )
+ message = await accumulate(events, model=model)
+ await _broadcast_turn(
+ runtime,
+ agent_name=agent.name,
+ input_messages=messages,
+ output_message=message,
+ system=system_str,
+ )
return JSONResponse(content=message.model_dump(mode="json"))
return app
-async def _require_token(
- request: Request, runtime: GatewayRuntime, *, scope: str
-) -> str:
- """Verify the request's bearer + scope, return the token's audit name.
-
- Accepts both ``X-Api-Key: `` (what the official Anthropic
- SDK sends — LibreChat, the CLI, third-party clients) and
- ``Authorization: Bearer `` (curl, Cursor). 401 on missing /
- unknown token; 403 on a known token whose scope doesn't cover
- ``scope``. Bootstrap tokens implicitly carry ``"*"`` and pass
- every scope check.
- """
- api_key = request.headers.get("x-api-key")
- identity = (
- await runtime.token_store.verify(api_key)
- if api_key
- else await runtime.token_store.verify_bearer(
- request.headers.get("authorization")
- )
- )
- if identity is None:
- raise HTTPException(
- status.HTTP_401_UNAUTHORIZED,
- "invalid or missing bearer token",
- headers={"WWW-Authenticate": "Bearer"},
- )
- if not identity.allows(scope):
- raise HTTPException(
- status.HTTP_403_FORBIDDEN,
- f"token scope {identity.scope!r} does not cover {scope!r}",
- )
- return identity.name
-
-
-async def _sse(events: AsyncIterator[MessageStreamEvent]) -> AsyncIterator[bytes]:
- r"""Serialize an event stream into Anthropic's ``text/event-stream`` form.
+async def _sse_and_broadcast(
+ events: AsyncIterator[MessageStreamEvent],
+ *,
+ runtime: GatewayRuntime,
+ agent_name: str,
+ input_messages: list[MessageParam],
+ system: str | None,
+ model: str,
+) -> AsyncIterator[bytes]:
+ r"""Serialize an event stream to SSE; broadcast a :class:`TurnRecord` after.
Each event becomes ``event: \ndata: \n\n`` — the shape
the Anthropic SDK's SSE decoder expects. Errors mid-stream are
swallowed into a synthetic ``error`` event so the client sees the
failure rather than a hung connection.
+
+ The same events feed a :class:`StreamAccumulator` on the side so that
+ once the SSE response closes we can hand a fully-assembled
+ ``Message`` to every ``runtime.turn_log_handlers`` entry (the
+ markdown frontend's archive logger lives in there). Broadcast
+ failures are caught — they must never bubble up to the client.
"""
+ acc = StreamAccumulator()
try:
async for ev in events:
+ acc.feed(ev)
payload = ev.model_dump_json()
yield f"event: {ev.type}\ndata: {payload}\n\n".encode()
except Exception as exc: # noqa: BLE001
@@ -253,83 +247,41 @@ async def _sse(events: AsyncIterator[MessageStreamEvent]) -> AsyncIterator[bytes
{"type": "error", "error": {"type": "api_error", "message": str(exc)}}
)
yield f"event: error\ndata: {err}\n\n".encode()
-
-
-async def _accumulate(
- events: AsyncIterator[MessageStreamEvent], *, model: str
-) -> Message:
- """Collapse a stream-event sequence into one ``Message`` (``stream=false``).
-
- Mirrors the Anthropic SDK's own accumulator: walk events, build
- block dicts indexed by their ``content_block`` index, fold text /
- thinking deltas in, buffer ``input_json_delta`` chunks until the
- block closes (then JSON-parse them once).
- """
- message_id = ""
- role = "assistant"
- usage = Usage(input_tokens=0, output_tokens=0)
- blocks: dict[int, dict[str, Any]] = {}
- json_buffers: dict[int, str] = {}
- stop_reason: str | None = None
- stop_sequence: str | None = None
-
- async for ev in events:
- # isinstance, not ``ev.type == "..."``: ty narrows on the
- # discriminator only via the class, and the raw event union
- # carries its own discriminators (``Raw*Event``) the SDK
- # already promises.
- if isinstance(ev, RawMessageStartEvent):
- message_id = ev.message.id
- role = ev.message.role
- usage = ev.message.usage
- elif isinstance(ev, RawContentBlockStartEvent):
- blocks[ev.index] = ev.content_block.model_dump()
- if blocks[ev.index].get("type") == "tool_use":
- json_buffers[ev.index] = ""
- elif isinstance(ev, RawContentBlockDeltaEvent):
- blk = blocks.setdefault(ev.index, {})
- delta = ev.delta
- if isinstance(delta, TextDelta):
- blk["text"] = blk.get("text", "") + delta.text
- elif isinstance(delta, InputJSONDelta):
- json_buffers[ev.index] = (
- json_buffers.get(ev.index, "") + delta.partial_json
- )
- elif isinstance(delta, ThinkingDelta):
- blk["thinking"] = blk.get("thinking", "") + delta.thinking
- elif isinstance(delta, SignatureDelta):
- blk["signature"] = delta.signature
- elif isinstance(ev, RawContentBlockStopEvent):
- blk = blocks.get(ev.index, {})
- if blk.get("type") == "tool_use":
- raw = json_buffers.pop(ev.index, "")
- blk["input"] = json.loads(raw) if raw.strip() else {}
- elif isinstance(ev, RawMessageDeltaEvent):
- stop_reason = ev.delta.stop_reason
- stop_sequence = ev.delta.stop_sequence
- if ev.usage.output_tokens:
- usage = Usage.model_validate(
- {**usage.model_dump(), "output_tokens": ev.usage.output_tokens}
- )
-
- content = []
- for idx in sorted(blocks):
- bd = blocks[idx]
- btype = bd.get("type")
- if btype == "text":
- content.append(TextBlock.model_validate(bd))
- elif btype == "tool_use":
- content.append(ToolUseBlock.model_validate(bd))
- elif btype == "thinking":
- content.append(ThinkingBlock.model_validate(bd))
-
- return Message(
- id=message_id or "msg_unknown",
- type="message",
- role=role,
- model=model,
- content=content,
- stop_reason=stop_reason, # type: ignore[arg-type]
- stop_sequence=stop_sequence,
- usage=usage,
+ return
+ message = acc.finalize(model=model)
+ await _broadcast_turn(
+ runtime,
+ agent_name=agent_name,
+ input_messages=input_messages,
+ output_message=message,
+ system=system,
)
+
+
+async def _broadcast_turn(
+ runtime: GatewayRuntime,
+ *,
+ agent_name: str,
+ input_messages: list[MessageParam],
+ output_message: Message,
+ system: str | None,
+) -> None:
+ """Fire each ``turn_log_handlers`` entry with a fresh :class:`TurnRecord`.
+
+ Handler exceptions are caught and logged — they're observability
+ plumbing, not part of the user-visible request path.
+ """
+ if not runtime.turn_log_handlers:
+ return
+ record = TurnRecord(
+ agent_name=agent_name,
+ input_messages=list(input_messages),
+ output_message=output_message,
+ system=system,
+ source="anthropic",
+ )
+ for handler in runtime.turn_log_handlers:
+ try:
+ await handler(record)
+ except Exception: # noqa: BLE001
+ _log.exception("turn_log_handler raised; continuing")
diff --git a/src/beaver_gateway/frontends/base.py b/src/beaver_gateway/frontends/base.py
index bfce91e..14b1d7b 100644
--- a/src/beaver_gateway/frontends/base.py
+++ b/src/beaver_gateway/frontends/base.py
@@ -15,13 +15,16 @@ from dataclasses import dataclass, field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
- from collections.abc import Mapping, Sequence
+ from collections.abc import Awaitable, Callable, Mapping, Sequence
from beaver_gateway.backends.base import Backend
from beaver_gateway.core.auth import TokenStore
from beaver_gateway.core.registry import AgentRegistry, McpRegistry
+ from beaver_gateway.core.turn_record import TurnRecord
from beaver_gateway.storage import Database
+ TurnLogHandler = Callable[[TurnRecord], Awaitable[None]]
+
@dataclass(frozen=True, slots=True)
class GatewayRuntime:
@@ -62,6 +65,18 @@ class GatewayRuntime:
# the dashboard so the operator can copy ready-to-use links / curl
# snippets. Other frontends ignore it.
frontends: Sequence[Frontend] = field(default_factory=tuple)
+ # Frontends that finish a turn (Anthropic Messages, Markdown) iterate
+ # this list and ``await`` each handler with a ``TurnRecord``. Handlers
+ # are appended during ``configure()`` by frontends that want a
+ # cross-frontend chat archive — currently the markdown frontend's
+ # ``log_all_chats`` mode. Handler exceptions are caught at the call
+ # site; they never block the user-visible response.
+ #
+ # The field is typed as ``list[Any]`` rather than the precise
+ # ``list[TurnLogHandler]`` because the alias lives under TYPE_CHECKING
+ # to keep ``anthropic.types`` out of the runtime import graph for
+ # this base module.
+ turn_log_handlers: list[TurnLogHandler] = field(default_factory=list)
class Frontend(ABC):
diff --git a/src/beaver_gateway/frontends/markdown/__init__.py b/src/beaver_gateway/frontends/markdown/__init__.py
new file mode 100644
index 0000000..c2b63a9
--- /dev/null
+++ b/src/beaver_gateway/frontends/markdown/__init__.py
@@ -0,0 +1,15 @@
+"""Markdown frontend — turn-by-turn chat archive backed by ``.md`` files.
+
+The user maintains chats as plain markdown files in an Obsidian vault.
+A plugin in Obsidian POSTs ``{filename, content?}`` to ``/chat`` and the
+frontend parses the file, finds the last turn, and runs the agent if
+the last turn is ``user``. The full response is appended back to the
+file as an ``### Assistant:`` turn. With ``log_all_chats=True`` the
+frontend also subscribes to every other frontend's turns and writes
+them into ``{vault_path}/{logged_subdir}/`` so the vault is the single
+chronological archive of all conversations.
+"""
+
+from beaver_gateway.frontends.markdown.frontend import MarkdownFrontend
+
+__all__ = ["MarkdownFrontend"]
diff --git a/src/beaver_gateway/frontends/markdown/crossfront.py b/src/beaver_gateway/frontends/markdown/crossfront.py
new file mode 100644
index 0000000..7013124
--- /dev/null
+++ b/src/beaver_gateway/frontends/markdown/crossfront.py
@@ -0,0 +1,302 @@
+"""Cross-frontend chat logger.
+
+When ``MarkdownFrontend(log_all_chats=True)`` is configured, every turn
+completed by any other frontend (currently the Anthropic Messages
+frontend) is mirrored into the vault as a ``.md`` file. Subsequent
+turns of the same conversation append to the same file — matched by a
+content-hash fingerprint stored in YAML frontmatter.
+
+The fingerprint hashes the message history *before* the new assistant
+reply. So the next request's input history (which now includes the
+prior assistant reply) hashes to the value we just persisted —
+``hash(prev_input + [assistant_reply])`` — and the lookup hits the
+same file. New conversations (no prior fingerprint match) get a fresh
+file under ``{vault_path}/{logged_subdir}/{agent_name}/``.
+"""
+
+from __future__ import annotations
+
+import asyncio
+import hashlib
+import logging
+from datetime import UTC, datetime
+from typing import TYPE_CHECKING, Any
+
+import frontmatter
+
+from beaver_gateway.frontends.markdown import renderer
+
+if TYPE_CHECKING:
+ from collections.abc import Callable, Iterable
+ from pathlib import Path
+
+ from anthropic.types import MessageParam
+
+ from beaver_gateway.core.turn_record import TurnRecord
+
+
+_log = logging.getLogger("beaver_gateway.frontends.markdown.crossfront")
+
+
+# User hook: take a turn + vault root, return where the new file should
+# live. Returning a relative ``Path`` is treated as relative to the
+# vault. ``None`` (the default) keeps the built-in
+# ``{vault}/{logged_subdir}/{agent}/{YYYY-MM-DD}_{hex8}.md`` layout.
+LogPathFn = "Callable[[TurnRecord, Path], Path]"
+
+
+__all__ = ["CrossFrontendLogger", "LogPathFn", "fingerprint_messages"]
+
+
+def fingerprint_messages(messages: Iterable[MessageParam]) -> str:
+ """Stable, short hex hash of a conversation prefix.
+
+ Built from ``(role, normalized_content)`` pairs only — so the
+ Markdown frontend's parser-shaped messages (text-only) and the
+ Anthropic frontend's raw ``messages`` payload (which may also be
+ string-only at v1) hash compatibly when they represent the same
+ conversation. Tool blocks / images would diverge, but those aren't
+ in the v1 ingest path.
+ """
+ h = hashlib.sha1(usedforsecurity=False)
+ for msg in messages:
+ role = str(msg.get("role", ""))
+ content = msg.get("content", "")
+ if isinstance(content, str):
+ text = content
+ else:
+ parts: list[str] = []
+ for blk in content:
+ if not isinstance(blk, dict):
+ continue
+ btype = blk.get("type")
+ if btype == "text":
+ parts.append(str(blk.get("text", "")))
+ text = "\n".join(parts)
+ h.update(role.encode("utf-8"))
+ h.update(b"\x00")
+ h.update(text.strip().encode("utf-8"))
+ h.update(b"\x01")
+ return h.hexdigest()[:16]
+
+
+class CrossFrontendLogger:
+ """Maintains the fingerprint→file map and writes turns to disk.
+
+ The map is in-process; on startup ``warm_index`` rebuilds it from
+ YAML frontmatter of every file under ``logged_subdir``. A miss
+ creates a new file, a hit appends to the existing one. All disk
+ work funnels through one ``asyncio.Lock`` because the writes are
+ cheap and serializing them sidesteps a class of races we don't need
+ to think about.
+ """
+
+ def __init__(
+ self,
+ *,
+ vault_path: Path,
+ logged_subdir: str,
+ log_path: Callable[[TurnRecord, Path], Path] | None = None,
+ ) -> None:
+ self._vault = vault_path
+ self._root = vault_path / logged_subdir
+ self._index: dict[str, Path] = {}
+ self._lock = asyncio.Lock()
+ self._log_path_fn = log_path
+ # When the user supplies a custom path function, files can land
+ # anywhere in the vault — so we have to scan the whole vault on
+ # startup to rebuild the fingerprint→path map. With the default
+ # layout we can bound the scan to ``_logs/``.
+ self._scan_root = vault_path if log_path is not None else self._root
+
+ def warm_index(self) -> None:
+ """Scan logged files synchronously, populating the fingerprint map.
+
+ Called from ``MarkdownFrontend.configure`` so the map is ready
+ before any cross-frontend turn arrives. ``frontmatter.load``
+ reads only enough of the file to parse the YAML head, so the
+ scan is cheap even on large vaults — but a custom ``log_path``
+ forces a full-vault walk; mention that in the constructor doc.
+ """
+ if not self._scan_root.exists():
+ return
+ for path in self._scan_root.rglob("*.md"):
+ try:
+ post = frontmatter.load(str(path))
+ except Exception: # noqa: BLE001
+ _log.warning("could not read frontmatter from %s — skipping", path)
+ continue
+ fp = post.metadata.get("fingerprint")
+ if isinstance(fp, str) and fp:
+ self._index[fp] = path
+ _log.info(
+ "crossfront index warmed: %d logged file(s) under %s",
+ len(self._index),
+ self._scan_root,
+ )
+
+ async def handle(self, record: TurnRecord) -> None:
+ """Append or create a logged file for ``record``.
+
+ Records that the markdown frontend itself produced
+ (``source=="markdown"``) are skipped — those already live in the
+ user's hand-written file and shouldn't be duplicated into the
+ ``_logs`` shadow tree.
+ """
+ if record.source == "markdown":
+ return
+
+ async with self._lock:
+ # ``input_messages`` is the *full* history sent to the backend
+ # (last entry is the new user turn). Match against the prefix
+ # that excludes the new user turn — that's what the previous
+ # write stored as its fingerprint. Empty prefix is the
+ # well-known "brand new chat" sentinel.
+ prefix = record.input_messages[:-1]
+ prev_fp = fingerprint_messages(prefix) if prefix else None
+ target = self._index.get(prev_fp) if prev_fp else None
+ if target is None:
+ target = self._new_file_path(record)
+
+ # Build the full history including the assistant reply; the
+ # new fingerprint matches *that* prefix, so the next user
+ # turn (history grows by one user msg) will hit this file.
+ assistant_msg: MessageParam = {
+ "role": "assistant",
+ "content": _flatten_text(record.output_message),
+ }
+ full_history = [*record.input_messages, assistant_msg]
+ new_fp = fingerprint_messages(full_history)
+
+ if target.exists():
+ existing = target.read_text(encoding="utf-8")
+ parsed = frontmatter.loads(existing)
+ body = _strip_trailing_user_scaffold(parsed.content)
+ # We append only the *new* user turn (the last one in
+ # input_messages, since prior turns are already on disk)
+ # plus the assistant reply.
+ new_user = record.input_messages[-1]
+ new_block = renderer.render_user_param(new_user)
+ new_block = renderer.append_to_body(
+ new_block, renderer.render_assistant_message(record.output_message)
+ )
+ new_body = renderer.append_to_body(body, new_block)
+ metadata = dict(parsed.metadata)
+ else:
+ # Materialize the whole conversation from scratch.
+ new_body = _render_full_history(
+ record.input_messages, record.output_message
+ )
+ metadata = {}
+
+ new_body = renderer.append_to_body(new_body, renderer.USER_SCAFFOLD)
+
+ metadata["agent"] = record.agent_name
+ metadata["fingerprint"] = new_fp
+ metadata["source"] = record.source
+ self._write(target, metadata, new_body)
+ # Maintain the index: drop the old fp (it's stale once we
+ # write the new turn), add the new one.
+ if prev_fp:
+ self._index.pop(prev_fp, None)
+ self._index[new_fp] = target
+
+ # ---- internals -----------------------------------------------------
+
+ def _new_file_path(self, record: TurnRecord) -> Path:
+ """Pick a fresh filename for a brand-new conversation.
+
+ With a user-supplied ``log_path`` we delegate to it (joining a
+ relative result with the vault root). Without one, we fall back
+ to ``{logged_subdir}/{agent}/{date}_{hex8}.md`` and ensure the
+ ``.md`` suffix in case the user picks a non-md extension by hand.
+ """
+ if self._log_path_fn is not None:
+ result = self._log_path_fn(record, self._vault)
+ if not result.is_absolute():
+ result = self._vault / result
+ if result.suffix != ".md":
+ result = result.with_suffix(".md")
+ result.parent.mkdir(parents=True, exist_ok=True)
+ return result
+ day = datetime.now(UTC).strftime("%Y-%m-%d")
+ # Short hex from the input hash so two same-day chats sort
+ # stably and don't collide.
+ salt = fingerprint_messages(record.input_messages)[:8]
+ agent_dir = self._root / record.agent_name
+ agent_dir.mkdir(parents=True, exist_ok=True)
+ return agent_dir / f"{day}_{salt}.md"
+
+ def _write(self, path: Path, metadata: dict[str, Any], body: str) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ if metadata:
+ post = frontmatter.Post(content=body, **metadata)
+ text = frontmatter.dumps(post) + "\n"
+ else:
+ text = body if body.endswith("\n") else body + "\n"
+ # Sync write inside the lock — keeps the implementation tiny;
+ # individual logged turns are small enough that the blocking
+ # write doesn't matter at human conversation rates.
+ path.write_text(text, encoding="utf-8")
+
+
+def _flatten_text(message: Any) -> str:
+ """Same as ``frontend._flatten_assistant_text`` but local to break a cycle."""
+ chunks = [
+ getattr(block, "text", "") or ""
+ for block in getattr(message, "content", ())
+ if getattr(block, "type", None) == "text"
+ ]
+ return "\n\n".join(c for c in chunks if c)
+
+
+def _render_full_history(messages: list[MessageParam], assistant: Any) -> str:
+ """Render an entire conversation (used when materializing a new logged file)."""
+ blocks: list[str] = []
+ for msg in messages:
+ role = msg.get("role")
+ if role == "user":
+ blocks.append(renderer.render_user_param(msg))
+ elif role == "assistant":
+ content = msg.get("content", "")
+ text = content if isinstance(content, str) else _content_to_text(content)
+ blocks.append(f"### Assistant:\n\n{text.strip()}\n" if text.strip() else "")
+ blocks.append(renderer.render_assistant_message(assistant))
+ body = ""
+ for block in blocks:
+ if not block:
+ continue
+ body = renderer.append_to_body(body, block)
+ return body
+
+
+def _strip_trailing_user_scaffold(body: str) -> str:
+ """Drop a trailing empty ``### User:`` block if present.
+
+ Cross-frontend turns aren't typed into the file by the human — they
+ arrive whole from another frontend. If we leave the previous run's
+ scaffold in place, we'd write the new user turn right after an
+ empty marker (visual noise, two ``### User:`` headers in a row).
+ Trim it and let the append flow add a fresh scaffold at the end.
+ """
+ stripped = body.rstrip()
+ marker = "### User:"
+ if not stripped.endswith(marker):
+ return body
+ # Walk back: the scaffold is the marker preceded by either start-of-file
+ # or an HR/blank line. Find the last newline before the marker, cut.
+ head = stripped[: -len(marker)].rstrip()
+ if head.endswith("---"):
+ head = head[: -len("---")].rstrip()
+ return head
+
+
+def _content_to_text(content: Any) -> str:
+ if not isinstance(content, list):
+ return ""
+ chunks = [
+ str(blk.get("text", ""))
+ for blk in content
+ if isinstance(blk, dict) and blk.get("type") == "text"
+ ]
+ return "\n\n".join(chunks)
diff --git a/src/beaver_gateway/frontends/markdown/frontend.py b/src/beaver_gateway/frontends/markdown/frontend.py
new file mode 100644
index 0000000..701bc92
--- /dev/null
+++ b/src/beaver_gateway/frontends/markdown/frontend.py
@@ -0,0 +1,404 @@
+"""``MarkdownFrontend`` — chat-via-markdown-files frontend.
+
+Wires:
+
+* ``POST /chat {filename, content?, agent?}`` — bearer-authenticated
+ trigger. The plugin in Obsidian fires this after the user edits a
+ ``.md`` and the file gets synced (or with ``content`` to short-circuit
+ the sync delay). We parse the file, check the last turn — assistant
+ → no-op, user → run the agent and append.
+* ``GET /healthz`` — liveness.
+
+Concurrency model: an in-memory ``set[Path]`` of files currently in
+flight. Two concurrent requests for the same file → the second gets
+409. The set is single-process (one gateway instance) — that's by
+design; the markdown frontend is the only writer in its vault from
+the gateway side.
+
+Cross-frontend logging: when ``log_all_chats=True``, ``configure()``
+registers a handler on ``runtime.turn_log_handlers`` so every other
+frontend's completed turns also land in the vault. The handler logic
+lives in :mod:`.crossfront` so this module stays focused on the HTTP
+shape.
+"""
+
+from __future__ import annotations
+
+import contextlib
+import json
+import logging
+import os
+import tempfile
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import aiofile
+from fastapi import FastAPI, HTTPException, Request, status
+from fastapi.responses import JSONResponse
+
+from beaver_gateway.core import audit
+from beaver_gateway.core.turn_record import TurnRecord
+from beaver_gateway.frontends._accumulate import accumulate
+from beaver_gateway.frontends._auth import require_token
+from beaver_gateway.frontends.base import Frontend
+from beaver_gateway.frontends.markdown import parser, renderer
+from beaver_gateway.frontends.markdown.crossfront import (
+ CrossFrontendLogger,
+ fingerprint_messages,
+)
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+
+ from anthropic.types import MessageParam
+
+ from beaver_gateway.frontends.base import GatewayRuntime
+
+
+_log = logging.getLogger("beaver_gateway.frontends.markdown")
+
+
+__all__ = ["MarkdownFrontend"]
+
+
+class MarkdownFrontend(Frontend):
+ """FastAPI app behind ``POST /chat`` driven by Obsidian-vault files."""
+
+ def __init__(
+ self,
+ *,
+ vault_path: Path | str,
+ host: str = "0.0.0.0", # noqa: S104
+ port: int = 8003,
+ default_agent: str | None = None,
+ log_all_chats: bool = False,
+ logged_subdir: str = "_logs",
+ log_path: Callable[[TurnRecord, Path], Path] | None = None,
+ public_base_url: str | None = None,
+ ) -> None:
+ self.vault_path = Path(vault_path).expanduser().resolve()
+ self.host = host
+ self.port = port
+ self.default_agent = default_agent
+ self.log_all_chats = log_all_chats
+ self.logged_subdir = logged_subdir
+ self.log_path = log_path
+ # External URL prefix when behind a reverse proxy — same role as
+ # on the other bearer frontends. Trailing slash trimmed for
+ # idempotent concatenation; ``None`` means "no proxy / advertise
+ # raw host:port".
+ self.public_base_url = public_base_url.rstrip("/") if public_base_url else None
+ self._runtime: GatewayRuntime | None = None
+ self._app: FastAPI | None = None
+ # Files currently being processed by an in-flight ``POST /chat``.
+ # Checked-and-added atomically in the request handler (no
+ # ``await`` between the check and the insert) so a concurrent
+ # request reliably loses the race to 409.
+ self._busy: set[Path] = set()
+ self._crossfront: CrossFrontendLogger | None = None
+
+ def configure(self, runtime: GatewayRuntime) -> None:
+ self._runtime = runtime
+ self.vault_path.mkdir(parents=True, exist_ok=True)
+ if self.log_all_chats:
+ self._crossfront = CrossFrontendLogger(
+ vault_path=self.vault_path,
+ logged_subdir=self.logged_subdir,
+ log_path=self.log_path,
+ )
+ # Scan the existing logged files synchronously here so the
+ # fingerprint→path map is populated before the first
+ # cross-frontend turn arrives. Cheap: frontmatter-only read.
+ self._crossfront.warm_index()
+ runtime.turn_log_handlers.append(self._crossfront.handle)
+ self._app = self._build_app(runtime)
+
+ async def serve(self) -> None:
+ import uvicorn
+
+ if self._app is None:
+ msg = "configure() must be called before serve()"
+ raise RuntimeError(msg)
+ config = uvicorn.Config(
+ self._app, host=self.host, port=self.port, log_level="info"
+ )
+ server = uvicorn.Server(config)
+ await server.serve()
+
+ # ---- app builder ---------------------------------------------------
+
+ def _build_app(self, runtime: GatewayRuntime) -> FastAPI:
+ app = FastAPI(title="beaver-gateway / Markdown")
+
+ @app.get("/healthz")
+ async def healthz() -> dict[str, str]:
+ return {"status": "ok"}
+
+ @app.post("/chat")
+ async def chat(request: Request) -> Any:
+ token_name = await require_token(request, runtime, scope="messages")
+ try:
+ body = await request.json()
+ except json.JSONDecodeError as exc:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, f"invalid JSON: {exc}"
+ ) from exc
+
+ filename = body.get("filename")
+ if not isinstance(filename, str) or not filename.strip():
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, "missing or non-string `filename`"
+ )
+ content_override = body.get("content")
+ agent_override = body.get("agent")
+ if agent_override is not None and not isinstance(agent_override, str):
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, "`agent` must be a string"
+ )
+
+ file_path = self._resolve_path(filename)
+
+ # Atomic check-and-claim: both ops run between awaits, so a
+ # second request can't slip into the same file slot.
+ if file_path in self._busy:
+ return JSONResponse(
+ status_code=status.HTTP_409_CONFLICT,
+ content={"status": "in_progress", "filename": filename},
+ )
+ self._busy.add(file_path)
+ try:
+ return await self._handle_chat(
+ runtime=runtime,
+ token_name=token_name,
+ filename=filename,
+ file_path=file_path,
+ content_override=content_override,
+ agent_override=agent_override,
+ )
+ finally:
+ self._busy.discard(file_path)
+
+ return app
+
+ # ---- dispatch ------------------------------------------------------
+
+ async def _handle_chat(
+ self,
+ *,
+ runtime: GatewayRuntime,
+ token_name: str,
+ filename: str,
+ file_path: Path,
+ content_override: Any,
+ agent_override: str | None,
+ ) -> Any:
+ if isinstance(content_override, str):
+ await _write_atomic(file_path, content_override)
+ file_text = content_override
+ elif content_override is None:
+ file_text = await _read_or_empty(file_path)
+ else:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, "`content` must be a string when present"
+ )
+
+ parsed = parser.parse(file_text)
+ agent_name = parser.resolve_agent(
+ metadata=parsed.metadata,
+ request_override=agent_override,
+ default=self.default_agent,
+ )
+ if not agent_name:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST,
+ "no agent specified: pass `agent`, set frontmatter, "
+ "or configure `default_agent`",
+ )
+
+ # When the parser produced no messages (file empty / only
+ # frontmatter), there's nothing to dispatch.
+ if not parsed.messages:
+ return {"status": "nothing_to_do", "reason": "empty file"}
+
+ if parser.last_role(parsed.messages) == "assistant":
+ return {"status": "nothing_to_do", "reason": "last turn is assistant"}
+
+ agent = runtime.agents.get(agent_name)
+ if agent is None:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND, f"unknown agent: {agent_name!r}"
+ )
+ backend = runtime.backends.get(agent.name)
+ if backend is None:
+ raise HTTPException(
+ status.HTTP_503_SERVICE_UNAVAILABLE,
+ f"no backend configured for agent {agent.name!r}",
+ )
+
+ _log.info(
+ "chat: actor=%s agent=%s file=%s msgs=%d",
+ token_name,
+ agent.name,
+ filename,
+ len(parsed.messages),
+ )
+ await audit.log(
+ runtime,
+ actor=f"token:{token_name}",
+ kind="markdown_chat",
+ agent_name=agent.name,
+ filename=filename,
+ msgs=len(parsed.messages),
+ )
+
+ try:
+ events = backend.complete(
+ agent=agent, messages=parsed.messages, system=None
+ )
+ message = await accumulate(events, model=agent.model or agent.name)
+ except Exception as exc:
+ _log.exception("backend failed for %s", filename)
+ error_block = _render_error_block(exc)
+ new_body = renderer.append_to_body(parsed.body, error_block)
+ await _write_atomic(
+ file_path, _reattach_frontmatter(parsed.metadata, new_body)
+ )
+ raise HTTPException(
+ status.HTTP_500_INTERNAL_SERVER_ERROR, f"backend error: {exc}"
+ ) from exc
+
+ rendered = renderer.render_assistant_message(message)
+ new_body = renderer.append_to_body(parsed.body, rendered)
+ new_body = renderer.append_to_body(new_body, renderer.USER_SCAFFOLD)
+ # Recompute fingerprint so a future cross-frontend hit on this
+ # same conversation can find it. Stored as hex string in
+ # frontmatter — only the markdown frontend reads it.
+ assistant_param: MessageParam = {
+ "role": "assistant",
+ "content": _flatten_assistant_text(message),
+ }
+ updated_messages: list[MessageParam] = [*parsed.messages, assistant_param]
+ updated_metadata = dict(parsed.metadata)
+ updated_metadata["agent"] = agent.name
+ updated_metadata["fingerprint"] = fingerprint_messages(updated_messages)
+ await _write_atomic(
+ file_path, _reattach_frontmatter(updated_metadata, new_body)
+ )
+
+ # Broadcast our own turn so other handlers (none today, but the
+ # symmetry is worth keeping) see what happened. ``source`` marks
+ # the origin so ``CrossFrontendLogger`` can skip its own files.
+ record = TurnRecord(
+ agent_name=agent.name,
+ input_messages=list(parsed.messages),
+ output_message=message,
+ system=None,
+ source="markdown",
+ )
+ for handler in runtime.turn_log_handlers:
+ try:
+ await handler(record)
+ except Exception: # noqa: BLE001
+ _log.exception("turn_log_handler raised; continuing")
+
+ return {"status": "ok", "turns_appended": 1, "agent": agent.name}
+
+ # ---- helpers -------------------------------------------------------
+
+ def _resolve_path(self, filename: str) -> Path:
+ """Resolve ``filename`` under the vault; reject escapes."""
+ # ``filename`` may be relative or absolute; we always anchor
+ # under ``vault_path`` so absolute paths from outside the vault
+ # don't sneak through. ``Path("/foo/bar")`` combined with a
+ # vault path keeps the absolute side; we strip leading slashes
+ # to coerce the rooted form into a relative path before joining.
+ rel = filename.lstrip("/")
+ if not rel.endswith(".md"):
+ rel = rel + ".md"
+ candidate = (self.vault_path / rel).resolve()
+ try:
+ candidate.relative_to(self.vault_path)
+ except ValueError as exc:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, f"filename escapes vault: {filename!r}"
+ ) from exc
+ return candidate
+
+
+# ---- module-level utilities ----------------------------------------------
+
+
+async def _read_or_empty(path: Path) -> str:
+ """Return file contents, or empty string if the file doesn't exist."""
+ # ``path.exists()`` here is a metadata stat — microseconds — and
+ # gating an async read on whether the file is there is exactly the
+ # check we want. Switching to anyio.Path / aiofiles.os just to
+ # silence the async-pathlib lint would cost a dep edge for no
+ # practical win.
+ if not path.exists(): # noqa: ASYNC240
+ return ""
+ async with aiofile.async_open(path, "r", encoding="utf-8") as f:
+ return await f.read()
+
+
+async def _write_atomic(path: Path, content: str) -> None:
+ """Write ``content`` to ``path`` via tmp + ``os.replace`` (atomic)."""
+ path.parent.mkdir(parents=True, exist_ok=True)
+ # ``NamedTemporaryFile`` keeps the file open which complicates
+ # ``os.replace`` on some platforms. Build the tmp name manually,
+ # write+fsync, then rename. Same-directory so the rename is atomic.
+ tmp_name = tempfile.mkstemp(
+ prefix=f".{path.name}.", suffix=".tmp", dir=str(path.parent)
+ )
+ fd, tmp_path = tmp_name
+ try:
+ async with aiofile.async_open(tmp_path, "w", encoding="utf-8") as f:
+ await f.write(content)
+ os.close(fd)
+ # ``os.replace`` is the atomic primitive — ``Path.replace`` is a
+ # thin wrapper around the same syscall; either works, ``os.`` is
+ # the one Linux/POSIX docs reach for.
+ os.replace(tmp_path, path) # noqa: PTH105
+ except BaseException:
+ # Cleanup on failure: close fd, remove tmp.
+ with contextlib.suppress(OSError):
+ os.close(fd)
+ with contextlib.suppress(OSError):
+ os.unlink(tmp_path) # noqa: PTH108
+ raise
+
+
+def _reattach_frontmatter(metadata: dict[str, Any], body: str) -> str:
+ r"""Re-emit a ``.md`` file with YAML frontmatter at the top.
+
+ Empty metadata → no frontmatter block (avoid littering every file
+ with a hollow ``---\n---``).
+ """
+ if not metadata:
+ return body if body.endswith("\n") else body + "\n"
+ import frontmatter as _fm
+
+ post = _fm.Post(content=body, **metadata)
+ return _fm.dumps(post) + "\n"
+
+
+def _flatten_assistant_text(message: Any) -> str:
+ """Pull all text blocks from an assistant ``Message`` and join them.
+
+ Used when we need the assistant content as a plain string for
+ fingerprinting / equality with a parser-shaped history (parser
+ already drops thinking + tool_use from assistant turns).
+ """
+ chunks = [
+ getattr(block, "text", "") or ""
+ for block in getattr(message, "content", ())
+ if getattr(block, "type", None) == "text"
+ ]
+ return "\n\n".join(c for c in chunks if c)
+
+
+def _render_error_block(exc: BaseException) -> str:
+ """Render a backend failure as an Assistant turn with a ``[!error]-`` callout."""
+ msg = str(exc) or exc.__class__.__name__
+ safe = msg.replace("\n", " ").strip()
+ return f"### Assistant:\n\n> [!error]-\n> {safe}\n"
diff --git a/src/beaver_gateway/frontends/markdown/parser.py b/src/beaver_gateway/frontends/markdown/parser.py
new file mode 100644
index 0000000..953b875
--- /dev/null
+++ b/src/beaver_gateway/frontends/markdown/parser.py
@@ -0,0 +1,187 @@
+"""Parse a markdown chat file into Anthropic ``MessageParam`` history.
+
+The file format is documented in ``frontends/markdown/__init__.py``:
+``### User:`` / ``### Assistant:`` H3 headers split turns, optional
+``---`` HRs between turns are visual-only, ``> [!thinking]-`` and
+``> [!tool]- `` callouts mark structured assistant content.
+
+For backend consumption we strip thinking and tool_use callouts —
+assistant turns become text-only. Rationale: history replay through
+claude-code's JSONL injection only needs the *narrated* answer (the
+thinking signatures expire and the original tool_results aren't
+captured in the renderer's output, so a faithful tool_use round-trip
+isn't possible today). The renderer keeps callouts in the file because
+they're informational for the human reader; the parser drops them when
+shaping the backend's input.
+"""
+
+from __future__ import annotations
+
+import re
+from dataclasses import dataclass
+from typing import TYPE_CHECKING, Any
+
+import frontmatter
+
+if TYPE_CHECKING:
+ from anthropic.types import MessageParam
+
+
+__all__ = ["ParsedFile", "last_role", "parse", "resolve_agent"]
+
+
+# Turn marker — must be exactly ``### User:`` or ``### Assistant:`` on
+# its own line. Trailing whitespace tolerated; nothing after the colon
+# on the same line (any inline content would mean the user typed
+# something that just happens to look like a header, and we'd rather
+# misparse than silently fold inline content into a turn).
+_TURN_RE = re.compile(r"^###\s+(User|Assistant):\s*$", re.MULTILINE)
+
+# Callout-start lines we strip from assistant turns when extracting
+# text. We don't try to parse the contents — for backend input we just
+# need to drop the whole quoted block.
+_CALLOUT_START_RE = re.compile(r"^>\s+\[!(thinking|tool)\]")
+
+
+@dataclass(frozen=True, slots=True)
+class ParsedFile:
+ """Result of parsing a single chat ``.md``.
+
+ ``metadata`` is the YAML frontmatter as a plain dict (empty if the
+ file has none). ``messages`` is the conversation history shaped for
+ ``Backend.complete`` — assistant turns are text-only. ``body`` is the
+ raw markdown content *after* the frontmatter is stripped; the
+ renderer needs it when it appends a new assistant turn so it can
+ preserve whatever the human typed verbatim (including any callouts
+ or HRs they added).
+ """
+
+ metadata: dict[str, Any]
+ body: str
+ messages: list[MessageParam]
+
+
+def parse(text: str) -> ParsedFile:
+ """Parse a chat ``.md`` into ``(metadata, body, messages)``.
+
+ A file with no turn markers but non-empty body is treated as a
+ single user turn — the friendly path for "user types into a new
+ file and hits send" before any turn markers exist.
+ """
+ parsed = frontmatter.loads(text)
+ metadata = dict(parsed.metadata)
+ body = parsed.content
+
+ messages: list[MessageParam] = []
+ turns = _split_turns(body)
+ if not turns:
+ stripped = body.strip()
+ if stripped:
+ messages.append({"role": "user", "content": stripped})
+ return ParsedFile(metadata=metadata, body=body, messages=messages)
+
+ for role, raw in turns:
+ if role == "user":
+ text_content = _strip_hrs(raw).strip()
+ if text_content:
+ messages.append({"role": "user", "content": text_content})
+ else:
+ text_content = _extract_assistant_text(raw)
+ if text_content:
+ messages.append({"role": "assistant", "content": text_content})
+
+ return ParsedFile(metadata=metadata, body=body, messages=messages)
+
+
+def last_role(messages: list[MessageParam]) -> str | None:
+ """Return ``"user"`` / ``"assistant"`` / ``None`` for an empty list."""
+ if not messages:
+ return None
+ return messages[-1]["role"]
+
+
+def resolve_agent(
+ *, metadata: dict[str, Any], request_override: str | None, default: str | None
+) -> str | None:
+ """Resolve the agent for this chat.
+
+ Precedence: request body override > frontmatter > frontend default.
+ Returns ``None`` if none match — caller responds with 400.
+ """
+ if request_override:
+ return request_override
+ fm_agent = metadata.get("agent")
+ if isinstance(fm_agent, str) and fm_agent:
+ return fm_agent
+ return default
+
+
+# ---- internals ---------------------------------------------------------
+
+
+def _split_turns(body: str) -> list[tuple[str, str]]:
+ """Walk turn markers, return ``[(role_lc, raw_body), ...]``.
+
+ Body for each turn is everything between this marker and the next
+ (or EOF). Leading marker line itself is dropped. We don't trim
+ whitespace here — that's per-role.
+ """
+ matches = list(_TURN_RE.finditer(body))
+ if not matches:
+ return []
+ out: list[tuple[str, str]] = []
+ for i, m in enumerate(matches):
+ role = m.group(1).lower()
+ start = m.end()
+ end = matches[i + 1].start() if i + 1 < len(matches) else len(body)
+ out.append((role, body[start:end]))
+ return out
+
+
+def _strip_hrs(raw: str) -> str:
+ """Drop decorative ``---`` separator lines (whole-line HRs only).
+
+ A ``---`` mid-paragraph (rare, but possible) stays. Only lines that
+ are *exactly* the HR after optional surrounding whitespace are
+ removed — those are the ones the renderer emits between turns.
+ """
+ lines = raw.splitlines()
+ kept = [ln for ln in lines if ln.strip() != "---"]
+ return "\n".join(kept)
+
+
+def _extract_assistant_text(raw: str) -> str:
+ """Strip thinking/tool callouts from an assistant turn, return spoken text.
+
+ Walks line by line. When we see a callout-start line (``> [!thinking]-``
+ or ``> [!tool]- ...``), we skip the entire contiguous quote block
+ (lines beginning with ``>`` or blank-then-`>` continuations don't
+ happen in Obsidian callouts — a blank line ends the callout). HR
+ lines (``---``) are dropped. Everything else is kept and joined,
+ then collapsed to a clean trim.
+ """
+ lines = raw.splitlines()
+ out_lines: list[str] = []
+ i = 0
+ while i < len(lines):
+ line = lines[i]
+ if _CALLOUT_START_RE.match(line):
+ # Skip the whole quote block (consecutive lines starting
+ # with ``>``). Stop at first non-``>`` line, leaving it for
+ # the next iteration. Blank lines do not end the block — a
+ # callout body with a blank line uses ``> `` (quote-space)
+ # too — but in practice Obsidian's quote block ends on the
+ # first line that doesn't start with ``>``.
+ while i < len(lines) and lines[i].lstrip().startswith(">"):
+ i += 1
+ continue
+ if line.strip() == "---":
+ i += 1
+ continue
+ out_lines.append(line)
+ i += 1
+ # Collapse runs of blank lines that callout-stripping creates
+ # (two newlines around a stripped block fold into one).
+ text_joined = "\n".join(out_lines)
+ text_joined = re.sub(r"\n{3,}", "\n\n", text_joined)
+ return text_joined.strip()
diff --git a/src/beaver_gateway/frontends/markdown/renderer.py b/src/beaver_gateway/frontends/markdown/renderer.py
new file mode 100644
index 0000000..6009a95
--- /dev/null
+++ b/src/beaver_gateway/frontends/markdown/renderer.py
@@ -0,0 +1,181 @@
+"""Render Anthropic ``Message`` (and individual user turns) into markdown.
+
+The renderer is one-way: it produces the human-facing artifact in the
+vault. The parser strips tool/thinking callouts when reshaping the file
+for backend replay — so what we write here is purely for the human
+reader (and for the cross-frontend logger, which materializes turns
+from other frontends).
+"""
+
+from __future__ import annotations
+
+import json
+import re
+from typing import TYPE_CHECKING, Any, cast
+
+from anthropic.types import Message, TextBlock, ThinkingBlock, ToolUseBlock
+
+if TYPE_CHECKING:
+ from collections.abc import Iterable
+
+ from anthropic.types import MessageParam
+
+
+__all__ = [
+ "FENCE",
+ "USER_SCAFFOLD",
+ "append_to_body",
+ "render_assistant_message",
+ "render_user_text",
+ "summarize_tool_input",
+]
+
+
+# Empty ``### User:`` block appended after each assistant reply so the
+# human has an obvious place to type the next turn. Parser drops empty
+# user blocks, so this doesn't re-trigger dispatch on its own.
+USER_SCAFFOLD = "### User:\n"
+
+
+# Default 4-backtick fence so tool results that contain literal ```` ``` ````
+# don't collide. JSON inputs use 3 backticks because they almost never
+# contain ``` and we get language syntax highlighting in Obsidian for free.
+FENCE = "````"
+
+# Used when a tool input mentions a path/file we can dangle in the
+# callout title — purely cosmetic.
+_PATH_KEYS = ("path", "file", "filename", "url", "command", "query")
+
+
+def render_user_text(content: str) -> str:
+ r"""Render one user-spoken turn as ``### User:\n\n``."""
+ return f"### User:\n\n{content.strip()}\n"
+
+
+def render_assistant_message(message: Message) -> str:
+ """Render an assistant ``Message`` (with content blocks) into a turn block.
+
+ Blocks render in their original order:
+
+ * ``ThinkingBlock`` → ``> [!thinking]-`` collapsed callout
+ * ``TextBlock`` → plain text (the spoken answer)
+ * ``ToolUseBlock`` → ``> [!tool]- `` callout with the ``input``
+ JSON quoted inside. Tool *results* are not persisted — see
+ module docstring on ``parser.py`` for why.
+
+ Blank lines separate adjacent blocks; trailing newline guarantees
+ the next ``---`` / ``### User:`` marker lands on its own line.
+ """
+ parts: list[str] = ["### Assistant:", ""]
+ for block in message.content:
+ parts.extend(_render_block(block))
+ parts.append("")
+ return "\n".join(parts).rstrip() + "\n"
+
+
+def render_user_param(param: MessageParam) -> str:
+ """Render a ``MessageParam`` user message into a ``### User:`` block.
+
+ Used by the cross-frontend logger when materializing turns from
+ other frontends. Tool_result blocks in the content list are dropped
+ silently — the markdown view doesn't track them (see ``parser.py``).
+ """
+ content = param.get("content", "")
+ if isinstance(content, str):
+ text = content
+ else:
+ # The Anthropic SDK types ``content`` as a union of typed-dict
+ # *Param classes plus pydantic block models — both shapes appear
+ # in practice (raw incoming JSON yields dicts, SDK-built params
+ # yield BaseModels). Treat each entry as a dict-like and pull
+ # ``text`` opportunistically.
+ chunks = [
+ str(blk.get("text", ""))
+ for blk in content
+ if isinstance(blk, dict) and blk.get("type") == "text"
+ ]
+ text = "\n\n".join(chunks)
+ return render_user_text(text)
+
+
+def append_to_body(existing: str, new_block: str) -> str:
+ """Append ``new_block`` to ``existing`` with a decorative HR separator.
+
+ Preserves the original body verbatim (whitespace, callouts, any
+ formatting the human added). The HR is purely visual: parser ignores
+ it.
+ """
+ head = existing.rstrip()
+ if head:
+ return f"{head}\n\n---\n\n{new_block}"
+ return new_block
+
+
+def summarize_tool_input(name: str, tool_input: object) -> str:
+ """Build the ``[!tool]- `` title string.
+
+ Tries to pick a single salient field (``path``, ``command``, etc.)
+ from the input dict so the collapsed callout shows something
+ meaningful in Obsidian. Falls back to just the tool name.
+ """
+ if not isinstance(tool_input, dict):
+ return name
+ # Anthropic ``ToolUseBlock.input`` is typed as ``object`` — the
+ # SDK's runtime value is always a JSON dict (str→Any), so a local
+ # cast keeps the rest of the function readable without sprinkling
+ # per-line type narrowing on every ``.get`` call.
+ d = cast("dict[str, Any]", tool_input)
+ for key in _PATH_KEYS:
+ value = d.get(key)
+ if isinstance(value, str) and value:
+ short = value if len(value) <= 60 else value[:57] + "..."
+ return f"{name} · {short}"
+ return name
+
+
+# ---- internals ---------------------------------------------------------
+
+
+def _render_block(block: object) -> Iterable[str]:
+ if isinstance(block, TextBlock):
+ text = (block.text or "").strip()
+ if text:
+ yield text
+ return
+ if isinstance(block, ThinkingBlock):
+ yield from _render_thinking(block.thinking or "")
+ return
+ if isinstance(block, ToolUseBlock):
+ yield from _render_tool_use(block)
+ return
+ # Unknown block type — skip silently rather than corrupting the file.
+
+
+def _render_thinking(text: str) -> Iterable[str]:
+ yield "> [!thinking]-"
+ for line in text.strip().splitlines() or [""]:
+ yield f"> {line}" if line else ">"
+
+
+def _render_tool_use(block: ToolUseBlock) -> Iterable[str]:
+ title = summarize_tool_input(block.name, block.input)
+ yield f"> [!tool]- {title}"
+ yield "> **input:**"
+ yield "> ```json"
+ pretty = json.dumps(block.input, indent=2, ensure_ascii=False, sort_keys=True)
+ for line in pretty.splitlines():
+ yield f"> {line}" if line else ">"
+ yield "> ```"
+
+
+def adaptive_fence(content: str) -> str:
+ """Return a backtick fence at least one longer than the longest run in ``content``.
+
+ Currently unused (tool *results* aren't persisted yet) — kept here
+ so when result capture lands the rendering side already has the
+ primitive.
+ """
+ longest = 0
+ for match in re.finditer(r"`+", content):
+ longest = max(longest, len(match.group(0)))
+ return "`" * max(3, longest + 1)
diff --git a/uv.lock b/uv.lock
index 91eefd8..efae55d 100644
--- a/uv.lock
+++ b/uv.lock
@@ -261,6 +261,7 @@ name = "beaver-gateway"
version = "0.1.0"
source = { editable = "." }
dependencies = [
+ { name = "aiofile" },
{ name = "aiohttp" },
{ name = "aiosqlite" },
{ name = "anthropic" },
@@ -274,6 +275,7 @@ dependencies = [
{ name = "psycopg", extra = ["binary"] },
{ name = "pydantic" },
{ name = "pydantic-settings" },
+ { name = "python-frontmatter" },
{ name = "sqlmodel" },
{ name = "uvicorn", extra = ["standard"] },
{ name = "uvloop" },
@@ -300,6 +302,7 @@ dev = [
[package.metadata]
requires-dist = [
+ { name = "aiofile", specifier = ">=3.11.1" },
{ name = "aiohttp", specifier = ">=3.13.5" },
{ name = "aiosqlite", specifier = ">=0.22.1" },
{ name = "anthropic", specifier = ">=0.103.0" },
@@ -315,6 +318,7 @@ requires-dist = [
{ name = "psycopg", extras = ["binary"], specifier = ">=3.3.4" },
{ name = "pydantic", specifier = ">=2.13.4" },
{ name = "pydantic-settings", specifier = ">=2.14.1" },
+ { name = "python-frontmatter", specifier = ">=1.2.0" },
{ name = "raycast-api", marker = "extra == 'local'", editable = "../raycast-api" },
{ name = "raycast-api", marker = "extra == 'prod'", git = "https://git.kotikot.com/beaver/raycast-api.git" },
{ name = "sqlmodel", specifier = ">=0.0.38" },
@@ -1647,6 +1651,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0b/d7/1959b9648791274998a9c3526f6d0ec8fd2233e4d4acce81bbae76b44b2a/python_dotenv-1.2.2-py3-none-any.whl", hash = "sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a", size = 22101, upload-time = "2026-03-01T16:00:25.09Z" },
]
+[[package]]
+name = "python-frontmatter"
+version = "1.2.0"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+ { name = "pyyaml" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/e9/21/88aefb4f1de6661b5a003175e21e4a5ad94f5e52b2abf4170a11883c7d81/python_frontmatter-1.2.0.tar.gz", hash = "sha256:5b26ccd3cb85af77feb11d83b922c7bb5aeccb0c9d3fb236b938c600b6322984", size = 16890, upload-time = "2026-05-17T23:42:05.493Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/1c/b1/ec12e3e746234006b77dec69d53878253b1da09dbb55fa3cb456083d9069/python_frontmatter-1.2.0-py3-none-any.whl", hash = "sha256:e1ee1d4300450a2f84e778eb4f70edf573da6cd7d463801066f05edc4e819c78", size = 10396, upload-time = "2026-05-17T23:42:04.637Z" },
+]
+
[[package]]
name = "python-multipart"
version = "0.0.29"