diff --git a/state_management.py b/state_management.py index ffa7fc5..a2b7e7d 100644 --- a/state_management.py +++ b/state_management.py @@ -2,9 +2,8 @@ from abots.helpers import generator, infinitedict from gevent.lock import BoundedSemaphore from collections import namedtuple -class AtomicDict(object):#dict): +class AtomicDict(object): def __init__(self, *args, **kwargs): - #dict.__init__(self, *args, **kwargs) self._dict = dict() self._lock = BoundedSemaphore() @@ -71,7 +70,8 @@ def manager(reducer, initial_state=infinitedict()): m["reducer"] = reducer m["state"] = initial_state - return meta, dispatch(meta) + state = lambda: get_state(meta) + return state, dispatch(meta) Action = namedtuple("Action", ["type", "payload"]) -meta, actions = manager(reducer, {"init": False}) +#meta, actions = manager(reducer) diff --git a/taijitu.py b/taijitu.py index 0273576..6126dec 100644 --- a/taijitu.py +++ b/taijitu.py @@ -1,67 +1,55 @@ from state_management import Action, manager -from abots.helpers import generator, infinitedict +from abots.helpers import generator, coroutine, infinitedict from time import monotonic as time -from gevent import Greenlet, sleep, joinall +from gevent import Greenlet, spawn, sleep, joinall from gevent.queue import Queue, Empty +#from queue import Queue, Empty -hooks = dict() +HOOKS = dict() def reducer(state, action): - name = action.get("name") - data = action.get("data") - if name == "ADD": - state["counter"] = state.get("counter", 0) + data + if action.name == "ADD": + state["counter"] = state.get("counter", 0) + action.data return state -@generator -def dispatcher(): - state, action = (yield) - state = state.copy() if state else infinitedict() - reducer(state, action) - -dispatch = dispatcher() - def register(func): - f = func() - f.start() - hooks[func.__name__.lower()] = f + #f = generator(func) + #f.start() + HOOKS[func.__name__.lower()] = func#f() return func -def loop(): - joinall([func for name, func in hooks.items()]) +#@coroutine +def pull(queue): + while True: + task = queue.get() + assert isinstance(task, tuple), f"Expected tuple: {task}" + assert len(task) == 2, f"Invalid format: {task}" + hook_name, message = task + hook = HOOKS.get(hook_name) + hook(queue, message) -def transmit(hook_name, message): - hook = hooks.get(hook_name) - inbox = getattr(hook, "inbox", None) - put = getattr(inbox, "put", None) - if callable(put): - put(message) - -class Actor(Greenlet): - def __init__(self): - self.inbox = Queue() - Greenlet.__init__(self) - - def send(self, message): - NotImplemented - - def _run(self): - self.running = True - while self.running: - message = self.inbox.get() - self.send(message) +@generator +def push(queue): + task = (yield) + assert isinstance(task, tuple), f"Expected tuple: {task}" + assert len(task) == 2, f"Invalid format: {task}" + queue.put(task) @register -class Ping(Actor): - def send(self, message): - print(message) - dispatch("pong", "ping") - sleep(0) +def ping(queue, message): + print(f"PING: {message}") + task = "pong", message + queue.put(task) @register -class Pong(Actor): - def send(self, message): - print(message) - dispatch("ping", "pong") - sleep(0) +def pong(queue, message): + print(f"PONG: {message}") + task = "ping", message + queue.put(task) + +state, dispatch = manager(reducer) +queue = Queue() +yang = push(queue) +yin = spawn(pull, queue) +start = lambda: joinall([yin])