import threading
import asyncio
from .base_runner import BaseRunner, OrphanedReturn
[docs]
class ThreadRunner(BaseRunner):
"""
Runner for subroutines with :py:mod:`threading`
Active payloads are *not* cancelled when the runner is closed.
Only program termination forcefully cancels leftover payloads.
"""
flavour = threading
# This runner directly uses threading.Thread to run payloads.
# To detect errors, each payload is wrapped; errors and unexpected return values
# are pushed to a queue from which the main task re-raises.
def __init__(self, asyncio_loop: asyncio.AbstractEventLoop):
super().__init__(asyncio_loop)
self._payload_failure = asyncio_loop.create_future()
[docs]
def register_payload(self, payload):
thread = threading.Thread(
target=self._monitor_payload, args=(payload,), daemon=True
)
thread.start()
[docs]
def run_payload(self, payload):
# The method has to block until payload is done.
# Instead of running payload in a thread and blocking this one,
# this thread is blocked by running the payload directly.
return payload()
def _monitor_payload(self, payload):
try:
result = payload()
except BaseException as e: # noqa: B036
failure = e
else:
if result is None:
return
failure = OrphanedReturn(payload, result)
self.asyncio_loop.call_soon_threadsafe(self._set_failure, failure)
def _set_failure(self, failure: BaseException):
if not self._payload_failure.done():
self._payload_failure.set_exception(failure)
[docs]
async def manage_payloads(self):
await self._payload_failure
[docs]
async def aclose(self):
if self._stopped.is_set():
return
if not self._payload_failure.done():
self._payload_failure.set_result(None)