#!/usr/bin/env python3 import irc.bot import gitea, github import markovuniverse as mu import tvdb_keys,tvdb_api import os.path import subprocess import json DEBUG = os.path.exists(os.path.expanduser("~/minerbot2.debug")) def reloadDebug(bot): global DEBUG DEBUG = os.path.exists(os.path.expanduser("~/minerbot2.debug")) if not DEBUG: for channel in CHANNELS: if channel not in bot.chanlist: bot.join(channel) bot.chanlist.append(channel) for channel in bot.chanlist: if channel not in CHANNELS: bot.part(channel,"exiting debug mode") bot.chanlist.remove(channel) else: with open(os.path.expanduser("~/minerbot2.debug")) as f: botchannels = [l.rstrip() for l in f if l.rstrip()] for channel in bot.chanlist: if channel not in botchannels: bot.part(channel,"entering debug mode") bot.chanlist.remove(channel) for channel in botchannels: if channel not in bot.chanlist: bot.join(channel) bot.chanlist.append(channel) def log(s): with open(os.path.expanduser("~/minerbot2.log"),"a") as f: f.write(s.rstrip()+"\n") def toot(s,tootjson="/usr/local/bin"): with open(os.path.expanduser("~/minerbot2.log"),"a") as f: return subprocess.call(["/usr/local/bin/toot",s],env={"TOOT_JSON_PATH":tootjson,"LANG":"C.UTF-8","LC_ALL":"C.UTF-8"},stdout=f,stderr=subprocess.STDOUT) gttapi = gitea.GiteaAPI("https://tildegit.org") #ghapi = github.GithubAPI() tvdb = tvdb_api.Tvdb(apikey=tvdb_keys.api_key,cache=False) def pad(l,n): while len(l)".format(e.source.nick,self.prefix)) return if toot(" ".join(p),("/usr/local/bin" if e.target=="#team" else "/usr/local/bin/tildeverse"))==0: c.privmsg(e.target,"{}: Tooted!".format(e.source.nick)) else: c.privmsg(e.target,"{}: Toot failed!".format(e.source.nick)) def suCommand(self,c,e,p): p.pop(0) if not p: p = ["help"] if p[0]=="fake-leak": c.privmsg(e.target,e.source.nick+": {} - {}".format(*mu.new_episode())) elif p[0]=="episode": if p[1]=="name": name = " ".join(p[2:]) eps = tvdb["Steven Universe"].search(name,key="episodeName") if len(eps)>1: c.privmsg(e.target,"{}: can you be a bit more specific, please?".format(e.source.nick)) elif len(eps)==0: c.privmsg(e.target,"{}: I couldn't find the Steven Universe episode \"{}\".".format(e.source.nick,name)) else: ep = eps[0] c.privmsg(e.target,SU_EP_INFO_FORMAT.format(nick=e.source.nick,ep=ep)) elif p[1]=="number": if "x" in p[2]: season, number = [int(x) for x in p[2].split("x")] else: season, number = [int(x) for x in [p[2],p[3]]] ep = tvdb["Steven Universe"][season][number] c.privmsg(e.target,SU_EP_INFO_FORMAT.format(nick=e.source.nick,ep=ep)) elif p[0]=="help": c.privmsg(e.target,"{}: current !su subcommands are: help (shows this message), fake-leak (comes up with a fake leak), episode (get an episode by number (i.e; 01x01) or by name (i.e; Gem Glow))".format(e.source.nick)) def quitCommand(self,c,e,p): if e.source.nick==self.botop: c.quit() raise SystemExit(0) else: c.privmsg(e.target,"You can't tell me what to do!") def reloadCommand(self,c,e,p): if e.source.nick==self.botop: c.quit() raise SystemExit(1) else: c.privmsg(e.target,"You can't tell me what to do!") def adminCommand(self,c,e,p): p.pop(0) # command name if e.source.userhost!=self.botop: # c.privmsg(e.source.nick) return sub = p.pop(0) if sub=="prefix": self.prefix=p[0] elif sub=="block": if len(p)==2: self.block(p[0],p[1]) else: self.block(p[0],"*") elif sub=="unblock": if len(p)==2: self.unblock(p[0],p[1]) else: self.unblock(p[0],"*") elif sub=="global": for channel in self.chanlist: c.privmsg(channel,"[GLOBAL] "+" ".join(p)) elif sub=="debug": reloadDebug(self) log("Reloaded debug mode") def load(self): for attr in self.data: filename = self.data[attr] if os.path.exists(filename): with open(filename) as f: setattr(self,attr,json.load(f)) else: setattr(self,attr,self.default[attr]) def save(self): for attr in self.data: filename = self.data[attr] with open(filename,"w") as f: json.dump(getattr(self,attr),f) def block(self,hostmask,command): self.load() if hostmask in self.blocklist: self.blocklist[hostmask].append(command) else: self.blocklist[hostmask] = [command] self.save() def unblock(self,hostmask,command): self.load() if not self.isBlocked(hostmask,command): return 1 # don't unblock someone if they aren't blocked in the first place if self.globallyBlocked(hostmask) and command != "*": return 2 # no exceptions to global blocks, must unblock globally if len(self.blocklist[hostmask])==1: del self.blocklist[hostmask] else: self.blocklist[hostmask].remove(command) self.save() def isBlocked(self,hostmask,command): if self.globallyBlocked(hostmask): return True # global blocks apply to all commands! return command in self.blocklist.get(hostmask,[]) def globallyBlocked(self,hostmask): self.load() return "*" in self.blocklist.get(hostmask,[]) def gitCommand(self,api,c,e,p,has_mirrors=True): p.pop(0) if not p: return if p[0] in ('issue','pull'): pad(p,4) global issue_data if "/" in p[1]: owner,repo = p[1].split("/") try: issue = int(p[2]) except ValueError: c.privmsg(e.target,"{}: do you think this is some kind of joke?".format(e.source.nick)) return issue_data = api.get_issue_data(owner,repo,issue) else: owner,repo = p[1], p[2] try: issue = int(p[3]) except ValueError: c.privmsg(e.target,"{}: do you think this is some kind of joke?".format(e.source.nick)) return issue_data = api.get_issue_data(owner,repo,issue) if issue_data.status_code!=200: if has_mirrors: repo_data = api.get_repo_data(owner,repo) if repo_data.json()['mirror']: c.privmsg(e.target,"The targeted repo is a mirror.") return c.privmsg(e.target,"An error occurred.") try: issue_data.raise_for_status() except Exception as e: log(e) else: issue_data = issue_data.json() descriptor = "Issue" if 'pull_request' in issue_data and issue_data['pull_request'] is not None: descriptor = "Pull request" description = descriptor + " #"+str(issue_data['number']) description += ": \"{}\"".format(issue_data['title']) author = issue_data['user'] description += " by {}".format(author.get('full_name') if author.get('full_name') else author['login']) if 'html_url' in issue_data: description += ": {}".format(issue_data["html_url"]) else: description += ": {}/{}/{}/{}/{}".format(api.base_url,owner,repo,descriptor.split()[0].lower()+"s",issue_data['number']) c.privmsg(e.target,description) elif p[0]=="link": pad(p,3) if "/" in p[1]: user, repo = p[1].split("/",1) else: user, repo = p[1], p[2] repo_data = api.get_repo_data(user,repo) if repo_data.status_code!=200: c.privmsg(e.target,"An error occcurred. (Response code = {!s})".format(repo_data.status_code)) return repo_data = repo_data.json() c.privmsg(e.target,"{nick}: {r[full_name]} - {r[description]}; {r[html_url]}".format(nick=e.source.nick,r=repo_data)) def giteaCommand(self,c,e,p): self.gitCommand(gttapi,c,e,p) # def githubCommand(self,c,e,p): # self.gitCommand(ghapi,c,e,p,False) def register(self,cmd,func): self.command_handlers[cmd]=func def registerData(self,attr,filename,default): self.data[attr] = filename self.default[attr] = default def on_welcome(self, c, e): c.mode(self.bot_nick,"+B") for channel in self.chanlist: c.join(channel) def on_pubmsg(self, c, e): self.process_command(c, e, e.arguments[0]) def on_privmsg(self, c, e): self.process_command(c, e, e.arguments[0]) def process_command(self, c, e, text): if not text.startswith(self.prefix): if text.startswith("!botlist") or text.startswith("!rollcall"): self.botlist(c,e,text[len(self.prefix):].split(" ")) return # only deal with prefixed messages parts = text[len(self.prefix):].split(" ") if not parts[0] in self.command_handlers: return # not our command if self.isBlocked(e.source.userhost,parts[0]): return # blocked user! self.command_handlers[parts[0]](c,e,parts) CHANNELS = [ '#team', '#meta', '#quotes', '#journal', '#sudoers', '#tildeverse', '#stevenuniverse', '#suwp', '#bots', '#twtxt', '#tildelinux' ] if __name__ == '__main__': if DEBUG: with open(os.path.expanduser("~/minerbot2.debug")) as f: botchannels = [l.rstrip() for l in f if l.rstrip()] else: botchannels = CHANNELS bot = TVBot(botchannels, 'minerbot2', 'localhost') bot.start()