launcher/test_app_flow.py

300 lines
10 KiB
Python

"""
Socket Flow:
- Get new messages
- Send messages to message queue
- Repeat
App Init Flow:
- Initialize state
- Start server(s)
- Start their clients
App Flow:
- Get new server messages
- Get new client messages
- Repeat
Message Queue Flow:
- In FIFO order, get next queue entry
- Dispatch entry to reducers queue as action
- Repeat
Reducers Flow:
- In FIFO order, get next action
- Send it to relevant reducer(s)
- Change state of application if needed
- Dispatch new state to hooks queue
- Repeat
Hooks Flow:
- In FIFO order, get new state
- Send it to relevant hooks(s)
- Run side effects triggered by new state
- Send message(s) to socket(s)
- Send message(s) to logger queue
- Dispatch action(s) to database queue
- Dispatch action(s) to reducer(s)
- Repeat
Database Flow:
- In FIFO order, get new action
- Run action
- Send status/result to sender
- Repeat
Logger Flow:
- In FIFO order, get new log message
- Determine log level
- Log message against logger
- Repeat
ASCII Diagram of App Flow:
/-> Reducer1 -\
/-> Clients ----> Messages -> Reducers -+--> Redcuer2 --+-> State -> Listeners
| | ^ ^ ^ \-> Reducer3 -/ | |
\-- Servers <------/ | | | v v
| | | Hook1 Hook2
| Database | Logger \ /
| ^ | ^ v
|__|________|____|_________________________________/
Logic:
- All app logic lives in the flow loop
- All components communicate with the flow loop using the queue
- All side effects from state changes must be done through a hook
- The flow loop manages the generators and coroutines
- The generators and coroutines have their own loops and queues
- A socket server can have many socket clients
- A socket client has only one socket server
- A socket client can talk with a socket (that is not their server) using hooks
- A socket or message queue can send messages to sockets, and nothing else
- Hooks use the message queue to get messages to sockets
- Hooks can only be triggered by changes to the app state
- The app state is a dictionary of dictionaries
- The value to the root keys in the app state must be a dictionary
- A reducer cannot dispatch an action to another reducer
- A reducer can update the state, and nothing else
- The database can take actions from hooks, and nothing else
- The database can relay its results / status using hooks
- The logger can take actions from hooks, and nothing else
- While debugging / in unstable commits, none of these rules apply
"""
from abots.helpers import jsto, sha256, generator, coroutine
from abots.helpers import freeze_dict, unfreeze_dict
from core.parsers import parse_irc_message
from socket import socket, timeout as sock_timeout
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
from time import sleep
from re import compile as regex
from queue import Queue, Empty
from collections import deque
class Chi:
def __init__(self, initial_state):
# Make sure initial state is in the correct format
assert isinstance(initial_state, dict),
f"Expected dict: {initial_state}"
for key, value in initial_state.items():
assert key != "_self", f"This key is reserved {key}"
self._check_state(key, value)
# Define internal states
initial_state["_self"] = dict()
initial_state["_self"]["running"] = True
self._state = freeze_dict(initial_state)
self._reducers = dict()
self._hooks = dict()
self._queue = Queue()
self._queues = dict()
self._generators = dict()
self._coroutines = dict()
self._listeners = dict()
def _check_state(self, key, value):
assert isinstance(key, str), f"Expected str: {key}"
assert isinstance(value, dict), f"Expected dict: {value}"
def _register_plugin(self, name, key):
assert isinstance(name, str), f"Expected str: {name}"
assert isinstance(key, str), f"Expected str: {key}"
plugin = getattr(self, f"_{name}", None)
assert plugin is not None, f"Expected plugin: {name}"
assert key not in plugin, f"'{key}' already in {name}"
def decorator_register(func):
assert callable(func), f"Expected func: {func}"
plugin[key] = func
return decorator_register
def get_state(self, key):
assert isinstance(key, str), f"Expected str: {key}"
return unfreeze_dict(self._state).get(key)
def get_state_keys(self):
return (key for key, value in self._state)
def get_state_values(self):
return (value for key, value in self._state)
def set_state(key, value):
self._check_state(key, value)
state = unfreeze_dict(self._state)
state[key] = value
self._state = freeze_dict(state)
# Set the state if it does not exist, return if state changed
def add_state(self, key, value):
self._check_state(key, value)
keys = self.get_state_keys()
if key in keys:
return False
self.set_state(key, value)
return True
def register_reducer(self, key):
return self._register_plugin("reducers", key)
def register_hook(self, key):
return self._register_plugin("hooks", key)
def register_generator(self, key):
assert isinstance(key, str), f"Expected str: {key}"
assert key not in self._generators, f"'{key}' already in generators"
def decorator_register(func):
assert callable(func), f"Expected func: {func}"
self._queues[key] = Queue()
self._generators[key] = generator(func)
return decorator_register
def register_coroutines(self, key, gen_key):
assert isinstance(key, str), f"Expected str: {key}"
assert key not in self._coroutines, f"'{key}' already in coroutines"
def decorator_register(func):
assert callable(func), f"Expected func: {func}"
self._coroutines[key] = coroutine(func)
return decorator_register
def subscribe(self, coro_key, gen_key):
assert isinstance(coro_key, str), f"Expected str: {coro_key}"
assert isinstance(gen_key, str), f"Expected str: {gen_key}"
assert coro_key in self._coroutines, f"'{coro_key}' not in coroutines"
assert gen_key in self._queues, f"'{gen_key}' not in queues"
already_subscribed = f"'{coro_key}' has already subscribed"
assert coro_key not in self._listeners, already_subscribed
self._listeners[coro_key] = self._queues.get(gen_key)
def cancel():
self._listeners.pop(coro_key, None)
return cancel
def start():
running = self.get_state("running")
while running:
for gname, gen in self._generators.items():
queue = self._queues.get(gname)
if not queue:
continue
for response in gen():
queue.put(response)
for cname, coro in self._coroutines.items():
queue = self._listeners.get(cname)
if not queue:
continue
for response in coro(queue):
self._queue.put(response)
while True:
try:
packed = self._queue.get_nowait()
assert len(packed) == 3, f"Invalid packed message: {packed}"
target, target_id, message = packed
assert message is not None, f"Invalid message: {message}"
targets = dict()
targets["generator"] = self._generators
targets["reducers"] = self._reducers
assert target in targets, f"Invalid target: {target}"
destination = targets.get(target).get(target_id)
invalid_target_id = f"Invalid target id: {target_id}"
assert destination is not None, invalid_target_id
destination.send(message)
except Empty:
break
for rname, reducer in self._reducers.items():
pass
def make_socket_client(host, port):
sock = wrap_socket(socket(AF_INET, SOCK_STREAM))
sock.connect((host, port))
return sock
def make_socket_server(host, port):
sock = wrap_socket(socket(AF_INET, SOCK_STREAM))
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
try:
sock.bind((host, port))
except (BrokenPipeError, OSError) as e:
continue
sock.setblocking(0)
sock.settimeout(0)
sock.listen(5)
return sock
def run_server(sock):
def gen_server():
client = sock.accept()
yield client
def run_client(sock):
buffer_size = 1024
def gen_client():
message = sock.recvfrom(buffer_size)
assert len(message) == 2, f"Wrong message format: {message}"
yield message
def coro_get_lines(queue):
cache = b""
newline = b"\n"
package = lambda m: ("generator", )
while True:
try:
#packet = cache + sock.recv(size)
message = queue.get_nowait()
except Empty:
break
assert len(message) == 2, f"Wrong message format: {message}"
raw, source = message
packet = cache + raw
if packet[-1] != newline:
buf, separator, remainder = packet.rpartition(newline)
cache = remainder
packet = buf + separator
else:
cache = b""
if len(packet) == 0:
yield None
for line in packet.split(newline):
try:
decoded = line.decode("utf-8")
except UnicodeDecodeError:
try:
decoded = line.decode("iso-8859-1")
except UnicodeDecodeError:
yield None
yield decoded.strip("\r")
def coro_client(message_queue, generator_queue):
for
babili_server = make_socket_server("localhost", 10601)
irc_servers = [("irc.tilde.chat", 6697), ("irc.freenode.net", 6697)]
irc_clients = dict()
for server, port in irc_servers:
name = server.split(".", 1)[1].split(".", 0)[0]
irc_clients[name] = make_socket_client(server, port)
initial_state = dict()
chi = Chi(initial_state)
chi.start()