Sorta, kinda, working version of actor model and state-dispatch model

This commit is contained in:
aewens 2019-08-14 16:08:01 -05:00
parent b3a44bc9f2
commit 41e75554ed
1 changed files with 59 additions and 83 deletions

View File

@ -1,91 +1,67 @@
from abots.helpers import generator
from psutil import Process as PSProcess
from queue import Queue, Empty
from time import sleep, monotonic as time
from os import getloadavg
from collections import deque
from multiprocessing import cpu_count
from abots.helpers import generator, infinitedict
#from queue import Queue, Empty
from time import monotonic as time
from gevent import Greenlet, sleep, joinall
from gevent.queue import Queue, Empty
actions = dict()
events = Queue()
last = time()
ticks = deque()
ticks.append(0)
max_loadavg = float(cpu_count())
threshold = 15.0
max_cpu = 25.0
sleeping = 0.001
sleepings = list()
stats = list()
magic = 0
step = 0.0001
max_ticks = 0
loops = 0
proc = PSProcess()
cpu = proc.cpu_percent()
hooks = dict()
def reducer(state, action):
name = action.get("name")
data = action.get("data")
if name == "ADD":
state["counter"] = state.get("counter", 0) + data
return state
@generator
def dispatcher():
state, action = (yield)
state = state.copy() if state else infinitedict()
reducer(state, action)
dispatch = dispatcher()
def register(func):
actions[func.__name__] = generator(func)
f = func()
f.start()
hooks[func.__name__.lower()] = f
return func
@register
def push():
message = (yield)
time1 = round(sleeping, 6)
time2 = round(magic, 6)
print("push", message, time1, time2, loops, max_ticks, cpu)
event = "pull", ticks[-1]
events.put_nowait(event)
def loop():
joinall([func for name, func in hooks.items()])
def transmit(hook_name, message):
hook = hooks.get(hook_name)
inbox = getattr(hook, "inbox", None)
put = getattr(inbox, "put", None)
if callable(put):
put(message)
class Actor(Greenlet):
def __init__(self):
self.inbox = Queue()
Greenlet.__init__(self)
def send(self, message):
NotImplemented
def _run(self):
self.running = True
while self.running:
message = self.inbox.get()
self.send(message)
@register
def pull():
message = (yield)
time1 = round(sleeping, 6)
time2 = round(magic, 6)
print("pull", message, time1, time2, loops, max_ticks, cpu)
event = "push", ticks[-1]
events.put_nowait(event)
class Ping(Actor):
def send(self, message):
print(message)
dispatch("pong", "ping")
sleep(0)
seed = "push", ticks[-1]
events.put_nowait(seed)
done = time() + 60
prev_cpu = 0
prev_loadavg = 0
while True: #time() < done:
if time() >= last + 1:
ticks.append(0)
last = time()
loadavg = getloadavg()[0]
diff_loadavg = loadavg - prev_loadavg
prev_loadavg = loadavg
if loadavg >= max_loadavg and diff_loadavg > 0:
sleeping = sleeping + step * 10
else:
cpu = proc.cpu_percent()
diff_cpu = cpu - prev_cpu
prev_cpu = cpu
if cpu >= max_cpu and diff_cpu > 0:
sleeping = sleeping + step * 10
elif cpu >= threshold and diff_cpu > 0:
sleeping = sleeping + step
elif sleeping - step >= 0 and diff_cpu < 0:
sleeping = sleeping - step
sleepings.append(sleeping)
stat = sum(sleepings) / len(sleepings)
stats.append(stat)
magic = sum(stats) / len(stats)
max_ticks = max(ticks)
loops = loops + 1
try:
event = events.get_nowait()
source, message = event
action = actions.get(source)
if action:
action().send(message)
except Empty:
break
sleep(sleeping)
ticks[-1] = ticks[-1] + 1
@register
class Pong(Actor):
def send(self, message):
print(message)
dispatch("ping", "pong")
sleep(0)