diff --git a/src/beaver_gateway/frontends/markdown/frontend.py b/src/beaver_gateway/frontends/markdown/frontend.py index 5ac934f..58ae12b 100644 --- a/src/beaver_gateway/frontends/markdown/frontend.py +++ b/src/beaver_gateway/frontends/markdown/frontend.py @@ -934,22 +934,32 @@ async def _events_with_heartbeat( instead of leaving it dangling. """ src = events.__aiter__() - while True: - next_task: asyncio.Task[Any] = asyncio.ensure_future(src.__anext__()) - try: + next_task: asyncio.Task[Any] | None = None + try: + while True: + # Reuse the in-flight task across timeouts. Spawning a fresh + # ``__anext__()`` while the previous one is still pending + # puts two consumers on the same async generator — that + # raises ``RuntimeError: anext(): asynchronous generator is + # already running``. + if next_task is None: + next_task = asyncio.ensure_future(src.__anext__()) done, _pending = await asyncio.wait({next_task}, timeout=interval) - except BaseException: + if not done: + yield None + continue + task = next_task + next_task = None + try: + result = task.result() + except StopAsyncIteration: + return + yield result + finally: + if next_task is not None and not next_task.done(): next_task.cancel() with contextlib.suppress(BaseException): await next_task - raise - if not done: - yield None - continue - try: - yield next_task.result() - except StopAsyncIteration: - return def _sse_pack(event: str, data: dict[str, Any]) -> bytes: