From 41e75554edf1ccfef9949d1d49b2c5f88ed3cac5 Mon Sep 17 00:00:00 2001 From: aewens Date: Wed, 14 Aug 2019 16:08:01 -0500 Subject: [PATCH] Sorta, kinda, working version of actor model and state-dispatch model --- taijitu.py | 142 ++++++++++++++++++++++------------------------------- 1 file changed, 59 insertions(+), 83 deletions(-) diff --git a/taijitu.py b/taijitu.py index 462cf2c..6be4e08 100644 --- a/taijitu.py +++ b/taijitu.py @@ -1,91 +1,67 @@ -from abots.helpers import generator -from psutil import Process as PSProcess -from queue import Queue, Empty -from time import sleep, monotonic as time -from os import getloadavg -from collections import deque -from multiprocessing import cpu_count +from abots.helpers import generator, infinitedict +#from queue import Queue, Empty +from time import monotonic as time +from gevent import Greenlet, sleep, joinall +from gevent.queue import Queue, Empty -actions = dict() -events = Queue() -last = time() -ticks = deque() -ticks.append(0) -max_loadavg = float(cpu_count()) -threshold = 15.0 -max_cpu = 25.0 -sleeping = 0.001 -sleepings = list() -stats = list() -magic = 0 -step = 0.0001 -max_ticks = 0 -loops = 0 -proc = PSProcess() -cpu = proc.cpu_percent() +hooks = dict() + +def reducer(state, action): + name = action.get("name") + data = action.get("data") + if name == "ADD": + state["counter"] = state.get("counter", 0) + data + + return state + +@generator +def dispatcher(): + state, action = (yield) + state = state.copy() if state else infinitedict() + reducer(state, action) + +dispatch = dispatcher() def register(func): - actions[func.__name__] = generator(func) + f = func() + f.start() + hooks[func.__name__.lower()] = f return func -@register -def push(): - message = (yield) - time1 = round(sleeping, 6) - time2 = round(magic, 6) - print("push", message, time1, time2, loops, max_ticks, cpu) - event = "pull", ticks[-1] - events.put_nowait(event) +def loop(): + joinall([func for name, func in hooks.items()]) + +def transmit(hook_name, message): + hook = hooks.get(hook_name) + inbox = getattr(hook, "inbox", None) + put = getattr(inbox, "put", None) + if callable(put): + put(message) + +class Actor(Greenlet): + def __init__(self): + self.inbox = Queue() + Greenlet.__init__(self) + + def send(self, message): + NotImplemented + + def _run(self): + self.running = True + while self.running: + message = self.inbox.get() + self.send(message) @register -def pull(): - message = (yield) - time1 = round(sleeping, 6) - time2 = round(magic, 6) - print("pull", message, time1, time2, loops, max_ticks, cpu) - event = "push", ticks[-1] - events.put_nowait(event) +class Ping(Actor): + def send(self, message): + print(message) + dispatch("pong", "ping") + sleep(0) -seed = "push", ticks[-1] -events.put_nowait(seed) -done = time() + 60 -prev_cpu = 0 -prev_loadavg = 0 -while True: #time() < done: - if time() >= last + 1: - ticks.append(0) - last = time() - loadavg = getloadavg()[0] - diff_loadavg = loadavg - prev_loadavg - prev_loadavg = loadavg - if loadavg >= max_loadavg and diff_loadavg > 0: - sleeping = sleeping + step * 10 - else: - cpu = proc.cpu_percent() - diff_cpu = cpu - prev_cpu - prev_cpu = cpu - if cpu >= max_cpu and diff_cpu > 0: - sleeping = sleeping + step * 10 - elif cpu >= threshold and diff_cpu > 0: - sleeping = sleeping + step - elif sleeping - step >= 0 and diff_cpu < 0: - sleeping = sleeping - step - - sleepings.append(sleeping) - stat = sum(sleepings) / len(sleepings) - stats.append(stat) - magic = sum(stats) / len(stats) - max_ticks = max(ticks) - loops = loops + 1 - - try: - event = events.get_nowait() - source, message = event - action = actions.get(source) - if action: - action().send(message) - except Empty: - break - - sleep(sleeping) - ticks[-1] = ticks[-1] + 1 +@register +class Pong(Actor): + def send(self, message): + print(message) + dispatch("ping", "pong") + sleep(0)