93 lines
3.1 KiB
Python
93 lines
3.1 KiB
Python
import teambot,tasks,rss,time,sys,subprocess,re,plugin,impmod,traceback,os,atexit
|
|
|
|
feed_url = "https://cosmic.voyage/rss.xml"
|
|
bot_op = "khuxkm@sudoers.tilde.team"
|
|
PLUGIN_MODULES=dict()
|
|
|
|
def unhighlight_nick (nick):
|
|
return "_{}_".format(nick)
|
|
|
|
def datetimejoin(joiner, parts):
|
|
return joiner.join([str(x).rjust(2, '0') for x in parts])
|
|
|
|
def newpost(it):
|
|
return dict(
|
|
guid = it.guid,
|
|
title = it.title,
|
|
link = it.link,
|
|
date = datetimejoin('-', it.published_parsed[0:3]) + ' ' + datetimejoin(':', it.published_parsed[3:6])
|
|
)
|
|
|
|
class CosmicBot(teambot.Handler):
|
|
def __init__(self,bot):
|
|
super(CosmicBot,self).__init__(bot)
|
|
self.prefix = "!"
|
|
self.tasks = tasks.TaskPool()
|
|
self.tasks.add_coroutine(self.check_rss,12,dict(url=feed_url,known=[],channel="#cosmic"))
|
|
self.tasks.load_state(0)
|
|
atexit.register(lambda: self.tasks.save_state(0))
|
|
def load_modules(self):
|
|
plugin.clear()
|
|
for module in os.listdir("commands"):
|
|
if module.endswith(".py"): self.load_module(module[:-3],os.path.join("commands",module))
|
|
def load_module(self,modname,path):
|
|
try:
|
|
if modname in PLUGIN_MODULES:
|
|
print("{} already imported, reloading".format(modname))
|
|
PLUGIN_MODULES[modname].reload()
|
|
return
|
|
try:
|
|
print("importing {}".format(modname))
|
|
PLUGIN_MODULES[modname]=impmod.Module(modname,path)
|
|
except:
|
|
print("error importing {}".format(modname))
|
|
traceback.print_exc()
|
|
except:
|
|
traceback.print_exc()
|
|
pass
|
|
def on_connection_established(self):
|
|
self.load_modules()
|
|
self.tasks.run()
|
|
def check_rss(self,state,base_state):
|
|
newtrans = rss.fetchNew(state["url"],[x["guid"] for x in state["known"]])
|
|
if newtrans:
|
|
state["known"].extend(newpost(x) for x in newtrans)
|
|
for trans in newtrans:
|
|
self.say(state["channel"],"Transmission received: {transmission.title} ({transmission.link})".format(transmission=trans))
|
|
toot.toot("Transmission received: {transmission.title}\n\n{transmission.link}".format(transmission=trans))
|
|
time.sleep(1)
|
|
return state
|
|
def on_pubmsg(self,channel,nick,text):
|
|
self.is_admin = self.event.source.userhost == bot_op
|
|
try:
|
|
for listener in plugin.listeners:
|
|
try:
|
|
plugin.listeners[listener](self,channel,nick,message)
|
|
except:
|
|
print("{} had a bruh moment:".format(listener))
|
|
traceback.print_exc()
|
|
except:
|
|
print("KERNAL PANIC")
|
|
traceback.print_exc()
|
|
if not text.startswith(self.prefix):
|
|
return
|
|
args = text[len(self.prefix):].strip().split()
|
|
cmd = args.pop(0)
|
|
if cmd in plugin.cmds:
|
|
try:
|
|
plugin.cmds[cmd](self,channel,nick,*args)
|
|
except TypeError:
|
|
self.say(channel,"Usage: {}{} {}".format(self.prefix,cmd,plugin.help.get(cmd,"")).strip())
|
|
traceback.print_exc()
|
|
except Exception as e:
|
|
errcls,err,tb = sys.exc_info()
|
|
self.say(channel,"ACCESS VIOLATION: "+traceback.format_exception_only(errcls,err)[0].strip())
|
|
print("On command `{}{} {}` by {}:".format(self.prefix,cmd," ".join(args),self.event.source))
|
|
traceback.print_exc()
|
|
|
|
if __name__=="__main__":
|
|
channels = "#cosmic".split()
|
|
bot = teambot.TeamBot(channels,"cosmicbot","localhost",chandler=CosmicBot)
|
|
bot.start()
|
|
|