fix: asynchronous generator is already running
This commit is contained in:
@@ -934,22 +934,32 @@ async def _events_with_heartbeat(
|
|||||||
instead of leaving it dangling.
|
instead of leaving it dangling.
|
||||||
"""
|
"""
|
||||||
src = events.__aiter__()
|
src = events.__aiter__()
|
||||||
while True:
|
next_task: asyncio.Task[Any] | None = None
|
||||||
next_task: asyncio.Task[Any] = asyncio.ensure_future(src.__anext__())
|
|
||||||
try:
|
try:
|
||||||
|
while True:
|
||||||
|
# Reuse the in-flight task across timeouts. Spawning a fresh
|
||||||
|
# ``__anext__()`` while the previous one is still pending
|
||||||
|
# puts two consumers on the same async generator — that
|
||||||
|
# raises ``RuntimeError: anext(): asynchronous generator is
|
||||||
|
# already running``.
|
||||||
|
if next_task is None:
|
||||||
|
next_task = asyncio.ensure_future(src.__anext__())
|
||||||
done, _pending = await asyncio.wait({next_task}, timeout=interval)
|
done, _pending = await asyncio.wait({next_task}, timeout=interval)
|
||||||
except BaseException:
|
|
||||||
next_task.cancel()
|
|
||||||
with contextlib.suppress(BaseException):
|
|
||||||
await next_task
|
|
||||||
raise
|
|
||||||
if not done:
|
if not done:
|
||||||
yield None
|
yield None
|
||||||
continue
|
continue
|
||||||
|
task = next_task
|
||||||
|
next_task = None
|
||||||
try:
|
try:
|
||||||
yield next_task.result()
|
result = task.result()
|
||||||
except StopAsyncIteration:
|
except StopAsyncIteration:
|
||||||
return
|
return
|
||||||
|
yield result
|
||||||
|
finally:
|
||||||
|
if next_task is not None and not next_task.done():
|
||||||
|
next_task.cancel()
|
||||||
|
with contextlib.suppress(BaseException):
|
||||||
|
await next_task
|
||||||
|
|
||||||
|
|
||||||
def _sse_pack(event: str, data: dict[str, Any]) -> bytes:
|
def _sse_pack(event: str, data: dict[str, Any]) -> bytes:
|
||||||
|
|||||||
Reference in New Issue
Block a user