forked from khuxkm/cosmicbot
khuxkm fbexl
bf59265e94
This prevents cosmicbot's RSS checking thread from crashing if the RSS feed ends up malformed for any reason.
99 lines
3.2 KiB
Python
99 lines
3.2 KiB
Python
import teambot,tasks,rss,time,sys,subprocess,re,plugin,impmod,traceback,os,atexit,toot
|
|
|
|
feed_url = "https://cosmic.voyage/rss.xml"
|
|
bot_op = "khuxkm@sudoers.tilde.team"
|
|
PLUGIN_MODULES=dict()
|
|
|
|
def unhighlight_nick (nick):
|
|
return "_{}_".format(nick)
|
|
|
|
def datetimejoin(joiner, parts):
|
|
return joiner.join([str(x).rjust(2, '0') for x in parts])
|
|
|
|
def newpost(it):
|
|
return dict(
|
|
guid = it.guid,
|
|
title = it.title,
|
|
link = it.link,
|
|
date = datetimejoin('-', it.published_parsed[0:3]) + ' ' + datetimejoin(':', it.published_parsed[3:6])
|
|
)
|
|
|
|
class CosmicBot(teambot.Handler):
|
|
def __init__(self,bot):
|
|
super(CosmicBot,self).__init__(bot)
|
|
self.prefix = "!"
|
|
self.tasks = tasks.TaskPool()
|
|
self.tasks.add_coroutine(self.check_rss,12,dict(url=feed_url,known=[],channel="#cosmic"))
|
|
self.tasks.load_state(0)
|
|
atexit.register(lambda: self.tasks.save_state(0))
|
|
def load_modules(self):
|
|
plugin.clear()
|
|
for module in os.listdir("commands"):
|
|
if module.endswith(".py"): self.load_module(module[:-3],os.path.join("commands",module))
|
|
def load_module(self,modname,path):
|
|
try:
|
|
if modname in PLUGIN_MODULES:
|
|
print("{} already imported, reloading".format(modname))
|
|
PLUGIN_MODULES[modname].reload()
|
|
return
|
|
try:
|
|
print("importing {}".format(modname))
|
|
PLUGIN_MODULES[modname]=impmod.Module(modname,path)
|
|
except:
|
|
print("error importing {}".format(modname))
|
|
traceback.print_exc()
|
|
except:
|
|
traceback.print_exc()
|
|
pass
|
|
def on_connection_established(self):
|
|
self.load_modules()
|
|
self.tasks.run()
|
|
def check_rss(self,state,base_state):
|
|
try:
|
|
newtrans = rss.fetchNew(state["url"],[x["guid"] for x in state["known"]])
|
|
if newtrans:
|
|
state["known"].extend(newpost(x) for x in newtrans)
|
|
for trans in newtrans:
|
|
self.say(state["channel"],"Transmission received: {transmission.title} ({transmission.link})".format(transmission=trans))
|
|
toot.toot("Transmission received: {transmission.title}\n\n{transmission.link}".format(transmission=trans))
|
|
time.sleep(1)
|
|
except:
|
|
print("Error occurred while loading feed, will try again...")
|
|
print("Traceback is below:")
|
|
traceback.print_exc()
|
|
pass
|
|
return state
|
|
def on_pubmsg(self,channel,nick,text):
|
|
self.is_admin = self.event.source.userhost == bot_op
|
|
try:
|
|
for listener in plugin.listeners:
|
|
try:
|
|
plugin.listeners[listener](self,channel,nick,message)
|
|
except:
|
|
print("{} had a bruh moment:".format(listener))
|
|
traceback.print_exc()
|
|
except:
|
|
print("KERNAL PANIC")
|
|
traceback.print_exc()
|
|
if not text.startswith(self.prefix):
|
|
return
|
|
args = text[len(self.prefix):].strip().split()
|
|
cmd = args.pop(0)
|
|
if cmd in plugin.cmds:
|
|
try:
|
|
plugin.cmds[cmd](self,channel,nick,*args)
|
|
except TypeError:
|
|
self.say(channel,"Usage: {}{} {}".format(self.prefix,cmd,plugin.help.get(cmd,"")).strip())
|
|
traceback.print_exc()
|
|
except Exception as e:
|
|
errcls,err,tb = sys.exc_info()
|
|
self.say(channel,"ACCESS VIOLATION: "+traceback.format_exception_only(errcls,err)[0].strip())
|
|
print("On command `{}{} {}` by {}:".format(self.prefix,cmd," ".join(args),self.event.source))
|
|
traceback.print_exc()
|
|
|
|
if __name__=="__main__":
|
|
channels = "#cosmic".split()
|
|
bot = teambot.TeamBot(channels,"cosmicbot","localhost",chandler=CosmicBot)
|
|
bot.start()
|
|
|