300 lines
10 KiB
Python
300 lines
10 KiB
Python
"""
|
|
|
|
Socket Flow:
|
|
- Get new messages
|
|
- Send messages to message queue
|
|
- Repeat
|
|
|
|
App Init Flow:
|
|
- Initialize state
|
|
- Start server(s)
|
|
- Start their clients
|
|
|
|
App Flow:
|
|
- Get new server messages
|
|
- Get new client messages
|
|
- Repeat
|
|
|
|
Message Queue Flow:
|
|
- In FIFO order, get next queue entry
|
|
- Dispatch entry to reducers queue as action
|
|
- Repeat
|
|
|
|
Reducers Flow:
|
|
- In FIFO order, get next action
|
|
- Send it to relevant reducer(s)
|
|
- Change state of application if needed
|
|
- Dispatch new state to hooks queue
|
|
- Repeat
|
|
|
|
Hooks Flow:
|
|
- In FIFO order, get new state
|
|
- Send it to relevant hooks(s)
|
|
- Run side effects triggered by new state
|
|
- Send message(s) to socket(s)
|
|
- Send message(s) to logger queue
|
|
- Dispatch action(s) to database queue
|
|
- Dispatch action(s) to reducer(s)
|
|
- Repeat
|
|
|
|
Database Flow:
|
|
- In FIFO order, get new action
|
|
- Run action
|
|
- Send status/result to sender
|
|
- Repeat
|
|
|
|
Logger Flow:
|
|
- In FIFO order, get new log message
|
|
- Determine log level
|
|
- Log message against logger
|
|
- Repeat
|
|
|
|
ASCII Diagram of App Flow:
|
|
|
|
/-> Reducer1 -\
|
|
/-> Clients ----> Messages -> Reducers -+--> Redcuer2 --+-> State -> Listeners
|
|
| | ^ ^ ^ \-> Reducer3 -/ | |
|
|
\-- Servers <------/ | | | v v
|
|
| | | Hook1 Hook2
|
|
| Database | Logger \ /
|
|
| ^ | ^ v
|
|
|__|________|____|_________________________________/
|
|
|
|
Logic:
|
|
- All app logic lives in the flow loop
|
|
- All components communicate with the flow loop using the queue
|
|
- All side effects from state changes must be done through a hook
|
|
- The flow loop manages the generators and coroutines
|
|
- The generators and coroutines have their own loops and queues
|
|
- A socket server can have many socket clients
|
|
- A socket client has only one socket server
|
|
- A socket client can talk with a socket (that is not their server) using hooks
|
|
- A socket or message queue can send messages to sockets, and nothing else
|
|
- Hooks use the message queue to get messages to sockets
|
|
- Hooks can only be triggered by changes to the app state
|
|
- The app state is a dictionary of dictionaries
|
|
- The value to the root keys in the app state must be a dictionary
|
|
- A reducer cannot dispatch an action to another reducer
|
|
- A reducer can update the state, and nothing else
|
|
- The database can take actions from hooks, and nothing else
|
|
- The database can relay its results / status using hooks
|
|
- The logger can take actions from hooks, and nothing else
|
|
- While debugging / in unstable commits, none of these rules apply
|
|
|
|
"""
|
|
from abots.helpers import jsto, sha256, generator, coroutine
|
|
from abots.helpers import freeze_dict, unfreeze_dict
|
|
from core.parsers import parse_irc_message
|
|
from socket import socket, timeout as sock_timeout
|
|
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
|
|
from ssl import wrap_socket
|
|
from time import sleep
|
|
from re import compile as regex
|
|
from queue import Queue, Empty
|
|
from collections import deque
|
|
|
|
class Chi:
|
|
def __init__(self, initial_state):
|
|
# Make sure initial state is in the correct format
|
|
assert isinstance(initial_state, dict),
|
|
f"Expected dict: {initial_state}"
|
|
for key, value in initial_state.items():
|
|
assert key != "_self", f"This key is reserved {key}"
|
|
self._check_state(key, value)
|
|
|
|
# Define internal states
|
|
initial_state["_self"] = dict()
|
|
initial_state["_self"]["running"] = True
|
|
|
|
self._state = freeze_dict(initial_state)
|
|
self._reducers = dict()
|
|
self._hooks = dict()
|
|
self._queue = Queue()
|
|
self._queues = dict()
|
|
self._generators = dict()
|
|
self._coroutines = dict()
|
|
self._listeners = dict()
|
|
|
|
def _check_state(self, key, value):
|
|
assert isinstance(key, str), f"Expected str: {key}"
|
|
assert isinstance(value, dict), f"Expected dict: {value}"
|
|
|
|
def _register_plugin(self, name, key):
|
|
assert isinstance(name, str), f"Expected str: {name}"
|
|
assert isinstance(key, str), f"Expected str: {key}"
|
|
plugin = getattr(self, f"_{name}", None)
|
|
assert plugin is not None, f"Expected plugin: {name}"
|
|
assert key not in plugin, f"'{key}' already in {name}"
|
|
def decorator_register(func):
|
|
assert callable(func), f"Expected func: {func}"
|
|
plugin[key] = func
|
|
return decorator_register
|
|
|
|
def get_state(self, key):
|
|
assert isinstance(key, str), f"Expected str: {key}"
|
|
return unfreeze_dict(self._state).get(key)
|
|
|
|
def get_state_keys(self):
|
|
return (key for key, value in self._state)
|
|
|
|
def get_state_values(self):
|
|
return (value for key, value in self._state)
|
|
|
|
def set_state(key, value):
|
|
self._check_state(key, value)
|
|
state = unfreeze_dict(self._state)
|
|
state[key] = value
|
|
self._state = freeze_dict(state)
|
|
|
|
# Set the state if it does not exist, return if state changed
|
|
def add_state(self, key, value):
|
|
self._check_state(key, value)
|
|
keys = self.get_state_keys()
|
|
if key in keys:
|
|
return False
|
|
self.set_state(key, value)
|
|
return True
|
|
|
|
def register_reducer(self, key):
|
|
return self._register_plugin("reducers", key)
|
|
|
|
def register_hook(self, key):
|
|
return self._register_plugin("hooks", key)
|
|
|
|
def register_generator(self, key):
|
|
assert isinstance(key, str), f"Expected str: {key}"
|
|
assert key not in self._generators, f"'{key}' already in generators"
|
|
def decorator_register(func):
|
|
assert callable(func), f"Expected func: {func}"
|
|
self._queues[key] = Queue()
|
|
self._generators[key] = generator(func)
|
|
return decorator_register
|
|
|
|
def register_coroutines(self, key, gen_key):
|
|
assert isinstance(key, str), f"Expected str: {key}"
|
|
assert key not in self._coroutines, f"'{key}' already in coroutines"
|
|
def decorator_register(func):
|
|
assert callable(func), f"Expected func: {func}"
|
|
self._coroutines[key] = coroutine(func)
|
|
return decorator_register
|
|
|
|
def subscribe(self, coro_key, gen_key):
|
|
assert isinstance(coro_key, str), f"Expected str: {coro_key}"
|
|
assert isinstance(gen_key, str), f"Expected str: {gen_key}"
|
|
assert coro_key in self._coroutines, f"'{coro_key}' not in coroutines"
|
|
assert gen_key in self._queues, f"'{gen_key}' not in queues"
|
|
already_subscribed = f"'{coro_key}' has already subscribed"
|
|
assert coro_key not in self._listeners, already_subscribed
|
|
self._listeners[coro_key] = self._queues.get(gen_key)
|
|
def cancel():
|
|
self._listeners.pop(coro_key, None)
|
|
return cancel
|
|
|
|
def start():
|
|
running = self.get_state("running")
|
|
while running:
|
|
for gname, gen in self._generators.items():
|
|
queue = self._queues.get(gname)
|
|
if not queue:
|
|
continue
|
|
for response in gen():
|
|
queue.put(response)
|
|
for cname, coro in self._coroutines.items():
|
|
queue = self._listeners.get(cname)
|
|
if not queue:
|
|
continue
|
|
for response in coro(queue):
|
|
self._queue.put(response)
|
|
while True:
|
|
try:
|
|
packed = self._queue.get_nowait()
|
|
assert len(packed) == 3, f"Invalid packed message: {packed}"
|
|
target, target_id, message = packed
|
|
assert message is not None, f"Invalid message: {message}"
|
|
targets = dict()
|
|
targets["generator"] = self._generators
|
|
targets["reducers"] = self._reducers
|
|
assert target in targets, f"Invalid target: {target}"
|
|
destination = targets.get(target).get(target_id)
|
|
invalid_target_id = f"Invalid target id: {target_id}"
|
|
assert destination is not None, invalid_target_id
|
|
destination.send(message)
|
|
except Empty:
|
|
break
|
|
for rname, reducer in self._reducers.items():
|
|
pass
|
|
|
|
def make_socket_client(host, port):
|
|
sock = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
|
sock.connect((host, port))
|
|
return sock
|
|
|
|
def make_socket_server(host, port):
|
|
sock = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
|
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
|
|
try:
|
|
sock.bind((host, port))
|
|
except (BrokenPipeError, OSError) as e:
|
|
continue
|
|
sock.setblocking(0)
|
|
sock.settimeout(0)
|
|
sock.listen(5)
|
|
return sock
|
|
|
|
def run_server(sock):
|
|
def gen_server():
|
|
client = sock.accept()
|
|
yield client
|
|
|
|
def run_client(sock):
|
|
buffer_size = 1024
|
|
def gen_client():
|
|
message = sock.recvfrom(buffer_size)
|
|
assert len(message) == 2, f"Wrong message format: {message}"
|
|
yield message
|
|
|
|
def coro_get_lines(queue):
|
|
cache = b""
|
|
newline = b"\n"
|
|
package = lambda m: ("generator", )
|
|
while True:
|
|
try:
|
|
#packet = cache + sock.recv(size)
|
|
message = queue.get_nowait()
|
|
except Empty:
|
|
break
|
|
assert len(message) == 2, f"Wrong message format: {message}"
|
|
raw, source = message
|
|
packet = cache + raw
|
|
if packet[-1] != newline:
|
|
buf, separator, remainder = packet.rpartition(newline)
|
|
cache = remainder
|
|
packet = buf + separator
|
|
else:
|
|
cache = b""
|
|
if len(packet) == 0:
|
|
yield None
|
|
for line in packet.split(newline):
|
|
try:
|
|
decoded = line.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
try:
|
|
decoded = line.decode("iso-8859-1")
|
|
except UnicodeDecodeError:
|
|
yield None
|
|
yield decoded.strip("\r")
|
|
|
|
def coro_client(message_queue, generator_queue):
|
|
for
|
|
|
|
babili_server = make_socket_server("localhost", 10601)
|
|
irc_servers = [("irc.tilde.chat", 6697), ("irc.freenode.net", 6697)]
|
|
irc_clients = dict()
|
|
for server, port in irc_servers:
|
|
name = server.split(".", 1)[1].split(".", 0)[0]
|
|
irc_clients[name] = make_socket_client(server, port)
|
|
|
|
initial_state = dict()
|
|
chi = Chi(initial_state)
|
|
chi.start()
|