irc-bot/prelude.py

194 lines
5.2 KiB
Python
Executable File

#!env/bin/python3
from requests import get as req_get, post as req_post, delete as req_delete
from socket import socket, timeout as sock_timeout
from socket import AF_INET, SOCK_STREAM
from ssl import wrap_socket
from traceback import format_exc
from json import dumps as json_dumps, loads as json_loads
from pathlib import Path
from importlib import import_module
def load_plugins(location):
scope = dict()
plugins = dict()
modules = Path(__file__).parent
for loc in location.split("/"):
modules = modules / loc
for module in modules.glob("*.py"):
if not module.is_file() or module.name == "__init__.py":
continue
package = module.name.replace(".py", "")
plugins[package] = list()
# Equivalent of doing "import <package>.<module>"
container = location.replace("/", ".")
script = import_module(f"{container}.{package}")
for variable in dir(script):
# Ignore Python internals and ACTPlugin itself
if variable.startswith("__"):
continue
# We only need the actual plugin here
script_var = getattr(script, variable)
if not callable(script_var):
continue
plugins[package].append(script_var)
return plugins
def isnumeric(test):
test.replace(".", "", 1).isdigit()
def cleanup(parameters, dirt=":"):
params = parameters.partition(dirt)
return f"{params[0]}{params[2]}"
# from section 2.3.1 of rfc1459
def parse_irc_message(message):
meta = dict()
meta["raw"] = message
if message is None or len(message) == 0:
return meta
has_tags = message[0] == "@"
meta["has_tags"] = has_tags
if has_tags:
tags, message = message.lstrip("@").split(" ", 1)
meta["tags"] = dict()
for tag in tags.split(";"):
if "=" in tag:
key, value = tag.split("=", 1)
meta["tags"][key] = value
else:
meta["tags"][tag] = None
has_prefix = message[0] == ":"
message = message.lstrip(":")
meta["has_prefix"] = has_prefix
if not has_prefix:
command, params = message.lstrip(":").split(" ", 1)
meta["type"] = "alpha"
else:
prefix, remainder = message.split(" ", 1)
if " " in remainder:
command, params = remainder.split(" ", 1)
else:
command = remainder
params = ""
source = "nick" if "!" in prefix else "server"
meta["prefix"] = prefix
meta["source"] = source
meta["type"] = "numeric" if isnumeric(command) else "alpha"
if source == "nick":
nick, user_info = prefix.split("!", 1)
user, host = user_info.split("@", 1)
meta["nick"] = nick
meta["user"] = user
meta["host"] = host
meta["command"] = command
meta["params"] = cleanup(params)
return meta
def is_admin(message, admin):
tags = message.get("tags", dict())
account = tags.get("account")
nick = message.get("nick")
if not all([account, nick]):
return False
if account != admin:
return False
elif nick != admin:
return False
return True
def privmsg(message):
return message.get("params", " ").split(" ", 1)
def irc(host, port, plugins, secure=True):
while True:
sock = socket(AF_INET, SOCK_STREAM)
if secure:
sock = wrap_socket(sock)
try:
sock.connect((host, port))
break
except ConnectionRefusedError as e:
continue
cache = b""
state = dict()
settings_path = Path(__file__).resolve().parent / "settings.json"
settings_text = settings_path.read_text()
settings = json_loads(settings_text)
state["_flags"] = dict()
state["settings"] = settings
while not state.get("stop"):
data = cache
try:
packet = sock.recv(512)
except Exception as e:
print(format_exc())
break
data = data + packet
if len(data) == 0:
continue
newline = b"\n"
if data[-1] != newline:
data_buffer, separator, cache = data.rpartition(newline)
data = data_buffer + separator
else:
cache = b""
for line in data.split(newline):
try:
message = line.decode("utf8")
except UnicodeDecodeError as e:
try:
message = line.decode("iso-8859-1")
except UnicodeDecodeError as e:
continue
if len(message) == 0:
continue
print(message)
parsed = parse_irc_message(message.strip("\r"))
print(parsed)
for plugin_name, plugin_callbacks in plugins.items():
for callback in plugin_callbacks:
state = callback(state, sock, parsed)
flags = state.get("_flags")
for key, value in flags.items():
state[key] = value
print("CLOSED")
if __name__ == "__main__":
plugins = load_plugins("plugins")
irc("irc.tilde.chat", 6697, plugins, True)