feat: add stateful conversation storage
This commit is contained in:
@@ -36,7 +36,15 @@ import aiofile
|
||||
from fastapi import FastAPI, HTTPException, Request, status
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from beaver_gateway.backends.claude_code import ClaudeCodeBackendAdapter, TurnCapture
|
||||
from beaver_gateway.core import audit
|
||||
from beaver_gateway.core.conversation_store import (
|
||||
diff_and_fork,
|
||||
load_conversation,
|
||||
load_messages,
|
||||
mint_conversation,
|
||||
rewrite_messages,
|
||||
)
|
||||
from beaver_gateway.core.turn_record import TurnRecord
|
||||
from beaver_gateway.frontends._accumulate import accumulate
|
||||
from beaver_gateway.frontends._auth import require_token
|
||||
@@ -264,9 +272,25 @@ class MarkdownFrontend(Frontend):
|
||||
msgs=len(parsed.messages),
|
||||
)
|
||||
|
||||
# Resolve / mint the conversation row, align incoming against
|
||||
# stored history, and feed the aligned messages to the backend
|
||||
# — see ``core/conversation_store.py`` for the full rationale.
|
||||
# If the backend isn't claude-code (no ``TurnCapture`` support)
|
||||
# we fall through to the legacy parser-only path.
|
||||
conv, conv_external_id, stored_msgs = await self._resolve_conversation(
|
||||
runtime=runtime, metadata=parsed.metadata, agent_name=agent.name
|
||||
)
|
||||
outcome = diff_and_fork(stored=stored_msgs, incoming=parsed.turns)
|
||||
capture: TurnCapture | None = (
|
||||
TurnCapture() if isinstance(backend, ClaudeCodeBackendAdapter) else None
|
||||
)
|
||||
|
||||
try:
|
||||
kwargs: dict[str, Any] = {}
|
||||
if capture is not None:
|
||||
kwargs["capture"] = capture
|
||||
events = backend.complete(
|
||||
agent=agent, messages=parsed.messages, system=None
|
||||
agent=agent, messages=outcome.messages, system=None, **kwargs
|
||||
)
|
||||
message = await accumulate(events, model=agent.model or agent.name)
|
||||
except Exception as exc:
|
||||
@@ -280,22 +304,22 @@ class MarkdownFrontend(Frontend):
|
||||
status.HTTP_500_INTERNAL_SERVER_ERROR, f"backend error: {exc}"
|
||||
) from exc
|
||||
|
||||
rendered = renderer.render_assistant_message(message)
|
||||
new_body = renderer.append_to_body(parsed.body, rendered)
|
||||
new_body = renderer.append_to_body(new_body, renderer.USER_SCAFFOLD)
|
||||
# Recompute fingerprint so a future cross-frontend hit on this
|
||||
# same conversation can find it. Stored as hex string in
|
||||
# frontmatter — only the markdown frontend reads it.
|
||||
assistant_param: MessageParam = {
|
||||
"role": "assistant",
|
||||
"content": _flatten_assistant_text(message),
|
||||
}
|
||||
updated_messages: list[MessageParam] = [*parsed.messages, assistant_param]
|
||||
updated_metadata = dict(parsed.metadata)
|
||||
updated_metadata["agent"] = agent.name
|
||||
updated_metadata["fingerprint"] = fingerprint_messages(updated_messages)
|
||||
new_content = _reattach_frontmatter(updated_metadata, new_body)
|
||||
await _write_atomic(file_path, new_content)
|
||||
new_content = await self._write_assistant_reply(
|
||||
file_path=file_path,
|
||||
parsed=parsed,
|
||||
message=message,
|
||||
agent_name=agent.name,
|
||||
conv_external_id=conv_external_id,
|
||||
)
|
||||
|
||||
await self._persist_canonical_history(
|
||||
runtime=runtime,
|
||||
conversation_id=conv.id,
|
||||
persist_messages=outcome.persist_messages,
|
||||
new_user_text=parsed.turns[-1].text,
|
||||
capture=capture,
|
||||
message=message,
|
||||
)
|
||||
|
||||
# Broadcast our own turn so other handlers (none today, but the
|
||||
# symmetry is worth keeping) see what happened. ``source`` marks
|
||||
@@ -322,6 +346,94 @@ class MarkdownFrontend(Frontend):
|
||||
|
||||
# ---- helpers -------------------------------------------------------
|
||||
|
||||
async def _write_assistant_reply(
|
||||
self,
|
||||
*,
|
||||
file_path: Path,
|
||||
parsed: parser.ParsedFile,
|
||||
message: Any,
|
||||
agent_name: str,
|
||||
conv_external_id: str,
|
||||
) -> str:
|
||||
"""Render the assistant turn, append to the file, refresh frontmatter."""
|
||||
rendered = renderer.render_assistant_message(message)
|
||||
new_body = renderer.append_to_body(parsed.body, rendered)
|
||||
new_body = renderer.append_to_body(new_body, renderer.USER_SCAFFOLD)
|
||||
# Recompute fingerprint so a future cross-frontend hit on this
|
||||
# same conversation can find it. Stored as hex string in
|
||||
# frontmatter — only the markdown frontend reads it.
|
||||
assistant_param: MessageParam = {
|
||||
"role": "assistant",
|
||||
"content": _flatten_assistant_text(message),
|
||||
}
|
||||
updated_messages: list[MessageParam] = [*parsed.messages, assistant_param]
|
||||
updated_metadata = dict(parsed.metadata)
|
||||
updated_metadata["agent"] = agent_name
|
||||
updated_metadata["conversation_id"] = conv_external_id
|
||||
updated_metadata["fingerprint"] = fingerprint_messages(updated_messages)
|
||||
new_content = _reattach_frontmatter(updated_metadata, new_body)
|
||||
await _write_atomic(file_path, new_content)
|
||||
return new_content
|
||||
|
||||
async def _resolve_conversation(
|
||||
self, *, runtime: GatewayRuntime, metadata: dict[str, Any], agent_name: str
|
||||
) -> tuple[Any, str, list[dict[str, Any]]]:
|
||||
"""Resolve the conversation row + stored messages for this request.
|
||||
|
||||
Looks up by frontmatter ``conversation_id``, mints a new row if
|
||||
missing, and returns ``(conv, external_id, stored_messages)``.
|
||||
``conv.id`` is guaranteed non-None because both
|
||||
``load_conversation`` (after refresh on a committed row) and
|
||||
``mint_conversation`` (post-commit refresh) populate it. We
|
||||
coerce with a runtime check so the rest of the handler can
|
||||
treat it as ``int``.
|
||||
"""
|
||||
raw = metadata.get("conversation_id")
|
||||
lookup_id = raw if isinstance(raw, str) and raw else None
|
||||
async with runtime.db.session() as session:
|
||||
conv = None
|
||||
if lookup_id is not None:
|
||||
conv = await load_conversation(
|
||||
session, frontend="markdown", external_id=lookup_id
|
||||
)
|
||||
if conv is None:
|
||||
conv = await mint_conversation(
|
||||
session, frontend="markdown", agent_name=agent_name
|
||||
)
|
||||
if conv.id is None:
|
||||
msg = "conversation row missing primary key after commit"
|
||||
raise RuntimeError(msg)
|
||||
stored = await load_messages(session, conversation_id=conv.id)
|
||||
return conv, conv.external_id, stored
|
||||
|
||||
async def _persist_canonical_history(
|
||||
self,
|
||||
*,
|
||||
runtime: GatewayRuntime,
|
||||
conversation_id: int,
|
||||
persist_messages: list[dict[str, Any]],
|
||||
new_user_text: str,
|
||||
capture: TurnCapture | None,
|
||||
message: Any,
|
||||
) -> None:
|
||||
"""Stamp the DB with the post-turn canonical Anthropic-shape history.
|
||||
|
||||
Combines the matched/spliced prior state, the new user prompt,
|
||||
and the synthesized assistant↔tool cycle from the backend (or
|
||||
a text-only fallback for backends without ``TurnCapture``).
|
||||
"""
|
||||
new_user_msg = {"role": "user", "content": new_user_text}
|
||||
synthesized = (
|
||||
capture.synthesized_messages
|
||||
if capture is not None
|
||||
else _fallback_synthesized(message)
|
||||
)
|
||||
canonical = [*persist_messages, new_user_msg, *synthesized]
|
||||
async with runtime.db.session() as session:
|
||||
await rewrite_messages(
|
||||
session, conversation_id=conversation_id, messages=canonical
|
||||
)
|
||||
|
||||
def _resolve_path(self, filename: str) -> Path:
|
||||
"""Resolve ``filename`` under the vault; reject escapes."""
|
||||
# ``filename`` may be relative or absolute; we always anchor
|
||||
@@ -399,6 +511,43 @@ def _reattach_frontmatter(metadata: dict[str, Any], body: str) -> str:
|
||||
return _fm.dumps(post) + "\n"
|
||||
|
||||
|
||||
def _fallback_synthesized(message: Any) -> list[dict[str, Any]]:
|
||||
"""Build a single-assistant ``synthesized_messages`` list from a raw ``Message``.
|
||||
|
||||
For backends that don't populate a :class:`TurnCapture` (anthropic
|
||||
HTTP, raycast, …) we don't have access to per-tool-cycle
|
||||
granularity, so the assistant reply lands in the DB as one
|
||||
canonical-block message. Tool memory across cache misses would
|
||||
degrade in that case, but those backends don't have the cache-miss
|
||||
re-seed problem to begin with — they manage history client-side.
|
||||
"""
|
||||
content: list[dict[str, Any]] = []
|
||||
for block in getattr(message, "content", ()):
|
||||
btype = getattr(block, "type", None)
|
||||
if btype == "text":
|
||||
content.append({"type": "text", "text": getattr(block, "text", "") or ""})
|
||||
elif btype == "tool_use":
|
||||
content.append(
|
||||
{
|
||||
"type": "tool_use",
|
||||
"id": getattr(block, "id", ""),
|
||||
"name": getattr(block, "name", ""),
|
||||
"input": getattr(block, "input", {}),
|
||||
}
|
||||
)
|
||||
elif btype == "thinking":
|
||||
content.append(
|
||||
{
|
||||
"type": "thinking",
|
||||
"thinking": getattr(block, "thinking", "") or "",
|
||||
"signature": getattr(block, "signature", "") or "",
|
||||
}
|
||||
)
|
||||
if not content:
|
||||
return []
|
||||
return [{"role": "assistant", "content": content}]
|
||||
|
||||
|
||||
def _flatten_assistant_text(message: Any) -> str:
|
||||
"""Pull all text blocks from an assistant ``Message`` and join them.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user