#!env/bin/python3 from socket import socket, AF_INET, SOCK_STREAM, timeout as sock_timeout from ssl import wrap_socket from traceback import format_exc from json import dumps as json_dumps, loads as json_loads from pathlib import Path from importlib import import_module from sys import stderr from argparse import ArgumentParser, FileType from time import time, sleep, mktime from threading import Thread from queue import Queue, Empty from select import select from datetime import datetime def isnumeric(test): test.replace(".", "", 1).isdigit() def eprint(*args, **kwargs): print(*args, file=stderr, **kwargs) class Bot: def __init__(self, config): self.server = "localhost" self.port = 6667 self.secure = False self.timeout = 0 self.plugins_dir = "plugins" self.sock = None self.read_sock = None self.write_sock = None self.scripts = dict() self.plugins = dict() self.settings = dict() self.mode = "reading" self.inputs = list() self.outputs = list() self._load_settings(config) self._load_plugins() self.events = Queue() self.actions = Queue() self.state = dict() self.state["_flags"] = dict() self.state["settings"] = self.settings def _connect(self): print(f"Connecting to {self.server}:{self.port}...") while True: self.sock = socket(AF_INET, SOCK_STREAM) if self.secure: self.sock = wrap_socket(self.sock) self.sock.setblocking(False) if self.timeout > 0: self.sock.settimeout(self.timeout) try: self.sock.connect((self.server, self.port)) print("Connected!") break except Exception as e: print("Trying to connect again...") sleep(1) continue # Needs to be run twice since connect consumes timeout if self.timeout > 0: self.sock.settimeout(self.timeout) def _load_settings(self, config): if config is None: settings_path = Path(__file__).resolve().parent / "settings.json" settings_text = settings_path.read_text() else: settings_text = config.read() self.settings = json_loads(settings_text) meta_settings = self.settings.get("_meta", dict()) self.server = meta_settings.get("server", self.server) self.port = meta_settings.get("port", self.port) self.secure = meta_settings.get("secure", self.secure) self.timeout = meta_settings.get("timeout", self.timeout) self.plugins_dir = meta_settings.get("plugins_dir", self.plugins_dir) def _load_plugins(self): scope = dict() modules = Path(__file__).parent container = self.plugins_dir.replace("/", ".") for loc in self.plugins_dir.split("/"): modules = modules / loc ignore_plugins = self.settings.get("ignore_plugins", list()) extra_plugins = self.settings.get("extra_plugins", list()) for module in modules.glob("*.py"): if not module.is_file() or module.name == "__init__.py": continue package = module.name.replace(".py", "") if package in ignore_plugins: continue if "private" in package and package not in extra_plugins: continue self.plugins[package] = list() # Equivalent of doing "import ." script = import_module(f"{container}.{package}") self.scripts[package] = script for variable in dir(script): # Ignore Python internals if variable.startswith("__"): continue if not variable.startswith("bot_"): continue # We only need the actual plugin here script_var = getattr(script, variable) if not callable(script_var): continue self.plugins[package].append(script_var) def _cleanup(self, parameters, dirt=":"): params = parameters.partition(dirt) return f"{params[0]}{params[2]}" # from section 2.3.1 of rfc1459 def parse_irc_message(self, message): meta = dict() meta["raw"] = message if message is None or len(message) == 0: return meta has_tags = message[0] == "@" meta["has_tags"] = has_tags if has_tags: tags, message = message.lstrip("@").split(" ", 1) meta["tags"] = dict() for tag in tags.split(";"): if "=" in tag: key, value = tag.split("=", 1) meta["tags"][key] = value else: meta["tags"][tag] = None has_prefix = message[0] == ":" message = message.lstrip(":") meta["has_prefix"] = has_prefix if not has_prefix: command, params = message.lstrip(":").split(" ", 1) meta["type"] = "alpha" meta["command"] = command meta["params"] = self._cleanup(params) else: prefix, remainder = message.split(" ", 1) if " " in remainder: command, params = remainder.split(" ", 1) else: command = remainder params = "" meta["command"] = command meta["params"] = self._cleanup(params) source = "nick" if "!" in prefix else "server" meta["prefix"] = prefix meta["source"] = source meta["type"] = "numeric" if isnumeric(command) else "alpha" if source == "nick": nick, user_info = prefix.split("!", 1) user, host = user_info.split("@", 1) meta["nick"] = nick meta["user"] = user meta["host"] = host if command == "PRIVMSG": channel, contents = meta["params"].split(" ", 1) meta["channel"] = channel meta["contents"] = contents return meta def is_admin(self, message, admin): tags = message.get("tags", dict()) account = tags.get("account") nick = message.get("nick") if not all([account, nick]): return False if account != admin: return False elif nick != admin: return False return True def privmsg(self, message): return message.get("params", " ").split(" ", 1) def write(self, message): # DEBUG print(f">> {message}") try: self.write_sock.send(f"{message}\r\n".encode()) return True except sock_timeout as e: return False except Exception as e: eprint(f"ERROR:\n{format_exc()}") return False # Sanity fail-safe return False def send(self, message): self.actions.put(message) def sendto(self, channel, message): self.send(f"PRIVMSG {channel} :{message}") def join(self, channel): self.send(f"JOIN {channel}") def leave(self, channel, farewell="Bye-bye!"): self.send(f"PART {channel} :{farewell}") def subscribe(self, wait, callback, context=dict()): input_time, time_unit = int(wait[:-1]), wait[-1] wait_time = 0 if time_unit == "s": wait_time = input_time if time_unit == "m": wait_time = input_time * 60 if time_unit == "h": wait_time = input_time * 60 * 60 if time_unit == "d": wait_time = input_time * 60 * 60 * 24 if time_unit == "w": wait_time = input_time * 60 * 60 * 24 * 7 at = time() + wait_time event = at, callback, context self.events.put(event) def process_events(self): events = list() #print("Processing events...") while not self.events.empty(): event = self.events.get() if not isinstance(event, tuple): continue if len(event) != 3 or None in event: continue at, callback, context = event if time() < at: print(f"Skipping: {event}") events.append(event) continue print(f"Running: {event}") callback(self, **context) for event in events: self.events.put(event) def toggle_mode(self): curr_index = 0 if self.mode == "reading" else 1 next_index = 1 if curr_index == 0 else 0 modes = ["reading", "writing"] socks = [self.read_sock, self.write_sock] sock_lists = [self.inputs, self.outputs] curr_sock = socks[curr_index] next_sock = socks[next_index] curr_sock_list = sock_lists[curr_index] next_sock_list = sock_lists[next_index] self.mode = modes[next_index] if curr_sock in curr_sock_list: curr_sock_list.remove(curr_sock) if curr_sock not in next_sock_list: next_sock_list.append(curr_sock) def set_read_mode(self): self.mode == "reading" if self.write_sock in self.outputs: self.outputs.remove(self.write_sock) if self.write_sock not in self.inputs: self.inputs.append(self.write_sock) def run(self): cache = b"" #Thread(target=self.process_events, daemon=True).start() self._connect() self.inputs = [self.sock] while not self.state.get("stop"): socks = select(self.inputs, self.outputs, self.inputs, 0) if not any(socks): eprint("ERROR: Timed out!") # NOTE - Anything to do while it waits should be done here: # self.process_events() # sleep(1) continue read_socks, write_socks, exc_socks = socks if exc_socks: print(exc_socks) has_read_sock = len(read_socks) > 0 has_write_sock = len(write_socks) > 0 if has_read_sock: self.read_sock = read_socks[0] if has_write_sock: self.write_sock = write_socks[0] # This needs to be done first or it will wait on response to run requeue = list() while not self.actions.empty(): action = self.actions.get() written = self.write(action) if not written: requeue.append(action) else: #print("Done writing") for rq in requeue: print("Requeuing") self.actions.put(rq) if not has_read_sock: self.toggle_mode() data = cache try: packet = self.read_sock.recv(512) except sock_timeout as e: #eprint("Timed out on read") #self.process_events() self.toggle_mode() continue except Exception as e: eprint(f"ERROR:\n{format_exc()}") break data = data + packet if len(data) == 0: self.toggle_mode() continue newline = b"\n" if data[-1] != newline: data_buffer, separator, cache = data.rpartition(newline) data = data_buffer + separator else: cache = b"" for line in data.split(newline): try: message = line.decode("utf8") except UnicodeDecodeError as e: try: message = line.decode("iso-8859-1") except UnicodeDecodeError as e: eprint(f"ERROR:\n{format_exc()}") continue if len(message) == 0: continue parsed = self.parse_irc_message(message.strip("\r")) # DEBUG lag = 0 tags = parsed.get("tags") if tags: time_tag = tags.get("time") time_str = "%Y-%m-%dT%H:%M:%S.%fZ" convert = datetime.strptime(time_tag, time_str).timetuple() msgnow = mktime(convert) now = datetime.utcnow().timestamp() lag = now - msgnow print(f"<< [[lag={lag:.2f}s]] {message}") #print("%%", parsed) for plugin_name, plugin_callbacks in self.plugins.items(): for callback in plugin_callbacks: try: callback(self, parsed) except Exception as e: eprint(f"ERROR:\n{format_exc()}") continue if self.state is None: eprint(f"ERROR: {plugin_name} returned None") continue flags = self.state.get("_flags") for key, value in flags.items(): self.state[key] = value self.toggle_mode() if __name__ == "__main__": parser = ArgumentParser() parser.add_argument("-c", nargs="?", type=FileType("r"), dest="config") args = parser.parse_args() bot = Bot(config=args.config) bot.run()