from functools import wraps from irctokens import build import importlib import sys import random class Command: def __init__(self, config): self.config = config self.commands = [] def mesg(self, msg, t=None): self.util.mesg(msg, t) def notice(self, msg): self.util.notice(msg) def action(self, msg): self.util.action(msg) def send(self, msg): self.util.send(msg) def err_perm(self, level="admin"): self.mesg(f"Error: insufficient privileges, you lack {level} access") def adm(func, *args, **kwargs): """decorator for admin commands""" global adm_cmds try: if func.__name__ not in adm_cmds and func.__name__ != "_": adm_cmds.append(func.__name__) except NameError: adm_cmds = [] if func.__name__ not in adm_cmds and func.__name__ != "_": adm_cmds.append(func.__name__) @wraps(func) def _(self, *args, **kwargs): if func.__name__ == "help": self.admin_commands = adm_cmds if not self.admin: self.err_perm() else: return func( self, self.prefix, self.line, self.pm, self._line, self.admin, self.mesg, ) return _ def cmd(func, *args, **kwargs): """decorator for user commands""" global cmds try: if func.__name__ not in cmds and func.__name__ != "_": cmds.append(func.__name__) except NameError: cmds = [] if func.__name__ not in cmds and func.__name__ != "_": cmds.append(func.__name__) @wraps(func) def _(self, *args, **kwargs): if func.__name__ == "help": self.commands = cmds if func.__name__ not in self.config.cmd.disabled: return func( self, self.prefix, self.line, self.pm, self._line, self.admin, self.mesg, ) return _ def internal(func, *args, **kwargs): """decorator for commands like ctcp which are for internal use, but use normal args template""" global cmds if func.__name__ in cmds: cmds.remove(func.__name__) return func def preq_cmd(self): # command prequisites / triggers cmd = self.line needs_prefix = True # self.mesg(f"attempting command: {cmd}") if cmd == "help" or cmd.startswith("help "): command = "help" elif cmd.startswith("quit"): command = "quit" elif cmd.startswith("echo "): command = "echo" elif cmd.startswith("roll ") or cmd == "roll": command = "dice" elif cmd.startswith("pick ") or cmd.startswith("choose "): command = "choose" elif cmd.startswith("yt ") or self.YouTube.match_urls(self.YouTube, cmd) != []: command = "yt" needs_prefix = False elif cmd.startswith("w ") or cmd.startswith("weather "): command = "weather" elif cmd.startswith("me "): command = "me" elif cmd == "crapdate" or cmd.startswith("crapdate "): command = "crapdate" elif cmd == "dbg" or cmd.startswith("dbg "): command = "dbg" elif cmd == "dbg2" or cmd.startswith("dbg2 "): command = "dbg2" elif cmd == "version" or cmd == "ver": command = "version" elif cmd.startswith("\x01") or self.is_ctcp: command = "ctcp" else: # self.mesg(cmd) return if command not in self.config.cmd.disabled: if needs_prefix == False: eval(f"self.{command}()") elif not (self.prefix == None and self.pm == False): eval(f"self.{command}()") # else: # self.mesg("this ain't a valid commanderoonie, you twat") def getversion(self): with open(self.config.self.gitdir + ".git/logs/HEAD") as f: for l in f: pass return l.split()[1] @internal @cmd def ctcp(self, prefix, cmd, pm, line, admin, mesg): """CTCP responses""" notice = self.notice ctcp = cmd[1:] ctcp_upper = ctcp.upper() if not ctcp.endswith("\x01"): ctcp = ctcp + "\x01" if ctcp_upper.startswith("PING"): ctcp = ( "\x01PING" + ("" if 1 == len(ctcp.split(" ")) else " ") + " ".join(ctcp.split(" ")[1:]) ) print(ctcp) self.notice(ctcp) if ctcp_upper.startswith("SOURCE"): self.notice("\x01SOURCE " + self.config.self.source + "\x01") elif ctcp_upper.startswith("VERSION"): self.notice(f"\x01VERSION {self.getversion()}\x01") elif ctcp_upper.startswith("FINGER"): self.notice( f"\x01FINGER {self.config.self.nick} version {self.getversion()} ({self.config.self.source})\x01" ) elif ctcp_upper.startswith("USERINFO"): self.notice( "\x01USERINFO crude IRC bot, originally made by jan6, as bot6 (https://tildegit.org/jan6/bot6)\x01" ) elif ctcp_upper.startswith("CLIENTINFO"): self.notice("\x01CLIENTINFO USERINFO PING SOURCE FINGER VERSION\x01") @adm def quit(self, prefix, cmd, pm, line, admin, mesg): if admin and (cmd == "q" or cmd == "quit"): self.util.quit() elif admin and (cmd.startswith("q ") or cmd.startswith("quit ")): self.util.quit(cmd.split(" ", 1)[1]) @adm def crapdate(self, prefix, cmd, pm, line, admin, mesg): """hacky and crappy update command, don't use it, lol""" args=cmd.split()[1:] if not args:args=[''] popen=__import__("os").popen # mesg(args) if args[0] in ["log","list"]: if len(args) == 1 : args=args+["",3] elif len(args) < 3: args=args+["3"] for i in popen(f"git log --pretty=oneline --abbrev-commit {args[1]}").read().split("\n",int(args[2])): mesg(i) else: mesg(popen("git pull").read()) mesg(popen("git status|tr '\\n' ' '").read()) @adm def dbg(self, prefix, cmd, pm, line, admin, mesg): """temporary debug command, subject to change A LOT""" mesg(dir()) @cmd def yt(self, prefix, cmd, pm, line, admin, mesg): """youtube""" YouTube = self.YouTube if cmd.startswith("yt "): cmd = cmd[3:] try: YouTube.premature_optimization = self.config.cmd.yt_premature_opt except AttributeError: pass print(f" YT premature_optimization={YouTube.premature_optimization}") urls = YouTube.match_urls(YouTube, cmd) yt_failed = False for video in urls: if yt_failed == True: yt_failed = False break try: a, yt_failed = YouTube.yt(YouTube, video) except Exception as e: a = e yt_failed = True mesg(a) @cmd def echo(self, prefix, cmd, pm, line, admin, mesg): """simple echo command | "echo ABC..." """ mesg("\x7f" + cmd.split(" ", 1)[1]) @cmd def choose(self, prefix, cmd, pm, line, admin, mesg): """simple random choice command | "choose A B C..." """ mesg("I choose: " + str(random.choice(cmd.split(" ", 1)[1].split(" ")))) @cmd def dice(self, prefix, cmd, pm, line, admin, mesg): """simple dice command | "roll [N[d[M]]]" where N is number of dice, and M is number of faces""" cmd = cmd.split(" ", 1)[1] amount, faces = 1, 6 try: amount = 0 + int(cmd.split("d", 1)[0]) faces = 0 + int(cmd.split("d", 1)[1]) except: pass if not str(amount).isnumeric() or amount > 100: amount = 1 if not str(faces).isnumeric() or faces > 100: faces = 6 mesg( f"rolling {amount}d{faces}: " + str([random.choice([i for i in range(faces)]) for n in range(amount)]) ) @cmd def version(self, prefix, cmd, pm, line, admin, mesg): """version""" mesg( f"{self.config.self.nick} version {self.getversion()} ({self.config.self.source})" ) @cmd def dbg2(self, prefix, cmd, pm, line, admin, mesg): """version""" with open(self.config.self.gitdir + ".git/logs/HEAD") as f: for l in f: pass mesg("version " + l.split()[1]) @cmd def me(self, prefix, cmd, pm, line, admin, mesg): """simple /me command""" self.action(cmd.split(" ", 1)[1]) @cmd def help(self, prefix, cmd, pm, line, admin, mesg): global adm_cmds global cmds disabled_commands = self.config.cmd.disabled admin_commands, commands = [], [] for i in ["exec", "eval", "reload"]: if i not in adm_cmds: adm_cmds.append(i) for i in adm_cmds: if i not in disabled_commands: admin_commands.append(i) for i in cmds: if i not in admin_commands and i not in disabled_commands: commands.append(i) prefixes = '"' + '", "'.join(self.config.cmd.prefixes) + '"' admin_commands = ", ".join(admin_commands) try: topic = cmd.split(" ", 1)[1] except IndexError: topic = None try: self_nick = self.self_nick except IndexError: self_nick = None abs_topics = {"prefixes": f'available prefixes are {prefixes} or "{self_nick}"'} if topic == None: mesg(f"available topics: " + ", ".join(list(abs_topics.keys()))) mesg(f"available commands: " + ", ".join(commands)) if admin: mesg(f"admin commands: {admin_commands}") else: try: mesg(f"{topic}: " + eval(f"self.{topic}.__doc__")) except (TypeError, AttributeError) as e: # mesg(str( e.__class__.__name__ )) if topic in abs_topics: mesg(f"{topic}: " + abs_topics[topic]) else: mesg(f'no help available for "{topic}"...') except Exception as e: mesg(str(e.__class__) + " " + str(e)) @cmd def weather(self, prefix, cmd, pm, line, admin, mesg): """crude weather command""" cmd = " ".join(cmd.split(" ", 1)[1:]) loc = cmd.split("?")[0].strip() mf = "m" # celsius by default if len(cmd.split("?")) >= 2: argsplit = cmd.split("?")[1].split() if "f" in argsplit or "u" in argsplit: mf = "u" # farenheit try: a = __import__("http.client").client.HTTPSConnection("wttr.in") x = __import__("urllib.parse").parse y = x.urlsplit(f"https://wttr.in/{loc}?A&T&0&n&F&{mf}") y = x.urlunsplit((y.scheme, y.netloc, x.quote(y.path), y.query, y.fragment)) a = __import__("urllib.request").request.urlopen(y) b = a.read().decode("utf-8") mesg( b.split("\n")[0].strip() + " " + " ".join(b.split("\n")[3].strip().split(" ")[-2:]) ) except Exception as e: mesg(f"{type(e).__name__} was raised: {e}") # mesg(cmd.split(" ", 1)[1])