launcher/test_sock_gen.py

296 lines
9.4 KiB
Python

from abots.helpers import jsto, sha256, generator
from core.parsers import parse_irc_message
from socket import socket, timeout as sock_timeout
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
from time import sleep
from re import compile as regex
"""
Socket Flow:
- Get new messages
- Send messages to message queue
- Repeat
App Init Flow:
- Initialize state
- Start server(s)
- Start their clients
App Flow:
- Get new server messages
- Get new client messages
- Repeat
Message Queue Flow:
- In FIFO order, get next queue entry
- Dispatch entry to reducers queue as action
- Repeat
Reducers Flow:
- In FIFO order, get next action
- Send it to relevant reducer(s)
- Change state of application if needed
- Dispatch new state to hooks queue
- Repeat
Hooks Flow:
- In FIFO order, get new state
- Send it to relevant hooks(s)
- Run side effects triggered by new state
- Send message(s) to socket(s)
- Send message(s) to logger queue
- Dispatch action(s) to database queue
- Dispatch action(s) to reducer(s)
- Repeat
Database Flow:
- In FIFO order, get new action
- Run action
- Send status/result to sender
- Repeat
Logger Flow:
- In FIFO order, get new log message
- Determine log level
- Log message against logger
- Repeat
-> Servers -\ /-> Reducer1 -\
/ |-> Messages -> Reducers -+--> Redcuer2 --+-> State -> Listeners
|-> Clients -/ ^ ^ \-> Reducer3 -/ | |
| | | v v
| | | Hook1 Hook2
| | | \ /
| | | v
|____________________|___________|______________________________________/
"""
with open("settings.json", "r") as config:
settings = jsto(config.read())
tilde = settings["irc"]["clients"]["tilde"]
host = tilde["host"]
port = tilde["port"]
name = tilde["name"]
pswd = tilde["password"]
auto = tilde["auto-join"]
assert isinstance(auto, list), "Expected list"
pack = lambda msg: f"{msg}\r\n".encode()
priv = lambda src, msg: pack(f"PRIVMSG {src} :{msg}")
join = lambda chan: pack(f"JOIN {chan}")
user = lambda name: pack(f"USER {name} - - -")
nick = lambda name: pack(f"NICK {name}")
iden = lambda pswd: pack(f"PRIVMSG NickServ :IDENTIFY {pswd}")
mode = lambda target, flag: pack(f"MODE {target} {flag}")
def clean(parameters, dirt=":"):
params = parameters.partition(dirt)
return params[0] + params[2]
hooks = dict()
listeners = dict()
state = dict()
sockets = dict()
sockets["server"] = dict()
sockets["clients"] = dict()
channel_modes = dict()
channel_modes["@"] = "secret"
channel_modes["*"] = "private"
channel_modes["="] = "public"
def register(namespace):
assert isinstance(namespace, str), "Expected str"
if namespace not in list(hooks):
hooks[namespace] = dict()
ns_hooks = hooks[namespace]
def wrapper_register(func):
assert callable(func), "Expected function"
hook_name = func.__name__
if hook_name in list(ns_hooks):
return
ns_hooks[hook_name] = func
#return generator(func)
return wrapper_register
def unsubscribe(key):
def cancel():
listeners.pop(key, None)
return cancel
def subscribe(namespace, name):
assert isinstance(namespace, str), "Expected str"
assert isinstance(name, str), "Expected str"
hook = hooks.get(namespace, dict()).get(name)
key = sha256()
if hook is None:
return
listeners[key] = hook
cancel = unsubscribe(key)
listeners[key] = generator(hook)(cancel)
def subscribe_many(namespace, names):
assert isinstance(names, list), "Expected list"
for name in names:
subscribe(namespace, name)
def get(sock, size=1024):
cache = b""
newline = b"\n"
while True:
packet = cache + sock.recv(size)
if packet[-1] != newline:
buf, separator, remainder = packet.rpartition(newline)
cache = remainder
packet = buf + separator
else:
cache = b""
if len(packet) == 0:
yield None
for line in packet.split(newline):
try:
decoded = line.decode("utf-8")
except UnicodeDecodeError:
try:
decoded = line.decode("iso-8859-1")
except UnicodeDecodeError:
yield None
yield decoded.strip("\r")
@register("tilde")
def loading(cancel):
sock, meta = (yield)
if meta.get("source") != "server": return
if meta.get("command") != "NOTICE": return
if "hostname" not in meta.get("params"): return
cancel()
subscribe("tilde", "identify")
sock.send(user(name))
sock.send(nick(name))
@register("tilde")
def ping(cancel):
sock, meta = (yield)
if meta.get("command") != "PING": return
params = meta.get("params", "")
sock.send(pack("PONG {params}"))
@register("tilde")
def identify(cancel):
sock, meta = (yield)
to_register = "please choose a different nick"
params = meta.get("params", "")
if meta.get("source") != "nick": return
if meta.get("nick") != "NickServ": return
if to_register in params:
sock.send(iden(pswd))
return
if meta.get("command") != "MODE": return
if " " not in params: return
name, flag = clean(params).split(" ", 1)
if flag == "+r":
cancel()
subscribe_many("tilde", ["handle_332", "handle_333", "handle_353",
"handle_366"])
sock.send(mode(name, "+B"))
if "channels" not in state:
state["channels"] = dict()
channels = state["channels"]
for chan in auto:
channels[chan] = dict()
sock.send(join(chan))
@register("tilde")
def handle_332(cancel):
sock, meta = (yield)
if meta.get("source") != "server": return
if meta.get("command") != "332": return
params = clean(meta.get("params", ""))
name, channel, topic = params.split(" ", 2)
state["channels"][channel]["topic"] = topic
@register("tilde")
def handle_333(cancel):
sock, meta = (yield)
if meta.get("source") != "server": return
if meta.get("command") != "333": return
params = clean(meta.get("params", ""))
name, channel, topic_nick, topic_time = params.split(" ", 3)
state["channels"][channel]["topic_nick"] = topic_nick
state["channels"][channel]["topic_time"] = topic_time
@register("tilde")
def handle_353(cancel):
sock, meta = (yield)
if meta.get("source") != "server": return
if meta.get("command") != "353": return
params = clean(meta.get("params", "")).strip()
pattern = regex("([@*=])")
separator = pattern.search(params).group()
channel_mode = channel_modes.get(separator, "public")
name, other = params.split(f" {separator} ", 1)
channel, members = other.split(" ", 1)
state["channels"][channel]["mode"] = channel_mode
state["channels"][channel]["members"] = members.split(" ")
@register("tilde")
def handle_366(cancel):
sock, meta = (yield)
if meta.get("source") != "server": return
if meta.get("command") != "366": return
params = clean(meta.get("params", ""))
name, channel, remainder = params.split(" ", 2)
chan = state["channels"][channel]
print(chan)
socket_servers = sockets["servers"]
for server_type, servers in settings.get("servers", dict()).items():
if server_type not in socket_servers:
socket_servers[server_type] = dict()
socket_type_servers = socket_servers[server_type]
for server_name, server_info in servers.items():
server_full_name = "/".join([server_type, server_name])
server_sock = wrap_socket(socket(AF_INET, SOCK_STREAM))
server_sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server_host = server_info.get("host", "")
server_port = server_info.get("port", 0)
try:
server_sock.bind((server_host, server_port))
except (BrokenPipeError, OSError) as e:
continue
server_sock.setblocking(0)
server_sock.settimeout(0)
socket_type_server[server_name] = dict()server_sock
socket_type_server[server_name]["sock"] = server_sock
socket_type_server[server_name]["clients"] = dict()
server_clients = socket_type_server[server_name]["clients"]
for client_type, clients in server_info.get("clients", dict()).items():
if client_type not in server_clients:
server_clients[client_type] = dict()
socket_type_clients = server_clients[client_type]
for client_name, client_info in clients.items():
client_full_name = "/".join([client_type, client_name])
client_sock = wrap_socket(socket(AF_INET, SOCK_STREAM))
client_sock.connect((host, port))
socket_type_clients[client-name] = dict()
socket_type_clients[client-name]["sock"] = client_sock
#subscribe_many(client_type, ["loading", "ping"])
while True:
try:
for line in get(sock):
if line is None or len(line) == 0:
continue
meta = parse_irc_message(line)
#print(meta)
print(line)
for key in list(listeners):
listeners[key].send((sock, meta))
except KeyboardInterrupt:
break