feat: add heartbeats to prevent long-thinking from closing
This commit is contained in:
@@ -24,12 +24,14 @@ shape.
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import contextlib
|
import contextlib
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import time
|
import time
|
||||||
|
from collections.abc import AsyncIterator
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import TYPE_CHECKING, Any
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
@@ -86,6 +88,13 @@ _STREAM_FLUSH_DEBOUNCE = 0.4
|
|||||||
# disk round-trip).
|
# disk round-trip).
|
||||||
_SSE_FLUSH_DEBOUNCE = 0.1
|
_SSE_FLUSH_DEBOUNCE = 0.1
|
||||||
|
|
||||||
|
# Interval between SSE comment-frames sent when the backend is silent
|
||||||
|
# (e.g. claude is mid-thinking on a large context). The Obsidian plugin
|
||||||
|
# and any intermediate proxies will hold the connection open as long as
|
||||||
|
# bytes keep flowing; a comment-frame is the cheapest legal SSE keepalive.
|
||||||
|
# Set well under typical proxy/client idle timeouts (60s).
|
||||||
|
_SSE_HEARTBEAT_INTERVAL = 15.0
|
||||||
|
|
||||||
|
|
||||||
class MarkdownFrontend(Frontend):
|
class MarkdownFrontend(Frontend):
|
||||||
"""FastAPI app behind ``POST /chat`` driven by Obsidian-vault files."""
|
"""FastAPI app behind ``POST /chat`` driven by Obsidian-vault files."""
|
||||||
@@ -620,7 +629,14 @@ class MarkdownFrontend(Frontend):
|
|||||||
return _reattach_frontmatter(parsed.metadata, new_body)
|
return _reattach_frontmatter(parsed.metadata, new_body)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async for ev in events:
|
async for ev in _events_with_heartbeat(events):
|
||||||
|
if ev is None:
|
||||||
|
# Backend is quiet (claude mid-thinking, MCP slow,
|
||||||
|
# whatever). SSE comment-frame keeps the TCP socket
|
||||||
|
# warm so the plugin / uvicorn / any reverse proxy
|
||||||
|
# doesn't time the request out before we finish.
|
||||||
|
yield b": keepalive\n\n"
|
||||||
|
continue
|
||||||
acc.feed(ev)
|
acc.feed(ev)
|
||||||
now = time.monotonic()
|
now = time.monotonic()
|
||||||
if (
|
if (
|
||||||
@@ -906,6 +922,36 @@ class MarkdownFrontend(Frontend):
|
|||||||
# ---- module-level utilities ----------------------------------------------
|
# ---- module-level utilities ----------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def _events_with_heartbeat(
|
||||||
|
events: AsyncIterator[Any], interval: float = _SSE_HEARTBEAT_INTERVAL
|
||||||
|
) -> AsyncIterator[Any]:
|
||||||
|
"""Wrap an async event stream with idle-time heartbeat markers.
|
||||||
|
|
||||||
|
Yields ``None`` every ``interval`` seconds during silence; real
|
||||||
|
events pass through unchanged. When the wrapped iterator is
|
||||||
|
exhausted, this generator returns. Cancellation propagates: if the
|
||||||
|
outer scope is cancelled we cancel the pending ``__anext__`` task
|
||||||
|
instead of leaving it dangling.
|
||||||
|
"""
|
||||||
|
src = events.__aiter__()
|
||||||
|
while True:
|
||||||
|
next_task: asyncio.Task[Any] = asyncio.ensure_future(src.__anext__())
|
||||||
|
try:
|
||||||
|
done, _pending = await asyncio.wait({next_task}, timeout=interval)
|
||||||
|
except BaseException:
|
||||||
|
next_task.cancel()
|
||||||
|
with contextlib.suppress(BaseException):
|
||||||
|
await next_task
|
||||||
|
raise
|
||||||
|
if not done:
|
||||||
|
yield None
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
yield next_task.result()
|
||||||
|
except StopAsyncIteration:
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
def _sse_pack(event: str, data: dict[str, Any]) -> bytes:
|
def _sse_pack(event: str, data: dict[str, Any]) -> bytes:
|
||||||
r"""Format one Server-Sent Event frame.
|
r"""Format one Server-Sent Event frame.
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user