Add plugin system for commands

This commit is contained in:
Robert Miles 2019-06-24 00:23:10 -04:00
parent bfe88e2e5e
commit 37bf2fb1e4
3 changed files with 145 additions and 51 deletions

96
bot.py
View File

@ -1,7 +1,8 @@
import teambot,tasks,rss,time,sys,subprocess,re
import teambot,tasks,rss,time,sys,subprocess,re,plugin,impmod,traceback,os
feed_url = "https://cosmic.voyage/rss.xml"
bot_op = "khuxkm@sudoers.tilde.team"
PLUGIN_MODULES=dict()
def unhighlight_nick (nick):
return "_{}_".format(nick)
@ -24,15 +25,27 @@ class CosmicBot(teambot.Handler):
self.tasks = tasks.TaskPool()
self.tasks.add_coroutine(self.check_rss,12,dict(url=feed_url,known=[],channel="#cosmic"))
self.tasks.load_state(0)
self.commands = dict()
self.register_command("botlist",self.on_botlist)
self.register_command("roster",self.on_roster)
self.register_command("latest",self.on_latest)
self.register_command("fortune",self.on_fortune)
self.register_command("admin",self.on_admin,True)
def register_command(self,name,action,is_admin=False):
self.commands[name] = dict(action=action,is_admin=is_admin)
def load_modules(self):
plugin.clear()
for module in os.listdir("commands"):
if module.endswith(".py"): self.load_module(module[:-3],os.path.join("commands",module))
def load_module(self,modname,path):
try:
if modname in PLUGIN_MODULES:
print("{} already imported, reloading".format(modname))
PLUGIN_MODULES[modname].reload()
return
try:
print("importing {}".format(modname))
PLUGIN_MODULES[modname]=impmod.Module(modname,path)
except:
print("error importing {}".format(modname))
traceback.print_exc()
except:
traceback.print_exc()
pass
def on_connection_established(self):
self.load_modules()
self.tasks.run()
def check_rss(self,state,base_state):
newtrans = rss.fetchNew(state["url"],[x["guid"] for x in state["known"]])
@ -43,54 +56,35 @@ class CosmicBot(teambot.Handler):
time.sleep(1)
return state
def on_pubmsg(self,channel,nick,text):
self.is_admin = self.event.source.userhost == bot_op
try:
for listener in plugin.listeners:
try:
plugin.listeners[listener](self,channel,nick,message)
except:
print("{} had a bruh moment:".format(listener))
traceback.print_exc()
except:
print("KERNAL PANIC")
traceback.print_exc()
if not text.startswith(self.prefix):
return
args = text[len(self.prefix):].strip().split()
cmd = args.pop(0)
is_admin = self.event.source.userhost == bot_op
if cmd in self.commands:
if (not self.commands[cmd]["is_admin"]) or (self.commands[cmd]["is_admin"] and is_admin):
try:
self.commands[cmd]["action"](channel,nick,*args)
except Exception as e:
print("access violation "+str(e))
def on_botlist(self,channel,nick):
self.say(channel,nick+": Maintainer: khuxkm@cosmic.voyage | Utility bot")
def on_admin(self,channel,nick,subcmd,*args):
if subcmd=="down":
self.tasks.stop()
self.tasks.save_state(0)
self._bot.die("Stopping...")
sys.exit(0)
elif subcmd=="check":
self.tasks.states[0] = self.check_rss(self.tasks.states[0],self.tasks.base_state)
def on_roster(self,channel,nick,*namecnt):
output = subprocess.check_output(["/usr/local/bin/roster",' '.join(namecnt)]).decode("ascii").split("\n")
output = filter(None,output)
for line in output:
line = re.sub("\s+"," ",line).split(" ",1)
self.say(channel,"{}: {} (by {})".format(nick,line[1],unhighlight_nick(line[0])))
def on_latest(self,channel,nick,count=5):
if re.match(r"\d+",count):
count = int(count)
if count < 1:
count = 1 # ...nice try, smartass
if count > 5:
count = 5 # don't spam the channel
self.say(channel, "{}: Latest {} {}. (See cosmic.voyage for more!)".format(nick, count, (count == 1 and "entry" or "entries")))
else:
self.say(channel, "{}: Latest entries matching '{}'. (See cosmic.voyage for more!)".format(nick, count))
output = subprocess.check_output(["/usr/local/bin/latest",str(count)]).decode("ascii").split("\n")
output = filter(None,output)
for line in output:
self.say(channel,"{}: {}".format(nick,line))
def on_fortune(self,channel,nick):
output = subprocess.check_output(["/usr/games/fortune"]).decode("ascii").split("\n")
output = filter(None,output)
for line in output:
self.say(channel,"{}".format(line))
if cmd in plugin.cmds:
try:
plugin.cmds[cmd](self,channel,nick,*args)
except TypeError:
self.say(channel,"Usage: {}{} {}".format(self.prefix,cmd,plugin.help.get(cmd,"")).strip())
traceback.print_exc()
except Exception as e:
errcls,err,tb = sys.exc_info()
self.say(channel,"ACCESS VIOLATION: "+traceback.format_exception_only(errcls,err)[0].strip())
print("On command `{}{} {}` by {}:".format(self.prefix,cmd," ".join(args),self.event.source))
traceback.print_exc()
if __name__=="__main__":
channels = "#cosmic".split()
bot = teambot.TeamBot(channels,"cosmicbot","localhost",chandler=CosmicBot)
bot.start()

19
impmod.py Normal file
View File

@ -0,0 +1,19 @@
import importlib, importlib.util, sys
class Module:
"""A module. Stores module object, spec object, and handles reloading."""
def __init__(self,modname,path=None):
if path is None:
path = modname+".py"
self.modname, self.path = modname, path
self.spec = importlib.util.spec_from_file_location(modname,path)
self.module = importlib.util.module_from_spec(self.spec)
self.spec.loader.exec_module(self.module)
def reload(self):
if self.modname not in sys.modules:
sys.modules[self.modname]=self.module
# Alright, this needs some explaining.
# When you do importlib.reload, it does some juju magic shist to find the spec and calls importlib._bootstrap._exec.
# When you dynamically import a module it won't do its magic correctly and it'll error.
# Luckily, we can skip all the juju magic since we can just store the spec.
importlib._bootstrap._exec(self.spec,self.module)

81
plugin.py Normal file
View File

@ -0,0 +1,81 @@
import json, traceback
cmds = {}
help = {}
listeners = {}
class CommandGroup:
def __init__(self,f,default="help"):
self.base = f
self.subcmds = {}
self.default = default
def command(self,name):
def _register_subcmd(f):
self.subcmds[name]=f
return f
return _register_subcmd
def __call__(self,bot,channel,nick,subcmd,*args):
print("Calling base")
if self.base(bot,channel,nick,subcmd,*args):
return
print("Calling subcommand {}".format(subcmd if subcmd in self.subcmds else self.default))
if subcmd in self.subcmds:
return self.subcmds[subcmd](bot,channel,nick,subcmd,*args)
else:
return self.subcmds[self.default](bot,channel,nick,subcmd,*args)
class Data:
"""A class for plugin data."""
def __init__(self,value):
self.value = value
def serialize(self):
return self.value
def deserialize(self,value):
self.value = value
def save(self,filename):
with open(filename,"w") as f:
f.write(self.serialize())
def load(self,filename):
try:
with open(filename) as f:
self.deserialize(f.read())
except:
print("Error loading data from {!r}:".format(filename))
traceback.print_exc()
pass # You should've initialized this with a sane default, so just keep the default on error
class JSONData(Data):
"""Data, but can be serialized to JSON (and should be)."""
def serialize(self):
return json.dumps(self.value)
def deserialize(self,value):
self.value = json.loads(value)
def clear():
cmds.clear()
help.clear()
listeners.clear()
def command(name,helptext="No help available for this command."):
def _register_cmd(func):
cmds[name]=func
help[name]=helptext
return func
return _register_cmd
def group(name,helptext=""):
def _register_group(f):
gr = CommandGroup(f)
cmds[name]=gr
help[name]=helptext
return gr
return _register_group
def listener(name):
def _register_cmd(func):
listeners[name]=func
return func
return _register_cmd
def alias(name,target):
cmds[name]=cmds[target]
help[name]=help[target]