from typing import Callable, Awaitable, Coroutine, Set
import asyncio
from .base_runner import BaseRunner, OrphanedReturn
from ._compat import asyncio_current_task
[docs]class AsyncioRunner(BaseRunner):
"""
Runner for coroutines with :py:mod:`asyncio`
All active payloads are actively cancelled when the runner is closed.
"""
flavour = asyncio
# This runner directly uses asyncio.Task to run payloads.
# To detect errors, each payload is wrapped; errors and unexpected return values
# are pushed to a queue from which the main task re-raises.
# Tasks are registered in a container to allow cancelling them. The payload wrapper
# takes care of adding/removing tasks.
def __init__(self, asyncio_loop: asyncio.AbstractEventLoop):
super().__init__(asyncio_loop)
self._tasks: Set[asyncio.Task] = set()
self._payload_failure = asyncio_loop.create_future()
[docs] def register_payload(self, payload: Callable[[], Awaitable]):
self.asyncio_loop.call_soon_threadsafe(self._setup_payload, payload)
[docs] def run_payload(self, payload: Callable[[], Coroutine]):
future = asyncio.run_coroutine_threadsafe(payload(), self.asyncio_loop)
return future.result()
def _setup_payload(self, payload: Callable[[], Awaitable]):
task = self.asyncio_loop.create_task(self._monitor_payload(payload))
self._tasks.add(task)
async def _monitor_payload(self, payload: Callable[[], Awaitable]):
try:
result = await payload()
except (asyncio.CancelledError, KeyboardInterrupt):
raise
except BaseException as e:
failure = e
else:
if result is None:
return
failure = OrphanedReturn(payload, result)
self._tasks.discard(asyncio_current_task())
if not self._payload_failure.done():
self._payload_failure.set_exception(failure)
[docs] async def manage_payloads(self):
await self._payload_failure
[docs] async def aclose(self):
if self._stopped.is_set() and not self._tasks:
return
# let the manage task wake up and exit
if not self._payload_failure.done():
self._payload_failure.set_result(None)
while self._tasks:
for task in self._tasks.copy():
if task.done():
self._tasks.discard(task)
# monitored tasks only propagate cancellation and KeyboardInterrupt
# KeyboardInterrupt will abort the asyncio loop but mark the task
# as exceptionally terminated – we explicitly fetch the exception
# to mark it as retrieved/handled and avoid warnings.
if not task.cancelled():
task.exception()
else:
task.cancel()
await asyncio.sleep(0.1)