Compare commits
4 Commits
master
...
state-disp
Author | SHA1 | Date |
---|---|---|
aewens | ab2ede2927 | |
aewens | b3355baac5 | |
aewens | b8bb9f8189 | |
aewens | 41e75554ed |
|
@ -0,0 +1,77 @@
|
|||
from abots.helpers import generator, infinitedict
|
||||
from gevent.lock import BoundedSemaphore
|
||||
from collections import namedtuple
|
||||
|
||||
class AtomicDict(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._dict = dict()
|
||||
self._lock = BoundedSemaphore()
|
||||
|
||||
def __enter__(self):
|
||||
self._lock.acquire()
|
||||
return self._dict
|
||||
|
||||
def __exit__(self, _type, value, traceback):
|
||||
self._lock.release()
|
||||
|
||||
def get_state(meta):
|
||||
state = infinitedict()
|
||||
with meta as m:
|
||||
state = m.get("state")
|
||||
assert state is not None, "'state' is missing"
|
||||
|
||||
return state.copy()
|
||||
|
||||
def unsubscribe(meta, listener):
|
||||
with meta as m:
|
||||
listeners = m.get("listeners")
|
||||
assert listener in listeners, f"'{listener}' missing from: {listeners}"
|
||||
listeners.remove(listener)
|
||||
|
||||
def subscribe(meta, listener):
|
||||
with meta as m:
|
||||
listeners = m.get("listeners")
|
||||
assert listeners is not None, "'listeners' is missing"
|
||||
listeners.append(listener)
|
||||
|
||||
return lambda: unsubscribe(meta, listener)
|
||||
|
||||
@generator
|
||||
def dispatch(meta):
|
||||
action = (yield)
|
||||
|
||||
with meta as m:
|
||||
is_dispatching = m.get("is_dispatching")
|
||||
assert is_dispatching is not None, "'is_dispatching' is missing"
|
||||
if is_dispatching:
|
||||
raise Exception("Reducers cannot dispatch actions")
|
||||
|
||||
try:
|
||||
with meta as m:
|
||||
m["is_dispatching"] = True
|
||||
reducer = m.get("reducer")
|
||||
assert reducer is not None, "'reducer' is missing"
|
||||
m["state"] = reducer(state, action)
|
||||
finally:
|
||||
with meta as m:
|
||||
m["is_dispatching"] = False
|
||||
|
||||
with meta as m:
|
||||
listeners = m.get("listeners")
|
||||
assert listeners is not None, "'listeners' is missing"
|
||||
for listener in listeners[:]:
|
||||
listener()
|
||||
|
||||
def manager(reducer, initial_state=infinitedict()):
|
||||
meta = AtomicDict()
|
||||
with meta as m:
|
||||
m["listeners"] = list()
|
||||
m["is_dispatching"] = False
|
||||
m["reducer"] = reducer
|
||||
m["state"] = initial_state
|
||||
|
||||
state = lambda: get_state(meta)
|
||||
return state, dispatch(meta)
|
||||
|
||||
Action = namedtuple("Action", ["type", "payload"])
|
||||
#meta, actions = manager(reducer)
|
128
taijitu.py
128
taijitu.py
|
@ -1,91 +1,55 @@
|
|||
from abots.helpers import generator
|
||||
from psutil import Process as PSProcess
|
||||
from queue import Queue, Empty
|
||||
from time import sleep, monotonic as time
|
||||
from os import getloadavg
|
||||
from collections import deque
|
||||
from multiprocessing import cpu_count
|
||||
from state_management import Action, manager
|
||||
from abots.helpers import generator, coroutine, infinitedict
|
||||
from time import monotonic as time
|
||||
from gevent import Greenlet, spawn, sleep, joinall
|
||||
from gevent.queue import Queue, Empty
|
||||
#from queue import Queue, Empty
|
||||
|
||||
actions = dict()
|
||||
events = Queue()
|
||||
last = time()
|
||||
ticks = deque()
|
||||
ticks.append(0)
|
||||
max_loadavg = float(cpu_count())
|
||||
threshold = 15.0
|
||||
max_cpu = 25.0
|
||||
sleeping = 0.001
|
||||
sleepings = list()
|
||||
stats = list()
|
||||
magic = 0
|
||||
step = 0.0001
|
||||
max_ticks = 0
|
||||
loops = 0
|
||||
proc = PSProcess()
|
||||
cpu = proc.cpu_percent()
|
||||
HOOKS = dict()
|
||||
|
||||
def reducer(state, action):
|
||||
if action.name == "ADD":
|
||||
state["counter"] = state.get("counter", 0) + action.data
|
||||
|
||||
return state
|
||||
|
||||
def register(func):
|
||||
actions[func.__name__] = generator(func)
|
||||
#f = generator(func)
|
||||
#f.start()
|
||||
HOOKS[func.__name__.lower()] = func#f()
|
||||
return func
|
||||
|
||||
@register
|
||||
def push():
|
||||
message = (yield)
|
||||
time1 = round(sleeping, 6)
|
||||
time2 = round(magic, 6)
|
||||
print("push", message, time1, time2, loops, max_ticks, cpu)
|
||||
event = "pull", ticks[-1]
|
||||
events.put_nowait(event)
|
||||
#@coroutine
|
||||
def pull(queue):
|
||||
while True:
|
||||
task = queue.get()
|
||||
assert isinstance(task, tuple), f"Expected tuple: {task}"
|
||||
assert len(task) == 2, f"Invalid format: {task}"
|
||||
hook_name, message = task
|
||||
hook = HOOKS.get(hook_name)
|
||||
hook(queue, message)
|
||||
|
||||
@generator
|
||||
def push(queue):
|
||||
task = (yield)
|
||||
assert isinstance(task, tuple), f"Expected tuple: {task}"
|
||||
assert len(task) == 2, f"Invalid format: {task}"
|
||||
queue.put(task)
|
||||
|
||||
@register
|
||||
def pull():
|
||||
message = (yield)
|
||||
time1 = round(sleeping, 6)
|
||||
time2 = round(magic, 6)
|
||||
print("pull", message, time1, time2, loops, max_ticks, cpu)
|
||||
event = "push", ticks[-1]
|
||||
events.put_nowait(event)
|
||||
def ping(queue, message):
|
||||
print(f"PING: {message}")
|
||||
task = "pong", message
|
||||
queue.put(task)
|
||||
|
||||
seed = "push", ticks[-1]
|
||||
events.put_nowait(seed)
|
||||
done = time() + 60
|
||||
prev_cpu = 0
|
||||
prev_loadavg = 0
|
||||
while True: #time() < done:
|
||||
if time() >= last + 1:
|
||||
ticks.append(0)
|
||||
last = time()
|
||||
loadavg = getloadavg()[0]
|
||||
diff_loadavg = loadavg - prev_loadavg
|
||||
prev_loadavg = loadavg
|
||||
if loadavg >= max_loadavg and diff_loadavg > 0:
|
||||
sleeping = sleeping + step * 10
|
||||
else:
|
||||
cpu = proc.cpu_percent()
|
||||
diff_cpu = cpu - prev_cpu
|
||||
prev_cpu = cpu
|
||||
if cpu >= max_cpu and diff_cpu > 0:
|
||||
sleeping = sleeping + step * 10
|
||||
elif cpu >= threshold and diff_cpu > 0:
|
||||
sleeping = sleeping + step
|
||||
elif sleeping - step >= 0 and diff_cpu < 0:
|
||||
sleeping = sleeping - step
|
||||
@register
|
||||
def pong(queue, message):
|
||||
print(f"PONG: {message}")
|
||||
task = "ping", message
|
||||
queue.put(task)
|
||||
|
||||
sleepings.append(sleeping)
|
||||
stat = sum(sleepings) / len(sleepings)
|
||||
stats.append(stat)
|
||||
magic = sum(stats) / len(stats)
|
||||
max_ticks = max(ticks)
|
||||
loops = loops + 1
|
||||
|
||||
try:
|
||||
event = events.get_nowait()
|
||||
source, message = event
|
||||
action = actions.get(source)
|
||||
if action:
|
||||
action().send(message)
|
||||
except Empty:
|
||||
break
|
||||
|
||||
sleep(sleeping)
|
||||
ticks[-1] = ticks[-1] + 1
|
||||
state, dispatch = manager(reducer)
|
||||
queue = Queue()
|
||||
yang = push(queue)
|
||||
yin = spawn(pull, queue)
|
||||
start = lambda: joinall([yin])
|
||||
|
|
Loading…
Reference in New Issue