feat: implement raycast backend

This commit is contained in:
h
2026-05-19 21:06:01 +02:00
parent 221e660c5c
commit 757065f21c
16 changed files with 1415 additions and 31 deletions
+3
View File
@@ -14,3 +14,6 @@ docs/
# IDE
.idea
# Local env
.env
+43 -6
View File
@@ -1,18 +1,55 @@
# Stub user config for the Phase 0 DoD.
# Sample user config — grows alongside the implementation phases.
#
# Loader (beaver_gateway/config_loader.py) execs this file with
# ClaudeAgent, RaycastAgent, McpServer, ExposedMcp, Gateway already
# bound. The top-level `gateway = Gateway(...)` is what gets picked up.
# bound — so importing them is optional. We import explicitly here so
# IDEs and type-checkers see real symbols instead of free variables.
from datetime import date
gateway = Gateway( # type: ignore[name-defined] # noqa: F821
from beaver_gateway.agents.claude import ClaudeAgent
from beaver_gateway.agents.raycast import RaycastAgent, RemoteTool, UserPreferences
from beaver_gateway.core.registry import Gateway
from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend
gateway = Gateway(
agents=[
ClaudeAgent( # type: ignore[name-defined] # noqa: F821
ClaudeAgent(
name="stub",
model="claude-sonnet-4-5",
model="claude-sonnet-4-6",
system_prompt="You are a stub agent used to validate the Phase 0 skeleton.",
cwd="/tmp",
),
# Phase 1.2 — a RaycastAgent the AnthropicMessagesFrontend will
# route via RaycastBackend. Phase 1.5 added the per-agent knobs
# (`temperature` / `additional_system_instructions` / etc.) —
# only `model`, `system_prompt`, and at least one of the others
# is mandatory.
RaycastAgent(
name="research",
model="Gemini 3.1 Flash Lite",
system_prompt=(
"You are a research assistant. "
"Reply in the user's language. Cite URLs when you use web search."
),
temperature=0.5,
available_native_tools=(RemoteTool.WEB_SEARCH, RemoteTool.READ_PAGE),
# Lambda so today's date is rebuilt on every request while
# locale/timezone stay pinned. ``True`` would give the same
# fresh-date behaviour but would also auto-pick host locale
# and timezone (``en-US`` / system tz), which isn't what we
# want here.
user_preferences=lambda: UserPreferences(
locale="en-GB",
timezone="Europe/Berlin",
current_date=date.today().isoformat(), # noqa: DTZ011 — local date is intended
),
),
],
mcps=[],
frontends=[],
frontends=[
# Phase 1.4 — expose the agents as `model=<name>` on an
# Anthropic-compatible Messages endpoint. Auth comes from
# `BOOTSTRAP_TOKENS` in the env (`name1:value1,name2:value2`).
AnthropicMessagesFrontend(host="0.0.0.0", port=8000),
],
)
+9
View File
@@ -16,5 +16,14 @@ services:
ADMIN_USER: "admin"
ADMIN_PASS: "change-me"
SESSION_SECRET: "dev-secret-change-me"
# Phase 1.4 — comma-separated `name:value` pairs. The frontend
# 401s every request until at least one token is set.
BOOTSTRAP_TOKENS: "cursor:dev-token-change-me"
# RAYCAST_BEARER / RAYCAST_DEVICE_ID + a raycast.json bind go
# here once you wire up an actual RaycastAgent. The example
# config.py declares one, so set these (or remove the agent)
# before exposing port 8000.
ports:
- "8000:8000"
volumes:
- ./config.py:/config/config.py:ro
+12 -2
View File
@@ -7,11 +7,21 @@ runtime branch.
from __future__ import annotations
from pathlib import Path # noqa: TC003 — runtime use by pydantic
from beaver_gateway.agents.base import BaseAgent
class ClaudeAgent(BaseAgent):
"""Agent backed by ``claude-code-api``."""
"""Agent backed by ``claude-code-api``.
cwd: str
``cwd`` is the working directory the claude CLI is spawned in;
typed as ``Path`` (Pydantic coerces strings) since claude-code-api
accepts ``str | os.PathLike[str]``. ``available_native_tools`` stays
``tuple[str, ...]`` because Claude Code's tool set is a moving target
(Bash/Read/Edit/Write/WebSearch/etc. — versions add and rename), so
pinning it as a ``Literal`` would rot the type each release.
"""
cwd: Path
available_native_tools: tuple[str, ...] = ()
+68 -4
View File
@@ -1,15 +1,79 @@
"""Raycast agent definition."""
"""Raycast agent definition.
Field set is the union of ``raycast_api.ChatAPI.stream`` parameters that
make sense as **per-agent defaults**. Per-request values (currently:
``temperature``) win when both are set; the rest fall back to whatever
the agent declared, then to Raycast's own defaults.
``BaseAgent.system_prompt`` maps onto Raycast's wire field
``additional_system_instructions`` (the slot the real client uses for
*content*); the wire field ``system_instructions`` stays at the Raycast
source default — ``"markdown"`` for ``AI_CHAT``, ``"plain"`` otherwise.
We don't expose that wire dichotomy to the user — they get one
conceptual "system prompt".
Excluded on purpose:
* ``buffer_id``/``message_id``/``current_date`` — per-call ephemeral
* ``provider`` override — escape hatch for non-catalog models, no clear
use case yet (revisit in PRD §14 when discovery lands)
* ``locale`` — process-wide via ``Settings.raycast_locale`` because we
only spin up one ``raycast_api.Client`` per gateway
* ``system_instructions`` (wire) — that's a format marker, not content;
the SDK fills it from the source default and we let it
"""
from __future__ import annotations
from raycast_api import Source
from raycast_api import ( # noqa: F401 — UserPreferences re-exported for user configs
RemoteTool,
Source,
UserPreferences,
UserPreferencesArg,
)
from beaver_gateway.agents.base import BaseAgent
class RaycastAgent(BaseAgent):
"""Agent backed by ``raycast-api``."""
"""Agent backed by ``raycast-api``.
``available_native_tools`` is the closed set of Raycast's server-side
"remote tools" (``web_search``, ``search_images``, ``read_page``) —
typed as ``RemoteTool`` so config-time IDE completion lists exactly
the three valid values. Pydantic also coerces string literals, so
``("web_search", "read_page")`` keeps working unchanged.
``user_preferences`` toggles the auto-generated ``<user-preferences>``
block Raycast prepends to ``additional_system_instructions``:
* ``True`` (default) → auto from host locale/timezone/today, rebuilt
every request so the date stays fresh;
* ``False`` → omit the block entirely;
* ``UserPreferences(...)`` instance → used verbatim (frozen at the
time the agent was loaded, so the date won't auto-update);
* ``Callable[[], UserPreferencesArg]`` → re-invoked on every request.
Use this for the common case "fresh date but custom
locale/timezone": ``user_preferences=lambda:
UserPreferences(locale="ru-RU", timezone="Europe/Berlin",
current_date=date.today().isoformat())``. Callables may nest
(a lambda returning a lambda…) but there's no real reason to.
The library uses this block for date/locale-aware formatting, not
for personalisation/memory — those are out of scope upstream.
``reasoning_effort`` values vary by model: GPT-5 takes
``"minimal"|"low"|"medium"|"high"``; Anthropic exposes nothing here
(Claude reasoning lives in a separate ``…-reasoning`` model variant
in the catalog). Unknown effort for the chosen model is ignored
server-side, so we stay loose as ``str | None``.
"""
streaming: bool = True
available_native_tools: tuple[str, ...] = ()
available_native_tools: tuple[RemoteTool, ...] = ()
source: Source = Source.AI_CHAT
temperature: float | None = None
reasoning_effort: str | None = None
tool_choice: str | None = None
user_preferences: UserPreferencesArg = True
+6
View File
@@ -0,0 +1,6 @@
"""Backend adapters.
Each backend wraps a provider-specific SDK (``raycast-api``, ``claude-code-api``)
and yields the unified :class:`~beaver_gateway.core.events.MessageStreamEvent`
family. The Anthropic-style frontend serialises events straight to SSE.
"""
+37
View File
@@ -0,0 +1,37 @@
"""Backend protocol.
A backend turns an Anthropic-style turn (``messages`` + agent definition)
into a stream of :class:`~beaver_gateway.core.events.MessageStreamEvent`
records. The frontend serializes whatever comes out straight to SSE, so
backends are the only place where provider quirks are translated.
Implementations are plain :class:`typing.Protocol` conformers — no ABC
subclassing — to keep them swappable in tests with bare async generators.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Protocol
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable
from anthropic.types import MessageParam
from beaver_gateway.agents.base import BaseAgent
from beaver_gateway.core.events import MessageStreamEvent
class Backend(Protocol):
"""Single-method protocol; ``complete`` returns an async iterator of events."""
def complete(
self,
*,
agent: BaseAgent,
messages: Iterable[MessageParam],
system: str | None = None,
**options: Any,
) -> AsyncIterator[MessageStreamEvent]:
"""Yield Anthropic stream events for one turn against ``agent``."""
...
+468
View File
@@ -0,0 +1,468 @@
"""Raycast backend adapter.
Translates between Anthropic's ``/v1/messages`` wire vocabulary (incoming
``MessageParam`` history, outgoing ``MessageStreamEvent`` SSE) and the
``raycast-api`` SDK (``Message`` history, ``ChatStreamChunk`` SSE).
Two halves live here:
* :func:`_to_raycast_messages` — pure conversion of an Anthropic message
list into ``list[raycast_api.Message]``. ``tool_result`` blocks carry no
tool name in Anthropic; we recover it by remembering each ``tool_use``
id we saw upstream.
* :meth:`RaycastBackend.complete` — opens a ``client.chat.stream`` and
walks chunks through a tiny block-state machine. The state machine
exists only because Raycast streams ``tool_calls`` in three phases
(open with id+name, deltas with empty id, final summary with the full
``arguments``) — Anthropic wants one ``content_block_start`` → deltas
→ ``content_block_stop`` per block, so we de-duplicate the final
summary against the per-delta increments already emitted.
"""
from __future__ import annotations
import json
import uuid
from typing import TYPE_CHECKING, Any, cast
from raycast_api import Message as RaycastMessage
from raycast_api import RemoteTool, Tool, ToolCall
from beaver_gateway.agents.raycast import RaycastAgent
from beaver_gateway.core.events import (
StopReason,
build_content_block_stop,
build_input_json_delta,
build_message_delta,
build_message_start,
build_message_stop,
build_text_block_start,
build_text_delta,
build_thinking_block_start,
build_thinking_delta,
build_tool_use_block_start,
)
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable, Mapping, Sequence
from anthropic.types import MessageParam
from raycast_api import ChatStreamChunk, Client
from beaver_gateway.agents.base import BaseAgent
from beaver_gateway.core.events import MessageStreamEvent
else:
from collections.abc import Mapping
__all__ = ["RaycastBackend"]
_RAYCAST_TO_ANTHROPIC_STOP: dict[str, StopReason] = {
"stop": "end_turn",
"STOP": "end_turn",
"end_turn": "end_turn",
"tool_calls": "tool_use",
"tool_use": "tool_use",
"length": "max_tokens",
"max_tokens": "max_tokens",
"stop_sequence": "stop_sequence",
}
def _first_set[T](*values: T | None) -> T | None:
"""Return the first value that isn't ``None``, else ``None``.
Used to layer per-request options over per-agent defaults: a real
``0.0`` temperature on the request must override the agent's
``None``, but the agent's value must take effect when the request
omits it. ``or``-chaining is wrong here because ``0.0`` / ``""`` are
legitimate values and falsy.
"""
for v in values:
if v is not None:
return v
return None
def _map_stop_reason(raw: str | None) -> StopReason:
"""Map Raycast ``finish_reason`` strings into Anthropic stop reasons.
Unknown values collapse to ``end_turn`` — Anthropic clients treat that
as a clean finish, which is the right user-visible behaviour when the
upstream simply went off-vocabulary.
"""
if raw is None:
return "end_turn"
return _RAYCAST_TO_ANTHROPIC_STOP.get(raw, "end_turn")
def _as_mapping(block: object) -> Mapping[str, Any] | None:
"""Narrow a block param (TypedDict | dict | anything) to a read-only mapping.
ty refuses to assign a TypedDict to ``dict[str, Any]`` (TypedDicts are
not freely-mutable dicts in its model), and our access pattern is
strictly read-only — so we go through ``Mapping``.
"""
if isinstance(block, Mapping):
return cast("Mapping[str, Any]", block)
return None
def _extract_tool_result_text(content: object) -> str:
"""Flatten an Anthropic ``tool_result`` block's content into plain text.
Anthropic accepts either a string or a list of typed blocks (text /
image / etc.). Raycast only carries text in tool results, so we keep
text blocks and JSON-encode anything richer rather than dropping it.
"""
if isinstance(content, str):
return content
if isinstance(content, list):
parts: list[str] = []
for block in content:
block_map = _as_mapping(block)
if block_map is not None and block_map.get("type") == "text":
parts.append(str(block_map.get("text", "")))
else:
parts.append(
json.dumps(block, separators=(",", ":"), ensure_ascii=False)
)
return "\n".join(parts)
return json.dumps(content, separators=(",", ":"), ensure_ascii=False)
def _to_raycast_messages(
messages: Iterable[MessageParam],
) -> list[RaycastMessage]:
"""Convert an Anthropic message history into a Raycast one.
``tool_use_id → name`` is tracked across the iteration so that
Raycast ``tool`` messages — which require a tool name the Anthropic
side does not carry on ``tool_result`` blocks — can be reconstructed.
"""
out: list[RaycastMessage] = []
tool_use_names: dict[str, str] = {}
for msg in messages:
role = msg["role"]
content = msg.get("content", "")
if isinstance(content, str):
if role == "user":
out.append(RaycastMessage.user(content))
else:
out.append(RaycastMessage.assistant(text=content))
continue
if role == "user":
text_parts: list[str] = []
tool_results: list[Mapping[str, Any]] = []
for block in content:
block_map = _as_mapping(block)
if block_map is None:
continue
btype = block_map.get("type")
if btype == "text":
text_parts.append(str(block_map.get("text", "")))
elif btype == "tool_result":
tool_results.append(block_map)
if text_parts:
out.append(RaycastMessage.user("\n".join(text_parts)))
for tr in tool_results:
tool_use_id = str(tr.get("tool_use_id", ""))
out.append(
RaycastMessage.tool(
tool_call_id=tool_use_id,
name=tool_use_names.get(tool_use_id, ""),
result=_extract_tool_result_text(tr.get("content", "")),
)
)
continue
text_parts = []
tool_calls: list[ToolCall] = []
for block in content:
block_map = _as_mapping(block)
if block_map is None:
continue
btype = block_map.get("type")
if btype == "text":
text_parts.append(str(block_map.get("text", "")))
elif btype == "tool_use":
tu_id = str(block_map.get("id", ""))
tu_name = str(block_map.get("name", ""))
tool_use_names[tu_id] = tu_name
tool_calls.append(
ToolCall(
id=tu_id,
name=tu_name,
arguments=json.dumps(
block_map.get("input", {}),
separators=(",", ":"),
ensure_ascii=False,
),
)
)
out.append(
RaycastMessage.assistant(
text="\n".join(text_parts),
tool_calls=tool_calls or None,
)
)
return out
def _build_tool_list(
agent: RaycastAgent,
) -> list[Tool | RemoteTool | str] | None:
"""Wrap each native-tool name as a Raycast remote tool.
Phase 1.2 scope: only the three model-agnostic remote tools
(`web_search`, `search_images`, `read_page`). Client-defined tools
coming through the Anthropic body's ``tools`` field stay out — that's
Phase 1.3+ alongside ``accept_client_tools``.
"""
if not agent.available_native_tools:
return None
return [Tool.remote(name) for name in agent.available_native_tools]
class _BlockState:
"""Tracks the currently-open Anthropic content block, if any.
Anthropic events are sequential per block (``content_block_start`` →
deltas → ``content_block_stop``). Raycast streams text and tool_calls
interleaved across chunks; we keep one slot open at a time and close
it whenever the kind changes.
Tool-call routing needs two side tables because Raycast streams ids
in phase 1 only and ``index`` in every chunk: ``tool_id_to_block``
keys by Raycast tool-call id, ``tool_idx_to_id`` resolves chunk
indices back to that id for the no-id delta chunks.
"""
__slots__ = ("index", "kind", "tool_id_to_block", "tool_idx_to_id")
def __init__(self) -> None:
self.index: int = -1
self.kind: str | None = None # "text" | "thinking" | "tool_use" | None
self.tool_id_to_block: dict[str, int] = {}
self.tool_idx_to_id: dict[int, str] = {}
class RaycastBackend:
"""Adapter from ``raycast-api`` chat streams to Anthropic stream events.
Construction takes a long-lived :class:`raycast_api.Client` (one per
gateway — bearer + device_id are process-wide). Each
:meth:`complete` call opens one ``chat.stream`` and yields a fully
Anthropic-shaped event sequence.
"""
def __init__(self, client: Client) -> None:
self._client = client
async def complete(
self,
*,
agent: BaseAgent,
messages: Iterable[MessageParam],
system: str | None = None,
**options: Any,
) -> AsyncIterator[MessageStreamEvent]:
if not isinstance(agent, RaycastAgent):
msg = f"RaycastBackend requires RaycastAgent, got {type(agent).__name__}"
raise TypeError(msg)
raycast_messages = _to_raycast_messages(messages)
tools = _build_tool_list(agent)
# On the wire Raycast uses ``system_instructions`` as a format
# marker (``"markdown"`` for AI_CHAT, ``"plain"`` otherwise —
# filled in by the SDK from the source default when we pass
# ``None``) and ``additional_system_instructions`` as the actual
# prompt content. So our ``system_prompt`` (or the per-request
# Anthropic ``system``, if present) flows into the *additional*
# slot. The SDK still prepends ``<user-preferences>`` to whatever
# we hand it via ``_build_preamble``.
prompt_content = system if system is not None else agent.system_prompt
# Per-request options win over agent defaults; agent defaults
# win over Raycast SDK defaults. ``None`` means "fall back".
async for event in self._stream(
agent=agent,
raycast_messages=raycast_messages,
tools=tools,
prompt_content=prompt_content,
temperature=_first_set(options.get("temperature"), agent.temperature),
reasoning_effort=_first_set(
options.get("reasoning_effort"), agent.reasoning_effort
),
tool_choice=_first_set(options.get("tool_choice"), agent.tool_choice),
):
yield event
async def _stream(
self,
*,
agent: RaycastAgent,
raycast_messages: Sequence[RaycastMessage],
tools: list[Tool | RemoteTool | str] | None,
prompt_content: str | None,
temperature: float | None,
reasoning_effort: str | None,
tool_choice: str | None,
) -> AsyncIterator[MessageStreamEvent]:
message_id = f"msg_{uuid.uuid4().hex}"
yield build_message_start(message_id=message_id, model=agent.model)
state = _BlockState()
final_finish: str | None = None
final_usage: dict[str, int] | None = None
stream = self._client.chat.stream(
model=agent.model,
messages=list(raycast_messages),
source=agent.source,
# ``system_instructions=None`` → SDK substitutes the source
# default (``"markdown"`` / ``"plain"``). Real prompt goes
# into ``additional_system_instructions``.
additional_system_instructions=prompt_content,
user_preferences=agent.user_preferences,
tools=tools,
tool_choice=tool_choice,
temperature=temperature,
reasoning_effort=reasoning_effort,
)
async for chunk in stream:
for event in self._handle_chunk(chunk, state):
yield event
if chunk.finish_reason:
final_finish = chunk.finish_reason
if chunk.usage:
final_usage = chunk.usage
# Close whatever block is still open before the message delta.
if state.kind is not None:
yield build_content_block_stop(state.index)
state.kind = None
yield build_message_delta(
stop_reason=_map_stop_reason(final_finish),
usage=final_usage,
)
yield build_message_stop()
def _handle_chunk(
self, chunk: ChatStreamChunk, state: _BlockState
) -> Iterable[MessageStreamEvent]:
"""Translate one Raycast chunk into zero or more Anthropic events.
Branch order matters: we close the previous block kind before
opening a new one (text → tool_use, tool_use → text, etc.) and we
intentionally fall through ``tool_calls`` only if there's a real
delta — the final-summary chunk re-sends the full arguments
string we've already streamed delta-by-delta.
"""
events: list[MessageStreamEvent] = []
is_final_summary = chunk.finish_reason is not None
if chunk.text:
events.extend(self._ensure_kind(state, "text"))
events.append(build_text_delta(state.index, chunk.text))
if chunk.reasoning:
events.extend(self._ensure_kind(state, "thinking"))
events.append(build_thinking_delta(state.index, chunk.reasoning))
if chunk.tool_calls:
events.extend(
self._handle_tool_calls(chunk, state, is_final_summary=is_final_summary)
)
return events
def _ensure_kind(
self, state: _BlockState, kind: str
) -> Iterable[MessageStreamEvent]:
"""Open a block of ``kind``, closing any current block first."""
if state.kind == kind:
return []
events: list[MessageStreamEvent] = []
if state.kind is not None:
events.append(build_content_block_stop(state.index))
state.index += 1
state.kind = kind
if kind == "text":
events.append(build_text_block_start(state.index))
elif kind == "thinking":
events.append(build_thinking_block_start(state.index))
else:
msg = f"unexpected block kind: {kind!r}"
raise ValueError(msg)
return events
def _handle_tool_calls(
self,
chunk: ChatStreamChunk,
state: _BlockState,
*,
is_final_summary: bool,
) -> Iterable[MessageStreamEvent]:
"""Translate one chunk's ``tool_calls`` payload into Anthropic events.
Mirrors the keying logic of ``raycast_api.ChatResult._merge_tool_calls``
so the same dedupe behaviour lands here: tool calls are tracked
by id when present, otherwise by their wire ``index`` field, and
the final-summary chunk's arguments string is dropped because the
deltas already streamed it.
"""
events: list[MessageStreamEvent] = []
raw_tcs = chunk.raw.get("tool_calls") or []
for i, tc in enumerate(chunk.tool_calls or []):
raw_tc = raw_tcs[i] if i < len(raw_tcs) else {}
idx_field = raw_tc.get("index") if isinstance(raw_tc, dict) else None
# Resolve this entry to a tool-id key, mirroring
# `raycast_api.ChatResult._merge_tool_calls`. Phase 1 carries
# id+index, phase 2 only index, phase 3 only id.
tool_id: str | None = None
if tc.id:
tool_id = tc.id
if isinstance(idx_field, int):
state.tool_idx_to_id[idx_field] = tc.id
elif isinstance(idx_field, int):
tool_id = state.tool_idx_to_id.get(idx_field)
if tool_id is None:
continue
block_idx = state.tool_id_to_block.get(tool_id)
if block_idx is None:
# New tool_use block. Close any open text/thinking block first.
if state.kind is not None:
events.append(build_content_block_stop(state.index))
state.index += 1
state.kind = "tool_use"
block_idx = state.index
state.tool_id_to_block[tool_id] = block_idx
events.append(
build_tool_use_block_start(
block_idx, tool_use_id=tool_id, name=tc.name or ""
)
)
if tc.arguments and not is_final_summary:
events.append(build_input_json_delta(block_idx, tc.arguments))
continue
# Existing block. Skip args on the final summary — they're a
# full restatement of what's already been delta'd.
if is_final_summary:
continue
if tc.arguments:
events.append(build_input_json_delta(block_idx, tc.arguments))
return events
+136 -9
View File
@@ -1,28 +1,155 @@
"""Process entrypoint.
Phase 0.3 — load the user's ``/config/config.py`` via
``config_loader``, build registries, print the
``loaded N agents, M mcps, K frontends`` line from the Phase 0 DoD,
exit cleanly. Phase 0.4 will install uvloop and start uvicorn(s) for
the frontends and the internal MCP app.
Phase 1.4 — async ``main``: install uvloop, load the user config, build
registries + per-agent backends (only ``RaycastBackend`` so far), wire
each frontend with a ``GatewayRuntime``, and run all
``frontend.serve()`` coroutines concurrently. Without any frontends we
still print the Phase 0 DoD line and exit cleanly so the bare skeleton
keeps working.
"""
from __future__ import annotations
import asyncio
import logging
from contextlib import AsyncExitStack
from typing import TYPE_CHECKING
import uvloop
from raycast_api import Client as RaycastClient
from raycast_api.config import Config as RaycastConfig
from beaver_gateway import config_loader
from beaver_gateway.agents.raycast import RaycastAgent
from beaver_gateway.backends.raycast import RaycastBackend
from beaver_gateway.core.auth import TokenStore
from beaver_gateway.core.registry import AgentRegistry, McpRegistry
from beaver_gateway.frontends.base import GatewayRuntime
from beaver_gateway.settings import Settings
if TYPE_CHECKING:
from beaver_gateway.backends.base import Backend
_log = logging.getLogger("beaver_gateway.cli")
def main() -> None:
settings = Settings() # ty: ignore[missing-argument]
"""Sync wrapper: uvloop loop factory + asyncio.run."""
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s"
)
asyncio.run(_async_main(), loop_factory=uvloop.new_event_loop)
async def _async_main() -> None:
settings = Settings() # ty: ignore[missing-argument]
gateway = config_loader.load(settings.config_path)
agents = AgentRegistry(gateway.agents)
mcps = McpRegistry(gateway.mcps)
token_store = TokenStore.from_env(settings.bootstrap_tokens)
print(
f"beaver-gateway: loaded {len(agents)} agents, "
f"{len(mcps)} mcps, {len(gateway.frontends)} frontends"
async with AsyncExitStack() as stack:
backends: dict[str, Backend] = await _build_backends(
settings=settings, agents=agents, stack=stack
)
runtime = GatewayRuntime(
agents=agents,
mcps=mcps,
backends=backends,
token_store=token_store,
)
for fe in gateway.frontends:
fe.configure(runtime)
_log.info(
"beaver-gateway: loaded %d agents, %d mcps, %d frontends",
len(agents),
len(mcps),
len(gateway.frontends),
)
# Keep the Phase 0 DoD line on stdout for grep-friendly smoke
# tests, in addition to the structured log line above.
print(
f"beaver-gateway: loaded {len(agents)} agents, "
f"{len(mcps)} mcps, {len(gateway.frontends)} frontends"
)
if not gateway.frontends:
return
async with asyncio.TaskGroup() as tg:
for fe in gateway.frontends:
tg.create_task(fe.serve())
async def _build_backends(
*,
settings: Settings,
agents: AgentRegistry,
stack: AsyncExitStack,
) -> dict[str, Backend]:
"""Construct one backend per agent name.
The Raycast ``Client`` is shared across every ``RaycastAgent``
(bearer + device-id are process-wide), so we open it lazily — only
when at least one ``RaycastAgent`` is present — and close it via
the caller's exit stack.
"""
backends: dict[str, Backend] = {}
raycast_agents = [a for a in agents if isinstance(a, RaycastAgent)]
if raycast_agents:
client = await _try_open_raycast_client(settings, stack)
if client is not None:
raycast_backend = RaycastBackend(client)
for a in raycast_agents:
backends[a.name] = raycast_backend
# Phase 2: ClaudeAgent → ClaudeCodeBackendAdapter goes here.
return backends
async def _try_open_raycast_client(
settings: Settings, stack: AsyncExitStack
) -> RaycastClient | None:
"""Open a ``raycast_api.Client`` if creds are available, else warn + skip.
Skipping is gentler than failing startup: the user can bring up the
gateway, list their agents, and still hit a ``ClaudeAgent``; affected
``RaycastAgent`` instances 503 with a specific message at request time.
Missing ``raycast.json`` falls into the same bucket — first-run
users won't have it yet.
"""
if not settings.raycast_bearer:
_log.warning(
"RaycastAgent present but RAYCAST_BEARER is unset — those agents will 503"
)
return None
if not settings.raycast_device_id:
_log.warning(
"RaycastAgent present but RAYCAST_DEVICE_ID is unset — those agents "
"will 503 (generate once with `python -c 'import secrets; "
"print(secrets.token_hex(32))'`)"
)
return None
if not settings.raycast_config_path.exists():
_log.warning(
"RaycastAgent present but %s is missing — those agents will 503",
settings.raycast_config_path,
)
return None
config = RaycastConfig.load(settings.raycast_config_path)
client = RaycastClient(
config=config,
bearer_token=settings.raycast_bearer,
device_id=settings.raycast_device_id,
locale=settings.raycast_locale,
)
return await stack.enter_async_context(client)
+105
View File
@@ -0,0 +1,105 @@
"""Bearer-token verification (Phase 1.3 — in-memory only).
Phase 4 will replace this with a DB-backed store (PRD §8 ``tokens``
table, Argon2 hashes, scopes, ``last_used_at`` batching). Until then,
frontends authenticate callers against an in-memory ``{value: name}``
dict seeded from the ``BOOTSTRAP_TOKENS`` env var.
Format::
BOOTSTRAP_TOKENS=cursor:s3cret,laptop:hunter2
The *name* side is for audit lines — :py:meth:`TokenStore.verify`
returns it on hit so callers can attribute the request without
exposing the raw secret. ``None`` means "no such token"; callers
turn that into 401.
This module deliberately knows nothing about HTTP frameworks — it
takes a raw token (or the verbatim ``Authorization`` header value)
and returns a name-or-None. Frontends own the response shape.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Mapping
class TokenStoreError(ValueError):
"""Malformed ``BOOTSTRAP_TOKENS`` value."""
class TokenStore:
"""Constant-time-ish bearer verification over a static name→value map.
The store is keyed by value internally (``verify`` lookup is O(1))
but constructed from a name→value mapping because that's how the
user thinks about it — one human-readable label per caller.
"""
__slots__ = ("_by_value",)
def __init__(self, tokens: Mapping[str, str]) -> None:
by_value: dict[str, str] = {}
for name, value in tokens.items():
if not name or not value:
msg = f"empty name or value in token map (name={name!r})"
raise TokenStoreError(msg)
if value in by_value:
msg = (
f"duplicate token value for names "
f"{by_value[value]!r} and {name!r}"
)
raise TokenStoreError(msg)
by_value[value] = name
self._by_value = by_value
@classmethod
def from_env(cls, raw: str) -> TokenStore:
"""Parse ``name1:value1,name2:value2`` (the ``BOOTSTRAP_TOKENS`` form)."""
tokens: dict[str, str] = {}
for chunk in raw.split(","):
entry = chunk.strip()
if not entry:
continue
name, sep, value = entry.partition(":")
if not sep:
msg = f"token entry missing ':' separator: {entry!r}"
raise TokenStoreError(msg)
name, value = name.strip(), value.strip()
if name in tokens:
msg = f"duplicate token name: {name!r}"
raise TokenStoreError(msg)
tokens[name] = value
return cls(tokens)
def verify(self, token: str | None) -> str | None:
"""Return the token's name if known, else ``None``."""
if not token:
return None
return self._by_value.get(token)
def verify_bearer(self, authorization: str | None) -> str | None:
"""Strip the ``Bearer`` prefix (case-insensitive) then verify.
Accepts a bare token too — Cursor's MCP transport sometimes
passes the raw value via ``?token=`` and reuses the same
verifier; treating an unprefixed header as a bare token keeps
both call sites on one method.
"""
if not authorization:
return None
head, sep, rest = authorization.partition(" ")
token = rest.strip() if sep and head.lower() == "bearer" else authorization
return self.verify(token)
def __len__(self) -> int:
return len(self._by_value)
def __bool__(self) -> bool:
return bool(self._by_value)
__all__ = ["TokenStore", "TokenStoreError"]
+175
View File
@@ -0,0 +1,175 @@
"""Unified streaming-event protocol shared by every backend adapter.
We piggyback on :mod:`anthropic.types` rather than reinvent the wire format:
``AnthropicMessagesFrontend`` ultimately serializes whatever a backend yields
straight to SSE via :py:meth:`pydantic.BaseModel.model_dump_json`, so events
must be valid Anthropic ``message_stream`` records. The aliases below give
the rest of the codebase one import path; the ``build_*`` helpers keep the
verbose constructors out of every adapter.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from anthropic.types import (
InputJSONDelta,
Message,
MessageDeltaUsage,
RawContentBlockDeltaEvent,
RawContentBlockStartEvent,
RawContentBlockStopEvent,
RawMessageDeltaEvent,
RawMessageStartEvent,
RawMessageStopEvent,
RawMessageStreamEvent,
SignatureDelta,
TextBlock,
TextDelta,
ThinkingBlock,
ThinkingDelta,
ToolUseBlock,
Usage,
)
from anthropic.types.raw_message_delta_event import Delta as MessageDelta
if TYPE_CHECKING:
from collections.abc import Mapping
StopReason = Literal[
"end_turn", "max_tokens", "stop_sequence", "tool_use", "pause_turn", "refusal"
]
"""Anthropic-canonical stop reasons. Backends map their native vocabulary into this."""
MessageStreamEvent = RawMessageStreamEvent
"""Single public alias for the event family yielded by ``Backend.complete``."""
__all__ = [
"MessageDelta",
"MessageDeltaUsage",
"MessageStreamEvent",
"RawContentBlockDeltaEvent",
"RawContentBlockStartEvent",
"RawContentBlockStopEvent",
"RawMessageDeltaEvent",
"RawMessageStartEvent",
"RawMessageStopEvent",
"StopReason",
"Usage",
"build_content_block_stop",
"build_input_json_delta",
"build_message_delta",
"build_message_start",
"build_message_stop",
"build_signature_delta",
"build_text_block_start",
"build_text_delta",
"build_thinking_block_start",
"build_thinking_delta",
"build_tool_use_block_start",
]
def build_message_start(
*, message_id: str, model: str, input_tokens: int = 0, output_tokens: int = 0
) -> RawMessageStartEvent:
"""Open a turn. Token counts get finalized later via ``message_delta``."""
return RawMessageStartEvent(
type="message_start",
message=Message(
id=message_id,
type="message",
role="assistant",
model=model,
content=[],
stop_reason=None,
stop_sequence=None,
usage=Usage(input_tokens=input_tokens, output_tokens=output_tokens),
),
)
def build_text_block_start(index: int) -> RawContentBlockStartEvent:
return RawContentBlockStartEvent(
type="content_block_start",
index=index,
content_block=TextBlock(type="text", text="", citations=None),
)
def build_text_delta(index: int, text: str) -> RawContentBlockDeltaEvent:
return RawContentBlockDeltaEvent(
type="content_block_delta",
index=index,
delta=TextDelta(type="text_delta", text=text),
)
def build_tool_use_block_start(
index: int, *, tool_use_id: str, name: str
) -> RawContentBlockStartEvent:
return RawContentBlockStartEvent(
type="content_block_start",
index=index,
content_block=ToolUseBlock(
type="tool_use", id=tool_use_id, name=name, input={}
),
)
def build_input_json_delta(index: int, partial_json: str) -> RawContentBlockDeltaEvent:
return RawContentBlockDeltaEvent(
type="content_block_delta",
index=index,
delta=InputJSONDelta(type="input_json_delta", partial_json=partial_json),
)
def build_thinking_block_start(index: int) -> RawContentBlockStartEvent:
return RawContentBlockStartEvent(
type="content_block_start",
index=index,
content_block=ThinkingBlock(type="thinking", thinking="", signature=""),
)
def build_thinking_delta(index: int, thinking: str) -> RawContentBlockDeltaEvent:
return RawContentBlockDeltaEvent(
type="content_block_delta",
index=index,
delta=ThinkingDelta(type="thinking_delta", thinking=thinking),
)
def build_signature_delta(index: int, signature: str) -> RawContentBlockDeltaEvent:
return RawContentBlockDeltaEvent(
type="content_block_delta",
index=index,
delta=SignatureDelta(type="signature_delta", signature=signature),
)
def build_content_block_stop(index: int) -> RawContentBlockStopEvent:
return RawContentBlockStopEvent(type="content_block_stop", index=index)
def build_message_delta(
*,
stop_reason: StopReason | None,
usage: Mapping[str, int] | None = None,
stop_sequence: str | None = None,
) -> RawMessageDeltaEvent:
"""Close a turn. ``usage`` is a partial dict (input/output tokens, cache_*)."""
return RawMessageDeltaEvent(
type="message_delta",
delta=MessageDelta(stop_reason=stop_reason, stop_sequence=stop_sequence),
usage=MessageDeltaUsage.model_validate(
dict(usage) if usage else {"output_tokens": 0}
),
)
def build_message_stop() -> RawMessageStopEvent:
return RawMessageStopEvent(type="message_stop")
+3 -2
View File
@@ -2,6 +2,7 @@
from __future__ import annotations
from beaver_gateway.frontends.base import Frontend
from beaver_gateway.frontends.anthropic import AnthropicMessagesFrontend
from beaver_gateway.frontends.base import Frontend, GatewayRuntime
__all__ = ["Frontend"]
__all__ = ["AnthropicMessagesFrontend", "Frontend", "GatewayRuntime"]
+299
View File
@@ -0,0 +1,299 @@
"""``POST /v1/messages`` frontend.
Exposes the gateway as an Anthropic-compatible Messages endpoint, so any
client that already speaks Anthropic (Cursor, Cline, the official SDK,
``curl``) can hit a configured agent by passing its name as ``model``.
The frontend is intentionally thin: it authenticates the bearer token,
resolves ``body.model`` to an agent + its backend, and then either
streams the backend's events straight to SSE or accumulates them into a
single ``Message`` for ``stream=false`` callers. All provider quirks
already live in the backend adapters; we don't translate here.
Phase 1.4 wires only ``RaycastAgent`` through ``RaycastBackend``;
``ClaudeAgent`` lands in Phase 2 and will plug into the same dispatch
table without changes to this module.
"""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING, Any
from anthropic.types import (
InputJSONDelta,
Message,
RawContentBlockDeltaEvent,
RawContentBlockStartEvent,
RawContentBlockStopEvent,
RawMessageDeltaEvent,
RawMessageStartEvent,
SignatureDelta,
TextBlock,
TextDelta,
ThinkingBlock,
ThinkingDelta,
ToolUseBlock,
Usage,
)
from fastapi import FastAPI, HTTPException, Request, status
from fastapi.responses import JSONResponse, StreamingResponse
from beaver_gateway.frontends.base import Frontend
if TYPE_CHECKING:
from collections.abc import AsyncIterator
from beaver_gateway.core.events import MessageStreamEvent
from beaver_gateway.frontends.base import GatewayRuntime
_log = logging.getLogger("beaver_gateway.frontends.anthropic")
__all__ = ["AnthropicMessagesFrontend"]
class AnthropicMessagesFrontend(Frontend):
"""FastAPI app behind ``POST /v1/messages`` + ``GET /v1/models``."""
def __init__(self, *, host: str = "0.0.0.0", port: int = 8000) -> None: # noqa: S104
self.host = host
self.port = port
self._runtime: GatewayRuntime | None = None
self._app: FastAPI | None = None
def configure(self, runtime: GatewayRuntime) -> None:
self._runtime = runtime
self._app = self._build_app(runtime)
async def serve(self) -> None:
# Local import: uvicorn pulls in a lot, no reason to load it when
# something else (a test, a script) imports this module just for
# the FastAPI factory.
import uvicorn
if self._app is None:
msg = "configure() must be called before serve()"
raise RuntimeError(msg)
config = uvicorn.Config(
self._app, host=self.host, port=self.port, log_level="info"
)
server = uvicorn.Server(config)
await server.serve()
def _build_app(self, runtime: GatewayRuntime) -> FastAPI:
app = FastAPI(title="beaver-gateway / Anthropic Messages")
@app.get("/healthz")
async def healthz() -> dict[str, str]:
return {"status": "ok"}
@app.get("/v1/models")
async def list_models(request: Request) -> dict[str, Any]:
_require_token(request, runtime)
data = [
{
"type": "model",
"id": a.name,
"display_name": a.name,
"created_at": None,
}
for a in runtime.agents
]
return {"data": data, "has_more": False, "first_id": None, "last_id": None}
@app.post("/v1/messages")
async def create_message(request: Request) -> Any:
token_name = _require_token(request, runtime)
try:
body = await request.json()
except json.JSONDecodeError as exc:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, f"invalid JSON: {exc}"
) from exc
model = body.get("model")
if not isinstance(model, str):
raise HTTPException(
status.HTTP_400_BAD_REQUEST, "missing or non-string `model`"
)
agent = runtime.agents.get(model)
if agent is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND, f"unknown agent: {model!r}"
)
backend = runtime.backends.get(agent.name)
if backend is None:
# Agent exists in config but its backend isn't wired in
# this phase (e.g. ClaudeAgent before Phase 2, or a
# RaycastAgent without RAYCAST_BEARER set at startup).
raise HTTPException(
status.HTTP_503_SERVICE_UNAVAILABLE,
f"no backend configured for agent {agent.name!r}",
)
messages = body.get("messages") or []
system = body.get("system")
stream_flag = bool(body.get("stream", False))
_log.info(
"messages: actor=%s agent=%s stream=%s msgs=%d",
token_name,
agent.name,
stream_flag,
len(messages),
)
# Forward per-request knobs the Anthropic body may carry —
# backend adapters layer these over per-agent defaults. Only
# values explicitly present (not Anthropic-defaulted ones we
# never received) are forwarded, so the agent's default still
# wins when the caller omits the field.
options: dict[str, Any] = {}
if isinstance(body.get("temperature"), int | float):
options["temperature"] = body["temperature"]
events = backend.complete(
agent=agent,
messages=messages,
system=system if isinstance(system, str) else None,
**options,
)
if stream_flag:
return StreamingResponse(
_sse(events), media_type="text/event-stream"
)
message = await _accumulate(events, model=model)
return JSONResponse(content=message.model_dump(mode="json"))
return app
def _require_token(request: Request, runtime: GatewayRuntime) -> str:
"""Verify the request's bearer and return the token's audit name.
Accepts both ``X-Api-Key: <token>`` (what the official Anthropic
SDK sends — LibreChat, the CLI, third-party clients) and
``Authorization: Bearer <token>`` (curl, Cursor). Raises 401 on
miss. ``TokenStore`` doesn't know about HTTP, so response shape
is owned here.
"""
api_key = request.headers.get("x-api-key")
name = (
runtime.token_store.verify(api_key)
if api_key
else runtime.token_store.verify_bearer(request.headers.get("authorization"))
)
if name is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
"invalid or missing bearer token",
headers={"WWW-Authenticate": "Bearer"},
)
return name
async def _sse(events: AsyncIterator[MessageStreamEvent]) -> AsyncIterator[bytes]:
r"""Serialize an event stream into Anthropic's ``text/event-stream`` form.
Each event becomes ``event: <type>\ndata: <json>\n\n`` — the shape
the Anthropic SDK's SSE decoder expects. Errors mid-stream are
swallowed into a synthetic ``error`` event so the client sees the
failure rather than a hung connection.
"""
try:
async for ev in events:
payload = ev.model_dump_json()
yield f"event: {ev.type}\ndata: {payload}\n\n".encode()
except Exception as exc: # noqa: BLE001
_log.exception("backend stream failed")
err = json.dumps(
{"type": "error", "error": {"type": "api_error", "message": str(exc)}}
)
yield f"event: error\ndata: {err}\n\n".encode()
async def _accumulate(
events: AsyncIterator[MessageStreamEvent], *, model: str
) -> Message:
"""Collapse a stream-event sequence into one ``Message`` (``stream=false``).
Mirrors the Anthropic SDK's own accumulator: walk events, build
block dicts indexed by their ``content_block`` index, fold text /
thinking deltas in, buffer ``input_json_delta`` chunks until the
block closes (then JSON-parse them once).
"""
message_id = ""
role = "assistant"
usage = Usage(input_tokens=0, output_tokens=0)
blocks: dict[int, dict[str, Any]] = {}
json_buffers: dict[int, str] = {}
stop_reason: str | None = None
stop_sequence: str | None = None
async for ev in events:
# isinstance, not ``ev.type == "..."``: ty narrows on the
# discriminator only via the class, and the raw event union
# carries its own discriminators (``Raw*Event``) the SDK
# already promises.
if isinstance(ev, RawMessageStartEvent):
message_id = ev.message.id
role = ev.message.role
usage = ev.message.usage
elif isinstance(ev, RawContentBlockStartEvent):
blocks[ev.index] = ev.content_block.model_dump()
if blocks[ev.index].get("type") == "tool_use":
json_buffers[ev.index] = ""
elif isinstance(ev, RawContentBlockDeltaEvent):
blk = blocks.setdefault(ev.index, {})
delta = ev.delta
if isinstance(delta, TextDelta):
blk["text"] = blk.get("text", "") + delta.text
elif isinstance(delta, InputJSONDelta):
json_buffers[ev.index] = (
json_buffers.get(ev.index, "") + delta.partial_json
)
elif isinstance(delta, ThinkingDelta):
blk["thinking"] = blk.get("thinking", "") + delta.thinking
elif isinstance(delta, SignatureDelta):
blk["signature"] = delta.signature
elif isinstance(ev, RawContentBlockStopEvent):
blk = blocks.get(ev.index, {})
if blk.get("type") == "tool_use":
raw = json_buffers.pop(ev.index, "")
blk["input"] = json.loads(raw) if raw.strip() else {}
elif isinstance(ev, RawMessageDeltaEvent):
stop_reason = ev.delta.stop_reason
stop_sequence = ev.delta.stop_sequence
if ev.usage.output_tokens:
usage = Usage.model_validate(
{**usage.model_dump(), "output_tokens": ev.usage.output_tokens}
)
content = []
for idx in sorted(blocks):
bd = blocks[idx]
btype = bd.get("type")
if btype == "text":
content.append(TextBlock.model_validate(bd))
elif btype == "tool_use":
content.append(ToolUseBlock.model_validate(bd))
elif btype == "thinking":
content.append(ThinkingBlock.model_validate(bd))
return Message(
id=message_id or "msg_unknown",
type="message",
role=role,
model=model,
content=content,
stop_reason=stop_reason, # type: ignore[arg-type]
stop_sequence=stop_sequence,
usage=usage,
)
+27 -5
View File
@@ -1,24 +1,46 @@
"""Frontend ABC.
"""Frontend ABC + the runtime context handed to ``configure``.
A frontend is anything that listens on a port and routes inbound traffic
into the agent/MCP registries. ``configure`` is called once after the
``Gateway`` is built; ``serve`` runs the listening loop.
into the gateway. ``GatewayRuntime`` carries everything a frontend may
need that isn't user-config: built registries, per-agent backends, and
the in-memory token store. The user's ``/config/config.py`` defines a
``Gateway`` (lists); ``cli.main`` turns that into a ``GatewayRuntime``
and hands it to each frontend's ``configure``.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from beaver_gateway.core.registry import Gateway
from beaver_gateway.backends.base import Backend
from beaver_gateway.core.auth import TokenStore
from beaver_gateway.core.registry import AgentRegistry, McpRegistry
@dataclass(frozen=True, slots=True)
class GatewayRuntime:
"""Post-build state of the gateway, shared with every frontend.
Backends are keyed by **agent name**, not type — one ``RaycastBackend``
instance can serve many ``RaycastAgent`` instances, but the lookup
site (an inbound request with ``model=<agent.name>``) already has
the name in hand, so the indirection lives one step earlier.
"""
agents: AgentRegistry
mcps: McpRegistry
backends: dict[str, Backend]
token_store: TokenStore
class Frontend(ABC):
"""Listens on a port, dispatches into the gateway."""
@abstractmethod
def configure(self, gateway: Gateway) -> None: ...
def configure(self, runtime: GatewayRuntime) -> None: ...
@abstractmethod
async def serve(self) -> None: ...
+9 -3
View File
@@ -7,6 +7,7 @@ discriminated-union members so downstream code can ``match`` on ``kind``.
from __future__ import annotations
from collections.abc import Callable # noqa: TC003 — runtime use by pydantic
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Literal
from pydantic import BaseModel, ConfigDict, Field
@@ -27,7 +28,7 @@ class StdioMcp(_BaseMcp):
kind: Literal["stdio"] = "stdio"
command: tuple[str, ...]
env: dict[str, str] | None = None
cwd: str | None = None
cwd: Path | None = None
class HttpMcp(_BaseMcp):
@@ -60,9 +61,14 @@ class McpServer:
name: str,
command: Iterable[str],
env: dict[str, str] | None = None,
cwd: str | None = None,
cwd: Path | str | None = None,
) -> StdioMcp:
return StdioMcp(name=name, command=tuple(command), env=env, cwd=cwd)
return StdioMcp(
name=name,
command=tuple(command),
env=env,
cwd=Path(cwd) if isinstance(cwd, str) else cwd,
)
@classmethod
def http(
+15
View File
@@ -29,3 +29,18 @@ class Settings(BaseSettings):
raycast_bearer: str | None = None
raycast_config_path: Path = Path("/config/raycast.json")
raycast_device_id: str | None = None
"""64-hex-char stable per-install id. Required when any ``RaycastAgent``
is configured — generate once via ``secrets.token_hex(32)`` and keep
in ``.env`` so Raycast doesn't see every restart as a new device."""
raycast_locale: str = "en-US"
"""``Accept-Language`` header sent by the shared ``raycast_api.Client``
and the default locale used to render auto ``UserPreferences``. One
value per gateway since we open one Client total."""
bootstrap_tokens: str = ""
"""Phase 1.3 seed: ``name1:value1,name2:value2``. Empty = no auth wired yet.
Phase 4 moves tokens into the DB. Until then this is the only source.
"""