2018-12-02 06:01:17 +00:00
|
|
|
import teambot,tasks,rss,time,sys,subprocess,re
|
2018-12-02 01:18:39 +00:00
|
|
|
|
2018-12-07 21:52:16 +00:00
|
|
|
feed_url = "https://cosmic.voyage/rss.xml"
|
|
|
|
bot_op = "khuxkm@sudoers.tilde.team"
|
|
|
|
|
2018-12-02 20:42:59 +00:00
|
|
|
def unhighlight_nick (nick):
|
2018-12-07 05:23:03 +00:00
|
|
|
return "_{}_".format(nick)
|
|
|
|
|
2018-12-07 21:52:16 +00:00
|
|
|
def datetimejoin(joiner, parts):
|
|
|
|
return joiner.join([str(x).rjust(2, '0') for x in parts])
|
|
|
|
|
|
|
|
def newpost(it):
|
2018-12-07 05:23:03 +00:00
|
|
|
return dict(
|
2018-12-07 21:52:16 +00:00
|
|
|
guid = it.guid,
|
|
|
|
title = it.title,
|
|
|
|
link = it.link,
|
|
|
|
date = datetimejoin('-', it.published_parsed[0:3]) + ' ' + datetimejoin(':', it.published_parsed[3:6])
|
2018-12-07 05:23:03 +00:00
|
|
|
)
|
2018-12-02 20:42:59 +00:00
|
|
|
|
2018-12-02 01:18:39 +00:00
|
|
|
class CosmicBot(teambot.Handler):
|
|
|
|
def __init__(self,bot):
|
|
|
|
super(CosmicBot,self).__init__(bot)
|
2018-12-02 02:43:01 +00:00
|
|
|
self.prefix = "!"
|
2018-12-02 01:18:39 +00:00
|
|
|
self.tasks = tasks.TaskPool()
|
2018-12-07 21:52:16 +00:00
|
|
|
self.tasks.add_coroutine(self.check_rss,12,dict(url=feed_url,known=[],channel="#cosmic"))
|
2018-12-02 01:18:39 +00:00
|
|
|
self.tasks.load_state(0)
|
2018-12-02 02:43:01 +00:00
|
|
|
self.commands = dict()
|
|
|
|
self.register_command("botlist",self.on_botlist)
|
2018-12-02 05:57:43 +00:00
|
|
|
self.register_command("roster",self.on_roster)
|
2018-12-07 05:06:10 +00:00
|
|
|
self.register_command("latest",self.on_latest)
|
2019-01-17 16:21:52 +00:00
|
|
|
self.register_command("fortune",self.on_fortune)
|
2018-12-02 02:43:01 +00:00
|
|
|
self.register_command("admin",self.on_admin,True)
|
|
|
|
def register_command(self,name,action,is_admin=False):
|
|
|
|
self.commands[name] = dict(action=action,is_admin=is_admin)
|
2018-12-02 01:18:39 +00:00
|
|
|
def on_connection_established(self):
|
|
|
|
self.tasks.run()
|
|
|
|
def check_rss(self,state,base_state):
|
2018-12-07 21:52:16 +00:00
|
|
|
newtrans = rss.fetchNew(state["url"],[x["guid"] for x in state["known"]])
|
2018-12-02 01:18:39 +00:00
|
|
|
if newtrans:
|
2018-12-07 21:52:16 +00:00
|
|
|
state["known"].extend(newpost(x) for x in newtrans)
|
2018-12-02 01:18:39 +00:00
|
|
|
for trans in newtrans:
|
2018-12-03 01:01:06 +00:00
|
|
|
self.say(state["channel"],"Transmission received: {transmission.title} ({transmission.link})".format(transmission=trans))
|
2018-12-02 01:18:39 +00:00
|
|
|
time.sleep(1)
|
|
|
|
return state
|
|
|
|
def on_pubmsg(self,channel,nick,text):
|
2018-12-02 02:43:01 +00:00
|
|
|
if not text.startswith(self.prefix):
|
|
|
|
return
|
|
|
|
args = text[len(self.prefix):].strip().split()
|
|
|
|
cmd = args.pop(0)
|
2018-12-07 21:52:16 +00:00
|
|
|
is_admin = self.event.source.userhost == bot_op
|
2018-12-02 02:43:01 +00:00
|
|
|
if cmd in self.commands:
|
|
|
|
if (not self.commands[cmd]["is_admin"]) or (self.commands[cmd]["is_admin"] and is_admin):
|
|
|
|
try:
|
|
|
|
self.commands[cmd]["action"](channel,nick,*args)
|
2018-12-02 06:01:17 +00:00
|
|
|
except Exception as e:
|
|
|
|
print("access violation "+str(e))
|
2018-12-02 02:43:01 +00:00
|
|
|
def on_botlist(self,channel,nick):
|
|
|
|
self.say(channel,nick+": Maintainer: khuxkm@cosmic.voyage | Utility bot")
|
|
|
|
def on_admin(self,channel,nick,subcmd,*args):
|
|
|
|
if subcmd=="down":
|
2018-12-02 01:18:39 +00:00
|
|
|
self.tasks.stop()
|
|
|
|
self.tasks.save_state(0)
|
|
|
|
self._bot.die("Stopping...")
|
2018-12-02 06:01:17 +00:00
|
|
|
sys.exit(0)
|
2018-12-02 02:43:01 +00:00
|
|
|
elif subcmd=="check":
|
|
|
|
self.tasks.states[0] = self.check_rss(self.tasks.states[0],self.tasks.base_state)
|
2018-12-07 05:23:03 +00:00
|
|
|
def on_roster(self,channel,nick,*namecnt):
|
|
|
|
output = subprocess.check_output(["/usr/local/bin/roster",' '.join(namecnt)]).decode("ascii").split("\n")
|
2018-12-03 01:05:17 +00:00
|
|
|
output = filter(None,output)
|
2018-12-02 05:57:43 +00:00
|
|
|
for line in output:
|
2018-12-03 19:27:50 +00:00
|
|
|
line = re.sub("\s+"," ",line).split(" ",1)
|
2018-12-03 01:00:23 +00:00
|
|
|
self.say(channel,"{}: {} (by {})".format(nick,line[1],unhighlight_nick(line[0])))
|
2018-12-07 05:06:10 +00:00
|
|
|
def on_latest(self,channel,nick,count=5):
|
|
|
|
count = int(count)
|
2018-12-07 21:52:16 +00:00
|
|
|
if count < 1:
|
|
|
|
count = 1 # ...nice try, smartass
|
2018-12-07 05:06:10 +00:00
|
|
|
if count > 5:
|
|
|
|
count = 5 # don't spam the channel
|
2018-12-07 21:52:16 +00:00
|
|
|
self.say(channel, "{}: Latest {} {}. (See cosmic.voyage for more!)".format(nick, count, (count == 1 and "entry" or "entries")))
|
2018-12-10 22:33:05 +00:00
|
|
|
output = subprocess.check_output(["/usr/local/bin/latest",str(count)]).decode("ascii").split("\n")
|
2018-12-10 20:05:03 +00:00
|
|
|
output = filter(None,output)
|
|
|
|
for line in output:
|
2018-12-10 22:33:05 +00:00
|
|
|
self.say(channel,"{}: {}".format(nick,line))
|
2019-01-17 16:21:52 +00:00
|
|
|
def on_fortune(self,channel,nick):
|
|
|
|
output = subprocess.check_output(["/usr/games/fortune",str(count)]).decode("ascii").split("\n")
|
|
|
|
output = filter(None,output)
|
|
|
|
for line in output:
|
|
|
|
self.say(channel,"{}".format(line))
|
2018-12-02 01:18:39 +00:00
|
|
|
|
|
|
|
if __name__=="__main__":
|
|
|
|
channels = "#cosmic".split()
|
|
|
|
bot = teambot.TeamBot(channels,"cosmicbot","localhost",chandler=CosmicBot)
|
|
|
|
bot.start()
|