Source code for cobald.daemon.runners.base_runner

from typing import Any
from abc import abstractmethod, ABCMeta
import asyncio
import logging
import threading

from ..debug import NameRepr


[docs] class BaseRunner(metaclass=ABCMeta): """Concurrency backend on top of `asyncio`""" flavour: Any def __init__(self, asyncio_loop: asyncio.AbstractEventLoop): self.asyncio_loop = asyncio_loop self._logger = logging.getLogger( "cobald.runtime.runner.%s" % NameRepr(self.flavour) ) self._stopped = threading.Event() self._stopped.set()
[docs] @abstractmethod def register_payload(self, payload): """ Register ``payload`` for background execution in a threadsafe manner This runs ``payload`` as an orphaned background task as soon as possible. It is an error for ``payload`` to return or raise anything without handling it. """ raise NotImplementedError
[docs] @abstractmethod def run_payload(self, payload): """ Execute ``payload`` and return its result in a threadsafe manner This runs ``payload`` as soon as possible, blocking until completion. Should ``payload`` return or raise anything, it is propagated to the caller. """ raise NotImplementedError
[docs] async def ready(self): """Wait until the runner is ready to accept payloads""" assert ( not self._stopped.is_set() ), "runner must be .run before waiting until it is ready"
# Most runners are ready when instantiated, simply queueing payloads # until they get a chance to run them. Only override this method when # the runner has to do some `async` setup before being ready.
[docs] async def run(self): """ Execute all current and future payloads in an `asyncio` coroutine This method will continuously execute payloads sent to the runner. It only returns when :py:meth:`stop` is called or if any orphaned payload returns or raises. In the latter case, :py:exc:`~.OrphanedReturn` or the raised exception is re-raised by this method. Implementations should override :py:meth:`~.manage_payloads` to customize their specific parts. """ self._logger.info("runner started: %s", self) self._stopped.clear() try: await self.manage_payloads() except asyncio.CancelledError: self._logger.info("runner cancelled: %s", self) raise except BaseException: self._logger.exception("runner aborted: %s", self) raise else: self._logger.info("runner stopped: %s", self) finally: self._stopped.set()
[docs] @abstractmethod async def manage_payloads(self): """ Implementation of managing payloads when :py:meth:`~.run` This method must continuously execute payloads sent to the runner. It may only return when :py:meth:`stop` is called or if any orphaned payload return or raise. In the latter case, :py:exc:`~.OrphanedReturn` or the raised exception must re-raised by this method. """ raise NotImplementedError
[docs] @abstractmethod async def aclose(self): """Shut down this runner""" raise NotImplementedError
[docs] def stop(self): """Stop execution of all current and future payloads and block until success""" if self._stopped.is_set(): return # the loop exists independently of all runners, we can use it to shut down closed = asyncio.run_coroutine_threadsafe(self.aclose(), self.asyncio_loop) closed.result()
[docs] class OrphanedReturn(Exception): """A runnable returned a value without anyone to receive it""" def __init__(self, who, value): super().__init__("no caller to receive %s from %s" % (value, who)) self.who = who self.value = value