Unstable, working on flow of application
This commit is contained in:
parent
b0ac577328
commit
d307a1ac96
|
@ -0,0 +1,299 @@
|
|||
"""
|
||||
|
||||
Socket Flow:
|
||||
- Get new messages
|
||||
- Send messages to message queue
|
||||
- Repeat
|
||||
|
||||
App Init Flow:
|
||||
- Initialize state
|
||||
- Start server(s)
|
||||
- Start their clients
|
||||
|
||||
App Flow:
|
||||
- Get new server messages
|
||||
- Get new client messages
|
||||
- Repeat
|
||||
|
||||
Message Queue Flow:
|
||||
- In FIFO order, get next queue entry
|
||||
- Dispatch entry to reducers queue as action
|
||||
- Repeat
|
||||
|
||||
Reducers Flow:
|
||||
- In FIFO order, get next action
|
||||
- Send it to relevant reducer(s)
|
||||
- Change state of application if needed
|
||||
- Dispatch new state to hooks queue
|
||||
- Repeat
|
||||
|
||||
Hooks Flow:
|
||||
- In FIFO order, get new state
|
||||
- Send it to relevant hooks(s)
|
||||
- Run side effects triggered by new state
|
||||
- Send message(s) to socket(s)
|
||||
- Send message(s) to logger queue
|
||||
- Dispatch action(s) to database queue
|
||||
- Dispatch action(s) to reducer(s)
|
||||
- Repeat
|
||||
|
||||
Database Flow:
|
||||
- In FIFO order, get new action
|
||||
- Run action
|
||||
- Send status/result to sender
|
||||
- Repeat
|
||||
|
||||
Logger Flow:
|
||||
- In FIFO order, get new log message
|
||||
- Determine log level
|
||||
- Log message against logger
|
||||
- Repeat
|
||||
|
||||
ASCII Diagram of App Flow:
|
||||
|
||||
/-> Reducer1 -\
|
||||
/-> Clients ----> Messages -> Reducers -+--> Redcuer2 --+-> State -> Listeners
|
||||
| | ^ ^ ^ \-> Reducer3 -/ | |
|
||||
\-- Servers <------/ | | | v v
|
||||
| | | Hook1 Hook2
|
||||
| Database | Logger \ /
|
||||
| ^ | ^ v
|
||||
|__|________|____|_________________________________/
|
||||
|
||||
Logic:
|
||||
- All app logic lives in the flow loop
|
||||
- All components communicate with the flow loop using the queue
|
||||
- All side effects from state changes must be done through a hook
|
||||
- The flow loop manages the generators and coroutines
|
||||
- The generators and coroutines have their own loops and queues
|
||||
- A socket server can have many socket clients
|
||||
- A socket client has only one socket server
|
||||
- A socket client can talk with a socket (that is not their server) using hooks
|
||||
- A socket or message queue can send messages to sockets, and nothing else
|
||||
- Hooks use the message queue to get messages to sockets
|
||||
- Hooks can only be triggered by changes to the app state
|
||||
- The app state is a dictionary of dictionaries
|
||||
- The value to the root keys in the app state must be a dictionary
|
||||
- A reducer cannot dispatch an action to another reducer
|
||||
- A reducer can update the state, and nothing else
|
||||
- The database can take actions from hooks, and nothing else
|
||||
- The database can relay its results / status using hooks
|
||||
- The logger can take actions from hooks, and nothing else
|
||||
- While debugging / in unstable commits, none of these rules apply
|
||||
|
||||
"""
|
||||
from abots.helpers import jsto, sha256, generator, coroutine
|
||||
from abots.helpers import freeze_dict, unfreeze_dict
|
||||
from core.parsers import parse_irc_message
|
||||
from socket import socket, timeout as sock_timeout
|
||||
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
|
||||
from ssl import wrap_socket
|
||||
from time import sleep
|
||||
from re import compile as regex
|
||||
from queue import Queue, Empty
|
||||
from collections import deque
|
||||
|
||||
class Chi:
|
||||
def __init__(self, initial_state):
|
||||
# Make sure initial state is in the correct format
|
||||
assert isinstance(initial_state, dict),
|
||||
f"Expected dict: {initial_state}"
|
||||
for key, value in initial_state.items():
|
||||
assert key != "_self", f"This key is reserved {key}"
|
||||
self._check_state(key, value)
|
||||
|
||||
# Define internal states
|
||||
initial_state["_self"] = dict()
|
||||
initial_state["_self"]["running"] = True
|
||||
|
||||
self._state = freeze_dict(initial_state)
|
||||
self._reducers = dict()
|
||||
self._hooks = dict()
|
||||
self._queue = Queue()
|
||||
self._queues = dict()
|
||||
self._generators = dict()
|
||||
self._coroutines = dict()
|
||||
self._listeners = dict()
|
||||
|
||||
def _check_state(self, key, value):
|
||||
assert isinstance(key, str), f"Expected str: {key}"
|
||||
assert isinstance(value, dict), f"Expected dict: {value}"
|
||||
|
||||
def _register_plugin(self, name, key):
|
||||
assert isinstance(name, str), f"Expected str: {name}"
|
||||
assert isinstance(key, str), f"Expected str: {key}"
|
||||
plugin = getattr(self, f"_{name}", None)
|
||||
assert plugin is not None, f"Expected plugin: {name}"
|
||||
assert key not in plugin, f"'{key}' already in {name}"
|
||||
def decorator_register(func):
|
||||
assert callable(func), f"Expected func: {func}"
|
||||
plugin[key] = func
|
||||
return decorator_register
|
||||
|
||||
def get_state(self, key):
|
||||
assert isinstance(key, str), f"Expected str: {key}"
|
||||
return unfreeze_dict(self._state).get(key)
|
||||
|
||||
def get_state_keys(self):
|
||||
return (key for key, value in self._state)
|
||||
|
||||
def get_state_values(self):
|
||||
return (value for key, value in self._state)
|
||||
|
||||
def set_state(key, value):
|
||||
self._check_state(key, value)
|
||||
state = unfreeze_dict(self._state)
|
||||
state[key] = value
|
||||
self._state = freeze_dict(state)
|
||||
|
||||
# Set the state if it does not exist, return if state changed
|
||||
def add_state(self, key, value):
|
||||
self._check_state(key, value)
|
||||
keys = self.get_state_keys()
|
||||
if key in keys:
|
||||
return False
|
||||
self.set_state(key, value)
|
||||
return True
|
||||
|
||||
def register_reducer(self, key):
|
||||
return self._register_plugin("reducers", key)
|
||||
|
||||
def register_hook(self, key):
|
||||
return self._register_plugin("hooks", key)
|
||||
|
||||
def register_generator(self, key):
|
||||
assert isinstance(key, str), f"Expected str: {key}"
|
||||
assert key not in self._generators, f"'{key}' already in generators"
|
||||
def decorator_register(func):
|
||||
assert callable(func), f"Expected func: {func}"
|
||||
self._queues[key] = Queue()
|
||||
self._generators[key] = generator(func)
|
||||
return decorator_register
|
||||
|
||||
def register_coroutines(self, key, gen_key):
|
||||
assert isinstance(key, str), f"Expected str: {key}"
|
||||
assert key not in self._coroutines, f"'{key}' already in coroutines"
|
||||
def decorator_register(func):
|
||||
assert callable(func), f"Expected func: {func}"
|
||||
self._coroutines[key] = coroutine(func)
|
||||
return decorator_register
|
||||
|
||||
def subscribe(self, coro_key, gen_key):
|
||||
assert isinstance(coro_key, str), f"Expected str: {coro_key}"
|
||||
assert isinstance(gen_key, str), f"Expected str: {gen_key}"
|
||||
assert coro_key in self._coroutines, f"'{coro_key}' not in coroutines"
|
||||
assert gen_key in self._queues, f"'{gen_key}' not in queues"
|
||||
already_subscribed = f"'{coro_key}' has already subscribed"
|
||||
assert coro_key not in self._listeners, already_subscribed
|
||||
self._listeners[coro_key] = self._queues.get(gen_key)
|
||||
def cancel():
|
||||
self._listeners.pop(coro_key, None)
|
||||
return cancel
|
||||
|
||||
def start():
|
||||
running = self.get_state("running")
|
||||
while running:
|
||||
for gname, gen in self._generators.items():
|
||||
queue = self._queues.get(gname)
|
||||
if not queue:
|
||||
continue
|
||||
for response in gen():
|
||||
queue.put(response)
|
||||
for cname, coro in self._coroutines.items():
|
||||
queue = self._listeners.get(cname)
|
||||
if not queue:
|
||||
continue
|
||||
for response in coro(queue):
|
||||
self._queue.put(response)
|
||||
while True:
|
||||
try:
|
||||
packed = self._queue.get_nowait()
|
||||
assert len(packed) == 3, f"Invalid packed message: {packed}"
|
||||
target, target_id, message = packed
|
||||
assert message is not None, f"Invalid message: {message}"
|
||||
targets = dict()
|
||||
targets["generator"] = self._generators
|
||||
targets["reducers"] = self._reducers
|
||||
assert target in targets, f"Invalid target: {target}"
|
||||
destination = targets.get(target).get(target_id)
|
||||
invalid_target_id = f"Invalid target id: {target_id}"
|
||||
assert destination is not None, invalid_target_id
|
||||
destination.send(message)
|
||||
except Empty:
|
||||
break
|
||||
for rname, reducer in self._reducers.items():
|
||||
pass
|
||||
|
||||
def make_socket_client(host, port):
|
||||
sock = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
||||
sock.connect((host, port))
|
||||
return sock
|
||||
|
||||
def make_socket_server(host, port):
|
||||
sock = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
||||
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
|
||||
try:
|
||||
sock.bind((host, port))
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
continue
|
||||
sock.setblocking(0)
|
||||
sock.settimeout(0)
|
||||
sock.listen(5)
|
||||
return sock
|
||||
|
||||
def run_server(sock):
|
||||
def gen_server():
|
||||
client = sock.accept()
|
||||
yield client
|
||||
|
||||
def run_client(sock):
|
||||
buffer_size = 1024
|
||||
def gen_client():
|
||||
message = sock.recvfrom(buffer_size)
|
||||
assert len(message) == 2, f"Wrong message format: {message}"
|
||||
yield message
|
||||
|
||||
def coro_get_lines(queue):
|
||||
cache = b""
|
||||
newline = b"\n"
|
||||
package = lambda m: ("generator", )
|
||||
while True:
|
||||
try:
|
||||
#packet = cache + sock.recv(size)
|
||||
message = queue.get_nowait()
|
||||
except Empty:
|
||||
break
|
||||
assert len(message) == 2, f"Wrong message format: {message}"
|
||||
raw, source = message
|
||||
packet = cache + raw
|
||||
if packet[-1] != newline:
|
||||
buf, separator, remainder = packet.rpartition(newline)
|
||||
cache = remainder
|
||||
packet = buf + separator
|
||||
else:
|
||||
cache = b""
|
||||
if len(packet) == 0:
|
||||
yield None
|
||||
for line in packet.split(newline):
|
||||
try:
|
||||
decoded = line.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
try:
|
||||
decoded = line.decode("iso-8859-1")
|
||||
except UnicodeDecodeError:
|
||||
yield None
|
||||
yield decoded.strip("\r")
|
||||
|
||||
def coro_client(message_queue, generator_queue):
|
||||
for
|
||||
|
||||
babili_server = make_socket_server("localhost", 10601)
|
||||
irc_servers = [("irc.tilde.chat", 6697), ("irc.freenode.net", 6697)]
|
||||
irc_clients = dict()
|
||||
for server, port in irc_servers:
|
||||
name = server.split(".", 1)[1].split(".", 0)[0]
|
||||
irc_clients[name] = make_socket_client(server, port)
|
||||
|
||||
initial_state = dict()
|
||||
chi = Chi(initial_state)
|
||||
chi.start()
|
Loading…
Reference in New Issue