refactor: add markdown frontend

This commit is contained in:
h
2026-05-20 21:30:10 +02:00
parent a7827b2fa6
commit 3dc780c74c
15 changed files with 1721 additions and 141 deletions
+58
View File
@@ -12,12 +12,30 @@ from beaver_gateway.agents.base import ExposedMcp
from beaver_gateway.agents.claude import ClaudeAgent
from beaver_gateway.agents.raycast import RaycastAgent, RemoteTool, UserPreferences
from beaver_gateway.core.registry import Gateway
from beaver_gateway.core.turn_record import TurnRecord, slugify
from beaver_gateway.frontends.admin import AdminFrontend
from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend
from beaver_gateway.frontends.markdown import MarkdownFrontend
from beaver_gateway.frontends.mcp_server import McpServerFrontend
from beaver_gateway.mcp.types import McpServer
def chat_log_path(record: TurnRecord, vault: Path) -> Path:
"""Decide where a logged chat from another frontend lands in the vault.
Called by ``MarkdownFrontend`` (with ``log_all_chats=True``) the first
time a conversation needs a file — continuation turns are matched by
fingerprint and stick to the file picked here. Return value can be
absolute or relative; relative paths are anchored under ``vault``.
Layout below: ``<vault>/<YYYY-MM>/<YYYY-MM-DD>_<topic>.md`` where
``topic`` is a slug of the very first user message in the chat.
"""
today = date.today()
topic = slugify(record.first_user_text, maxlen=40)
return vault / f"{today:%Y-%m}" / f"{today:%Y-%m-%d}_{topic}.md"
def current_time() -> str:
"""Return the current local time as an ISO-8601 string.
@@ -162,5 +180,45 @@ gateway = Gateway(
# `messages` only work on `/v1/messages`; `mcp` only on
# `/mcp/<name>`; `*` works everywhere.
AdminFrontend(host="0.0.0.0", port=8002),
# Obsidian-vault chat frontend. Each `.md` is one conversation
# (User/Assistant turn pairs). The Obsidian companion plugin
# POSTs `{filename, content?}` to `/chat` — the frontend reads
# the file, runs the agent if the last turn is `user`, and
# appends the assistant reply back. With `log_all_chats=True`
# *every* turn (from Anthropic Messages too) is mirrored into
# `{vault}/_logs/<agent>/` so the vault is the central archive.
#
# `vault_path` here points at a per-restart tempdir so the
# example boots cleanly; in real deployments mount the
# Obsidian-sync container's vault volume to a stable path and
# pass that instead.
MarkdownFrontend(
host="0.0.0.0",
port=8003,
# Point at the dedicated chats subdir of your real Obsidian
# vault — the gateway has no idea (and no need) about other
# notes outside it. Path resolution / vault-escape checks
# are anchored here, so absolute-path attempts (and ``..``
# tricks) can't reach notes alongside it.
#
# vault_path=Path("/Users/me/Obsidian/Personal/chats"),
#
# Per-restart tempdir kept here so the example boots even
# without a real vault on the host.
vault_path=Path(tempfile.mkdtemp(prefix="beaver-vault-")).resolve(),
default_agent="research",
log_all_chats=True,
# ``log_path`` (optional) overrides the default
# ``{vault}/_logs/<agent>/<date>_<hex>.md`` layout for chats
# logged from OTHER frontends (Anthropic Messages, admin
# in-browser chat). Defined as a top-of-file function so the
# types are explicit and the IDE can hover them; a lambda
# works too, but a real ``def`` keeps the signature visible
# and lets you docstring it. Heads up: any custom path
# forces ``warm_index`` to scan the entire vault on startup
# so the fingerprint→file map survives a restart no matter
# where you put files.
log_path=chat_log_path,
),
],
)
+2
View File
@@ -7,6 +7,7 @@ authors = [
]
requires-python = ">=3.13"
dependencies = [
"aiofile>=3.11.1",
"aiohttp>=3.13.5",
"aiosqlite>=0.22.1",
"anthropic>=0.103.0",
@@ -20,6 +21,7 @@ dependencies = [
"psycopg[binary]>=3.3.4",
"pydantic>=2.13.4",
"pydantic-settings>=2.14.1",
"python-frontmatter>=1.2.0",
"sqlmodel>=0.0.38",
"uvicorn[standard]>=0.47.0",
"uvloop>=0.22.1",
+133
View File
@@ -0,0 +1,133 @@
"""Cross-frontend turn record.
Frontends that finish a turn (the Anthropic Messages frontend, the
markdown frontend) emit a :class:`TurnRecord` to every handler in
``GatewayRuntime.turn_log_handlers``. The markdown frontend uses this
to persist chats from other frontends into the Obsidian vault — see
``frontends/markdown/crossfront.py``.
Kept tiny on purpose: it carries the structured-enough payload a logger
needs (which agent ran, input history, the assembled assistant reply)
and nothing else. The full event stream is gone by the time handlers
run — if a future consumer needs deltas it would subscribe at a lower
level, not here.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from anthropic.types import Message, MessageParam
__all__ = ["TurnRecord", "slugify"]
# Filesystem-safe slug: collapse anything that isn't a word char or
# space/hyphen to a hyphen, then squash runs of separators. Aimed at
# letting users build filenames from ``record.first_user_text`` without
# hand-rolling sanitization in every config.
_SLUG_BAD_RE = re.compile(r"[^\w\s\-]+", flags=re.UNICODE)
_SLUG_SEP_RE = re.compile(r"[\s\-]+", flags=re.UNICODE)
def slugify(text: str, *, maxlen: int = 40) -> str:
"""Sanitize ``text`` for use as a filename fragment.
Strips punctuation, collapses whitespace/hyphens into single ``-``,
and truncates to ``maxlen``. Returns ``"untitled"`` for empty input.
Unicode letters are preserved (Obsidian handles them fine; macOS
and modern Linux fs's too).
"""
cleaned = _SLUG_BAD_RE.sub(" ", text).strip()
cleaned = _SLUG_SEP_RE.sub("-", cleaned).strip("-")
if not cleaned:
return "untitled"
if len(cleaned) > maxlen:
cleaned = cleaned[:maxlen].rstrip("-")
return cleaned or "untitled"
@dataclass(frozen=True, slots=True)
class TurnRecord:
"""One completed turn, as seen by a frontend.
``input_messages`` is the conversation history sent to the backend
(everything *before* the assistant reply). ``output_message`` is the
finalized assistant ``Message`` (post-accumulation, with all content
blocks attached). ``system`` is the per-request system prompt if any
— agents own their own ``system_prompt``, this is the override the
caller passed; most handlers can ignore it.
``source`` names the frontend that produced the record so the
cross-frontend logger can avoid logging its own turns (markdown
frontend writing the file would otherwise also receive its own
broadcast and double-write).
"""
agent_name: str
input_messages: list[MessageParam]
output_message: Message
system: str | None = None
source: str = "anthropic"
@property
def first_user_text(self) -> str:
"""Plain text of the *earliest* user turn in this conversation.
Useful for naming new files by topic. Empty string if the input
history somehow has no user turn (shouldn't happen — turns are
broadcast only after a user → assistant cycle).
"""
for msg in self.input_messages:
if msg.get("role") == "user":
return _text_of(msg.get("content", ""))
return ""
@property
def last_user_text(self) -> str:
"""Plain text of the most recent user turn (the trigger)."""
for msg in reversed(self.input_messages):
if msg.get("role") == "user":
return _text_of(msg.get("content", ""))
return ""
@property
def assistant_text(self) -> str:
"""Plain text of the assistant reply, with thinking/tool blocks dropped."""
chunks = [
getattr(block, "text", "") or ""
for block in getattr(self.output_message, "content", ())
if getattr(block, "type", None) == "text"
]
return "\n\n".join(c for c in chunks if c)
def _text_of(content: object) -> str:
"""Flatten an Anthropic ``MessageParam.content`` field to a plain string.
Handles both shapes the SDK accepts: raw string, or a list of block
dicts (we pull ``text`` blocks only). Anything we don't recognize is
silently skipped — these helpers exist for naming files, not for
faithful content reconstruction.
"""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for blk in content:
if not isinstance(blk, dict):
continue
# ``MessageParam.content`` is typed as a union of typed-dicts
# per Anthropic SDK; we only care about plain ``text`` blocks
# and look them up via ``Any`` to dodge the keyed-typed-dict
# variance gymnastics (``ty`` won't let an open dict alias a
# closed-keyed one).
d: Any = blk
if d.get("type") == "text":
parts.append(str(d.get("text", "")))
return "\n\n".join(p for p in parts if p)
return ""
+146
View File
@@ -0,0 +1,146 @@
"""Collapse an Anthropic event stream into one ``Message``.
Extracted from ``AnthropicMessagesFrontend`` so the markdown frontend
can run the same accumulation logic when it wants the finalized turn
rather than raw SSE chunks. Mirrors the Anthropic SDK's own accumulator:
walks events, builds block dicts indexed by their ``content_block``
index, folds text / thinking deltas in, buffers ``input_json_delta``
chunks until the block closes (then JSON-parses them once).
"""
from __future__ import annotations
import json
from typing import TYPE_CHECKING, Any, Literal, cast
from anthropic.types import (
InputJSONDelta,
Message,
RawContentBlockDeltaEvent,
RawContentBlockStartEvent,
RawContentBlockStopEvent,
RawMessageDeltaEvent,
RawMessageStartEvent,
SignatureDelta,
TextBlock,
TextDelta,
ThinkingBlock,
ThinkingDelta,
ToolUseBlock,
Usage,
)
if TYPE_CHECKING:
from collections.abc import AsyncIterator
from beaver_gateway.core.events import MessageStreamEvent, StopReason
__all__ = ["StreamAccumulator", "accumulate"]
class StreamAccumulator:
"""Folds a stream of events into one ``Message``, incrementally.
Use when you need to *both* forward events somewhere (SSE) *and*
keep a finalized ``Message`` for post-stream work (audit, logging
to disk). Call :meth:`feed` for each event, :meth:`finalize` once.
"""
__slots__ = (
"_blocks",
"_json_buffers",
"_message_id",
"_role",
"_stop_reason",
"_stop_sequence",
"_usage",
)
def __init__(self) -> None:
self._message_id: str = ""
self._role: str = "assistant"
self._usage: Usage = Usage(input_tokens=0, output_tokens=0)
self._blocks: dict[int, dict[str, Any]] = {}
self._json_buffers: dict[int, str] = {}
self._stop_reason: str | None = None
self._stop_sequence: str | None = None
def feed(self, ev: MessageStreamEvent) -> None:
# isinstance, not ``ev.type == "..."``: ty narrows on the
# discriminator only via the class, and the raw event union
# carries its own discriminators (``Raw*Event``) the SDK
# already promises.
if isinstance(ev, RawMessageStartEvent):
self._message_id = ev.message.id
self._role = ev.message.role
self._usage = ev.message.usage
elif isinstance(ev, RawContentBlockStartEvent):
self._blocks[ev.index] = ev.content_block.model_dump()
if self._blocks[ev.index].get("type") == "tool_use":
self._json_buffers[ev.index] = ""
elif isinstance(ev, RawContentBlockDeltaEvent):
blk = self._blocks.setdefault(ev.index, {})
delta = ev.delta
if isinstance(delta, TextDelta):
blk["text"] = blk.get("text", "") + delta.text
elif isinstance(delta, InputJSONDelta):
self._json_buffers[ev.index] = (
self._json_buffers.get(ev.index, "") + delta.partial_json
)
elif isinstance(delta, ThinkingDelta):
blk["thinking"] = blk.get("thinking", "") + delta.thinking
elif isinstance(delta, SignatureDelta):
blk["signature"] = delta.signature
elif isinstance(ev, RawContentBlockStopEvent):
blk = self._blocks.get(ev.index, {})
if blk.get("type") == "tool_use":
raw = self._json_buffers.pop(ev.index, "")
blk["input"] = json.loads(raw) if raw.strip() else {}
elif isinstance(ev, RawMessageDeltaEvent):
self._stop_reason = ev.delta.stop_reason
self._stop_sequence = ev.delta.stop_sequence
if ev.usage.output_tokens:
self._usage = Usage.model_validate(
{
**self._usage.model_dump(),
"output_tokens": ev.usage.output_tokens,
}
)
def finalize(self, *, model: str) -> Message:
content: list[Any] = []
for idx in sorted(self._blocks):
bd = self._blocks[idx]
btype = bd.get("type")
if btype == "text":
content.append(TextBlock.model_validate(bd))
elif btype == "tool_use":
content.append(ToolUseBlock.model_validate(bd))
elif btype == "thinking":
content.append(ThinkingBlock.model_validate(bd))
# ``role`` is always ``"assistant"`` at the wire level — we
# initialised the field to that and only overwrite from a
# ``RawMessageStartEvent`` which itself carries the same literal.
# The cast keeps both type-checkers happy without a runtime check.
return Message(
id=self._message_id or "msg_unknown",
type="message",
role=cast("Literal['assistant']", self._role),
model=model,
content=content,
stop_reason=cast("StopReason | None", self._stop_reason),
stop_sequence=self._stop_sequence,
usage=self._usage,
)
async def accumulate(
events: AsyncIterator[MessageStreamEvent], *, model: str
) -> Message:
"""Drain ``events`` into one ``Message`` (used by ``stream=false`` paths)."""
acc = StreamAccumulator()
async for ev in events:
acc.feed(ev)
return acc.finalize(model=model)
+53
View File
@@ -0,0 +1,53 @@
"""Shared bearer-token verification for HTTP frontends.
Extracted from ``AnthropicMessagesFrontend`` so the markdown frontend
(and any future bearer-protected frontend) can reuse one canonical
verifier instead of copy-pasting the header-parsing dance.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from fastapi import HTTPException, status
if TYPE_CHECKING:
from fastapi import Request
from beaver_gateway.frontends.base import GatewayRuntime
__all__ = ["require_token"]
async def require_token(
request: Request, runtime: GatewayRuntime, *, scope: str
) -> str:
"""Verify the request's bearer + scope, return the token's audit name.
Accepts both ``X-Api-Key: <token>`` (Anthropic SDK / LibreChat) and
``Authorization: Bearer <token>`` (curl, Cursor). 401 on missing /
unknown token; 403 on a known token whose scope doesn't cover
``scope``. Bootstrap tokens implicitly carry ``"*"`` and pass every
scope check.
"""
api_key = request.headers.get("x-api-key")
identity = (
await runtime.token_store.verify(api_key)
if api_key
else await runtime.token_store.verify_bearer(
request.headers.get("authorization")
)
)
if identity is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
"invalid or missing bearer token",
headers={"WWW-Authenticate": "Bearer"},
)
if not identity.allows(scope):
raise HTTPException(
status.HTTP_403_FORBIDDEN,
f"token scope {identity.scope!r} does not cover {scope!r}",
)
return identity.name
+71 -5
View File
@@ -45,6 +45,8 @@ from jinja2 import Environment, PackageLoader, select_autoescape
from beaver_gateway.core import audit
from beaver_gateway.core.auth import VALID_SCOPES, hash_token
from beaver_gateway.core.turn_record import TurnRecord
from beaver_gateway.frontends._accumulate import StreamAccumulator
from beaver_gateway.frontends.base import Frontend
from beaver_gateway.storage import (
create_token,
@@ -57,6 +59,8 @@ if TYPE_CHECKING:
from collections.abc import AsyncIterator
from datetime import datetime
from anthropic.types import MessageParam
from beaver_gateway.core.events import MessageStreamEvent
from beaver_gateway.frontends.base import GatewayRuntime
@@ -449,7 +453,15 @@ class AdminFrontend(Frontend):
system=system if isinstance(system, str) else None,
)
return StreamingResponse(
_sse_events(events), media_type="text/event-stream"
_sse_events_and_broadcast(
events,
runtime=runtime,
agent_name=agent.name,
input_messages=messages,
system=system if isinstance(system, str) else None,
model=agent.model or agent.name,
),
media_type="text/event-stream",
)
return app
@@ -510,23 +522,31 @@ def _build_endpoint_catalog(
and may have non-trivial transitive deps (aiohttp etc.).
"""
from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend
from beaver_gateway.frontends.markdown import MarkdownFrontend
from beaver_gateway.frontends.mcp_server import McpServerFrontend
scheme = request.headers.get("x-forwarded-proto") or request.url.scheme
hostname = request.url.hostname or "localhost"
def _base_for(fe: AnthropicMessagesFrontend | McpServerFrontend) -> str:
def _base_for(
fe: AnthropicMessagesFrontend | McpServerFrontend | MarkdownFrontend,
) -> str:
if fe.public_base_url:
return fe.public_base_url
return f"{scheme}://{hostname}:{fe.port}"
anthropic_base: str | None = None
mcp_base: str | None = None
markdown_fe: MarkdownFrontend | None = None
markdown_base: str | None = None
for fe in runtime.frontends:
if isinstance(fe, AnthropicMessagesFrontend) and anthropic_base is None:
anthropic_base = _base_for(fe)
elif isinstance(fe, McpServerFrontend) and mcp_base is None:
mcp_base = _base_for(fe)
elif isinstance(fe, MarkdownFrontend) and markdown_fe is None:
markdown_fe = fe
markdown_base = _base_for(fe)
agent_rows: list[dict[str, Any]] = []
if anthropic_base is not None:
@@ -554,27 +574,57 @@ def _build_endpoint_catalog(
{"namespace": "all", "kind": "bundle", "url": f"{mcp_base}/all/"}
)
markdown_row: dict[str, Any] | None = None
if markdown_fe is not None and markdown_base is not None:
# Pick a sane example filename from the vault, prefer something
# the user can recognise. Falls back to a generic ``chat.md``.
sample_agent = markdown_fe.default_agent or next(
(a.name for a in runtime.agents), "<agent>"
)
markdown_row = {
"url": f"{markdown_base}/chat",
"vault_path": str(markdown_fe.vault_path),
"default_agent": markdown_fe.default_agent,
"log_all_chats": markdown_fe.log_all_chats,
"logged_subdir": markdown_fe.logged_subdir,
"sample_agent": sample_agent,
}
return {
"anthropic_base": anthropic_base,
"mcp_base": mcp_base,
"markdown_base": markdown_base,
"agents": agent_rows,
"mcps": mcp_rows,
"markdown": markdown_row,
}
async def _sse_events(
async def _sse_events_and_broadcast(
events: AsyncIterator[MessageStreamEvent],
*,
runtime: GatewayRuntime,
agent_name: str,
input_messages: list[MessageParam],
system: str | None,
model: str,
) -> AsyncIterator[bytes]:
r"""Serialize a backend stream into Anthropic's ``text/event-stream`` form.
r"""Serialize a backend stream as SSE and broadcast a ``TurnRecord`` after.
Same wire shape as :mod:`beaver_gateway.frontends.anthropic` —
duplicated rather than imported so the admin frontend stays
independent of that module's private helpers, and so a mid-stream
failure surfaces as an in-band ``error`` event the chat UI can
render rather than a dangling connection.
render rather than a dangling connection. The events also feed a
side ``StreamAccumulator`` so once the SSE response closes we hand
the assembled ``Message`` to ``runtime.turn_log_handlers`` (the
markdown frontend's archive logger lives there). ``source="admin"``
so the cross-frontend logger knows where the turn came from.
"""
acc = StreamAccumulator()
try:
async for ev in events:
acc.feed(ev)
payload = ev.model_dump_json()
yield f"event: {ev.type}\ndata: {payload}\n\n".encode()
except Exception as exc: # noqa: BLE001
@@ -583,6 +633,22 @@ async def _sse_events(
{"type": "error", "error": {"type": "api_error", "message": str(exc)}}
)
yield f"event: error\ndata: {err}\n\n".encode()
return
if not runtime.turn_log_handlers:
return
message = acc.finalize(model=model)
record = TurnRecord(
agent_name=agent_name,
input_messages=list(input_messages),
output_message=message,
system=system,
source="admin",
)
for handler in runtime.turn_log_handlers:
try:
await handler(record)
except Exception: # noqa: BLE001
_log.exception("turn_log_handler raised; continuing")
def _set_session_cookie(response: Response, value: str) -> None:
@@ -134,6 +134,41 @@
add one to <code>Gateway(frontends=[...])</code> to expose them over HTTP.
</p>
{% endif %}
{% if endpoints.markdown %}
<h3 class="ep-h3">Markdown — <code>POST /chat</code></h3>
<table class="ep-table" data-required-scope="messages">
<thead><tr><th>Vault</th><th>Default agent</th><th>URL</th><th class="ep-actions-th"></th></tr></thead>
<tbody>
<tr class="ep-row"
data-kind="markdown"
data-url="{{ endpoints.markdown.url }}"
data-sample-agent="{{ endpoints.markdown.sample_agent }}">
<td>
<code>{{ endpoints.markdown.vault_path }}</code>
<span class="ep-scope-warn" hidden>scope mismatch</span>
{% if endpoints.markdown.log_all_chats %}
<span class="pill">log_all_chats · {{ endpoints.markdown.logged_subdir }}/</span>
{% endif %}
</td>
<td>
{% if endpoints.markdown.default_agent %}
<code>{{ endpoints.markdown.default_agent }}</code>
{% else %}
<span class="muted">— (request must set <code>agent</code> or frontmatter)</span>
{% endif %}
</td>
<td><code class="ep-url">{{ endpoints.markdown.url }}</code></td>
<td class="ep-actions">
<button type="button" data-action="copy-url">Copy URL</button>
<button type="button" data-action="copy-curl">Copy curl</button>
<button type="button" data-action="toggle-curl" aria-expanded="false">▸ curl</button>
</td>
</tr>
<tr class="ep-curl-row" hidden><td colspan="4"><pre class="ep-curl"></pre></td></tr>
</tbody>
</table>
{% endif %}
{% else %}
<p class="muted">
Nothing exposed yet — declare agents / MCPs and the matching frontends
@@ -284,6 +319,21 @@
" " + url,
].join("\n");
}
if (kind === "markdown") {
const agent = row.getAttribute("data-sample-agent") || "";
const body = JSON.stringify({
filename: "example.md",
content: "### User:\n\nhello\n",
agent: agent,
});
return [
"curl \\",
" -H " + escSh("Authorization: Bearer " + tok) + " \\",
" -H 'content-type: application/json' \\",
" -d " + escSh(body) + " \\",
" " + url,
].join("\n");
}
if (kind === "mcp") {
// Streamable-HTTP MCP wants `initialize` first — the response
// carries the `Mcp-Session-Id` header you must echo back on
+87 -135
View File
@@ -21,31 +21,20 @@ import json
import logging
from typing import TYPE_CHECKING, Any
from anthropic.types import (
InputJSONDelta,
Message,
RawContentBlockDeltaEvent,
RawContentBlockStartEvent,
RawContentBlockStopEvent,
RawMessageDeltaEvent,
RawMessageStartEvent,
SignatureDelta,
TextBlock,
TextDelta,
ThinkingBlock,
ThinkingDelta,
ToolUseBlock,
Usage,
)
from fastapi import FastAPI, HTTPException, Request, status
from fastapi.responses import JSONResponse, StreamingResponse
from beaver_gateway.core import audit
from beaver_gateway.core.turn_record import TurnRecord
from beaver_gateway.frontends._accumulate import StreamAccumulator, accumulate
from beaver_gateway.frontends._auth import require_token
from beaver_gateway.frontends.base import Frontend
if TYPE_CHECKING:
from collections.abc import AsyncIterator
from anthropic.types import Message, MessageParam
from beaver_gateway.core.events import MessageStreamEvent
from beaver_gateway.frontends.base import GatewayRuntime
@@ -109,7 +98,7 @@ class AnthropicMessagesFrontend(Frontend):
@app.get("/v1/models")
async def list_models(request: Request) -> dict[str, Any]:
await _require_token(request, runtime, scope="messages")
await require_token(request, runtime, scope="messages")
data = [
{
"type": "model",
@@ -123,7 +112,7 @@ class AnthropicMessagesFrontend(Frontend):
@app.post("/v1/messages")
async def create_message(request: Request) -> Any:
token_name = await _require_token(request, runtime, scope="messages")
token_name = await require_token(request, runtime, scope="messages")
try:
body = await request.json()
except json.JSONDecodeError as exc:
@@ -193,58 +182,63 @@ class AnthropicMessagesFrontend(Frontend):
**options,
)
system_str = system if isinstance(system, str) else None
if stream_flag:
return StreamingResponse(_sse(events), media_type="text/event-stream")
message = await _accumulate(events, model=model)
# Side-accumulate while streaming so we can still emit a
# ``TurnRecord`` after the response closes. The buffered
# ``Message`` lives only in this coroutine's frame; SSE
# bytes still flow to the client unchanged.
return StreamingResponse(
_sse_and_broadcast(
events,
runtime=runtime,
agent_name=agent.name,
input_messages=messages,
system=system_str,
model=model,
),
media_type="text/event-stream",
)
message = await accumulate(events, model=model)
await _broadcast_turn(
runtime,
agent_name=agent.name,
input_messages=messages,
output_message=message,
system=system_str,
)
return JSONResponse(content=message.model_dump(mode="json"))
return app
async def _require_token(
request: Request, runtime: GatewayRuntime, *, scope: str
) -> str:
"""Verify the request's bearer + scope, return the token's audit name.
Accepts both ``X-Api-Key: <token>`` (what the official Anthropic
SDK sends — LibreChat, the CLI, third-party clients) and
``Authorization: Bearer <token>`` (curl, Cursor). 401 on missing /
unknown token; 403 on a known token whose scope doesn't cover
``scope``. Bootstrap tokens implicitly carry ``"*"`` and pass
every scope check.
"""
api_key = request.headers.get("x-api-key")
identity = (
await runtime.token_store.verify(api_key)
if api_key
else await runtime.token_store.verify_bearer(
request.headers.get("authorization")
)
)
if identity is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
"invalid or missing bearer token",
headers={"WWW-Authenticate": "Bearer"},
)
if not identity.allows(scope):
raise HTTPException(
status.HTTP_403_FORBIDDEN,
f"token scope {identity.scope!r} does not cover {scope!r}",
)
return identity.name
async def _sse(events: AsyncIterator[MessageStreamEvent]) -> AsyncIterator[bytes]:
r"""Serialize an event stream into Anthropic's ``text/event-stream`` form.
async def _sse_and_broadcast(
events: AsyncIterator[MessageStreamEvent],
*,
runtime: GatewayRuntime,
agent_name: str,
input_messages: list[MessageParam],
system: str | None,
model: str,
) -> AsyncIterator[bytes]:
r"""Serialize an event stream to SSE; broadcast a :class:`TurnRecord` after.
Each event becomes ``event: <type>\ndata: <json>\n\n`` — the shape
the Anthropic SDK's SSE decoder expects. Errors mid-stream are
swallowed into a synthetic ``error`` event so the client sees the
failure rather than a hung connection.
The same events feed a :class:`StreamAccumulator` on the side so that
once the SSE response closes we can hand a fully-assembled
``Message`` to every ``runtime.turn_log_handlers`` entry (the
markdown frontend's archive logger lives in there). Broadcast
failures are caught — they must never bubble up to the client.
"""
acc = StreamAccumulator()
try:
async for ev in events:
acc.feed(ev)
payload = ev.model_dump_json()
yield f"event: {ev.type}\ndata: {payload}\n\n".encode()
except Exception as exc: # noqa: BLE001
@@ -253,83 +247,41 @@ async def _sse(events: AsyncIterator[MessageStreamEvent]) -> AsyncIterator[bytes
{"type": "error", "error": {"type": "api_error", "message": str(exc)}}
)
yield f"event: error\ndata: {err}\n\n".encode()
async def _accumulate(
events: AsyncIterator[MessageStreamEvent], *, model: str
) -> Message:
"""Collapse a stream-event sequence into one ``Message`` (``stream=false``).
Mirrors the Anthropic SDK's own accumulator: walk events, build
block dicts indexed by their ``content_block`` index, fold text /
thinking deltas in, buffer ``input_json_delta`` chunks until the
block closes (then JSON-parse them once).
"""
message_id = ""
role = "assistant"
usage = Usage(input_tokens=0, output_tokens=0)
blocks: dict[int, dict[str, Any]] = {}
json_buffers: dict[int, str] = {}
stop_reason: str | None = None
stop_sequence: str | None = None
async for ev in events:
# isinstance, not ``ev.type == "..."``: ty narrows on the
# discriminator only via the class, and the raw event union
# carries its own discriminators (``Raw*Event``) the SDK
# already promises.
if isinstance(ev, RawMessageStartEvent):
message_id = ev.message.id
role = ev.message.role
usage = ev.message.usage
elif isinstance(ev, RawContentBlockStartEvent):
blocks[ev.index] = ev.content_block.model_dump()
if blocks[ev.index].get("type") == "tool_use":
json_buffers[ev.index] = ""
elif isinstance(ev, RawContentBlockDeltaEvent):
blk = blocks.setdefault(ev.index, {})
delta = ev.delta
if isinstance(delta, TextDelta):
blk["text"] = blk.get("text", "") + delta.text
elif isinstance(delta, InputJSONDelta):
json_buffers[ev.index] = (
json_buffers.get(ev.index, "") + delta.partial_json
)
elif isinstance(delta, ThinkingDelta):
blk["thinking"] = blk.get("thinking", "") + delta.thinking
elif isinstance(delta, SignatureDelta):
blk["signature"] = delta.signature
elif isinstance(ev, RawContentBlockStopEvent):
blk = blocks.get(ev.index, {})
if blk.get("type") == "tool_use":
raw = json_buffers.pop(ev.index, "")
blk["input"] = json.loads(raw) if raw.strip() else {}
elif isinstance(ev, RawMessageDeltaEvent):
stop_reason = ev.delta.stop_reason
stop_sequence = ev.delta.stop_sequence
if ev.usage.output_tokens:
usage = Usage.model_validate(
{**usage.model_dump(), "output_tokens": ev.usage.output_tokens}
)
content = []
for idx in sorted(blocks):
bd = blocks[idx]
btype = bd.get("type")
if btype == "text":
content.append(TextBlock.model_validate(bd))
elif btype == "tool_use":
content.append(ToolUseBlock.model_validate(bd))
elif btype == "thinking":
content.append(ThinkingBlock.model_validate(bd))
return Message(
id=message_id or "msg_unknown",
type="message",
role=role,
model=model,
content=content,
stop_reason=stop_reason, # type: ignore[arg-type]
stop_sequence=stop_sequence,
usage=usage,
return
message = acc.finalize(model=model)
await _broadcast_turn(
runtime,
agent_name=agent_name,
input_messages=input_messages,
output_message=message,
system=system,
)
async def _broadcast_turn(
runtime: GatewayRuntime,
*,
agent_name: str,
input_messages: list[MessageParam],
output_message: Message,
system: str | None,
) -> None:
"""Fire each ``turn_log_handlers`` entry with a fresh :class:`TurnRecord`.
Handler exceptions are caught and logged — they're observability
plumbing, not part of the user-visible request path.
"""
if not runtime.turn_log_handlers:
return
record = TurnRecord(
agent_name=agent_name,
input_messages=list(input_messages),
output_message=output_message,
system=system,
source="anthropic",
)
for handler in runtime.turn_log_handlers:
try:
await handler(record)
except Exception: # noqa: BLE001
_log.exception("turn_log_handler raised; continuing")
+16 -1
View File
@@ -15,13 +15,16 @@ from dataclasses import dataclass, field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Mapping, Sequence
from collections.abc import Awaitable, Callable, Mapping, Sequence
from beaver_gateway.backends.base import Backend
from beaver_gateway.core.auth import TokenStore
from beaver_gateway.core.registry import AgentRegistry, McpRegistry
from beaver_gateway.core.turn_record import TurnRecord
from beaver_gateway.storage import Database
TurnLogHandler = Callable[[TurnRecord], Awaitable[None]]
@dataclass(frozen=True, slots=True)
class GatewayRuntime:
@@ -62,6 +65,18 @@ class GatewayRuntime:
# the dashboard so the operator can copy ready-to-use links / curl
# snippets. Other frontends ignore it.
frontends: Sequence[Frontend] = field(default_factory=tuple)
# Frontends that finish a turn (Anthropic Messages, Markdown) iterate
# this list and ``await`` each handler with a ``TurnRecord``. Handlers
# are appended during ``configure()`` by frontends that want a
# cross-frontend chat archive — currently the markdown frontend's
# ``log_all_chats`` mode. Handler exceptions are caught at the call
# site; they never block the user-visible response.
#
# The field is typed as ``list[Any]`` rather than the precise
# ``list[TurnLogHandler]`` because the alias lives under TYPE_CHECKING
# to keep ``anthropic.types`` out of the runtime import graph for
# this base module.
turn_log_handlers: list[TurnLogHandler] = field(default_factory=list)
class Frontend(ABC):
@@ -0,0 +1,15 @@
"""Markdown frontend — turn-by-turn chat archive backed by ``.md`` files.
The user maintains chats as plain markdown files in an Obsidian vault.
A plugin in Obsidian POSTs ``{filename, content?}`` to ``/chat`` and the
frontend parses the file, finds the last turn, and runs the agent if
the last turn is ``user``. The full response is appended back to the
file as an ``### Assistant:`` turn. With ``log_all_chats=True`` the
frontend also subscribes to every other frontend's turns and writes
them into ``{vault_path}/{logged_subdir}/`` so the vault is the single
chronological archive of all conversations.
"""
from beaver_gateway.frontends.markdown.frontend import MarkdownFrontend
__all__ = ["MarkdownFrontend"]
@@ -0,0 +1,302 @@
"""Cross-frontend chat logger.
When ``MarkdownFrontend(log_all_chats=True)`` is configured, every turn
completed by any other frontend (currently the Anthropic Messages
frontend) is mirrored into the vault as a ``.md`` file. Subsequent
turns of the same conversation append to the same file — matched by a
content-hash fingerprint stored in YAML frontmatter.
The fingerprint hashes the message history *before* the new assistant
reply. So the next request's input history (which now includes the
prior assistant reply) hashes to the value we just persisted —
``hash(prev_input + [assistant_reply])`` — and the lookup hits the
same file. New conversations (no prior fingerprint match) get a fresh
file under ``{vault_path}/{logged_subdir}/{agent_name}/``.
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Any
import frontmatter
from beaver_gateway.frontends.markdown import renderer
if TYPE_CHECKING:
from collections.abc import Callable, Iterable
from pathlib import Path
from anthropic.types import MessageParam
from beaver_gateway.core.turn_record import TurnRecord
_log = logging.getLogger("beaver_gateway.frontends.markdown.crossfront")
# User hook: take a turn + vault root, return where the new file should
# live. Returning a relative ``Path`` is treated as relative to the
# vault. ``None`` (the default) keeps the built-in
# ``{vault}/{logged_subdir}/{agent}/{YYYY-MM-DD}_{hex8}.md`` layout.
LogPathFn = "Callable[[TurnRecord, Path], Path]"
__all__ = ["CrossFrontendLogger", "LogPathFn", "fingerprint_messages"]
def fingerprint_messages(messages: Iterable[MessageParam]) -> str:
"""Stable, short hex hash of a conversation prefix.
Built from ``(role, normalized_content)`` pairs only — so the
Markdown frontend's parser-shaped messages (text-only) and the
Anthropic frontend's raw ``messages`` payload (which may also be
string-only at v1) hash compatibly when they represent the same
conversation. Tool blocks / images would diverge, but those aren't
in the v1 ingest path.
"""
h = hashlib.sha1(usedforsecurity=False)
for msg in messages:
role = str(msg.get("role", ""))
content = msg.get("content", "")
if isinstance(content, str):
text = content
else:
parts: list[str] = []
for blk in content:
if not isinstance(blk, dict):
continue
btype = blk.get("type")
if btype == "text":
parts.append(str(blk.get("text", "")))
text = "\n".join(parts)
h.update(role.encode("utf-8"))
h.update(b"\x00")
h.update(text.strip().encode("utf-8"))
h.update(b"\x01")
return h.hexdigest()[:16]
class CrossFrontendLogger:
"""Maintains the fingerprint→file map and writes turns to disk.
The map is in-process; on startup ``warm_index`` rebuilds it from
YAML frontmatter of every file under ``logged_subdir``. A miss
creates a new file, a hit appends to the existing one. All disk
work funnels through one ``asyncio.Lock`` because the writes are
cheap and serializing them sidesteps a class of races we don't need
to think about.
"""
def __init__(
self,
*,
vault_path: Path,
logged_subdir: str,
log_path: Callable[[TurnRecord, Path], Path] | None = None,
) -> None:
self._vault = vault_path
self._root = vault_path / logged_subdir
self._index: dict[str, Path] = {}
self._lock = asyncio.Lock()
self._log_path_fn = log_path
# When the user supplies a custom path function, files can land
# anywhere in the vault — so we have to scan the whole vault on
# startup to rebuild the fingerprint→path map. With the default
# layout we can bound the scan to ``_logs/``.
self._scan_root = vault_path if log_path is not None else self._root
def warm_index(self) -> None:
"""Scan logged files synchronously, populating the fingerprint map.
Called from ``MarkdownFrontend.configure`` so the map is ready
before any cross-frontend turn arrives. ``frontmatter.load``
reads only enough of the file to parse the YAML head, so the
scan is cheap even on large vaults — but a custom ``log_path``
forces a full-vault walk; mention that in the constructor doc.
"""
if not self._scan_root.exists():
return
for path in self._scan_root.rglob("*.md"):
try:
post = frontmatter.load(str(path))
except Exception: # noqa: BLE001
_log.warning("could not read frontmatter from %s — skipping", path)
continue
fp = post.metadata.get("fingerprint")
if isinstance(fp, str) and fp:
self._index[fp] = path
_log.info(
"crossfront index warmed: %d logged file(s) under %s",
len(self._index),
self._scan_root,
)
async def handle(self, record: TurnRecord) -> None:
"""Append or create a logged file for ``record``.
Records that the markdown frontend itself produced
(``source=="markdown"``) are skipped — those already live in the
user's hand-written file and shouldn't be duplicated into the
``_logs`` shadow tree.
"""
if record.source == "markdown":
return
async with self._lock:
# ``input_messages`` is the *full* history sent to the backend
# (last entry is the new user turn). Match against the prefix
# that excludes the new user turn — that's what the previous
# write stored as its fingerprint. Empty prefix is the
# well-known "brand new chat" sentinel.
prefix = record.input_messages[:-1]
prev_fp = fingerprint_messages(prefix) if prefix else None
target = self._index.get(prev_fp) if prev_fp else None
if target is None:
target = self._new_file_path(record)
# Build the full history including the assistant reply; the
# new fingerprint matches *that* prefix, so the next user
# turn (history grows by one user msg) will hit this file.
assistant_msg: MessageParam = {
"role": "assistant",
"content": _flatten_text(record.output_message),
}
full_history = [*record.input_messages, assistant_msg]
new_fp = fingerprint_messages(full_history)
if target.exists():
existing = target.read_text(encoding="utf-8")
parsed = frontmatter.loads(existing)
body = _strip_trailing_user_scaffold(parsed.content)
# We append only the *new* user turn (the last one in
# input_messages, since prior turns are already on disk)
# plus the assistant reply.
new_user = record.input_messages[-1]
new_block = renderer.render_user_param(new_user)
new_block = renderer.append_to_body(
new_block, renderer.render_assistant_message(record.output_message)
)
new_body = renderer.append_to_body(body, new_block)
metadata = dict(parsed.metadata)
else:
# Materialize the whole conversation from scratch.
new_body = _render_full_history(
record.input_messages, record.output_message
)
metadata = {}
new_body = renderer.append_to_body(new_body, renderer.USER_SCAFFOLD)
metadata["agent"] = record.agent_name
metadata["fingerprint"] = new_fp
metadata["source"] = record.source
self._write(target, metadata, new_body)
# Maintain the index: drop the old fp (it's stale once we
# write the new turn), add the new one.
if prev_fp:
self._index.pop(prev_fp, None)
self._index[new_fp] = target
# ---- internals -----------------------------------------------------
def _new_file_path(self, record: TurnRecord) -> Path:
"""Pick a fresh filename for a brand-new conversation.
With a user-supplied ``log_path`` we delegate to it (joining a
relative result with the vault root). Without one, we fall back
to ``{logged_subdir}/{agent}/{date}_{hex8}.md`` and ensure the
``.md`` suffix in case the user picks a non-md extension by hand.
"""
if self._log_path_fn is not None:
result = self._log_path_fn(record, self._vault)
if not result.is_absolute():
result = self._vault / result
if result.suffix != ".md":
result = result.with_suffix(".md")
result.parent.mkdir(parents=True, exist_ok=True)
return result
day = datetime.now(UTC).strftime("%Y-%m-%d")
# Short hex from the input hash so two same-day chats sort
# stably and don't collide.
salt = fingerprint_messages(record.input_messages)[:8]
agent_dir = self._root / record.agent_name
agent_dir.mkdir(parents=True, exist_ok=True)
return agent_dir / f"{day}_{salt}.md"
def _write(self, path: Path, metadata: dict[str, Any], body: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
if metadata:
post = frontmatter.Post(content=body, **metadata)
text = frontmatter.dumps(post) + "\n"
else:
text = body if body.endswith("\n") else body + "\n"
# Sync write inside the lock — keeps the implementation tiny;
# individual logged turns are small enough that the blocking
# write doesn't matter at human conversation rates.
path.write_text(text, encoding="utf-8")
def _flatten_text(message: Any) -> str:
"""Same as ``frontend._flatten_assistant_text`` but local to break a cycle."""
chunks = [
getattr(block, "text", "") or ""
for block in getattr(message, "content", ())
if getattr(block, "type", None) == "text"
]
return "\n\n".join(c for c in chunks if c)
def _render_full_history(messages: list[MessageParam], assistant: Any) -> str:
"""Render an entire conversation (used when materializing a new logged file)."""
blocks: list[str] = []
for msg in messages:
role = msg.get("role")
if role == "user":
blocks.append(renderer.render_user_param(msg))
elif role == "assistant":
content = msg.get("content", "")
text = content if isinstance(content, str) else _content_to_text(content)
blocks.append(f"### Assistant:\n\n{text.strip()}\n" if text.strip() else "")
blocks.append(renderer.render_assistant_message(assistant))
body = ""
for block in blocks:
if not block:
continue
body = renderer.append_to_body(body, block)
return body
def _strip_trailing_user_scaffold(body: str) -> str:
"""Drop a trailing empty ``### User:`` block if present.
Cross-frontend turns aren't typed into the file by the human — they
arrive whole from another frontend. If we leave the previous run's
scaffold in place, we'd write the new user turn right after an
empty marker (visual noise, two ``### User:`` headers in a row).
Trim it and let the append flow add a fresh scaffold at the end.
"""
stripped = body.rstrip()
marker = "### User:"
if not stripped.endswith(marker):
return body
# Walk back: the scaffold is the marker preceded by either start-of-file
# or an HR/blank line. Find the last newline before the marker, cut.
head = stripped[: -len(marker)].rstrip()
if head.endswith("---"):
head = head[: -len("---")].rstrip()
return head
def _content_to_text(content: Any) -> str:
if not isinstance(content, list):
return ""
chunks = [
str(blk.get("text", ""))
for blk in content
if isinstance(blk, dict) and blk.get("type") == "text"
]
return "\n\n".join(chunks)
@@ -0,0 +1,404 @@
"""``MarkdownFrontend`` — chat-via-markdown-files frontend.
Wires:
* ``POST /chat {filename, content?, agent?}`` — bearer-authenticated
trigger. The plugin in Obsidian fires this after the user edits a
``.md`` and the file gets synced (or with ``content`` to short-circuit
the sync delay). We parse the file, check the last turn — assistant
→ no-op, user → run the agent and append.
* ``GET /healthz`` — liveness.
Concurrency model: an in-memory ``set[Path]`` of files currently in
flight. Two concurrent requests for the same file → the second gets
409. The set is single-process (one gateway instance) — that's by
design; the markdown frontend is the only writer in its vault from
the gateway side.
Cross-frontend logging: when ``log_all_chats=True``, ``configure()``
registers a handler on ``runtime.turn_log_handlers`` so every other
frontend's completed turns also land in the vault. The handler logic
lives in :mod:`.crossfront` so this module stays focused on the HTTP
shape.
"""
from __future__ import annotations
import contextlib
import json
import logging
import os
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Any
import aiofile
from fastapi import FastAPI, HTTPException, Request, status
from fastapi.responses import JSONResponse
from beaver_gateway.core import audit
from beaver_gateway.core.turn_record import TurnRecord
from beaver_gateway.frontends._accumulate import accumulate
from beaver_gateway.frontends._auth import require_token
from beaver_gateway.frontends.base import Frontend
from beaver_gateway.frontends.markdown import parser, renderer
from beaver_gateway.frontends.markdown.crossfront import (
CrossFrontendLogger,
fingerprint_messages,
)
if TYPE_CHECKING:
from collections.abc import Callable
from anthropic.types import MessageParam
from beaver_gateway.frontends.base import GatewayRuntime
_log = logging.getLogger("beaver_gateway.frontends.markdown")
__all__ = ["MarkdownFrontend"]
class MarkdownFrontend(Frontend):
"""FastAPI app behind ``POST /chat`` driven by Obsidian-vault files."""
def __init__(
self,
*,
vault_path: Path | str,
host: str = "0.0.0.0", # noqa: S104
port: int = 8003,
default_agent: str | None = None,
log_all_chats: bool = False,
logged_subdir: str = "_logs",
log_path: Callable[[TurnRecord, Path], Path] | None = None,
public_base_url: str | None = None,
) -> None:
self.vault_path = Path(vault_path).expanduser().resolve()
self.host = host
self.port = port
self.default_agent = default_agent
self.log_all_chats = log_all_chats
self.logged_subdir = logged_subdir
self.log_path = log_path
# External URL prefix when behind a reverse proxy — same role as
# on the other bearer frontends. Trailing slash trimmed for
# idempotent concatenation; ``None`` means "no proxy / advertise
# raw host:port".
self.public_base_url = public_base_url.rstrip("/") if public_base_url else None
self._runtime: GatewayRuntime | None = None
self._app: FastAPI | None = None
# Files currently being processed by an in-flight ``POST /chat``.
# Checked-and-added atomically in the request handler (no
# ``await`` between the check and the insert) so a concurrent
# request reliably loses the race to 409.
self._busy: set[Path] = set()
self._crossfront: CrossFrontendLogger | None = None
def configure(self, runtime: GatewayRuntime) -> None:
self._runtime = runtime
self.vault_path.mkdir(parents=True, exist_ok=True)
if self.log_all_chats:
self._crossfront = CrossFrontendLogger(
vault_path=self.vault_path,
logged_subdir=self.logged_subdir,
log_path=self.log_path,
)
# Scan the existing logged files synchronously here so the
# fingerprint→path map is populated before the first
# cross-frontend turn arrives. Cheap: frontmatter-only read.
self._crossfront.warm_index()
runtime.turn_log_handlers.append(self._crossfront.handle)
self._app = self._build_app(runtime)
async def serve(self) -> None:
import uvicorn
if self._app is None:
msg = "configure() must be called before serve()"
raise RuntimeError(msg)
config = uvicorn.Config(
self._app, host=self.host, port=self.port, log_level="info"
)
server = uvicorn.Server(config)
await server.serve()
# ---- app builder ---------------------------------------------------
def _build_app(self, runtime: GatewayRuntime) -> FastAPI:
app = FastAPI(title="beaver-gateway / Markdown")
@app.get("/healthz")
async def healthz() -> dict[str, str]:
return {"status": "ok"}
@app.post("/chat")
async def chat(request: Request) -> Any:
token_name = await require_token(request, runtime, scope="messages")
try:
body = await request.json()
except json.JSONDecodeError as exc:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, f"invalid JSON: {exc}"
) from exc
filename = body.get("filename")
if not isinstance(filename, str) or not filename.strip():
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "missing or non-string `filename`"
)
content_override = body.get("content")
agent_override = body.get("agent")
if agent_override is not None and not isinstance(agent_override, str):
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "`agent` must be a string"
)
file_path = self._resolve_path(filename)
# Atomic check-and-claim: both ops run between awaits, so a
# second request can't slip into the same file slot.
if file_path in self._busy:
return JSONResponse(
status_code=status.HTTP_409_CONFLICT,
content={"status": "in_progress", "filename": filename},
)
self._busy.add(file_path)
try:
return await self._handle_chat(
runtime=runtime,
token_name=token_name,
filename=filename,
file_path=file_path,
content_override=content_override,
agent_override=agent_override,
)
finally:
self._busy.discard(file_path)
return app
# ---- dispatch ------------------------------------------------------
async def _handle_chat(
self,
*,
runtime: GatewayRuntime,
token_name: str,
filename: str,
file_path: Path,
content_override: Any,
agent_override: str | None,
) -> Any:
if isinstance(content_override, str):
await _write_atomic(file_path, content_override)
file_text = content_override
elif content_override is None:
file_text = await _read_or_empty(file_path)
else:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "`content` must be a string when present"
)
parsed = parser.parse(file_text)
agent_name = parser.resolve_agent(
metadata=parsed.metadata,
request_override=agent_override,
default=self.default_agent,
)
if not agent_name:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
"no agent specified: pass `agent`, set frontmatter, "
"or configure `default_agent`",
)
# When the parser produced no messages (file empty / only
# frontmatter), there's nothing to dispatch.
if not parsed.messages:
return {"status": "nothing_to_do", "reason": "empty file"}
if parser.last_role(parsed.messages) == "assistant":
return {"status": "nothing_to_do", "reason": "last turn is assistant"}
agent = runtime.agents.get(agent_name)
if agent is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND, f"unknown agent: {agent_name!r}"
)
backend = runtime.backends.get(agent.name)
if backend is None:
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE,
f"no backend configured for agent {agent.name!r}",
)
_log.info(
"chat: actor=%s agent=%s file=%s msgs=%d",
token_name,
agent.name,
filename,
len(parsed.messages),
)
await audit.log(
runtime,
actor=f"token:{token_name}",
kind="markdown_chat",
agent_name=agent.name,
filename=filename,
msgs=len(parsed.messages),
)
try:
events = backend.complete(
agent=agent, messages=parsed.messages, system=None
)
message = await accumulate(events, model=agent.model or agent.name)
except Exception as exc:
_log.exception("backend failed for %s", filename)
error_block = _render_error_block(exc)
new_body = renderer.append_to_body(parsed.body, error_block)
await _write_atomic(
file_path, _reattach_frontmatter(parsed.metadata, new_body)
)
raise HTTPException(
status.HTTP_500_INTERNAL_SERVER_ERROR, f"backend error: {exc}"
) from exc
rendered = renderer.render_assistant_message(message)
new_body = renderer.append_to_body(parsed.body, rendered)
new_body = renderer.append_to_body(new_body, renderer.USER_SCAFFOLD)
# Recompute fingerprint so a future cross-frontend hit on this
# same conversation can find it. Stored as hex string in
# frontmatter — only the markdown frontend reads it.
assistant_param: MessageParam = {
"role": "assistant",
"content": _flatten_assistant_text(message),
}
updated_messages: list[MessageParam] = [*parsed.messages, assistant_param]
updated_metadata = dict(parsed.metadata)
updated_metadata["agent"] = agent.name
updated_metadata["fingerprint"] = fingerprint_messages(updated_messages)
await _write_atomic(
file_path, _reattach_frontmatter(updated_metadata, new_body)
)
# Broadcast our own turn so other handlers (none today, but the
# symmetry is worth keeping) see what happened. ``source`` marks
# the origin so ``CrossFrontendLogger`` can skip its own files.
record = TurnRecord(
agent_name=agent.name,
input_messages=list(parsed.messages),
output_message=message,
system=None,
source="markdown",
)
for handler in runtime.turn_log_handlers:
try:
await handler(record)
except Exception: # noqa: BLE001
_log.exception("turn_log_handler raised; continuing")
return {"status": "ok", "turns_appended": 1, "agent": agent.name}
# ---- helpers -------------------------------------------------------
def _resolve_path(self, filename: str) -> Path:
"""Resolve ``filename`` under the vault; reject escapes."""
# ``filename`` may be relative or absolute; we always anchor
# under ``vault_path`` so absolute paths from outside the vault
# don't sneak through. ``Path("/foo/bar")`` combined with a
# vault path keeps the absolute side; we strip leading slashes
# to coerce the rooted form into a relative path before joining.
rel = filename.lstrip("/")
if not rel.endswith(".md"):
rel = rel + ".md"
candidate = (self.vault_path / rel).resolve()
try:
candidate.relative_to(self.vault_path)
except ValueError as exc:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, f"filename escapes vault: {filename!r}"
) from exc
return candidate
# ---- module-level utilities ----------------------------------------------
async def _read_or_empty(path: Path) -> str:
"""Return file contents, or empty string if the file doesn't exist."""
# ``path.exists()`` here is a metadata stat — microseconds — and
# gating an async read on whether the file is there is exactly the
# check we want. Switching to anyio.Path / aiofiles.os just to
# silence the async-pathlib lint would cost a dep edge for no
# practical win.
if not path.exists(): # noqa: ASYNC240
return ""
async with aiofile.async_open(path, "r", encoding="utf-8") as f:
return await f.read()
async def _write_atomic(path: Path, content: str) -> None:
"""Write ``content`` to ``path`` via tmp + ``os.replace`` (atomic)."""
path.parent.mkdir(parents=True, exist_ok=True)
# ``NamedTemporaryFile`` keeps the file open which complicates
# ``os.replace`` on some platforms. Build the tmp name manually,
# write+fsync, then rename. Same-directory so the rename is atomic.
tmp_name = tempfile.mkstemp(
prefix=f".{path.name}.", suffix=".tmp", dir=str(path.parent)
)
fd, tmp_path = tmp_name
try:
async with aiofile.async_open(tmp_path, "w", encoding="utf-8") as f:
await f.write(content)
os.close(fd)
# ``os.replace`` is the atomic primitive — ``Path.replace`` is a
# thin wrapper around the same syscall; either works, ``os.`` is
# the one Linux/POSIX docs reach for.
os.replace(tmp_path, path) # noqa: PTH105
except BaseException:
# Cleanup on failure: close fd, remove tmp.
with contextlib.suppress(OSError):
os.close(fd)
with contextlib.suppress(OSError):
os.unlink(tmp_path) # noqa: PTH108
raise
def _reattach_frontmatter(metadata: dict[str, Any], body: str) -> str:
r"""Re-emit a ``.md`` file with YAML frontmatter at the top.
Empty metadata → no frontmatter block (avoid littering every file
with a hollow ``---\n---``).
"""
if not metadata:
return body if body.endswith("\n") else body + "\n"
import frontmatter as _fm
post = _fm.Post(content=body, **metadata)
return _fm.dumps(post) + "\n"
def _flatten_assistant_text(message: Any) -> str:
"""Pull all text blocks from an assistant ``Message`` and join them.
Used when we need the assistant content as a plain string for
fingerprinting / equality with a parser-shaped history (parser
already drops thinking + tool_use from assistant turns).
"""
chunks = [
getattr(block, "text", "") or ""
for block in getattr(message, "content", ())
if getattr(block, "type", None) == "text"
]
return "\n\n".join(c for c in chunks if c)
def _render_error_block(exc: BaseException) -> str:
"""Render a backend failure as an Assistant turn with a ``[!error]-`` callout."""
msg = str(exc) or exc.__class__.__name__
safe = msg.replace("\n", " ").strip()
return f"### Assistant:\n\n> [!error]-\n> {safe}\n"
@@ -0,0 +1,187 @@
"""Parse a markdown chat file into Anthropic ``MessageParam`` history.
The file format is documented in ``frontends/markdown/__init__.py``:
``### User:`` / ``### Assistant:`` H3 headers split turns, optional
``---`` HRs between turns are visual-only, ``> [!thinking]-`` and
``> [!tool]- <name>`` callouts mark structured assistant content.
For backend consumption we strip thinking and tool_use callouts —
assistant turns become text-only. Rationale: history replay through
claude-code's JSONL injection only needs the *narrated* answer (the
thinking signatures expire and the original tool_results aren't
captured in the renderer's output, so a faithful tool_use round-trip
isn't possible today). The renderer keeps callouts in the file because
they're informational for the human reader; the parser drops them when
shaping the backend's input.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
import frontmatter
if TYPE_CHECKING:
from anthropic.types import MessageParam
__all__ = ["ParsedFile", "last_role", "parse", "resolve_agent"]
# Turn marker — must be exactly ``### User:`` or ``### Assistant:`` on
# its own line. Trailing whitespace tolerated; nothing after the colon
# on the same line (any inline content would mean the user typed
# something that just happens to look like a header, and we'd rather
# misparse than silently fold inline content into a turn).
_TURN_RE = re.compile(r"^###\s+(User|Assistant):\s*$", re.MULTILINE)
# Callout-start lines we strip from assistant turns when extracting
# text. We don't try to parse the contents — for backend input we just
# need to drop the whole quoted block.
_CALLOUT_START_RE = re.compile(r"^>\s+\[!(thinking|tool)\]")
@dataclass(frozen=True, slots=True)
class ParsedFile:
"""Result of parsing a single chat ``.md``.
``metadata`` is the YAML frontmatter as a plain dict (empty if the
file has none). ``messages`` is the conversation history shaped for
``Backend.complete`` — assistant turns are text-only. ``body`` is the
raw markdown content *after* the frontmatter is stripped; the
renderer needs it when it appends a new assistant turn so it can
preserve whatever the human typed verbatim (including any callouts
or HRs they added).
"""
metadata: dict[str, Any]
body: str
messages: list[MessageParam]
def parse(text: str) -> ParsedFile:
"""Parse a chat ``.md`` into ``(metadata, body, messages)``.
A file with no turn markers but non-empty body is treated as a
single user turn — the friendly path for "user types into a new
file and hits send" before any turn markers exist.
"""
parsed = frontmatter.loads(text)
metadata = dict(parsed.metadata)
body = parsed.content
messages: list[MessageParam] = []
turns = _split_turns(body)
if not turns:
stripped = body.strip()
if stripped:
messages.append({"role": "user", "content": stripped})
return ParsedFile(metadata=metadata, body=body, messages=messages)
for role, raw in turns:
if role == "user":
text_content = _strip_hrs(raw).strip()
if text_content:
messages.append({"role": "user", "content": text_content})
else:
text_content = _extract_assistant_text(raw)
if text_content:
messages.append({"role": "assistant", "content": text_content})
return ParsedFile(metadata=metadata, body=body, messages=messages)
def last_role(messages: list[MessageParam]) -> str | None:
"""Return ``"user"`` / ``"assistant"`` / ``None`` for an empty list."""
if not messages:
return None
return messages[-1]["role"]
def resolve_agent(
*, metadata: dict[str, Any], request_override: str | None, default: str | None
) -> str | None:
"""Resolve the agent for this chat.
Precedence: request body override > frontmatter > frontend default.
Returns ``None`` if none match — caller responds with 400.
"""
if request_override:
return request_override
fm_agent = metadata.get("agent")
if isinstance(fm_agent, str) and fm_agent:
return fm_agent
return default
# ---- internals ---------------------------------------------------------
def _split_turns(body: str) -> list[tuple[str, str]]:
"""Walk turn markers, return ``[(role_lc, raw_body), ...]``.
Body for each turn is everything between this marker and the next
(or EOF). Leading marker line itself is dropped. We don't trim
whitespace here — that's per-role.
"""
matches = list(_TURN_RE.finditer(body))
if not matches:
return []
out: list[tuple[str, str]] = []
for i, m in enumerate(matches):
role = m.group(1).lower()
start = m.end()
end = matches[i + 1].start() if i + 1 < len(matches) else len(body)
out.append((role, body[start:end]))
return out
def _strip_hrs(raw: str) -> str:
"""Drop decorative ``---`` separator lines (whole-line HRs only).
A ``---`` mid-paragraph (rare, but possible) stays. Only lines that
are *exactly* the HR after optional surrounding whitespace are
removed — those are the ones the renderer emits between turns.
"""
lines = raw.splitlines()
kept = [ln for ln in lines if ln.strip() != "---"]
return "\n".join(kept)
def _extract_assistant_text(raw: str) -> str:
"""Strip thinking/tool callouts from an assistant turn, return spoken text.
Walks line by line. When we see a callout-start line (``> [!thinking]-``
or ``> [!tool]- ...``), we skip the entire contiguous quote block
(lines beginning with ``>`` or blank-then-`>` continuations don't
happen in Obsidian callouts — a blank line ends the callout). HR
lines (``---``) are dropped. Everything else is kept and joined,
then collapsed to a clean trim.
"""
lines = raw.splitlines()
out_lines: list[str] = []
i = 0
while i < len(lines):
line = lines[i]
if _CALLOUT_START_RE.match(line):
# Skip the whole quote block (consecutive lines starting
# with ``>``). Stop at first non-``>`` line, leaving it for
# the next iteration. Blank lines do not end the block — a
# callout body with a blank line uses ``> `` (quote-space)
# too — but in practice Obsidian's quote block ends on the
# first line that doesn't start with ``>``.
while i < len(lines) and lines[i].lstrip().startswith(">"):
i += 1
continue
if line.strip() == "---":
i += 1
continue
out_lines.append(line)
i += 1
# Collapse runs of blank lines that callout-stripping creates
# (two newlines around a stripped block fold into one).
text_joined = "\n".join(out_lines)
text_joined = re.sub(r"\n{3,}", "\n\n", text_joined)
return text_joined.strip()
@@ -0,0 +1,181 @@
"""Render Anthropic ``Message`` (and individual user turns) into markdown.
The renderer is one-way: it produces the human-facing artifact in the
vault. The parser strips tool/thinking callouts when reshaping the file
for backend replay — so what we write here is purely for the human
reader (and for the cross-frontend logger, which materializes turns
from other frontends).
"""
from __future__ import annotations
import json
import re
from typing import TYPE_CHECKING, Any, cast
from anthropic.types import Message, TextBlock, ThinkingBlock, ToolUseBlock
if TYPE_CHECKING:
from collections.abc import Iterable
from anthropic.types import MessageParam
__all__ = [
"FENCE",
"USER_SCAFFOLD",
"append_to_body",
"render_assistant_message",
"render_user_text",
"summarize_tool_input",
]
# Empty ``### User:`` block appended after each assistant reply so the
# human has an obvious place to type the next turn. Parser drops empty
# user blocks, so this doesn't re-trigger dispatch on its own.
USER_SCAFFOLD = "### User:\n"
# Default 4-backtick fence so tool results that contain literal ```` ``` ````
# don't collide. JSON inputs use 3 backticks because they almost never
# contain ``` and we get language syntax highlighting in Obsidian for free.
FENCE = "````"
# Used when a tool input mentions a path/file we can dangle in the
# callout title — purely cosmetic.
_PATH_KEYS = ("path", "file", "filename", "url", "command", "query")
def render_user_text(content: str) -> str:
r"""Render one user-spoken turn as ``### User:\n\n<text>``."""
return f"### User:\n\n{content.strip()}\n"
def render_assistant_message(message: Message) -> str:
"""Render an assistant ``Message`` (with content blocks) into a turn block.
Blocks render in their original order:
* ``ThinkingBlock`` → ``> [!thinking]-`` collapsed callout
* ``TextBlock`` → plain text (the spoken answer)
* ``ToolUseBlock`` → ``> [!tool]- <name>`` callout with the ``input``
JSON quoted inside. Tool *results* are not persisted — see
module docstring on ``parser.py`` for why.
Blank lines separate adjacent blocks; trailing newline guarantees
the next ``---`` / ``### User:`` marker lands on its own line.
"""
parts: list[str] = ["### Assistant:", ""]
for block in message.content:
parts.extend(_render_block(block))
parts.append("")
return "\n".join(parts).rstrip() + "\n"
def render_user_param(param: MessageParam) -> str:
"""Render a ``MessageParam`` user message into a ``### User:`` block.
Used by the cross-frontend logger when materializing turns from
other frontends. Tool_result blocks in the content list are dropped
silently — the markdown view doesn't track them (see ``parser.py``).
"""
content = param.get("content", "")
if isinstance(content, str):
text = content
else:
# The Anthropic SDK types ``content`` as a union of typed-dict
# *Param classes plus pydantic block models — both shapes appear
# in practice (raw incoming JSON yields dicts, SDK-built params
# yield BaseModels). Treat each entry as a dict-like and pull
# ``text`` opportunistically.
chunks = [
str(blk.get("text", ""))
for blk in content
if isinstance(blk, dict) and blk.get("type") == "text"
]
text = "\n\n".join(chunks)
return render_user_text(text)
def append_to_body(existing: str, new_block: str) -> str:
"""Append ``new_block`` to ``existing`` with a decorative HR separator.
Preserves the original body verbatim (whitespace, callouts, any
formatting the human added). The HR is purely visual: parser ignores
it.
"""
head = existing.rstrip()
if head:
return f"{head}\n\n---\n\n{new_block}"
return new_block
def summarize_tool_input(name: str, tool_input: object) -> str:
"""Build the ``[!tool]- <summary>`` title string.
Tries to pick a single salient field (``path``, ``command``, etc.)
from the input dict so the collapsed callout shows something
meaningful in Obsidian. Falls back to just the tool name.
"""
if not isinstance(tool_input, dict):
return name
# Anthropic ``ToolUseBlock.input`` is typed as ``object`` — the
# SDK's runtime value is always a JSON dict (str→Any), so a local
# cast keeps the rest of the function readable without sprinkling
# per-line type narrowing on every ``.get`` call.
d = cast("dict[str, Any]", tool_input)
for key in _PATH_KEYS:
value = d.get(key)
if isinstance(value, str) and value:
short = value if len(value) <= 60 else value[:57] + "..."
return f"{name} · {short}"
return name
# ---- internals ---------------------------------------------------------
def _render_block(block: object) -> Iterable[str]:
if isinstance(block, TextBlock):
text = (block.text or "").strip()
if text:
yield text
return
if isinstance(block, ThinkingBlock):
yield from _render_thinking(block.thinking or "")
return
if isinstance(block, ToolUseBlock):
yield from _render_tool_use(block)
return
# Unknown block type — skip silently rather than corrupting the file.
def _render_thinking(text: str) -> Iterable[str]:
yield "> [!thinking]-"
for line in text.strip().splitlines() or [""]:
yield f"> {line}" if line else ">"
def _render_tool_use(block: ToolUseBlock) -> Iterable[str]:
title = summarize_tool_input(block.name, block.input)
yield f"> [!tool]- {title}"
yield "> **input:**"
yield "> ```json"
pretty = json.dumps(block.input, indent=2, ensure_ascii=False, sort_keys=True)
for line in pretty.splitlines():
yield f"> {line}" if line else ">"
yield "> ```"
def adaptive_fence(content: str) -> str:
"""Return a backtick fence at least one longer than the longest run in ``content``.
Currently unused (tool *results* aren't persisted yet) — kept here
so when result capture lands the rendering side already has the
primitive.
"""
longest = 0
for match in re.finditer(r"`+", content):
longest = max(longest, len(match.group(0)))
return "`" * max(3, longest + 1)
Generated
+16
View File
@@ -261,6 +261,7 @@ name = "beaver-gateway"
version = "0.1.0"
source = { editable = "." }
dependencies = [
{ name = "aiofile" },
{ name = "aiohttp" },
{ name = "aiosqlite" },
{ name = "anthropic" },
@@ -274,6 +275,7 @@ dependencies = [
{ name = "psycopg", extra = ["binary"] },
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "python-frontmatter" },
{ name = "sqlmodel" },
{ name = "uvicorn", extra = ["standard"] },
{ name = "uvloop" },
@@ -300,6 +302,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "aiofile", specifier = ">=3.11.1" },
{ name = "aiohttp", specifier = ">=3.13.5" },
{ name = "aiosqlite", specifier = ">=0.22.1" },
{ name = "anthropic", specifier = ">=0.103.0" },
@@ -315,6 +318,7 @@ requires-dist = [
{ name = "psycopg", extras = ["binary"], specifier = ">=3.3.4" },
{ name = "pydantic", specifier = ">=2.13.4" },
{ name = "pydantic-settings", specifier = ">=2.14.1" },
{ name = "python-frontmatter", specifier = ">=1.2.0" },
{ name = "raycast-api", marker = "extra == 'local'", editable = "../raycast-api" },
{ name = "raycast-api", marker = "extra == 'prod'", git = "https://git.kotikot.com/beaver/raycast-api.git" },
{ name = "sqlmodel", specifier = ">=0.0.38" },
@@ -1647,6 +1651,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0b/d7/1959b9648791274998a9c3526f6d0ec8fd2233e4d4acce81bbae76b44b2a/python_dotenv-1.2.2-py3-none-any.whl", hash = "sha256:1d8214789a24de455a8b8bd8ae6fe3c6b69a5e3d64aa8a8e5d68e694bbcb285a", size = 22101, upload-time = "2026-03-01T16:00:25.09Z" },
]
[[package]]
name = "python-frontmatter"
version = "1.2.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pyyaml" },
]
sdist = { url = "https://files.pythonhosted.org/packages/e9/21/88aefb4f1de6661b5a003175e21e4a5ad94f5e52b2abf4170a11883c7d81/python_frontmatter-1.2.0.tar.gz", hash = "sha256:5b26ccd3cb85af77feb11d83b922c7bb5aeccb0c9d3fb236b938c600b6322984", size = 16890, upload-time = "2026-05-17T23:42:05.493Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1c/b1/ec12e3e746234006b77dec69d53878253b1da09dbb55fa3cb456083d9069/python_frontmatter-1.2.0-py3-none-any.whl", hash = "sha256:e1ee1d4300450a2f84e778eb4f70edf573da6cd7d463801066f05edc4e819c78", size = 10396, upload-time = "2026-05-17T23:42:04.637Z" },
]
[[package]]
name = "python-multipart"
version = "0.0.29"