296 lines
9.4 KiB
Python
296 lines
9.4 KiB
Python
from abots.helpers import jsto, sha256, generator
|
|
from core.parsers import parse_irc_message
|
|
from socket import socket, timeout as sock_timeout
|
|
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
|
|
from ssl import wrap_socket
|
|
from time import sleep
|
|
from re import compile as regex
|
|
|
|
"""
|
|
|
|
Socket Flow:
|
|
- Get new messages
|
|
- Send messages to message queue
|
|
- Repeat
|
|
|
|
App Init Flow:
|
|
- Initialize state
|
|
- Start server(s)
|
|
- Start their clients
|
|
|
|
App Flow:
|
|
- Get new server messages
|
|
- Get new client messages
|
|
- Repeat
|
|
|
|
Message Queue Flow:
|
|
- In FIFO order, get next queue entry
|
|
- Dispatch entry to reducers queue as action
|
|
- Repeat
|
|
|
|
Reducers Flow:
|
|
- In FIFO order, get next action
|
|
- Send it to relevant reducer(s)
|
|
- Change state of application if needed
|
|
- Dispatch new state to hooks queue
|
|
- Repeat
|
|
|
|
Hooks Flow:
|
|
- In FIFO order, get new state
|
|
- Send it to relevant hooks(s)
|
|
- Run side effects triggered by new state
|
|
- Send message(s) to socket(s)
|
|
- Send message(s) to logger queue
|
|
- Dispatch action(s) to database queue
|
|
- Dispatch action(s) to reducer(s)
|
|
- Repeat
|
|
|
|
Database Flow:
|
|
- In FIFO order, get new action
|
|
- Run action
|
|
- Send status/result to sender
|
|
- Repeat
|
|
|
|
Logger Flow:
|
|
- In FIFO order, get new log message
|
|
- Determine log level
|
|
- Log message against logger
|
|
- Repeat
|
|
|
|
-> Servers -\ /-> Reducer1 -\
|
|
/ |-> Messages -> Reducers -+--> Redcuer2 --+-> State -> Listeners
|
|
|-> Clients -/ ^ ^ \-> Reducer3 -/ | |
|
|
| | | v v
|
|
| | | Hook1 Hook2
|
|
| | | \ /
|
|
| | | v
|
|
|____________________|___________|______________________________________/
|
|
|
|
"""
|
|
|
|
with open("settings.json", "r") as config:
|
|
settings = jsto(config.read())
|
|
|
|
tilde = settings["irc"]["clients"]["tilde"]
|
|
host = tilde["host"]
|
|
port = tilde["port"]
|
|
name = tilde["name"]
|
|
pswd = tilde["password"]
|
|
auto = tilde["auto-join"]
|
|
|
|
assert isinstance(auto, list), "Expected list"
|
|
|
|
pack = lambda msg: f"{msg}\r\n".encode()
|
|
priv = lambda src, msg: pack(f"PRIVMSG {src} :{msg}")
|
|
join = lambda chan: pack(f"JOIN {chan}")
|
|
user = lambda name: pack(f"USER {name} - - -")
|
|
nick = lambda name: pack(f"NICK {name}")
|
|
iden = lambda pswd: pack(f"PRIVMSG NickServ :IDENTIFY {pswd}")
|
|
mode = lambda target, flag: pack(f"MODE {target} {flag}")
|
|
|
|
def clean(parameters, dirt=":"):
|
|
params = parameters.partition(dirt)
|
|
return params[0] + params[2]
|
|
|
|
hooks = dict()
|
|
listeners = dict()
|
|
state = dict()
|
|
sockets = dict()
|
|
sockets["server"] = dict()
|
|
sockets["clients"] = dict()
|
|
|
|
channel_modes = dict()
|
|
channel_modes["@"] = "secret"
|
|
channel_modes["*"] = "private"
|
|
channel_modes["="] = "public"
|
|
|
|
def register(namespace):
|
|
assert isinstance(namespace, str), "Expected str"
|
|
if namespace not in list(hooks):
|
|
hooks[namespace] = dict()
|
|
ns_hooks = hooks[namespace]
|
|
def wrapper_register(func):
|
|
assert callable(func), "Expected function"
|
|
hook_name = func.__name__
|
|
if hook_name in list(ns_hooks):
|
|
return
|
|
ns_hooks[hook_name] = func
|
|
#return generator(func)
|
|
return wrapper_register
|
|
|
|
def unsubscribe(key):
|
|
def cancel():
|
|
listeners.pop(key, None)
|
|
return cancel
|
|
|
|
def subscribe(namespace, name):
|
|
assert isinstance(namespace, str), "Expected str"
|
|
assert isinstance(name, str), "Expected str"
|
|
hook = hooks.get(namespace, dict()).get(name)
|
|
key = sha256()
|
|
if hook is None:
|
|
return
|
|
listeners[key] = hook
|
|
cancel = unsubscribe(key)
|
|
listeners[key] = generator(hook)(cancel)
|
|
|
|
def subscribe_many(namespace, names):
|
|
assert isinstance(names, list), "Expected list"
|
|
for name in names:
|
|
subscribe(namespace, name)
|
|
|
|
def get(sock, size=1024):
|
|
cache = b""
|
|
newline = b"\n"
|
|
while True:
|
|
packet = cache + sock.recv(size)
|
|
if packet[-1] != newline:
|
|
buf, separator, remainder = packet.rpartition(newline)
|
|
cache = remainder
|
|
packet = buf + separator
|
|
else:
|
|
cache = b""
|
|
if len(packet) == 0:
|
|
yield None
|
|
for line in packet.split(newline):
|
|
try:
|
|
decoded = line.decode("utf-8")
|
|
except UnicodeDecodeError:
|
|
try:
|
|
decoded = line.decode("iso-8859-1")
|
|
except UnicodeDecodeError:
|
|
yield None
|
|
yield decoded.strip("\r")
|
|
|
|
@register("tilde")
|
|
def loading(cancel):
|
|
sock, meta = (yield)
|
|
if meta.get("source") != "server": return
|
|
if meta.get("command") != "NOTICE": return
|
|
if "hostname" not in meta.get("params"): return
|
|
cancel()
|
|
subscribe("tilde", "identify")
|
|
sock.send(user(name))
|
|
sock.send(nick(name))
|
|
|
|
@register("tilde")
|
|
def ping(cancel):
|
|
sock, meta = (yield)
|
|
if meta.get("command") != "PING": return
|
|
params = meta.get("params", "")
|
|
sock.send(pack("PONG {params}"))
|
|
|
|
@register("tilde")
|
|
def identify(cancel):
|
|
sock, meta = (yield)
|
|
to_register = "please choose a different nick"
|
|
params = meta.get("params", "")
|
|
if meta.get("source") != "nick": return
|
|
if meta.get("nick") != "NickServ": return
|
|
if to_register in params:
|
|
sock.send(iden(pswd))
|
|
return
|
|
if meta.get("command") != "MODE": return
|
|
if " " not in params: return
|
|
name, flag = clean(params).split(" ", 1)
|
|
if flag == "+r":
|
|
cancel()
|
|
subscribe_many("tilde", ["handle_332", "handle_333", "handle_353",
|
|
"handle_366"])
|
|
sock.send(mode(name, "+B"))
|
|
if "channels" not in state:
|
|
state["channels"] = dict()
|
|
channels = state["channels"]
|
|
for chan in auto:
|
|
channels[chan] = dict()
|
|
sock.send(join(chan))
|
|
|
|
@register("tilde")
|
|
def handle_332(cancel):
|
|
sock, meta = (yield)
|
|
if meta.get("source") != "server": return
|
|
if meta.get("command") != "332": return
|
|
params = clean(meta.get("params", ""))
|
|
name, channel, topic = params.split(" ", 2)
|
|
state["channels"][channel]["topic"] = topic
|
|
|
|
@register("tilde")
|
|
def handle_333(cancel):
|
|
sock, meta = (yield)
|
|
if meta.get("source") != "server": return
|
|
if meta.get("command") != "333": return
|
|
params = clean(meta.get("params", ""))
|
|
name, channel, topic_nick, topic_time = params.split(" ", 3)
|
|
state["channels"][channel]["topic_nick"] = topic_nick
|
|
state["channels"][channel]["topic_time"] = topic_time
|
|
|
|
@register("tilde")
|
|
def handle_353(cancel):
|
|
sock, meta = (yield)
|
|
if meta.get("source") != "server": return
|
|
if meta.get("command") != "353": return
|
|
params = clean(meta.get("params", "")).strip()
|
|
pattern = regex("([@*=])")
|
|
separator = pattern.search(params).group()
|
|
channel_mode = channel_modes.get(separator, "public")
|
|
name, other = params.split(f" {separator} ", 1)
|
|
channel, members = other.split(" ", 1)
|
|
state["channels"][channel]["mode"] = channel_mode
|
|
state["channels"][channel]["members"] = members.split(" ")
|
|
|
|
@register("tilde")
|
|
def handle_366(cancel):
|
|
sock, meta = (yield)
|
|
if meta.get("source") != "server": return
|
|
if meta.get("command") != "366": return
|
|
params = clean(meta.get("params", ""))
|
|
name, channel, remainder = params.split(" ", 2)
|
|
chan = state["channels"][channel]
|
|
print(chan)
|
|
|
|
socket_servers = sockets["servers"]
|
|
for server_type, servers in settings.get("servers", dict()).items():
|
|
if server_type not in socket_servers:
|
|
socket_servers[server_type] = dict()
|
|
socket_type_servers = socket_servers[server_type]
|
|
for server_name, server_info in servers.items():
|
|
server_full_name = "/".join([server_type, server_name])
|
|
server_sock = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
|
server_sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
|
|
server_host = server_info.get("host", "")
|
|
server_port = server_info.get("port", 0)
|
|
try:
|
|
server_sock.bind((server_host, server_port))
|
|
except (BrokenPipeError, OSError) as e:
|
|
continue
|
|
server_sock.setblocking(0)
|
|
server_sock.settimeout(0)
|
|
socket_type_server[server_name] = dict()server_sock
|
|
socket_type_server[server_name]["sock"] = server_sock
|
|
socket_type_server[server_name]["clients"] = dict()
|
|
server_clients = socket_type_server[server_name]["clients"]
|
|
for client_type, clients in server_info.get("clients", dict()).items():
|
|
if client_type not in server_clients:
|
|
server_clients[client_type] = dict()
|
|
socket_type_clients = server_clients[client_type]
|
|
for client_name, client_info in clients.items():
|
|
client_full_name = "/".join([client_type, client_name])
|
|
client_sock = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
|
client_sock.connect((host, port))
|
|
socket_type_clients[client-name] = dict()
|
|
socket_type_clients[client-name]["sock"] = client_sock
|
|
#subscribe_many(client_type, ["loading", "ping"])
|
|
|
|
while True:
|
|
try:
|
|
for line in get(sock):
|
|
if line is None or len(line) == 0:
|
|
continue
|
|
meta = parse_irc_message(line)
|
|
#print(meta)
|
|
print(line)
|
|
for key in list(listeners):
|
|
listeners[key].send((sock, meta))
|
|
except KeyboardInterrupt:
|
|
break
|