Source code for cobald.composite.factory

from typing import Callable
import weakref

import trio

from cobald.interfaces import Pool, CompositePool
from cobald.daemon import service

[docs]@service(flavour=trio) class FactoryPool(CompositePool): """ Composition that adds and removes pools to satisfy demand :param factory: a callable that produces a new :py:class:`~.Pool` :param interval: how often to adjust the number of children Adjustment uses two extensions that children must respond to adequately: * When spawned via ``factory()``, children shall already be set to their expected ``demand``. * When disabled via ``demand=0``, children shall shut down and free any resources and tasks. Once spawned, children are free to adjust their demand if required. A child may disable itself permanently by setting its own ``demand = 0``. The :py:class:`FactoryPool` inspects the demand for all its children before spawning or disabling any children. Any child which satisfies ``supply > 0`` **or** ``demand > 0`` is considered active and contributes to the :py:class:`FactoryPool` ``supply``, ``demand``, ``allocation``, and ``utilisation``. The :py:class:`FactoryPool` makes no assumption about the validity or fitness of active children. It is the responsibility of children to report their status accordingly. For example, if a child shuts down and does not allocate its ``supply`` further, it should scale its reported ``allocation`` accordingly. """ @property def children(self): return [*self._hatchery, *self._mortuary] @property def demand(self): return self._demand @demand.setter def demand(self, value): # we may spend an arbitrary time spawning Drones, # just acknowledge demand and defer any actions self._demand = value @property def supply(self): return sum( for child in self.children) @property def utilisation(self): active_children = [child for child in self.children if > 0] try: return sum(child.utilisation for child in active_children) / len( active_children ) except ZeroDivisionError: return 1.0 @property def allocation(self): active_children = [child for child in self.children if > 0] try: return sum(child.allocation for child in active_children) / len( active_children ) except ZeroDivisionError: return 1.0 def __init__( self, *children: Pool, factory: Callable[[], Pool], interval: float = 30 ): self._demand = sum(child.demand for child in children) #: children fulfilling our demand self._hatchery = set(children) #: children shutting down self._mortuary = weakref.WeakSet() self.factory = factory self.interval = interval
[docs] async def run(self): while True: await trio.sleep(self.interval) # freeze target demand in case another thread updates us supply, demand =, self.demand if supply > demand: self._shrink(target=demand) else: self._grow(target=demand)
def _shrink(self, target: float): # we can only reap children that are not already shutting down # prefer reaping children that supply few used resources hit_list = sorted( self._hatchery, key=lambda child: * child.utilisation ) excess_demand = sum(child.demand for child in hit_list) - target for child in hit_list: if excess_demand <= 0: break # reap child if child.demand <= excess_demand: excess_demand -= child.demand self._release_child(child) self._reap_children() def _grow(self, target: float): missing_demand = target - sum(child.demand for child in self.children) while missing_demand > 0: new_child = self.factory() self._hatchery.add(new_child) assert ( new_child.demand > 0 ), "factory must produce children with initial demand" missing_demand -= new_child.demand self._reap_children() def _reap_children(self): for child in list(self._hatchery): if child.demand <= 0: self._release_child(child) def _release_child(self, child: Pool): child.demand = 0 self._hatchery.discard(child) self._mortuary.add(child)