235 lines
7.0 KiB
Python
235 lines
7.0 KiB
Python
|
#!env/bin/python3
|
||
|
|
||
|
from socket import socket, AF_INET, SOCK_STREAM, timeout as sock_timeout
|
||
|
from ssl import wrap_socket
|
||
|
from traceback import format_exc
|
||
|
from json import dumps as json_dumps, loads as json_loads
|
||
|
from pathlib import Path
|
||
|
from importlib import import_module
|
||
|
from sys import stderr
|
||
|
|
||
|
def isnumeric(test):
|
||
|
test.replace(".", "", 1).isdigit()
|
||
|
|
||
|
def eprint(*args, **kwargs):
|
||
|
print(*args, file=stderr, **kwargs)
|
||
|
|
||
|
class Bot:
|
||
|
def __init__(self, host, port, plugins, secure=True, timeout=0):
|
||
|
self.host = host
|
||
|
self.port = port
|
||
|
self.secure = secure
|
||
|
self.timeout= timeout
|
||
|
self.plugins = self._load_plugins(plugins)
|
||
|
self.sock = None
|
||
|
|
||
|
self._connect()
|
||
|
|
||
|
def _connect(self):
|
||
|
print(f"Connecting to {self.host}:{self.port}...")
|
||
|
while True:
|
||
|
self.sock = socket(AF_INET, SOCK_STREAM)
|
||
|
if self.secure:
|
||
|
self.sock = wrap_socket(self.sock)
|
||
|
|
||
|
if self.timeout > 0:
|
||
|
self.sock.settimeout(self.timeout)
|
||
|
|
||
|
try:
|
||
|
self.sock.connect((self.host, self.port))
|
||
|
print("Connected!")
|
||
|
break
|
||
|
|
||
|
except Exception as e:
|
||
|
print("Trying to connect again...")
|
||
|
continue
|
||
|
|
||
|
def _load_plugins(self, location):
|
||
|
scope = dict()
|
||
|
plugins = dict()
|
||
|
modules = Path(__file__).parent
|
||
|
for loc in location.split("/"):
|
||
|
modules = modules / loc
|
||
|
|
||
|
for module in modules.glob("*.py"):
|
||
|
if not module.is_file() or module.name == "__init__.py":
|
||
|
continue
|
||
|
|
||
|
package = module.name.replace(".py", "")
|
||
|
plugins[package] = list()
|
||
|
|
||
|
# Equivalent of doing "import <package>.<module>"
|
||
|
container = location.replace("/", ".")
|
||
|
script = import_module(f"{container}.{package}")
|
||
|
|
||
|
for variable in dir(script):
|
||
|
# Ignore Python internals
|
||
|
if variable.startswith("__"):
|
||
|
continue
|
||
|
|
||
|
if not variable.startswith("bot_"):
|
||
|
continue
|
||
|
|
||
|
# We only need the actual plugin here
|
||
|
script_var = getattr(script, variable)
|
||
|
if not callable(script_var):
|
||
|
continue
|
||
|
|
||
|
plugins[package].append(script_var)
|
||
|
|
||
|
return plugins
|
||
|
|
||
|
def _cleanup(self, parameters, dirt=":"):
|
||
|
params = parameters.partition(dirt)
|
||
|
return f"{params[0]}{params[2]}"
|
||
|
|
||
|
# from section 2.3.1 of rfc1459
|
||
|
def parse_irc_message(self, message):
|
||
|
meta = dict()
|
||
|
meta["raw"] = message
|
||
|
if message is None or len(message) == 0:
|
||
|
return meta
|
||
|
|
||
|
has_tags = message[0] == "@"
|
||
|
meta["has_tags"] = has_tags
|
||
|
|
||
|
if has_tags:
|
||
|
tags, message = message.lstrip("@").split(" ", 1)
|
||
|
meta["tags"] = dict()
|
||
|
|
||
|
for tag in tags.split(";"):
|
||
|
if "=" in tag:
|
||
|
key, value = tag.split("=", 1)
|
||
|
meta["tags"][key] = value
|
||
|
|
||
|
else:
|
||
|
meta["tags"][tag] = None
|
||
|
|
||
|
has_prefix = message[0] == ":"
|
||
|
message = message.lstrip(":")
|
||
|
meta["has_prefix"] = has_prefix
|
||
|
|
||
|
if not has_prefix:
|
||
|
command, params = message.lstrip(":").split(" ", 1)
|
||
|
meta["type"] = "alpha"
|
||
|
|
||
|
else:
|
||
|
prefix, remainder = message.split(" ", 1)
|
||
|
if " " in remainder:
|
||
|
command, params = remainder.split(" ", 1)
|
||
|
|
||
|
else:
|
||
|
command = remainder
|
||
|
params = ""
|
||
|
|
||
|
source = "nick" if "!" in prefix else "server"
|
||
|
meta["prefix"] = prefix
|
||
|
meta["source"] = source
|
||
|
meta["type"] = "numeric" if isnumeric(command) else "alpha"
|
||
|
if source == "nick":
|
||
|
nick, user_info = prefix.split("!", 1)
|
||
|
user, host = user_info.split("@", 1)
|
||
|
meta["nick"] = nick
|
||
|
meta["user"] = user
|
||
|
meta["host"] = host
|
||
|
|
||
|
meta["command"] = command
|
||
|
meta["params"] = self._cleanup(params)
|
||
|
|
||
|
return meta
|
||
|
|
||
|
def is_admin(self, message, admin):
|
||
|
tags = message.get("tags", dict())
|
||
|
account = tags.get("account")
|
||
|
nick = message.get("nick")
|
||
|
if not all([account, nick]):
|
||
|
return False
|
||
|
if account != admin:
|
||
|
return False
|
||
|
elif nick != admin:
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def privmsg(self, message):
|
||
|
return message.get("params", " ").split(" ", 1)
|
||
|
|
||
|
def send(self, message):
|
||
|
try:
|
||
|
self.sock.send(f"{message}\r\n".encode())
|
||
|
|
||
|
except Exception as e:
|
||
|
eprint(f"ERROR:\n{format_exc()}")
|
||
|
|
||
|
def sendto(self, channel, message):
|
||
|
self.send(f"PRIVMSG {channel} :{message}")
|
||
|
|
||
|
def run(self):
|
||
|
cache = b""
|
||
|
state = dict()
|
||
|
|
||
|
settings_path = Path(__file__).resolve().parent / "settings.json"
|
||
|
settings_text = settings_path.read_text()
|
||
|
settings = json_loads(settings_text)
|
||
|
state["_flags"] = dict()
|
||
|
state["settings"] = settings
|
||
|
|
||
|
while not state.get("stop"):
|
||
|
data = cache
|
||
|
try:
|
||
|
packet = self.sock.recv(512)
|
||
|
|
||
|
except Exception as e:
|
||
|
eprint(f"ERROR:\n{format_exc()}")
|
||
|
break
|
||
|
|
||
|
data = data + packet
|
||
|
if len(data) == 0:
|
||
|
continue
|
||
|
|
||
|
newline = b"\n"
|
||
|
if data[-1] != newline:
|
||
|
data_buffer, separator, cache = data.rpartition(newline)
|
||
|
data = data_buffer + separator
|
||
|
|
||
|
else:
|
||
|
cache = b""
|
||
|
|
||
|
for line in data.split(newline):
|
||
|
try:
|
||
|
message = line.decode("utf8")
|
||
|
|
||
|
except UnicodeDecodeError as e:
|
||
|
try:
|
||
|
message = line.decode("iso-8859-1")
|
||
|
|
||
|
except UnicodeDecodeError as e:
|
||
|
eprint(f"ERROR:\n{format_exc()}")
|
||
|
continue
|
||
|
|
||
|
if len(message) == 0:
|
||
|
continue
|
||
|
|
||
|
#print(message)
|
||
|
parsed = self.parse_irc_message(message.strip("\r"))
|
||
|
#print(parsed)
|
||
|
for plugin_name, plugin_callbacks in self.plugins.items():
|
||
|
for callback in plugin_callbacks:
|
||
|
try:
|
||
|
state = callback(self, state, parsed)
|
||
|
|
||
|
except Exception as e:
|
||
|
eprint(f"ERROR:\n{format_exc()}")
|
||
|
continue
|
||
|
|
||
|
if state is None:
|
||
|
eprint(f"ERROR: {plugin_name} returned None")
|
||
|
continue
|
||
|
|
||
|
flags = state.get("_flags")
|
||
|
for key, value in flags.items():
|
||
|
state[key] = value
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
bot = Bot("irc.tilde.chat", 6697, "plugins", True)
|
||
|
bot.run()
|