Compare commits

...

8 Commits

4 changed files with 282 additions and 82 deletions

27
NOTES Normal file
View File

@ -0,0 +1,27 @@
20190825
========
# Redukti
Components:
* State: Dictionary holding values of application state
* Action: A name and data pair that encode a change to the state
* Reducer: A function that takes a state and action to produce a new state
* Root reducer: Passes the state and action to all reducers
# Derivide
Components:
* Event: Destination, type of change occurring, old value, and new value
* Stream: A series of events
* Pool: Collection of resources for off main thread tasks
* Plugin: Takes events from stream, performs side effect, and returns result
* Schema: Set of rules describing the structure of data, used for validating
* Blob: Collection of data validated against schema(s)
* Database: The validated form of the blob from schema(s)
# Taijitu
Components:
* Dispatcher: Waits for new events and sends them to new reducers or listener
* Listener: Waits for new events and sends them to plugins or dispatcher

132
event_loop.py Normal file
View File

@ -0,0 +1,132 @@
from abots.helpers import generator, coroutine, infinitedict
from asyncio import get_event_loop, sleep, wait, Queue
from socket import socket, timeout as sock_timeout
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
from time import sleep as nap
async def handle_client(loop, client):
while True:
try:
request = await loop.sock_recv(client, 256)
except Exception as e:
continue
if request:
message = request.decode()
print(message)
await sleep(0)
client.close()
async def run_server(loop, server):
while True:
client, _ = await loop.sock_accept(server)
loop.create_task(handle_client(loop, client))
await sleep(0)
async def push(loop, queues):
while all([queue for queue in queues]):
send = queues.get("push")
receive = queues.get("pull")
event = await send.get()
print("PUSH", event)
assert isinstance(event, tuple), f"Expected tuple: {event}"
if len(event) == 2:
error, response = event
if error:
break
elif len(event) == 4:
where, context, old_value, new_value = event
await receive.put(event)
await sleep(0)
return
async def pull(loop, queues):
plugins = dict()
print("PULL")
while all([queue for queue in queues]):
hook = queues.get("hook")
send = queues.get("push")
receive = queues.get("pull")
while not hook.empty():
plugin = await hook.get()
print("PULL", plugin)
assert callable(plugin), f"Expected function: {plugin}"
plugins[plugin.__name__.lower()] = plugin
event = await receive.get()
print("PULL", event)
assert isinstance(event, tuple), f"Expected tuple: {event}"
assert len(event) == 4, f"Invalid format: {event}"
where, context, old_value, new_value = event
stop = False
for plugin_name, plugin in plugins.items():
if plugin_name == where:
result = plugin(context, old_value, new_value)
assert isinstance(result, tuple), f"Expected tuple: {result}"
assert len(result) == 2, f"Invalid format: {result}"
await send.put(result)
error, response = result
if error:
stop = True
break
if stop:
break
await sleep(0)
return
def test(context, old_value, new_value):
status = old_value == new_value
return True, status
async def main(loop):
tasks = list()
queues = dict()
queues["hook"] = Queue()
queues["push"] = Queue()
queues["pull"] = Queue()
server = wrap_socket(socket(AF_INET, SOCK_STREAM))
server.setblocking(False)
server.settimeout(1)
server.connect(("", 6697))
await queues["hook"].put(test)
await queues["push"].put(("test", "change", None, True))
tasks.append(loop.create_task(handle_client(loop, server)))
tasks.append(loop.create_task(pull(loop, queues)))
tasks.append(loop.create_task(push(loop, queues)))
await wait(tasks)
loop.stop()
loop = get_event_loop()
try:
#loop.create_task(main(loop))
#server = wrap_socket(socket(AF_INET, SOCK_STREAM))
server = socket(AF_INET, SOCK_STREAM)
server.setblocking(False)
#server.settimeout(1)
server.bind(("localhost", 10201))
server.listen(5)
loop.create_task(run_server(loop, server))
loop.run_forever()
finally:
loop.close()
#server = wrap_socket(socket(AF_INET, SOCK_STREAM))
#server.connect(("irc.tilde.chat", 6697))
#while True:
# print(server.recv(4096))
# res = input("> ")
# server.send(f"{res}\r\n".encode())

77
state_management.py Normal file
View File

@ -0,0 +1,77 @@
from abots.helpers import generator, infinitedict
from gevent.lock import BoundedSemaphore
from collections import namedtuple
class AtomicDict(object):
def __init__(self, *args, **kwargs):
self._dict = dict()
self._lock = BoundedSemaphore()
def __enter__(self):
self._lock.acquire()
return self._dict
def __exit__(self, _type, value, traceback):
self._lock.release()
def get_state(meta):
state = infinitedict()
with meta as m:
state = m.get("state")
assert state is not None, "'state' is missing"
return state.copy()
def unsubscribe(meta, listener):
with meta as m:
listeners = m.get("listeners")
assert listener in listeners, f"'{listener}' missing from: {listeners}"
listeners.remove(listener)
def subscribe(meta, listener):
with meta as m:
listeners = m.get("listeners")
assert listeners is not None, "'listeners' is missing"
listeners.append(listener)
return lambda: unsubscribe(meta, listener)
@generator
def dispatch(meta):
action = (yield)
with meta as m:
is_dispatching = m.get("is_dispatching")
assert is_dispatching is not None, "'is_dispatching' is missing"
if is_dispatching:
raise Exception("Reducers cannot dispatch actions")
try:
with meta as m:
m["is_dispatching"] = True
reducer = m.get("reducer")
assert reducer is not None, "'reducer' is missing"
m["state"] = reducer(state, action)
finally:
with meta as m:
m["is_dispatching"] = False
with meta as m:
listeners = m.get("listeners")
assert listeners is not None, "'listeners' is missing"
for listener in listeners[:]:
listener()
def manager(reducer, initial_state=infinitedict()):
meta = AtomicDict()
with meta as m:
m["listeners"] = list()
m["is_dispatching"] = False
m["reducer"] = reducer
m["state"] = initial_state
state = lambda: get_state(meta)
return state, dispatch(meta)
Action = namedtuple("Action", ["type", "payload"])
#meta, actions = manager(reducer)

View File

@ -1,91 +1,55 @@
from abots.helpers import generator
from psutil import Process as PSProcess
from queue import Queue, Empty
from time import sleep, monotonic as time
from os import getloadavg
from collections import deque
from multiprocessing import cpu_count
from state_management import Action, manager
from abots.helpers import generator, coroutine, infinitedict
from time import monotonic as time
from gevent import Greenlet, spawn, sleep, joinall
from gevent.queue import Queue, Empty
#from queue import Queue, Empty
actions = dict()
events = Queue()
last = time()
ticks = deque()
ticks.append(0)
max_loadavg = float(cpu_count())
threshold = 15.0
max_cpu = 25.0
sleeping = 0.001
sleepings = list()
stats = list()
magic = 0
step = 0.0001
max_ticks = 0
loops = 0
proc = PSProcess()
cpu = proc.cpu_percent()
HOOKS = dict()
def reducer(state, action):
if action.name == "ADD":
state["counter"] = state.get("counter", 0) + action.data
return state
def register(func):
actions[func.__name__] = generator(func)
#f = generator(func)
#f.start()
HOOKS[func.__name__.lower()] = func#f()
return func
@register
def push():
message = (yield)
time1 = round(sleeping, 6)
time2 = round(magic, 6)
print("push", message, time1, time2, loops, max_ticks, cpu)
event = "pull", ticks[-1]
events.put_nowait(event)
#@coroutine
def pull(queue):
while True:
task = queue.get()
assert isinstance(task, tuple), f"Expected tuple: {task}"
assert len(task) == 2, f"Invalid format: {task}"
hook_name, message = task
hook = HOOKS.get(hook_name)
hook(queue, message)
@generator
def push(queue):
task = (yield)
assert isinstance(task, tuple), f"Expected tuple: {task}"
assert len(task) == 2, f"Invalid format: {task}"
queue.put(task)
@register
def pull():
message = (yield)
time1 = round(sleeping, 6)
time2 = round(magic, 6)
print("pull", message, time1, time2, loops, max_ticks, cpu)
event = "push", ticks[-1]
events.put_nowait(event)
def ping(queue, message):
print(f"PING: {message}")
task = "pong", message
queue.put(task)
seed = "push", ticks[-1]
events.put_nowait(seed)
done = time() + 60
prev_cpu = 0
prev_loadavg = 0
while True: #time() < done:
if time() >= last + 1:
ticks.append(0)
last = time()
loadavg = getloadavg()[0]
diff_loadavg = loadavg - prev_loadavg
prev_loadavg = loadavg
if loadavg >= max_loadavg and diff_loadavg > 0:
sleeping = sleeping + step * 10
else:
cpu = proc.cpu_percent()
diff_cpu = cpu - prev_cpu
prev_cpu = cpu
if cpu >= max_cpu and diff_cpu > 0:
sleeping = sleeping + step * 10
elif cpu >= threshold and diff_cpu > 0:
sleeping = sleeping + step
elif sleeping - step >= 0 and diff_cpu < 0:
sleeping = sleeping - step
@register
def pong(queue, message):
print(f"PONG: {message}")
task = "ping", message
queue.put(task)
sleepings.append(sleeping)
stat = sum(sleepings) / len(sleepings)
stats.append(stat)
magic = sum(stats) / len(stats)
max_ticks = max(ticks)
loops = loops + 1
try:
event = events.get_nowait()
source, message = event
action = actions.get(source)
if action:
action().send(message)
except Empty:
break
sleep(sleeping)
ticks[-1] = ticks[-1] + 1
state, dispatch = manager(reducer)
queue = Queue()
yang = push(queue)
yin = spawn(pull, queue)
start = lambda: joinall([yin])