from functools import wraps from irctokens import build import importlib import sys class Command: def __init__(self, config): self.config = config self.commands = [] def mesg(self, msg, t=None): self.util.mesg(msg, t) def notice(self, msg): self.util.notice(msg) def action(self, msg): self.util.action(msg) def send(self, msg): self.util.send(msg) def err_perm(self, level="admin"): self.mesg(f"Error: insufficient privileges, you lack {level} access") def adm(func, *args, **kwargs): """decorator for admin commands""" global adm_cmds try: if func.__name__ not in adm_cmds and func.__name__ != "_": adm_cmds.append(func.__name__) except NameError: adm_cmds = [] if func.__name__ not in adm_cmds and func.__name__ != "_": adm_cmds.append(func.__name__) @wraps(func) def _(self, *args, **kwargs): if func.__name__ == "help": self.admin_commands = adm_cmds if not self.admin: self.err_perm() else: return func( self, self.prefix, self.line, self.pm, self._line, self.admin, self.mesg, ) return _ def cmd(func, *args, **kwargs): """decorator for user commands""" global cmds try: if func.__name__ not in cmds and func.__name__ != "_": cmds.append(func.__name__) except NameError: cmds = [] if func.__name__ not in cmds and func.__name__ != "_": cmds.append(func.__name__) @wraps(func) def _(self, *args, **kwargs): if func.__name__ == "help": self.commands = cmds if func.__name__ not in self.config.cmd.disabled: return func( self, self.prefix, self.line, self.pm, self._line, self.admin, self.mesg, ) return _ def internal(func, *args, **kwargs): """decorator for commands like ctcp which are for internal use, but use normal args template""" global cmds if func.__name__ in cmds: cmds.remove(func.__name__) return func def preq_cmd(self): # command prequisites / triggers cmd = self.line needs_prefix = True # self.mesg(f"attempting command: {cmd}") if cmd == "help" or cmd.startswith("help "): command = "help" elif cmd.startswith("quit"): command = "quit" elif cmd.startswith("echo "): command = "echo" elif cmd.startswith("yt ") or self.YouTube.match_urls(self.YouTube, cmd) != []: command = "yt" needs_prefix = False elif cmd.startswith("w ") or cmd.startswith("weather "): command = "weather" elif cmd.startswith("me "): command = "me" elif cmd == "dbg" or cmd.startswith("dbg "): command = "dbg" elif cmd == "dbg2" or cmd.startswith("dbg2 "): command = "dbg2" elif cmd == "version" or cmd == "ver": command = "version" elif cmd.startswith("\x01") or self.is_ctcp: command = "ctcp" else: # self.mesg(cmd) return if command not in self.config.cmd.disabled: if needs_prefix == False: eval(f"self.{command}()") elif not (self.prefix == None and self.pm == False): eval(f"self.{command}()") # else: # self.mesg("this ain't a valid commanderoonie, you twat") def getversion(self): with open(self.config.self.gitdir + ".git/logs/HEAD") as f: for l in f: pass return l.split()[1] @internal @cmd def ctcp(self, prefix, cmd, pm, line, admin, mesg): """CTCP responses""" notice = self.notice ctcp = cmd[1:] if ctcp.startswith("PING"): if not ctcp.endswith("\x01"): ctcp = ctcp + "\x01" print(ctcp) self.notice(ctcp) ctcp = ctcp.upper() if ctcp.startswith("SOURCE"): self.notice("\x01SOURCE " + self.config.self.source + "\x01") elif ctcp.startswith("VERSION"): self.notice(f"\x01VERSION {self.getversion()}\x01") elif ctcp.startswith("FINGER"): self.notice( f"\x01FINGER {self.config.self.nick} version {self.getversion()} ({self.config.self.source})\x01" ) elif ctcp.startswith("CLIENTINFO"): self.notice("\x01CLIENTINFO PING SOURCE FINGER VERSION\x01") @adm def quit(self, prefix, cmd, pm, line, admin, mesg): if admin and (cmd == "q" or cmd == "quit"): self.util.quit() elif is_adm and (cmd.startswith("q ") or cmd.startswith("quit ")): self.util.quit(cmd.split(" ", 1)[1]) @adm def dbg(self, prefix, cmd, pm, line, admin, mesg): """temporary debug command, subject to change A LOT""" mesg(dir()) @cmd def yt(self, prefix, cmd, pm, line, admin, mesg): """youtube""" YouTube = self.YouTube if cmd.startswith("yt "): cmd = cmd[3:] try: YouTube.premature_optimization = self.config.cmd.yt_premature_opt except AttributeError: pass print(f" YT premature_optimization={YouTube.premature_optimization}") urls = YouTube.match_urls(YouTube, cmd) yt_failed = False for video in urls: if yt_failed == True: yt_failed = False break try: a, yt_failed = YouTube.yt(YouTube, video) except Exception as e: a = e yt_failed = True mesg(a) @cmd def echo(self, prefix, cmd, pm, line, admin, mesg): """simple echo command""" mesg("\x7f" + cmd.split(" ", 1)[1]) @cmd def version(self, prefix, cmd, pm, line, admin, mesg): """version""" mesg( f"{self.config.self.nick} version {self.getversion()} ({self.config.self.source})" ) @cmd def dbg2(self, prefix, cmd, pm, line, admin, mesg): """version""" with open(self.config.self.gitdir + ".git/logs/HEAD") as f: for l in f: pass mesg("version " + l.split()[1]) @cmd def me(self, prefix, cmd, pm, line, admin, mesg): """simple /me command""" self.action(cmd.split(" ", 1)[1]) @cmd def help(self, prefix, cmd, pm, line, admin, mesg): global adm_cmds global cmds disabled_commands = self.config.cmd.disabled admin_commands, commands = [], [] for i in ["exec", "eval", "reload"]: if i not in adm_cmds: adm_cmds.append(i) for i in adm_cmds: if i not in disabled_commands: admin_commands.append(i) for i in cmds: if i not in admin_commands and i not in disabled_commands: commands.append(i) prefixes = '"' + '", "'.join(self.config.cmd.prefixes) + '"' admin_commands = ", ".join(admin_commands) try: topic = cmd.split(" ", 1)[1] except IndexError: topic = None try: self_nick = self.self_nick except IndexError: self_nick = None abs_topics = {"prefixes": f'available prefixes are {prefixes} or "{self_nick}"'} if topic == None: mesg(f"available topics: " + ", ".join(list(abs_topics.keys()))) mesg(f"available commands: " + ", ".join(commands)) if admin: mesg(f"admin commands: {admin_commands}") else: try: mesg(f"{topic}: " + eval(f"self.{topic}.__doc__")) except (TypeError, AttributeError) as e: # mesg(str( e.__class__.__name__ )) if topic in abs_topics: mesg(f"{topic}: " + abs_topics[topic]) else: mesg(f'no help available for "{topic}"...') except Exception as e: mesg(str(e.__class__) + " " + str(e)) @cmd def weather(self, prefix, cmd, pm, line, admin, mesg): """crude weather command""" cmd = " ".join(cmd.split(" ", 1)[1:]) loc = cmd.split("?")[0].strip() mf = "m" # celsius by default if len(cmd.split("?")) >= 2: argsplit = cmd.split("?")[1].split() if "f" in argsplit or "u" in argsplit: mf = "u" # farenheit try: a = __import__("http.client").client.HTTPSConnection("wttr.in") x = __import__("urllib.parse").parse y = x.urlsplit(f"https://wttr.in/{loc}?A&T&0&n&F&{mf}") y = x.urlunsplit((y.scheme, y.netloc, x.quote(y.path), y.query, y.fragment)) a = __import__("urllib.request").request.urlopen(y) b = a.read().decode("utf-8") mesg( b.split("\n")[0].strip() + " " + " ".join(b.split("\n")[3].strip().split(" ")[-2:]) ) except Exception as e: mesg(f"{type(e).__name__} was raised: {e}") # mesg(cmd.split(" ", 1)[1])