From 5fb5b6a3fb7fb58bb616324817fae1055d341fa7 Mon Sep 17 00:00:00 2001 From: h Date: Fri, 22 May 2026 23:59:00 +0200 Subject: [PATCH] feat: add heartbeats to prevent long-thinking from closing --- .../frontends/markdown/frontend.py | 48 ++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/src/beaver_gateway/frontends/markdown/frontend.py b/src/beaver_gateway/frontends/markdown/frontend.py index 1e9c6b9..5ac934f 100644 --- a/src/beaver_gateway/frontends/markdown/frontend.py +++ b/src/beaver_gateway/frontends/markdown/frontend.py @@ -24,12 +24,14 @@ shape. from __future__ import annotations +import asyncio import contextlib import json import logging import os import tempfile import time +from collections.abc import AsyncIterator from pathlib import Path from typing import TYPE_CHECKING, Any @@ -86,6 +88,13 @@ _STREAM_FLUSH_DEBOUNCE = 0.4 # disk round-trip). _SSE_FLUSH_DEBOUNCE = 0.1 +# Interval between SSE comment-frames sent when the backend is silent +# (e.g. claude is mid-thinking on a large context). The Obsidian plugin +# and any intermediate proxies will hold the connection open as long as +# bytes keep flowing; a comment-frame is the cheapest legal SSE keepalive. +# Set well under typical proxy/client idle timeouts (60s). +_SSE_HEARTBEAT_INTERVAL = 15.0 + class MarkdownFrontend(Frontend): """FastAPI app behind ``POST /chat`` driven by Obsidian-vault files.""" @@ -620,7 +629,14 @@ class MarkdownFrontend(Frontend): return _reattach_frontmatter(parsed.metadata, new_body) try: - async for ev in events: + async for ev in _events_with_heartbeat(events): + if ev is None: + # Backend is quiet (claude mid-thinking, MCP slow, + # whatever). SSE comment-frame keeps the TCP socket + # warm so the plugin / uvicorn / any reverse proxy + # doesn't time the request out before we finish. + yield b": keepalive\n\n" + continue acc.feed(ev) now = time.monotonic() if ( @@ -906,6 +922,36 @@ class MarkdownFrontend(Frontend): # ---- module-level utilities ---------------------------------------------- +async def _events_with_heartbeat( + events: AsyncIterator[Any], interval: float = _SSE_HEARTBEAT_INTERVAL +) -> AsyncIterator[Any]: + """Wrap an async event stream with idle-time heartbeat markers. + + Yields ``None`` every ``interval`` seconds during silence; real + events pass through unchanged. When the wrapped iterator is + exhausted, this generator returns. Cancellation propagates: if the + outer scope is cancelled we cancel the pending ``__anext__`` task + instead of leaving it dangling. + """ + src = events.__aiter__() + while True: + next_task: asyncio.Task[Any] = asyncio.ensure_future(src.__anext__()) + try: + done, _pending = await asyncio.wait({next_task}, timeout=interval) + except BaseException: + next_task.cancel() + with contextlib.suppress(BaseException): + await next_task + raise + if not done: + yield None + continue + try: + yield next_task.result() + except StopAsyncIteration: + return + + def _sse_pack(event: str, data: dict[str, Any]) -> bytes: r"""Format one Server-Sent Event frame.