diff --git a/test_app_flow.py b/test_app_flow.py new file mode 100644 index 0000000..4fd2132 --- /dev/null +++ b/test_app_flow.py @@ -0,0 +1,299 @@ +""" + +Socket Flow: +- Get new messages +- Send messages to message queue +- Repeat + +App Init Flow: +- Initialize state +- Start server(s) +- Start their clients + +App Flow: +- Get new server messages +- Get new client messages +- Repeat + +Message Queue Flow: +- In FIFO order, get next queue entry +- Dispatch entry to reducers queue as action +- Repeat + +Reducers Flow: +- In FIFO order, get next action +- Send it to relevant reducer(s) +- Change state of application if needed +- Dispatch new state to hooks queue +- Repeat + +Hooks Flow: +- In FIFO order, get new state +- Send it to relevant hooks(s) +- Run side effects triggered by new state + - Send message(s) to socket(s) + - Send message(s) to logger queue + - Dispatch action(s) to database queue + - Dispatch action(s) to reducer(s) +- Repeat + +Database Flow: +- In FIFO order, get new action +- Run action +- Send status/result to sender +- Repeat + +Logger Flow: +- In FIFO order, get new log message +- Determine log level +- Log message against logger +- Repeat + +ASCII Diagram of App Flow: + + /-> Reducer1 -\ +/-> Clients ----> Messages -> Reducers -+--> Redcuer2 --+-> State -> Listeners +| | ^ ^ ^ \-> Reducer3 -/ | | +\-- Servers <------/ | | | v v + | | | Hook1 Hook2 + | Database | Logger \ / + | ^ | ^ v + |__|________|____|_________________________________/ + +Logic: +- All app logic lives in the flow loop +- All components communicate with the flow loop using the queue +- All side effects from state changes must be done through a hook +- The flow loop manages the generators and coroutines +- The generators and coroutines have their own loops and queues +- A socket server can have many socket clients +- A socket client has only one socket server +- A socket client can talk with a socket (that is not their server) using hooks +- A socket or message queue can send messages to sockets, and nothing else +- Hooks use the message queue to get messages to sockets +- Hooks can only be triggered by changes to the app state +- The app state is a dictionary of dictionaries +- The value to the root keys in the app state must be a dictionary +- A reducer cannot dispatch an action to another reducer +- A reducer can update the state, and nothing else +- The database can take actions from hooks, and nothing else +- The database can relay its results / status using hooks +- The logger can take actions from hooks, and nothing else +- While debugging / in unstable commits, none of these rules apply + +""" +from abots.helpers import jsto, sha256, generator, coroutine +from abots.helpers import freeze_dict, unfreeze_dict +from core.parsers import parse_irc_message +from socket import socket, timeout as sock_timeout +from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR +from ssl import wrap_socket +from time import sleep +from re import compile as regex +from queue import Queue, Empty +from collections import deque + +class Chi: + def __init__(self, initial_state): + # Make sure initial state is in the correct format + assert isinstance(initial_state, dict), + f"Expected dict: {initial_state}" + for key, value in initial_state.items(): + assert key != "_self", f"This key is reserved {key}" + self._check_state(key, value) + + # Define internal states + initial_state["_self"] = dict() + initial_state["_self"]["running"] = True + + self._state = freeze_dict(initial_state) + self._reducers = dict() + self._hooks = dict() + self._queue = Queue() + self._queues = dict() + self._generators = dict() + self._coroutines = dict() + self._listeners = dict() + + def _check_state(self, key, value): + assert isinstance(key, str), f"Expected str: {key}" + assert isinstance(value, dict), f"Expected dict: {value}" + + def _register_plugin(self, name, key): + assert isinstance(name, str), f"Expected str: {name}" + assert isinstance(key, str), f"Expected str: {key}" + plugin = getattr(self, f"_{name}", None) + assert plugin is not None, f"Expected plugin: {name}" + assert key not in plugin, f"'{key}' already in {name}" + def decorator_register(func): + assert callable(func), f"Expected func: {func}" + plugin[key] = func + return decorator_register + + def get_state(self, key): + assert isinstance(key, str), f"Expected str: {key}" + return unfreeze_dict(self._state).get(key) + + def get_state_keys(self): + return (key for key, value in self._state) + + def get_state_values(self): + return (value for key, value in self._state) + + def set_state(key, value): + self._check_state(key, value) + state = unfreeze_dict(self._state) + state[key] = value + self._state = freeze_dict(state) + + # Set the state if it does not exist, return if state changed + def add_state(self, key, value): + self._check_state(key, value) + keys = self.get_state_keys() + if key in keys: + return False + self.set_state(key, value) + return True + + def register_reducer(self, key): + return self._register_plugin("reducers", key) + + def register_hook(self, key): + return self._register_plugin("hooks", key) + + def register_generator(self, key): + assert isinstance(key, str), f"Expected str: {key}" + assert key not in self._generators, f"'{key}' already in generators" + def decorator_register(func): + assert callable(func), f"Expected func: {func}" + self._queues[key] = Queue() + self._generators[key] = generator(func) + return decorator_register + + def register_coroutines(self, key, gen_key): + assert isinstance(key, str), f"Expected str: {key}" + assert key not in self._coroutines, f"'{key}' already in coroutines" + def decorator_register(func): + assert callable(func), f"Expected func: {func}" + self._coroutines[key] = coroutine(func) + return decorator_register + + def subscribe(self, coro_key, gen_key): + assert isinstance(coro_key, str), f"Expected str: {coro_key}" + assert isinstance(gen_key, str), f"Expected str: {gen_key}" + assert coro_key in self._coroutines, f"'{coro_key}' not in coroutines" + assert gen_key in self._queues, f"'{gen_key}' not in queues" + already_subscribed = f"'{coro_key}' has already subscribed" + assert coro_key not in self._listeners, already_subscribed + self._listeners[coro_key] = self._queues.get(gen_key) + def cancel(): + self._listeners.pop(coro_key, None) + return cancel + + def start(): + running = self.get_state("running") + while running: + for gname, gen in self._generators.items(): + queue = self._queues.get(gname) + if not queue: + continue + for response in gen(): + queue.put(response) + for cname, coro in self._coroutines.items(): + queue = self._listeners.get(cname) + if not queue: + continue + for response in coro(queue): + self._queue.put(response) + while True: + try: + packed = self._queue.get_nowait() + assert len(packed) == 3, f"Invalid packed message: {packed}" + target, target_id, message = packed + assert message is not None, f"Invalid message: {message}" + targets = dict() + targets["generator"] = self._generators + targets["reducers"] = self._reducers + assert target in targets, f"Invalid target: {target}" + destination = targets.get(target).get(target_id) + invalid_target_id = f"Invalid target id: {target_id}" + assert destination is not None, invalid_target_id + destination.send(message) + except Empty: + break + for rname, reducer in self._reducers.items(): + pass + +def make_socket_client(host, port): + sock = wrap_socket(socket(AF_INET, SOCK_STREAM)) + sock.connect((host, port)) + return sock + +def make_socket_server(host, port): + sock = wrap_socket(socket(AF_INET, SOCK_STREAM)) + sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + try: + sock.bind((host, port)) + except (BrokenPipeError, OSError) as e: + continue + sock.setblocking(0) + sock.settimeout(0) + sock.listen(5) + return sock + +def run_server(sock): + def gen_server(): + client = sock.accept() + yield client + +def run_client(sock): + buffer_size = 1024 + def gen_client(): + message = sock.recvfrom(buffer_size) + assert len(message) == 2, f"Wrong message format: {message}" + yield message + +def coro_get_lines(queue): + cache = b"" + newline = b"\n" + package = lambda m: ("generator", ) + while True: + try: + #packet = cache + sock.recv(size) + message = queue.get_nowait() + except Empty: + break + assert len(message) == 2, f"Wrong message format: {message}" + raw, source = message + packet = cache + raw + if packet[-1] != newline: + buf, separator, remainder = packet.rpartition(newline) + cache = remainder + packet = buf + separator + else: + cache = b"" + if len(packet) == 0: + yield None + for line in packet.split(newline): + try: + decoded = line.decode("utf-8") + except UnicodeDecodeError: + try: + decoded = line.decode("iso-8859-1") + except UnicodeDecodeError: + yield None + yield decoded.strip("\r") + +def coro_client(message_queue, generator_queue): + for + +babili_server = make_socket_server("localhost", 10601) +irc_servers = [("irc.tilde.chat", 6697), ("irc.freenode.net", 6697)] +irc_clients = dict() +for server, port in irc_servers: + name = server.split(".", 1)[1].split(".", 0)[0] + irc_clients[name] = make_socket_client(server, port) + +initial_state = dict() +chi = Chi(initial_state) +chi.start()