Added coroutine hook system

This commit is contained in:
aewens 2019-08-21 21:24:49 -05:00
parent b3355baac5
commit ab2ede2927
2 changed files with 42 additions and 54 deletions

View File

@ -2,9 +2,8 @@ from abots.helpers import generator, infinitedict
from gevent.lock import BoundedSemaphore
from collections import namedtuple
class AtomicDict(object):#dict):
class AtomicDict(object):
def __init__(self, *args, **kwargs):
#dict.__init__(self, *args, **kwargs)
self._dict = dict()
self._lock = BoundedSemaphore()
@ -71,7 +70,8 @@ def manager(reducer, initial_state=infinitedict()):
m["reducer"] = reducer
m["state"] = initial_state
return meta, dispatch(meta)
state = lambda: get_state(meta)
return state, dispatch(meta)
Action = namedtuple("Action", ["type", "payload"])
meta, actions = manager(reducer, {"init": False})
#meta, actions = manager(reducer)

View File

@ -1,67 +1,55 @@
from state_management import Action, manager
from abots.helpers import generator, infinitedict
from abots.helpers import generator, coroutine, infinitedict
from time import monotonic as time
from gevent import Greenlet, sleep, joinall
from gevent import Greenlet, spawn, sleep, joinall
from gevent.queue import Queue, Empty
#from queue import Queue, Empty
hooks = dict()
HOOKS = dict()
def reducer(state, action):
name = action.get("name")
data = action.get("data")
if name == "ADD":
state["counter"] = state.get("counter", 0) + data
if action.name == "ADD":
state["counter"] = state.get("counter", 0) + action.data
return state
@generator
def dispatcher():
state, action = (yield)
state = state.copy() if state else infinitedict()
reducer(state, action)
dispatch = dispatcher()
def register(func):
f = func()
f.start()
hooks[func.__name__.lower()] = f
#f = generator(func)
#f.start()
HOOKS[func.__name__.lower()] = func#f()
return func
def loop():
joinall([func for name, func in hooks.items()])
#@coroutine
def pull(queue):
while True:
task = queue.get()
assert isinstance(task, tuple), f"Expected tuple: {task}"
assert len(task) == 2, f"Invalid format: {task}"
hook_name, message = task
hook = HOOKS.get(hook_name)
hook(queue, message)
def transmit(hook_name, message):
hook = hooks.get(hook_name)
inbox = getattr(hook, "inbox", None)
put = getattr(inbox, "put", None)
if callable(put):
put(message)
class Actor(Greenlet):
def __init__(self):
self.inbox = Queue()
Greenlet.__init__(self)
def send(self, message):
NotImplemented
def _run(self):
self.running = True
while self.running:
message = self.inbox.get()
self.send(message)
@generator
def push(queue):
task = (yield)
assert isinstance(task, tuple), f"Expected tuple: {task}"
assert len(task) == 2, f"Invalid format: {task}"
queue.put(task)
@register
class Ping(Actor):
def send(self, message):
print(message)
dispatch("pong", "ping")
sleep(0)
def ping(queue, message):
print(f"PING: {message}")
task = "pong", message
queue.put(task)
@register
class Pong(Actor):
def send(self, message):
print(message)
dispatch("ping", "pong")
sleep(0)
def pong(queue, message):
print(f"PONG: {message}")
task = "ping", message
queue.put(task)
state, dispatch = manager(reducer)
queue = Queue()
yang = push(queue)
yin = spawn(pull, queue)
start = lambda: joinall([yin])