345 lines
12 KiB
Python
345 lines
12 KiB
Python
from functools import wraps
|
|
from irctokens import build
|
|
|
|
import importlib
|
|
import sys
|
|
import random
|
|
|
|
|
|
class Command:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.commands = []
|
|
|
|
def mesg(self, msg, t=None):
|
|
self.util.mesg(msg, t)
|
|
|
|
def notice(self, msg):
|
|
self.util.notice(msg)
|
|
|
|
def action(self, msg):
|
|
self.util.action(msg)
|
|
|
|
def send(self, msg):
|
|
self.util.send(msg)
|
|
|
|
def err_perm(self, level="admin"):
|
|
self.mesg(f"Error: insufficient privileges, you lack {level} access")
|
|
|
|
def adm(func, *args, **kwargs):
|
|
"""decorator for admin commands"""
|
|
global adm_cmds
|
|
try:
|
|
if func.__name__ not in adm_cmds and func.__name__ != "_":
|
|
adm_cmds.append(func.__name__)
|
|
except NameError:
|
|
adm_cmds = []
|
|
if func.__name__ not in adm_cmds and func.__name__ != "_":
|
|
adm_cmds.append(func.__name__)
|
|
|
|
@wraps(func)
|
|
def _(self, *args, **kwargs):
|
|
if func.__name__ == "help":
|
|
self.admin_commands = adm_cmds
|
|
if not self.admin:
|
|
self.err_perm()
|
|
else:
|
|
return func(
|
|
self,
|
|
self.prefix,
|
|
self.line,
|
|
self.pm,
|
|
self._line,
|
|
self.admin,
|
|
self.mesg,
|
|
)
|
|
|
|
return _
|
|
|
|
def cmd(func, *args, **kwargs):
|
|
"""decorator for user commands"""
|
|
global cmds
|
|
try:
|
|
if func.__name__ not in cmds and func.__name__ != "_":
|
|
cmds.append(func.__name__)
|
|
except NameError:
|
|
cmds = []
|
|
if func.__name__ not in cmds and func.__name__ != "_":
|
|
cmds.append(func.__name__)
|
|
|
|
@wraps(func)
|
|
def _(self, *args, **kwargs):
|
|
if func.__name__ == "help":
|
|
self.commands = cmds
|
|
if func.__name__ not in self.config.cmd.disabled:
|
|
return func(
|
|
self,
|
|
self.prefix,
|
|
self.line,
|
|
self.pm,
|
|
self._line,
|
|
self.admin,
|
|
self.mesg,
|
|
)
|
|
|
|
return _
|
|
|
|
def internal(func, *args, **kwargs):
|
|
"""decorator for commands like ctcp which are for internal use, but use normal args template"""
|
|
global cmds
|
|
if func.__name__ in cmds:
|
|
cmds.remove(func.__name__)
|
|
return func
|
|
|
|
def preq_cmd(self): # command prequisites / triggers
|
|
cmd = self.line
|
|
needs_prefix = True
|
|
# self.mesg(f"attempting command: {cmd}")
|
|
if cmd == "help" or cmd.startswith("help "):
|
|
command = "help"
|
|
elif cmd.startswith("quit"):
|
|
command = "quit"
|
|
elif cmd.startswith("echo "):
|
|
command = "echo"
|
|
elif cmd.startswith("roll ") or cmd == "roll":
|
|
command = "dice"
|
|
elif cmd.startswith("pick ") or cmd.startswith("choose "):
|
|
command = "choose"
|
|
elif cmd.startswith("yt ") or self.YouTube.match_urls(self.YouTube, cmd) != []:
|
|
command = "yt"
|
|
needs_prefix = False
|
|
elif cmd.startswith("w ") or cmd.startswith("weather "):
|
|
command = "weather"
|
|
elif cmd.startswith("me "):
|
|
command = "me"
|
|
elif cmd == "crapdate" or cmd.startswith("crapdate "):
|
|
command = "crapdate"
|
|
elif cmd == "dbg" or cmd.startswith("dbg "):
|
|
command = "dbg"
|
|
elif cmd == "dbg2" or cmd.startswith("dbg2 "):
|
|
command = "dbg2"
|
|
elif cmd == "version" or cmd == "ver":
|
|
command = "version"
|
|
elif cmd.startswith("\x01") or self.is_ctcp:
|
|
command = "ctcp"
|
|
else:
|
|
# self.mesg(cmd)
|
|
return
|
|
if command not in self.config.cmd.disabled:
|
|
if needs_prefix == False:
|
|
eval(f"self.{command}()")
|
|
elif not (self.prefix == None and self.pm == False):
|
|
eval(f"self.{command}()")
|
|
|
|
# else:
|
|
# self.mesg("this ain't a valid commanderoonie, you twat")
|
|
|
|
def getversion(self):
|
|
with open(self.config.self.gitdir + ".git/logs/HEAD") as f:
|
|
for l in f:
|
|
pass
|
|
return l.split()[1]
|
|
|
|
@internal
|
|
@cmd
|
|
def ctcp(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""CTCP responses"""
|
|
notice = self.notice
|
|
ctcp = cmd[1:]
|
|
ctcp_upper = ctcp.upper()
|
|
if not ctcp.endswith("\x01"):
|
|
ctcp = ctcp + "\x01"
|
|
if ctcp_upper.startswith("PING"):
|
|
ctcp = (
|
|
"\x01PING"
|
|
+ ("" if 1 == len(ctcp.split(" ")) else " ")
|
|
+ " ".join(ctcp.split(" ")[1:])
|
|
)
|
|
print(ctcp)
|
|
self.notice(ctcp)
|
|
if ctcp_upper.startswith("SOURCE"):
|
|
self.notice("\x01SOURCE " + self.config.self.source + "\x01")
|
|
elif ctcp_upper.startswith("VERSION"):
|
|
self.notice(f"\x01VERSION {self.getversion()}\x01")
|
|
elif ctcp_upper.startswith("FINGER"):
|
|
self.notice(
|
|
f"\x01FINGER {self.config.self.nick} version {self.getversion()} ({self.config.self.source})\x01"
|
|
)
|
|
elif ctcp_upper.startswith("USERINFO"):
|
|
self.notice(
|
|
"\x01USERINFO crude IRC bot, originally made by jan6, as bot6 (https://tildegit.org/jan6/bot6)\x01"
|
|
)
|
|
elif ctcp_upper.startswith("CLIENTINFO"):
|
|
self.notice("\x01CLIENTINFO USERINFO PING SOURCE FINGER VERSION\x01")
|
|
|
|
@adm
|
|
def quit(self, prefix, cmd, pm, line, admin, mesg):
|
|
if admin and (cmd == "q" or cmd == "quit"):
|
|
self.util.quit()
|
|
elif admin and (cmd.startswith("q ") or cmd.startswith("quit ")):
|
|
self.util.quit(cmd.split(" ", 1)[1])
|
|
|
|
@adm
|
|
def crapdate(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""hacky and crappy update command, don't use it, lol"""
|
|
args=cmd.split()[1:]
|
|
if not args:args=['']
|
|
popen=__import__("os").popen
|
|
# mesg(args)
|
|
if args[0] in ["log","list"]:
|
|
if len(args) == 1 : args=args+["",3]
|
|
elif len(args) < 3: args=args+["3"]
|
|
for i in popen(f"git log --pretty=oneline --abbrev-commit {args[1]}").read().split("\n",int(args[2])): mesg(i)
|
|
else:
|
|
mesg(popen("git pull").read())
|
|
mesg(popen("git status|tr '\\n' ' '").read())
|
|
|
|
@adm
|
|
def dbg(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""temporary debug command, subject to change A LOT"""
|
|
mesg(dir())
|
|
|
|
@cmd
|
|
def yt(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""youtube"""
|
|
YouTube = self.YouTube
|
|
if cmd.startswith("yt "):
|
|
cmd = cmd[3:]
|
|
try:
|
|
YouTube.premature_optimization = self.config.cmd.yt_premature_opt
|
|
except AttributeError:
|
|
pass
|
|
print(f" YT premature_optimization={YouTube.premature_optimization}")
|
|
urls = YouTube.match_urls(YouTube, cmd)
|
|
yt_failed = False
|
|
for video in urls:
|
|
if yt_failed == True:
|
|
yt_failed = False
|
|
break
|
|
try:
|
|
a, yt_failed = YouTube.yt(YouTube, video)
|
|
except Exception as e:
|
|
a = e
|
|
yt_failed = True
|
|
mesg(a)
|
|
|
|
@cmd
|
|
def echo(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""simple echo command | "echo ABC..." """
|
|
mesg("\x7f" + cmd.split(" ", 1)[1])
|
|
|
|
@cmd
|
|
def choose(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""simple random choice command | "choose A B C..." """
|
|
mesg("I choose: " + str(random.choice(cmd.split(" ", 1)[1].split(" "))))
|
|
|
|
@cmd
|
|
def dice(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""simple dice command | "roll [N[d[M]]]" where N is number of dice, and M is number of faces"""
|
|
cmd = cmd.split(" ", 1)[1]
|
|
amount, faces = 1, 6
|
|
try:
|
|
amount = 0 + int(cmd.split("d", 1)[0])
|
|
faces = 0 + int(cmd.split("d", 1)[1])
|
|
except:
|
|
pass
|
|
if not str(amount).isnumeric() or amount > 100:
|
|
amount = 1
|
|
if not str(faces).isnumeric() or faces > 100:
|
|
faces = 6
|
|
mesg(
|
|
f"rolling {amount}d{faces}: "
|
|
+ str([random.choice([i for i in range(faces)]) for n in range(amount)])
|
|
)
|
|
|
|
@cmd
|
|
def version(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""version"""
|
|
mesg(
|
|
f"{self.config.self.nick} version {self.getversion()} ({self.config.self.source})"
|
|
)
|
|
|
|
@cmd
|
|
def dbg2(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""version"""
|
|
with open(self.config.self.gitdir + ".git/logs/HEAD") as f:
|
|
for l in f:
|
|
pass
|
|
mesg("version " + l.split()[1])
|
|
|
|
@cmd
|
|
def me(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""simple /me command"""
|
|
self.action(cmd.split(" ", 1)[1])
|
|
|
|
@cmd
|
|
def help(self, prefix, cmd, pm, line, admin, mesg):
|
|
global adm_cmds
|
|
global cmds
|
|
disabled_commands = self.config.cmd.disabled
|
|
admin_commands, commands = [], []
|
|
for i in ["exec", "eval", "reload"]:
|
|
if i not in adm_cmds:
|
|
adm_cmds.append(i)
|
|
for i in adm_cmds:
|
|
if i not in disabled_commands:
|
|
admin_commands.append(i)
|
|
for i in cmds:
|
|
if i not in admin_commands and i not in disabled_commands:
|
|
commands.append(i)
|
|
prefixes = '"' + '", "'.join(self.config.cmd.prefixes) + '"'
|
|
admin_commands = ", ".join(admin_commands)
|
|
try:
|
|
topic = cmd.split(" ", 1)[1]
|
|
except IndexError:
|
|
topic = None
|
|
try:
|
|
self_nick = self.self_nick
|
|
except IndexError:
|
|
self_nick = None
|
|
|
|
abs_topics = {"prefixes": f'available prefixes are {prefixes} or "{self_nick}"'}
|
|
if topic == None:
|
|
mesg(f"available topics: " + ", ".join(list(abs_topics.keys())))
|
|
mesg(f"available commands: " + ", ".join(commands))
|
|
if admin:
|
|
mesg(f"admin commands: {admin_commands}")
|
|
else:
|
|
try:
|
|
mesg(f"{topic}: " + eval(f"self.{topic}.__doc__"))
|
|
except (TypeError, AttributeError) as e:
|
|
# mesg(str( e.__class__.__name__ ))
|
|
if topic in abs_topics:
|
|
mesg(f"{topic}: " + abs_topics[topic])
|
|
else:
|
|
mesg(f'no help available for "{topic}"...')
|
|
except Exception as e:
|
|
mesg(str(e.__class__) + " " + str(e))
|
|
|
|
@cmd
|
|
def weather(self, prefix, cmd, pm, line, admin, mesg):
|
|
"""crude weather command"""
|
|
cmd = " ".join(cmd.split(" ", 1)[1:])
|
|
loc = cmd.split("?")[0].strip()
|
|
|
|
mf = "m" # celsius by default
|
|
if len(cmd.split("?")) >= 2:
|
|
argsplit = cmd.split("?")[1].split()
|
|
if "f" in argsplit or "u" in argsplit:
|
|
mf = "u" # farenheit
|
|
try:
|
|
a = __import__("http.client").client.HTTPSConnection("wttr.in")
|
|
x = __import__("urllib.parse").parse
|
|
y = x.urlsplit(f"https://wttr.in/{loc}?A&T&0&n&F&{mf}")
|
|
y = x.urlunsplit((y.scheme, y.netloc, x.quote(y.path), y.query, y.fragment))
|
|
a = __import__("urllib.request").request.urlopen(y)
|
|
b = a.read().decode("utf-8")
|
|
mesg(
|
|
b.split("\n")[0].strip()
|
|
+ " "
|
|
+ " ".join(b.split("\n")[3].strip().split(" ")[-2:])
|
|
)
|
|
except Exception as e:
|
|
mesg(f"{type(e).__name__} was raised: {e}")
|
|
# mesg(cmd.split(" ", 1)[1])
|