irc-bot/bot.py

453 lines
14 KiB
Python
Executable File

#!env/bin/python3
from socket import socket, AF_INET, SOCK_STREAM, timeout as sock_timeout
from ssl import wrap_socket
from traceback import format_exc
from json import dumps as json_dumps, loads as json_loads
from pathlib import Path
from importlib import import_module
from sys import stderr
from argparse import ArgumentParser, FileType
from time import time, sleep, mktime
from threading import Thread
from queue import Queue, Empty
from select import select
from datetime import datetime
def isnumeric(test):
test.replace(".", "", 1).isdigit()
def eprint(*args, **kwargs):
print(*args, file=stderr, **kwargs)
class Bot:
def __init__(self, config):
self.server = "localhost"
self.port = 6667
self.secure = False
self.timeout = 0
self.plugins_dir = "plugins"
self.sock = None
self.read_sock = None
self.write_sock = None
self.scripts = dict()
self.plugins = dict()
self.settings = dict()
self.mode = "reading"
self.inputs = list()
self.outputs = list()
self._load_settings(config)
self._load_plugins()
self.events = Queue()
self.actions = Queue()
self.state = dict()
self.state["_flags"] = dict()
self.state["settings"] = self.settings
def _connect(self):
print(f"Connecting to {self.server}:{self.port}...")
while True:
self.sock = socket(AF_INET, SOCK_STREAM)
if self.secure:
self.sock = wrap_socket(self.sock)
self.sock.setblocking(False)
if self.timeout > 0:
self.sock.settimeout(self.timeout)
try:
self.sock.connect((self.server, self.port))
print("Connected!")
break
except Exception as e:
print("Trying to connect again...")
sleep(1)
continue
# Needs to be run twice since connect consumes timeout
if self.timeout > 0:
self.sock.settimeout(self.timeout)
def _load_settings(self, config):
if config is None:
settings_path = Path(__file__).resolve().parent / "settings.json"
settings_text = settings_path.read_text()
else:
settings_text = config.read()
self.settings = json_loads(settings_text)
meta_settings = self.settings.get("_meta", dict())
self.server = meta_settings.get("server", self.server)
self.port = meta_settings.get("port", self.port)
self.secure = meta_settings.get("secure", self.secure)
self.timeout = meta_settings.get("timeout", self.timeout)
self.plugins_dir = meta_settings.get("plugins_dir", self.plugins_dir)
def _load_plugins(self):
scope = dict()
modules = Path(__file__).parent
container = self.plugins_dir.replace("/", ".")
for loc in self.plugins_dir.split("/"):
modules = modules / loc
ignore_plugins = self.settings.get("ignore_plugins", list())
extra_plugins = self.settings.get("extra_plugins", list())
for module in modules.glob("*.py"):
if not module.is_file() or module.name == "__init__.py":
continue
package = module.name.replace(".py", "")
if package in ignore_plugins:
continue
if "private" in package and package not in extra_plugins:
continue
self.plugins[package] = list()
# Equivalent of doing "import <package>.<module>"
script = import_module(f"{container}.{package}")
self.scripts[package] = script
for variable in dir(script):
# Ignore Python internals
if variable.startswith("__"):
continue
if not variable.startswith("bot_"):
continue
# We only need the actual plugin here
script_var = getattr(script, variable)
if not callable(script_var):
continue
self.plugins[package].append(script_var)
def _cleanup(self, parameters, dirt=":"):
params = parameters.partition(dirt)
return f"{params[0]}{params[2]}"
# from section 2.3.1 of rfc1459
def parse_irc_message(self, message):
meta = dict()
meta["raw"] = message
if message is None or len(message) == 0:
return meta
has_tags = message[0] == "@"
meta["has_tags"] = has_tags
if has_tags:
tags, message = message.lstrip("@").split(" ", 1)
meta["tags"] = dict()
for tag in tags.split(";"):
if "=" in tag:
key, value = tag.split("=", 1)
meta["tags"][key] = value
else:
meta["tags"][tag] = None
has_prefix = message[0] == ":"
message = message.lstrip(":")
meta["has_prefix"] = has_prefix
if not has_prefix:
command, params = message.lstrip(":").split(" ", 1)
meta["type"] = "alpha"
meta["command"] = command
meta["params"] = self._cleanup(params)
else:
prefix, remainder = message.split(" ", 1)
if " " in remainder:
command, params = remainder.split(" ", 1)
else:
command = remainder
params = ""
meta["command"] = command
meta["params"] = self._cleanup(params)
source = "nick" if "!" in prefix else "server"
meta["prefix"] = prefix
meta["source"] = source
meta["type"] = "numeric" if isnumeric(command) else "alpha"
if source == "nick":
nick, user_info = prefix.split("!", 1)
user, host = user_info.split("@", 1)
meta["nick"] = nick
meta["user"] = user
meta["host"] = host
if command == "PRIVMSG":
channel, contents = meta["params"].split(" ", 1)
meta["channel"] = channel
meta["contents"] = contents
return meta
def is_admin(self, message, admin):
tags = message.get("tags", dict())
account = tags.get("account")
nick = message.get("nick")
if not all([account, nick]):
return False
if account != admin:
return False
elif nick != admin:
return False
return True
def privmsg(self, message):
return message.get("params", " ").split(" ", 1)
def write(self, message):
# DEBUG
print(f">> {message}")
try:
self.write_sock.send(f"{message}\r\n".encode())
return True
except sock_timeout as e:
return False
except Exception as e:
eprint(f"ERROR:\n{format_exc()}")
return False
# Sanity fail-safe
return False
def send(self, message):
self.actions.put(message)
def sendto(self, channel, message):
self.send(f"PRIVMSG {channel} :{message}")
def join(self, channel):
self.send(f"JOIN {channel}")
def leave(self, channel, farewell="Bye-bye!"):
self.send(f"PART {channel} :{farewell}")
def subscribe(self, wait, callback, context=dict()):
input_time, time_unit = int(wait[:-1]), wait[-1]
wait_time = 0
if time_unit == "s":
wait_time = input_time
if time_unit == "m":
wait_time = input_time * 60
if time_unit == "h":
wait_time = input_time * 60 * 60
if time_unit == "d":
wait_time = input_time * 60 * 60 * 24
if time_unit == "w":
wait_time = input_time * 60 * 60 * 24 * 7
at = time() + wait_time
event = at, callback, context
self.events.put(event)
def process_events(self):
events = list()
#print("Processing events...")
while not self.events.empty():
event = self.events.get()
if not isinstance(event, tuple):
continue
if len(event) != 3 or None in event:
continue
at, callback, context = event
if time() < at:
print(f"Skipping: {event}")
events.append(event)
continue
print(f"Running: {event}")
callback(self, **context)
for event in events:
self.events.put(event)
def toggle_mode(self):
curr_index = 0 if self.mode == "reading" else 1
next_index = 1 if curr_index == 0 else 0
modes = ["reading", "writing"]
socks = [self.read_sock, self.write_sock]
sock_lists = [self.inputs, self.outputs]
curr_sock = socks[curr_index]
next_sock = socks[next_index]
curr_sock_list = sock_lists[curr_index]
next_sock_list = sock_lists[next_index]
self.mode = modes[next_index]
if curr_sock in curr_sock_list:
curr_sock_list.remove(curr_sock)
if curr_sock not in next_sock_list:
next_sock_list.append(curr_sock)
def set_read_mode(self):
self.mode == "reading"
if self.write_sock in self.outputs:
self.outputs.remove(self.write_sock)
if self.write_sock not in self.inputs:
self.inputs.append(self.write_sock)
def run(self):
cache = b""
#Thread(target=self.process_events, daemon=True).start()
self._connect()
self.inputs = [self.sock]
while not self.state.get("stop"):
socks = select(self.inputs, self.outputs, self.inputs, 0)
if not any(socks):
eprint("ERROR: Timed out!")
# NOTE - Anything to do while it waits should be done here:
# <waiting>
self.process_events()
# </waiting>
sleep(1)
continue
read_socks, write_socks, exc_socks = socks
if exc_socks:
print(exc_socks)
has_read_sock = len(read_socks) > 0
has_write_sock = len(write_socks) > 0
if has_read_sock:
self.read_sock = read_socks[0]
if has_write_sock:
self.write_sock = write_socks[0]
# This needs to be done first or it will wait on response to run
requeue = list()
while not self.actions.empty():
action = self.actions.get()
written = self.write(action)
if not written:
requeue.append(action)
else:
#print("Done writing")
for rq in requeue:
print("Requeuing")
self.actions.put(rq)
if not has_read_sock:
self.toggle_mode()
data = cache
try:
packet = self.read_sock.recv(512)
except sock_timeout as e:
#eprint("Timed out on read")
#self.process_events()
self.toggle_mode()
continue
except Exception as e:
eprint(f"ERROR:\n{format_exc()}")
break
data = data + packet
if len(data) == 0:
self.toggle_mode()
continue
newline = b"\n"
if data[-1] != newline:
data_buffer, separator, cache = data.rpartition(newline)
data = data_buffer + separator
else:
cache = b""
for line in data.split(newline):
try:
message = line.decode("utf8")
except UnicodeDecodeError as e:
try:
message = line.decode("iso-8859-1")
except UnicodeDecodeError as e:
eprint(f"ERROR:\n{format_exc()}")
continue
if len(message) == 0:
continue
parsed = self.parse_irc_message(message.strip("\r"))
# DEBUG
lag = 0
tags = parsed.get("tags")
if tags:
time_tag = tags.get("time")
time_str = "%Y-%m-%dT%H:%M:%S.%fZ"
convert = datetime.strptime(time_tag, time_str).timetuple()
msgnow = mktime(convert)
now = datetime.utcnow().timestamp()
lag = now - msgnow
print(f"<< [[lag={lag:.2f}s]] {message}")
#print("%%", parsed)
for plugin_name, plugin_callbacks in self.plugins.items():
for callback in plugin_callbacks:
try:
callback(self, parsed)
except Exception as e:
eprint(f"ERROR:\n{format_exc()}")
continue
if self.state is None:
eprint(f"ERROR: {plugin_name} returned None")
continue
flags = self.state.get("_flags")
for key, value in flags.items():
self.state[key] = value
self.toggle_mode()
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("-c", nargs="?", type=FileType("r"), dest="config")
args = parser.parse_args()
bot = Bot(config=args.config)
bot.run()