Source code for cobald.daemon.runners.service

from typing import TypeVar, Set
import logging
import weakref
import trio
import functools
import threading

from types import ModuleType

from .meta_runner import MetaRunner
from .guard import exclusive
from ..debug import NameRepr


T = TypeVar("T")


def _weakset_copy(ws: "weakref.WeakSet[T]") -> Set[T]:
    """Thread-safely copy all items from a weakset to a set"""
    # The various WeakSet methods are not thread-safe because they miss locking.
    # The main issue is that all copy approaches use ``__iter__``, which is not
    # thread-safe against items being garbage collected. However, we can access
    # the actual backing real set ``ws.data`` and ``set(some_set)`` is GIL-atomic.
    refs = set(ws.data)
    return {item for item in (ref() for ref in refs) if item is not None}


[docs] class ServiceUnit(object): """ Definition for running a service :param service: the service to run :param flavour: runner flavour to use for running the service """ __active_units__: "weakref.WeakSet[ServiceUnit]" = weakref.WeakSet() def __init__(self, service, flavour): assert hasattr(service, "run"), "service must implement a 'run' method" assert any( flavour == runner.flavour for runner in MetaRunner.runner_types ), "service flavour must be one of %s" % ",".join( repr(runner.flavour) for runner in MetaRunner.runner_types ) self.service = weakref.ref(service) self.flavour = flavour self._started = False ServiceUnit.__active_units__.add(self)
[docs] @classmethod def units(cls) -> "Set[ServiceUnit]": """Container of all currently defined units""" return _weakset_copy(cls.__active_units__)
@property def running(self): return self._started
[docs] def start(self, runner: MetaRunner): service = self.service() if service is None: return else: self._started = True runner.register_payload(service.run, flavour=self.flavour)
def __repr__(self): return "%s(%r, flavour=%r)" % ( self.__class__.__name__, self.service() or "<defunct>", self.flavour, )
[docs] def service(flavour): r""" Mark a class as implementing a Service Each Service class must have a ``run`` method, which does not take any arguments. This method is :py:meth:`~.ServiceRunner.adopt`\ ed after the daemon starts, unless * the Service has been garbage collected, or * the ServiceUnit has been :py:meth:`~.ServiceUnit.cancel`\ ed. For each service instance, its :py:class:`~.ServiceUnit` is available at ``service_instance.__service_unit__``. """ def service_unit_decorator(raw_cls): __new__ = raw_cls.__new__ def __new_service__(cls, *args, **kwargs): if __new__ is object.__new__: self = __new__(cls) else: self = __new__(cls, *args, **kwargs) service_unit = ServiceUnit(self, flavour) self.__service_unit__ = service_unit return self raw_cls.__new__ = __new_service__ if raw_cls.run.__doc__ is None: raw_cls.run.__doc__ = "Service entry point" return raw_cls return service_unit_decorator
[docs] class ServiceRunner(object): """ Runner for coroutines, subroutines and services The service runner prevents silent failures by tracking concurrent tasks and therefore provides safer concurrency. If any task fails with an exception or provides unexpected output values, this is registered as an error; the runner will gracefully shut down all tasks in this case. To provide ``async`` concurrency, the runner also manages common ``async`` event loops and tracks them for failures as well. As a result, ``async`` code should usually use the "current" event loop directly. """ def __init__(self, accept_delay: float = 1): self._logger = logging.getLogger("cobald.runtime.daemon.services") self._meta_runner = MetaRunner() self._must_shutdown = False self._is_shutdown = threading.Event() self._is_shutdown.set() self.running = threading.Event() self.accept_delay = accept_delay
[docs] def execute(self, payload, *args, flavour: ModuleType, **kwargs): """ Synchronously run ``payload`` and provide its output If ``*args*`` and/or ``**kwargs`` are provided, pass them to ``payload`` upon execution. """ if args or kwargs: payload = functools.partial(payload, *args, **kwargs) return self._meta_runner.run_payload(payload, flavour=flavour)
[docs] def adopt(self, payload, *args, flavour: ModuleType, **kwargs): """ Concurrently run ``payload`` in the background If ``*args*`` and/or ``**kwargs`` are provided, pass them to ``payload`` upon execution. """ if args or kwargs: payload = functools.partial(payload, *args, **kwargs) self._meta_runner.register_payload(payload, flavour=flavour)
[docs] @exclusive() def accept(self): """ Start accepting synchronous, asynchronous and service payloads Since services are globally defined, only one :py:class:`ServiceRunner` may :py:meth:`accept` payloads at any time. """ self._must_shutdown = False self._logger.info("%s starting", self.__class__.__name__) self.adopt(self._accept_services, flavour=trio) self._meta_runner.run()
[docs] def shutdown(self): """Shutdown the accept loop and stop running payloads""" self._must_shutdown = True self._is_shutdown.wait() self._meta_runner.stop()
async def _accept_services(self): delay, max_delay, increase = 0.0, self.accept_delay, self.accept_delay / 10 self._is_shutdown.clear() self.running.set() try: self._logger.info("%s started", self.__class__.__name__) while not self._must_shutdown: self._adopt_services() await trio.sleep(delay) delay = min(delay + increase, max_delay) except trio.Cancelled: self._logger.info("%s cancelled", self.__class__.__name__) except BaseException: self._logger.exception("%s aborted", self.__class__.__name__) raise else: self._logger.info("%s stopped", self.__class__.__name__) finally: self.running.clear() self._is_shutdown.set() def _adopt_services(self): for unit in ServiceUnit.units(): if unit.running: continue self._logger.info("%s adopts %s", self.__class__.__name__, NameRepr(unit)) unit.start(self._meta_runner)