from typing import TypeVar, Set
import logging
import weakref
import trio
import functools
import threading
from types import ModuleType
from .meta_runner import MetaRunner
from .guard import exclusive
from ..debug import NameRepr
T = TypeVar("T")
def _weakset_copy(ws: "weakref.WeakSet[T]") -> Set[T]:
"""Thread-safely copy all items from a weakset to a set"""
# The various WeakSet methods are not thread-safe because they miss locking.
# The main issue is that all copy approaches use ``__iter__``, which is not
# thread-safe against items being garbage collected. However, we can access
# the actual backing real set ``ws.data`` and ``set(some_set)`` is GIL-atomic.
refs = set(ws.data)
return {item for item in (ref() for ref in refs) if item is not None}
[docs]class ServiceUnit(object):
"""
Definition for running a service
:param service: the service to run
:param flavour: runner flavour to use for running the service
"""
__active_units__: "weakref.WeakSet[ServiceUnit]" = weakref.WeakSet()
def __init__(self, service, flavour):
assert hasattr(service, "run"), "service must implement a 'run' method"
assert any(
flavour == runner.flavour for runner in MetaRunner.runner_types
), "service flavour must be one of %s" % ",".join(
repr(runner.flavour) for runner in MetaRunner.runner_types
)
self.service = weakref.ref(service)
self.flavour = flavour
self._started = False
ServiceUnit.__active_units__.add(self)
[docs] @classmethod
def units(cls) -> "Set[ServiceUnit]":
"""Container of all currently defined units"""
return _weakset_copy(cls.__active_units__)
@property
def running(self):
return self._started
[docs] def start(self, runner: MetaRunner):
service = self.service()
if service is None:
return
else:
self._started = True
runner.register_payload(service.run, flavour=self.flavour)
def __repr__(self):
return "%s(%r, flavour=%r)" % (
self.__class__.__name__,
self.service() or "<defunct>",
self.flavour,
)
[docs]def service(flavour):
r"""
Mark a class as implementing a Service
Each Service class must have a ``run`` method, which does not take any arguments.
This method is :py:meth:`~.ServiceRunner.adopt`\ ed after the daemon starts, unless
* the Service has been garbage collected, or
* the ServiceUnit has been :py:meth:`~.ServiceUnit.cancel`\ ed.
For each service instance, its :py:class:`~.ServiceUnit` is available at
``service_instance.__service_unit__``.
"""
def service_unit_decorator(raw_cls):
__new__ = raw_cls.__new__
def __new_service__(cls, *args, **kwargs):
if __new__ is object.__new__:
self = __new__(cls)
else:
self = __new__(cls, *args, **kwargs)
service_unit = ServiceUnit(self, flavour)
self.__service_unit__ = service_unit
return self
raw_cls.__new__ = __new_service__
if raw_cls.run.__doc__ is None:
raw_cls.run.__doc__ = "Service entry point"
return raw_cls
return service_unit_decorator
[docs]class ServiceRunner(object):
"""
Runner for coroutines, subroutines and services
The service runner prevents silent failures by tracking concurrent tasks
and therefore provides safer concurrency.
If any task fails with an exception or provides
unexpected output values, this is registered as an error; the runner will
gracefully shut down all tasks in this case.
To provide ``async`` concurrency, the runner also manages common
``async`` event loops and tracks them for failures as well. As a result,
``async`` code should usually use the "current" event loop directly.
"""
def __init__(self, accept_delay: float = 1):
self._logger = logging.getLogger("cobald.runtime.daemon.services")
self._meta_runner = MetaRunner()
self._must_shutdown = False
self._is_shutdown = threading.Event()
self._is_shutdown.set()
self.running = threading.Event()
self.accept_delay = accept_delay
[docs] def execute(self, payload, *args, flavour: ModuleType, **kwargs):
"""
Synchronously run ``payload`` and provide its output
If ``*args*`` and/or ``**kwargs`` are provided, pass them to ``payload``
upon execution.
"""
if args or kwargs:
payload = functools.partial(payload, *args, **kwargs)
return self._meta_runner.run_payload(payload, flavour=flavour)
[docs] def adopt(self, payload, *args, flavour: ModuleType, **kwargs):
"""
Concurrently run ``payload`` in the background
If ``*args*`` and/or ``**kwargs`` are provided, pass them to ``payload``
upon execution.
"""
if args or kwargs:
payload = functools.partial(payload, *args, **kwargs)
self._meta_runner.register_payload(payload, flavour=flavour)
[docs] @exclusive()
def accept(self):
"""
Start accepting synchronous, asynchronous and service payloads
Since services are globally defined, only one :py:class:`ServiceRunner`
may :py:meth:`accept` payloads at any time.
"""
self._must_shutdown = False
self._logger.info("%s starting", self.__class__.__name__)
self.adopt(self._accept_services, flavour=trio)
self._meta_runner.run()
[docs] def shutdown(self):
"""Shutdown the accept loop and stop running payloads"""
self._must_shutdown = True
self._is_shutdown.wait()
self._meta_runner.stop()
async def _accept_services(self):
delay, max_delay, increase = 0.0, self.accept_delay, self.accept_delay / 10
self._is_shutdown.clear()
self.running.set()
try:
self._logger.info("%s started", self.__class__.__name__)
while not self._must_shutdown:
self._adopt_services()
await trio.sleep(delay)
delay = min(delay + increase, max_delay)
except trio.Cancelled:
self._logger.info("%s cancelled", self.__class__.__name__)
except BaseException:
self._logger.exception("%s aborted", self.__class__.__name__)
raise
else:
self._logger.info("%s stopped", self.__class__.__name__)
finally:
self.running.clear()
self._is_shutdown.set()
def _adopt_services(self):
for unit in ServiceUnit.units():
if unit.running:
continue
self._logger.info("%s adopts %s", self.__class__.__name__, NameRepr(unit))
unit.start(self._meta_runner)