Compare commits

...

4 Commits

2 changed files with 123 additions and 82 deletions

77
state_management.py Normal file
View File

@ -0,0 +1,77 @@
from abots.helpers import generator, infinitedict
from gevent.lock import BoundedSemaphore
from collections import namedtuple
class AtomicDict(object):
def __init__(self, *args, **kwargs):
self._dict = dict()
self._lock = BoundedSemaphore()
def __enter__(self):
self._lock.acquire()
return self._dict
def __exit__(self, _type, value, traceback):
self._lock.release()
def get_state(meta):
state = infinitedict()
with meta as m:
state = m.get("state")
assert state is not None, "'state' is missing"
return state.copy()
def unsubscribe(meta, listener):
with meta as m:
listeners = m.get("listeners")
assert listener in listeners, f"'{listener}' missing from: {listeners}"
listeners.remove(listener)
def subscribe(meta, listener):
with meta as m:
listeners = m.get("listeners")
assert listeners is not None, "'listeners' is missing"
listeners.append(listener)
return lambda: unsubscribe(meta, listener)
@generator
def dispatch(meta):
action = (yield)
with meta as m:
is_dispatching = m.get("is_dispatching")
assert is_dispatching is not None, "'is_dispatching' is missing"
if is_dispatching:
raise Exception("Reducers cannot dispatch actions")
try:
with meta as m:
m["is_dispatching"] = True
reducer = m.get("reducer")
assert reducer is not None, "'reducer' is missing"
m["state"] = reducer(state, action)
finally:
with meta as m:
m["is_dispatching"] = False
with meta as m:
listeners = m.get("listeners")
assert listeners is not None, "'listeners' is missing"
for listener in listeners[:]:
listener()
def manager(reducer, initial_state=infinitedict()):
meta = AtomicDict()
with meta as m:
m["listeners"] = list()
m["is_dispatching"] = False
m["reducer"] = reducer
m["state"] = initial_state
state = lambda: get_state(meta)
return state, dispatch(meta)
Action = namedtuple("Action", ["type", "payload"])
#meta, actions = manager(reducer)

View File

@ -1,91 +1,55 @@
from abots.helpers import generator
from psutil import Process as PSProcess
from queue import Queue, Empty
from time import sleep, monotonic as time
from os import getloadavg
from collections import deque
from multiprocessing import cpu_count
from state_management import Action, manager
from abots.helpers import generator, coroutine, infinitedict
from time import monotonic as time
from gevent import Greenlet, spawn, sleep, joinall
from gevent.queue import Queue, Empty
#from queue import Queue, Empty
actions = dict()
events = Queue()
last = time()
ticks = deque()
ticks.append(0)
max_loadavg = float(cpu_count())
threshold = 15.0
max_cpu = 25.0
sleeping = 0.001
sleepings = list()
stats = list()
magic = 0
step = 0.0001
max_ticks = 0
loops = 0
proc = PSProcess()
cpu = proc.cpu_percent()
HOOKS = dict()
def reducer(state, action):
if action.name == "ADD":
state["counter"] = state.get("counter", 0) + action.data
return state
def register(func):
actions[func.__name__] = generator(func)
#f = generator(func)
#f.start()
HOOKS[func.__name__.lower()] = func#f()
return func
@register
def push():
message = (yield)
time1 = round(sleeping, 6)
time2 = round(magic, 6)
print("push", message, time1, time2, loops, max_ticks, cpu)
event = "pull", ticks[-1]
events.put_nowait(event)
#@coroutine
def pull(queue):
while True:
task = queue.get()
assert isinstance(task, tuple), f"Expected tuple: {task}"
assert len(task) == 2, f"Invalid format: {task}"
hook_name, message = task
hook = HOOKS.get(hook_name)
hook(queue, message)
@generator
def push(queue):
task = (yield)
assert isinstance(task, tuple), f"Expected tuple: {task}"
assert len(task) == 2, f"Invalid format: {task}"
queue.put(task)
@register
def pull():
message = (yield)
time1 = round(sleeping, 6)
time2 = round(magic, 6)
print("pull", message, time1, time2, loops, max_ticks, cpu)
event = "push", ticks[-1]
events.put_nowait(event)
def ping(queue, message):
print(f"PING: {message}")
task = "pong", message
queue.put(task)
seed = "push", ticks[-1]
events.put_nowait(seed)
done = time() + 60
prev_cpu = 0
prev_loadavg = 0
while True: #time() < done:
if time() >= last + 1:
ticks.append(0)
last = time()
loadavg = getloadavg()[0]
diff_loadavg = loadavg - prev_loadavg
prev_loadavg = loadavg
if loadavg >= max_loadavg and diff_loadavg > 0:
sleeping = sleeping + step * 10
else:
cpu = proc.cpu_percent()
diff_cpu = cpu - prev_cpu
prev_cpu = cpu
if cpu >= max_cpu and diff_cpu > 0:
sleeping = sleeping + step * 10
elif cpu >= threshold and diff_cpu > 0:
sleeping = sleeping + step
elif sleeping - step >= 0 and diff_cpu < 0:
sleeping = sleeping - step
@register
def pong(queue, message):
print(f"PONG: {message}")
task = "ping", message
queue.put(task)
sleepings.append(sleeping)
stat = sum(sleepings) / len(sleepings)
stats.append(stat)
magic = sum(stats) / len(stats)
max_ticks = max(ticks)
loops = loops + 1
try:
event = events.get_nowait()
source, message = event
action = actions.get(source)
if action:
action().send(message)
except Empty:
break
sleep(sleeping)
ticks[-1] = ticks[-1] + 1
state, dispatch = manager(reducer)
queue = Queue()
yang = push(queue)
yin = spawn(pull, queue)
start = lambda: joinall([yin])