Compare commits
8 Commits
Author | SHA1 | Date |
---|---|---|
aewens | 19cf34904f | |
aewens | 6c1cc0bf9a | |
aewens | bd21d1501e | |
aewens | 2e0af94286 | |
aewens | ab2ede2927 | |
aewens | b3355baac5 | |
aewens | b8bb9f8189 | |
aewens | 41e75554ed |
|
@ -0,0 +1,27 @@
|
||||||
|
20190825
|
||||||
|
========
|
||||||
|
|
||||||
|
# Redukti
|
||||||
|
|
||||||
|
Components:
|
||||||
|
* State: Dictionary holding values of application state
|
||||||
|
* Action: A name and data pair that encode a change to the state
|
||||||
|
* Reducer: A function that takes a state and action to produce a new state
|
||||||
|
* Root reducer: Passes the state and action to all reducers
|
||||||
|
|
||||||
|
# Derivide
|
||||||
|
|
||||||
|
Components:
|
||||||
|
* Event: Destination, type of change occurring, old value, and new value
|
||||||
|
* Stream: A series of events
|
||||||
|
* Pool: Collection of resources for off main thread tasks
|
||||||
|
* Plugin: Takes events from stream, performs side effect, and returns result
|
||||||
|
* Schema: Set of rules describing the structure of data, used for validating
|
||||||
|
* Blob: Collection of data validated against schema(s)
|
||||||
|
* Database: The validated form of the blob from schema(s)
|
||||||
|
|
||||||
|
# Taijitu
|
||||||
|
|
||||||
|
Components:
|
||||||
|
* Dispatcher: Waits for new events and sends them to new reducers or listener
|
||||||
|
* Listener: Waits for new events and sends them to plugins or dispatcher
|
|
@ -0,0 +1,132 @@
|
||||||
|
from abots.helpers import generator, coroutine, infinitedict
|
||||||
|
|
||||||
|
from asyncio import get_event_loop, sleep, wait, Queue
|
||||||
|
from socket import socket, timeout as sock_timeout
|
||||||
|
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
|
||||||
|
from ssl import wrap_socket
|
||||||
|
from time import sleep as nap
|
||||||
|
|
||||||
|
async def handle_client(loop, client):
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
request = await loop.sock_recv(client, 256)
|
||||||
|
except Exception as e:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if request:
|
||||||
|
message = request.decode()
|
||||||
|
print(message)
|
||||||
|
|
||||||
|
await sleep(0)
|
||||||
|
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
async def run_server(loop, server):
|
||||||
|
while True:
|
||||||
|
client, _ = await loop.sock_accept(server)
|
||||||
|
loop.create_task(handle_client(loop, client))
|
||||||
|
await sleep(0)
|
||||||
|
|
||||||
|
async def push(loop, queues):
|
||||||
|
while all([queue for queue in queues]):
|
||||||
|
send = queues.get("push")
|
||||||
|
receive = queues.get("pull")
|
||||||
|
event = await send.get()
|
||||||
|
print("PUSH", event)
|
||||||
|
assert isinstance(event, tuple), f"Expected tuple: {event}"
|
||||||
|
if len(event) == 2:
|
||||||
|
error, response = event
|
||||||
|
if error:
|
||||||
|
break
|
||||||
|
|
||||||
|
elif len(event) == 4:
|
||||||
|
where, context, old_value, new_value = event
|
||||||
|
await receive.put(event)
|
||||||
|
|
||||||
|
await sleep(0)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
async def pull(loop, queues):
|
||||||
|
plugins = dict()
|
||||||
|
print("PULL")
|
||||||
|
while all([queue for queue in queues]):
|
||||||
|
hook = queues.get("hook")
|
||||||
|
send = queues.get("push")
|
||||||
|
receive = queues.get("pull")
|
||||||
|
while not hook.empty():
|
||||||
|
plugin = await hook.get()
|
||||||
|
print("PULL", plugin)
|
||||||
|
assert callable(plugin), f"Expected function: {plugin}"
|
||||||
|
plugins[plugin.__name__.lower()] = plugin
|
||||||
|
|
||||||
|
event = await receive.get()
|
||||||
|
print("PULL", event)
|
||||||
|
assert isinstance(event, tuple), f"Expected tuple: {event}"
|
||||||
|
assert len(event) == 4, f"Invalid format: {event}"
|
||||||
|
where, context, old_value, new_value = event
|
||||||
|
|
||||||
|
stop = False
|
||||||
|
for plugin_name, plugin in plugins.items():
|
||||||
|
if plugin_name == where:
|
||||||
|
result = plugin(context, old_value, new_value)
|
||||||
|
assert isinstance(result, tuple), f"Expected tuple: {result}"
|
||||||
|
assert len(result) == 2, f"Invalid format: {result}"
|
||||||
|
await send.put(result)
|
||||||
|
error, response = result
|
||||||
|
if error:
|
||||||
|
stop = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if stop:
|
||||||
|
break
|
||||||
|
|
||||||
|
await sleep(0)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
def test(context, old_value, new_value):
|
||||||
|
status = old_value == new_value
|
||||||
|
return True, status
|
||||||
|
|
||||||
|
async def main(loop):
|
||||||
|
tasks = list()
|
||||||
|
queues = dict()
|
||||||
|
queues["hook"] = Queue()
|
||||||
|
queues["push"] = Queue()
|
||||||
|
queues["pull"] = Queue()
|
||||||
|
|
||||||
|
server = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
||||||
|
server.setblocking(False)
|
||||||
|
server.settimeout(1)
|
||||||
|
server.connect(("", 6697))
|
||||||
|
|
||||||
|
await queues["hook"].put(test)
|
||||||
|
await queues["push"].put(("test", "change", None, True))
|
||||||
|
|
||||||
|
tasks.append(loop.create_task(handle_client(loop, server)))
|
||||||
|
tasks.append(loop.create_task(pull(loop, queues)))
|
||||||
|
tasks.append(loop.create_task(push(loop, queues)))
|
||||||
|
await wait(tasks)
|
||||||
|
loop.stop()
|
||||||
|
|
||||||
|
loop = get_event_loop()
|
||||||
|
try:
|
||||||
|
#loop.create_task(main(loop))
|
||||||
|
#server = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
||||||
|
server = socket(AF_INET, SOCK_STREAM)
|
||||||
|
server.setblocking(False)
|
||||||
|
#server.settimeout(1)
|
||||||
|
server.bind(("localhost", 10201))
|
||||||
|
server.listen(5)
|
||||||
|
loop.create_task(run_server(loop, server))
|
||||||
|
loop.run_forever()
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
#server = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
||||||
|
#server.connect(("irc.tilde.chat", 6697))
|
||||||
|
#while True:
|
||||||
|
# print(server.recv(4096))
|
||||||
|
# res = input("> ")
|
||||||
|
# server.send(f"{res}\r\n".encode())
|
|
@ -0,0 +1,77 @@
|
||||||
|
from abots.helpers import generator, infinitedict
|
||||||
|
from gevent.lock import BoundedSemaphore
|
||||||
|
from collections import namedtuple
|
||||||
|
|
||||||
|
class AtomicDict(object):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
self._dict = dict()
|
||||||
|
self._lock = BoundedSemaphore()
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self._lock.acquire()
|
||||||
|
return self._dict
|
||||||
|
|
||||||
|
def __exit__(self, _type, value, traceback):
|
||||||
|
self._lock.release()
|
||||||
|
|
||||||
|
def get_state(meta):
|
||||||
|
state = infinitedict()
|
||||||
|
with meta as m:
|
||||||
|
state = m.get("state")
|
||||||
|
assert state is not None, "'state' is missing"
|
||||||
|
|
||||||
|
return state.copy()
|
||||||
|
|
||||||
|
def unsubscribe(meta, listener):
|
||||||
|
with meta as m:
|
||||||
|
listeners = m.get("listeners")
|
||||||
|
assert listener in listeners, f"'{listener}' missing from: {listeners}"
|
||||||
|
listeners.remove(listener)
|
||||||
|
|
||||||
|
def subscribe(meta, listener):
|
||||||
|
with meta as m:
|
||||||
|
listeners = m.get("listeners")
|
||||||
|
assert listeners is not None, "'listeners' is missing"
|
||||||
|
listeners.append(listener)
|
||||||
|
|
||||||
|
return lambda: unsubscribe(meta, listener)
|
||||||
|
|
||||||
|
@generator
|
||||||
|
def dispatch(meta):
|
||||||
|
action = (yield)
|
||||||
|
|
||||||
|
with meta as m:
|
||||||
|
is_dispatching = m.get("is_dispatching")
|
||||||
|
assert is_dispatching is not None, "'is_dispatching' is missing"
|
||||||
|
if is_dispatching:
|
||||||
|
raise Exception("Reducers cannot dispatch actions")
|
||||||
|
|
||||||
|
try:
|
||||||
|
with meta as m:
|
||||||
|
m["is_dispatching"] = True
|
||||||
|
reducer = m.get("reducer")
|
||||||
|
assert reducer is not None, "'reducer' is missing"
|
||||||
|
m["state"] = reducer(state, action)
|
||||||
|
finally:
|
||||||
|
with meta as m:
|
||||||
|
m["is_dispatching"] = False
|
||||||
|
|
||||||
|
with meta as m:
|
||||||
|
listeners = m.get("listeners")
|
||||||
|
assert listeners is not None, "'listeners' is missing"
|
||||||
|
for listener in listeners[:]:
|
||||||
|
listener()
|
||||||
|
|
||||||
|
def manager(reducer, initial_state=infinitedict()):
|
||||||
|
meta = AtomicDict()
|
||||||
|
with meta as m:
|
||||||
|
m["listeners"] = list()
|
||||||
|
m["is_dispatching"] = False
|
||||||
|
m["reducer"] = reducer
|
||||||
|
m["state"] = initial_state
|
||||||
|
|
||||||
|
state = lambda: get_state(meta)
|
||||||
|
return state, dispatch(meta)
|
||||||
|
|
||||||
|
Action = namedtuple("Action", ["type", "payload"])
|
||||||
|
#meta, actions = manager(reducer)
|
128
taijitu.py
128
taijitu.py
|
@ -1,91 +1,55 @@
|
||||||
from abots.helpers import generator
|
from state_management import Action, manager
|
||||||
from psutil import Process as PSProcess
|
from abots.helpers import generator, coroutine, infinitedict
|
||||||
from queue import Queue, Empty
|
from time import monotonic as time
|
||||||
from time import sleep, monotonic as time
|
from gevent import Greenlet, spawn, sleep, joinall
|
||||||
from os import getloadavg
|
from gevent.queue import Queue, Empty
|
||||||
from collections import deque
|
#from queue import Queue, Empty
|
||||||
from multiprocessing import cpu_count
|
|
||||||
|
|
||||||
actions = dict()
|
HOOKS = dict()
|
||||||
events = Queue()
|
|
||||||
last = time()
|
def reducer(state, action):
|
||||||
ticks = deque()
|
if action.name == "ADD":
|
||||||
ticks.append(0)
|
state["counter"] = state.get("counter", 0) + action.data
|
||||||
max_loadavg = float(cpu_count())
|
|
||||||
threshold = 15.0
|
return state
|
||||||
max_cpu = 25.0
|
|
||||||
sleeping = 0.001
|
|
||||||
sleepings = list()
|
|
||||||
stats = list()
|
|
||||||
magic = 0
|
|
||||||
step = 0.0001
|
|
||||||
max_ticks = 0
|
|
||||||
loops = 0
|
|
||||||
proc = PSProcess()
|
|
||||||
cpu = proc.cpu_percent()
|
|
||||||
|
|
||||||
def register(func):
|
def register(func):
|
||||||
actions[func.__name__] = generator(func)
|
#f = generator(func)
|
||||||
|
#f.start()
|
||||||
|
HOOKS[func.__name__.lower()] = func#f()
|
||||||
return func
|
return func
|
||||||
|
|
||||||
@register
|
#@coroutine
|
||||||
def push():
|
def pull(queue):
|
||||||
message = (yield)
|
while True:
|
||||||
time1 = round(sleeping, 6)
|
task = queue.get()
|
||||||
time2 = round(magic, 6)
|
assert isinstance(task, tuple), f"Expected tuple: {task}"
|
||||||
print("push", message, time1, time2, loops, max_ticks, cpu)
|
assert len(task) == 2, f"Invalid format: {task}"
|
||||||
event = "pull", ticks[-1]
|
hook_name, message = task
|
||||||
events.put_nowait(event)
|
hook = HOOKS.get(hook_name)
|
||||||
|
hook(queue, message)
|
||||||
|
|
||||||
|
@generator
|
||||||
|
def push(queue):
|
||||||
|
task = (yield)
|
||||||
|
assert isinstance(task, tuple), f"Expected tuple: {task}"
|
||||||
|
assert len(task) == 2, f"Invalid format: {task}"
|
||||||
|
queue.put(task)
|
||||||
|
|
||||||
@register
|
@register
|
||||||
def pull():
|
def ping(queue, message):
|
||||||
message = (yield)
|
print(f"PING: {message}")
|
||||||
time1 = round(sleeping, 6)
|
task = "pong", message
|
||||||
time2 = round(magic, 6)
|
queue.put(task)
|
||||||
print("pull", message, time1, time2, loops, max_ticks, cpu)
|
|
||||||
event = "push", ticks[-1]
|
|
||||||
events.put_nowait(event)
|
|
||||||
|
|
||||||
seed = "push", ticks[-1]
|
@register
|
||||||
events.put_nowait(seed)
|
def pong(queue, message):
|
||||||
done = time() + 60
|
print(f"PONG: {message}")
|
||||||
prev_cpu = 0
|
task = "ping", message
|
||||||
prev_loadavg = 0
|
queue.put(task)
|
||||||
while True: #time() < done:
|
|
||||||
if time() >= last + 1:
|
|
||||||
ticks.append(0)
|
|
||||||
last = time()
|
|
||||||
loadavg = getloadavg()[0]
|
|
||||||
diff_loadavg = loadavg - prev_loadavg
|
|
||||||
prev_loadavg = loadavg
|
|
||||||
if loadavg >= max_loadavg and diff_loadavg > 0:
|
|
||||||
sleeping = sleeping + step * 10
|
|
||||||
else:
|
|
||||||
cpu = proc.cpu_percent()
|
|
||||||
diff_cpu = cpu - prev_cpu
|
|
||||||
prev_cpu = cpu
|
|
||||||
if cpu >= max_cpu and diff_cpu > 0:
|
|
||||||
sleeping = sleeping + step * 10
|
|
||||||
elif cpu >= threshold and diff_cpu > 0:
|
|
||||||
sleeping = sleeping + step
|
|
||||||
elif sleeping - step >= 0 and diff_cpu < 0:
|
|
||||||
sleeping = sleeping - step
|
|
||||||
|
|
||||||
sleepings.append(sleeping)
|
state, dispatch = manager(reducer)
|
||||||
stat = sum(sleepings) / len(sleepings)
|
queue = Queue()
|
||||||
stats.append(stat)
|
yang = push(queue)
|
||||||
magic = sum(stats) / len(stats)
|
yin = spawn(pull, queue)
|
||||||
max_ticks = max(ticks)
|
start = lambda: joinall([yin])
|
||||||
loops = loops + 1
|
|
||||||
|
|
||||||
try:
|
|
||||||
event = events.get_nowait()
|
|
||||||
source, message = event
|
|
||||||
action = actions.get(source)
|
|
||||||
if action:
|
|
||||||
action().send(message)
|
|
||||||
except Empty:
|
|
||||||
break
|
|
||||||
|
|
||||||
sleep(sleeping)
|
|
||||||
ticks[-1] = ticks[-1] + 1
|
|
||||||
|
|
Loading…
Reference in New Issue