core/utils.py

168 lines
5.2 KiB
Python

from abots.helpers import obtain, generator, singleton
from collections import defaultdict
from time import sleep
from enum import Enum
# Helpers
nonedict = lambda: defaultdict(lambda: None)
acquire_name = lambda obj: obj.__class__.__name__.lower()
def access(reference, path):
assert isinstance(reference, dict), "Expected dict"
assert isinstance(path, list), "Expected list"
while len(path) > 1:
point = path[0]
path = path[1:]
reference = reference[point]
if reference is None:
reference = nonedict()
return reference[path[0]]
@singleton
class Hooks:
def __init__(self):
self._hooks = nonedict()#defaultdict(nonedict)
self._resources = nonedict()
def _run(self, kind_name, func):
kind = obtain(self, f"_{kind_name}")
if kind is None:
return None
return func(kind)
def _get(self, kind_name, name):
return self._run(kind_name, lambda k: access(k, name.split("/", 1)))
def gather(self, namespace="_none"):
assert isinstance(namespace, str), "Expected str"
return self._hooks[namespace]
def attach(self, hook_name):
if "/" not in hook_name:
hook_name = f"_none/{hook_name}"
return self._get("hooks", hook_name)
def request(self, resource_name):
return self._get("resources", resource_name)
def register(self, namespace="_none"):
assert isinstance(namespace, str), "Expected str"
if namespace not in list(self._hooks):
self._hooks[namespace] = nonedict()
hooks = self._hooks[namespace]
def wrapper_register(func):
hook_name = func.__name__
if hook_name in list(hooks):
return
hooks[hook_name] = func
return generator(func)
return wrapper_register
def use(self, resource):
resource_name = acquire_name(resource)
if resource_name not in list(self._resources):
self._resources[resource_name] = resource
return self
hooks = Hooks()
# Simplified redux port in Python
class StateManager:
def __init__(self, reducers, initial_state=dict()):
assert isinstance(reducers, dict), "Expected dict"
assert all(map(str, reducers.keys())), "Expected strings"
assert all(map(callable, reducers.values())), "Expected functions"
assert isinstance(initial_state, dict), "Expected dict"
self._reducers = reducers
self._state = initial_state
self._listeners = list()
def _apply_reducers(self, state, action):
if state is None:
state = dict()
new_state = dict()
changed = False
for key, reducer in self._reducers.items():
previous_state = state.get(key, None)
next_state = reducer(previous_state, action)
if next_state is None:
continue
new_state[key] = next_state
if not changed and next_state != previous_state:
changed = True
return new_state if changed else state
def add_reducer(self, key, reducer):
assert isinstance(key, str), "Expected str"
assert callable(reducer), "Expected function"
self._reducers[key] = reducer
def remove_reducer(self, key):
if key in self._reducers:
self._reducers.pop(key, None)
def get_state(self):
return self._state.copy() if isinstance(self._state, dict) else dict()
def subscribe(self, listener):
self._listeners.append(listener)
# Cancel function
return lambda: self.listeners.remove(listener)
@generator
def start(self):
action = (yield)
assert hasattr(action, "name"), "Expected name"
assert hasattr(action, "data"), "Expected data"
if action.name is None:
return
self._state = self._apply_reducers(self.get_state(), action)
class ActionName(Enum):
INIT = "INIT"
class Action:
NAMES = ActionName
def __init__(self, name, data):
self.name = name
self.data = data
# NOTE - I should probably not be doing this, but it works
# It dynamically assigns new enum values to both NAMES and ActionName
@classmethod
def register(cls, name):
assert isinstance(name, str), "Expected str"
if hasattr(cls.NAMES, name): return
setattr(cls.NAMES, name, name)
@classmethod
def register_many(cls, names):
assert isinstance(names, list), "Expected list"
for name in names:
cls.register(name)
@staticmethod
def create(name, data):
action_name = getattr(ActionName, name, None)
assert action_name is not None, "Expected ActionName"
return Action(action_name, data)
reducers = dict()
def combine(key):
assert isinstance(key, str), "Expected str"
def decorator_combine(func):
assert callable(func), "Expected function"
reducers[key] = func
return decorator_combine
def spindown(composer, bind, reason="Bye-bye!"):
composer.logger.critical("GOING DOWN!")
sock = composer.get_sock(bind)
sock.send(f"QUIT {reason}")
sleep(sock.timeout)
sock.stop()
#composer.kill_switch.set()