320 lines
9.8 KiB
Python
320 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
import irc.bot
|
|
|
|
import gitea, github
|
|
import markovuniverse as mu
|
|
import tvdb_keys,tvdb_api
|
|
import os.path
|
|
import subprocess
|
|
import json
|
|
|
|
DEBUG = os.path.exists(os.path.expanduser("~/minerbot2.debug"))
|
|
|
|
def reloadDebug(bot):
|
|
global DEBUG
|
|
DEBUG = os.path.exists(os.path.expanduser("~/minerbot2.debug"))
|
|
if not DEBUG:
|
|
for channel in CHANNELS:
|
|
if channel not in bot.chanlist:
|
|
bot.join(channel)
|
|
bot.chanlist.append(channel)
|
|
for channel in bot.chanlist:
|
|
if channel not in CHANNELS:
|
|
bot.part(channel,"exiting debug mode")
|
|
bot.chanlist.remove(channel)
|
|
else:
|
|
with open(os.path.expanduser("~/minerbot2.debug")) as f:
|
|
botchannels = [l.rstrip() for l in f if l.rstrip()]
|
|
for channel in bot.chanlist:
|
|
if channel not in botchannels:
|
|
bot.part(channel,"entering debug mode")
|
|
bot.chanlist.remove(channel)
|
|
for channel in botchannels:
|
|
if channel not in bot.chanlist:
|
|
bot.join(channel)
|
|
bot.chanlist.append(channel)
|
|
|
|
def log(s):
|
|
with open(os.path.expanduser("~/minerbot2.log"),"a") as f:
|
|
f.write(s.rstrip()+"\n")
|
|
|
|
def toot(s,tootjson="/usr/local/bin"):
|
|
with open(os.path.expanduser("~/minerbot2.log"),"a") as f:
|
|
return subprocess.call(["/usr/local/bin/toot",s],env={"TOOT_JSON_PATH":tootjson,"LANG":"C.UTF-8","LC_ALL":"C.UTF-8"},stdout=f,stderr=subprocess.STDOUT)
|
|
|
|
gttapi = gitea.GiteaAPI("https://tildegit.org")
|
|
#ghapi = github.GithubAPI()
|
|
tvdb = tvdb_api.Tvdb(apikey=tvdb_keys.api_key,cache=False)
|
|
|
|
def pad(l,n):
|
|
while len(l)<n:
|
|
l.append("")
|
|
|
|
SU_EP_INFO_FORMAT = "{nick}: Season {ep[airedSeason]}, Episode {ep[airedEpisodeNumber]}: {ep[episodeName]} - \"{ep[overview]}\"; aired {ep[firstAired]}"
|
|
|
|
class TVBot(irc.bot.SingleServerIRCBot):
|
|
def __init__(self, channels, nickname, server, port=6667, prefix="!", operator="khuxkm@oper.tilde.chat"):
|
|
irc.bot.SingleServerIRCBot.__init__(self, [(server, port)], nickname, nickname)
|
|
self.chanlist = channels
|
|
self.bot_nick = nickname
|
|
self.prefix = "!"
|
|
self.botop = operator
|
|
self.command_handlers = {}
|
|
self.data = {}
|
|
self.default = {}
|
|
self.register("admin",self.adminCommand)
|
|
self.register("botlist",self.botlist)
|
|
self.register("rollcall",self.botlist)
|
|
self.register("gitea",self.giteaCommand)
|
|
# self.register("github",self.githubCommand)
|
|
# self.register("quit",self.quitCommand)
|
|
# self.register("reload",self.reloadCommand)
|
|
self.register("su",self.suCommand)
|
|
self.register("toot",self.tootCommand)
|
|
self.registerData("blocklist","blocklist.json",{})
|
|
|
|
def botlist(self,c,e,p):
|
|
c.privmsg(e.target,"Maintainer: khuxkm@tilde.team; Bot that does things; Commands: !gitea, !su, !toot".replace("!",self.prefix))
|
|
|
|
def tootCommand(self,c,e,p):
|
|
p.pop(0)
|
|
if not p:
|
|
c.privmsg(e.target,"{}: Usage: {}toot <text>".format(e.source.nick,self.prefix))
|
|
return
|
|
if toot(" ".join(p),("/usr/local/bin" if e.target=="#team" else "/usr/local/bin/tildeverse"))==0:
|
|
c.privmsg(e.target,"{}: Tooted!".format(e.source.nick))
|
|
else:
|
|
c.privmsg(e.target,"{}: Toot failed!".format(e.source.nick))
|
|
|
|
def suCommand(self,c,e,p):
|
|
p.pop(0)
|
|
if not p:
|
|
p = ["help"]
|
|
if p[0]=="fake-leak":
|
|
c.privmsg(e.target,e.source.nick+": {} - {}".format(*mu.new_episode()))
|
|
elif p[0]=="episode":
|
|
if p[1]=="name":
|
|
name = " ".join(p[2:])
|
|
eps = tvdb["Steven Universe"].search(name,key="episodeName")
|
|
if len(eps)>1:
|
|
c.privmsg(e.target,"{}: can you be a bit more specific, please?".format(e.source.nick))
|
|
elif len(eps)==0:
|
|
c.privmsg(e.target,"{}: I couldn't find the Steven Universe episode \"{}\".".format(e.source.nick,name))
|
|
else:
|
|
ep = eps[0]
|
|
c.privmsg(e.target,SU_EP_INFO_FORMAT.format(nick=e.source.nick,ep=ep))
|
|
elif p[1]=="number":
|
|
if "x" in p[2]:
|
|
season, number = [int(x) for x in p[2].split("x")]
|
|
else:
|
|
season, number = [int(x) for x in [p[2],p[3]]]
|
|
ep = tvdb["Steven Universe"][season][number]
|
|
c.privmsg(e.target,SU_EP_INFO_FORMAT.format(nick=e.source.nick,ep=ep))
|
|
elif p[0]=="help":
|
|
c.privmsg(e.target,"{}: current !su subcommands are: help (shows this message), fake-leak (comes up with a fake leak), episode <number/name> (get an episode by number (i.e; 01x01) or by name (i.e; Gem Glow))".format(e.source.nick))
|
|
|
|
def quitCommand(self,c,e,p):
|
|
if e.source.nick==self.botop:
|
|
c.quit()
|
|
raise SystemExit(0)
|
|
else:
|
|
c.privmsg(e.target,"You can't tell me what to do!")
|
|
|
|
def reloadCommand(self,c,e,p):
|
|
if e.source.nick==self.botop:
|
|
c.quit()
|
|
raise SystemExit(1)
|
|
else:
|
|
c.privmsg(e.target,"You can't tell me what to do!")
|
|
|
|
def adminCommand(self,c,e,p):
|
|
p.pop(0) # command name
|
|
if e.source.userhost!=self.botop:
|
|
# c.privmsg(e.source.nick)
|
|
return
|
|
sub = p.pop(0)
|
|
if sub=="prefix":
|
|
self.prefix=p[0]
|
|
elif sub=="block":
|
|
if len(p)==2:
|
|
self.block(p[0],p[1])
|
|
else:
|
|
self.block(p[0],"*")
|
|
elif sub=="unblock":
|
|
if len(p)==2:
|
|
self.unblock(p[0],p[1])
|
|
else:
|
|
self.unblock(p[0],"*")
|
|
elif sub=="global":
|
|
for channel in self.chanlist:
|
|
c.privmsg(channel,"[GLOBAL] "+" ".join(p))
|
|
elif sub=="debug":
|
|
reloadDebug(self)
|
|
log("Reloaded debug mode")
|
|
|
|
def load(self):
|
|
for attr in self.data:
|
|
filename = self.data[attr]
|
|
if os.path.exists(filename):
|
|
with open(filename) as f:
|
|
setattr(self,attr,json.load(f))
|
|
else:
|
|
setattr(self,attr,self.default[attr])
|
|
|
|
def save(self):
|
|
for attr in self.data:
|
|
filename = self.data[attr]
|
|
with open(filename,"w") as f:
|
|
json.dump(getattr(self,attr),f)
|
|
|
|
def block(self,hostmask,command):
|
|
self.load()
|
|
if hostmask in self.blocklist:
|
|
self.blocklist[hostmask].append(command)
|
|
else:
|
|
self.blocklist[hostmask] = [command]
|
|
self.save()
|
|
|
|
def unblock(self,hostmask,command):
|
|
self.load()
|
|
if not self.isBlocked(hostmask,command):
|
|
return 1 # don't unblock someone if they aren't blocked in the first place
|
|
if self.globallyBlocked(hostmask) and command != "*":
|
|
return 2 # no exceptions to global blocks, must unblock globally
|
|
if len(self.blocklist[hostmask])==1:
|
|
del self.blocklist[hostmask]
|
|
else:
|
|
self.blocklist[hostmask].remove(command)
|
|
self.save()
|
|
|
|
def isBlocked(self,hostmask,command):
|
|
if self.globallyBlocked(hostmask):
|
|
return True # global blocks apply to all commands!
|
|
return command in self.blocklist.get(hostmask,[])
|
|
|
|
def globallyBlocked(self,hostmask):
|
|
self.load()
|
|
return "*" in self.blocklist.get(hostmask,[])
|
|
|
|
def gitCommand(self,api,c,e,p,has_mirrors=True):
|
|
p.pop(0)
|
|
if not p:
|
|
return
|
|
if p[0] in ('issue','pull'):
|
|
pad(p,4)
|
|
global issue_data
|
|
if "/" in p[1]:
|
|
owner,repo = p[1].split("/")
|
|
try:
|
|
issue = int(p[2])
|
|
except ValueError:
|
|
c.privmsg(e.target,"{}: do you think this is some kind of joke?".format(e.source.nick))
|
|
return
|
|
issue_data = api.get_issue_data(owner,repo,issue)
|
|
else:
|
|
owner,repo = p[1], p[2]
|
|
try:
|
|
issue = int(p[3])
|
|
except ValueError:
|
|
c.privmsg(e.target,"{}: do you think this is some kind of joke?".format(e.source.nick))
|
|
return
|
|
issue_data = api.get_issue_data(owner,repo,issue)
|
|
if issue_data.status_code!=200:
|
|
if has_mirrors:
|
|
repo_data = api.get_repo_data(owner,repo)
|
|
if repo_data.json()['mirror']:
|
|
c.privmsg(e.target,"The targeted repo is a mirror.")
|
|
return
|
|
c.privmsg(e.target,"An error occurred.")
|
|
try:
|
|
issue_data.raise_for_status()
|
|
except Exception as e:
|
|
log(e)
|
|
else:
|
|
issue_data = issue_data.json()
|
|
descriptor = "Issue"
|
|
if 'pull_request' in issue_data and issue_data['pull_request'] is not None:
|
|
descriptor = "Pull request"
|
|
description = descriptor + " #"+str(issue_data['number'])
|
|
description += ": \"{}\"".format(issue_data['title'])
|
|
author = issue_data['user']
|
|
description += " by {}".format(author.get('full_name') if author.get('full_name') else author['login'])
|
|
if 'html_url' in issue_data:
|
|
description += ": {}".format(issue_data["html_url"])
|
|
else:
|
|
description += ": {}/{}/{}/{}/{}".format(api.base_url,owner,repo,descriptor.split()[0].lower()+"s",issue_data['number'])
|
|
c.privmsg(e.target,description)
|
|
elif p[0]=="link":
|
|
pad(p,3)
|
|
if "/" in p[1]:
|
|
user, repo = p[1].split("/",1)
|
|
else:
|
|
user, repo = p[1], p[2]
|
|
repo_data = api.get_repo_data(user,repo)
|
|
if repo_data.status_code!=200:
|
|
c.privmsg(e.target,"An error occcurred. (Response code = {!s})".format(repo_data.status_code))
|
|
return
|
|
repo_data = repo_data.json()
|
|
c.privmsg(e.target,"{nick}: {r[full_name]} - {r[description]}; {r[html_url]}".format(nick=e.source.nick,r=repo_data))
|
|
|
|
def giteaCommand(self,c,e,p):
|
|
self.gitCommand(gttapi,c,e,p)
|
|
|
|
# def githubCommand(self,c,e,p):
|
|
# self.gitCommand(ghapi,c,e,p,False)
|
|
|
|
def register(self,cmd,func):
|
|
self.command_handlers[cmd]=func
|
|
|
|
def registerData(self,attr,filename,default):
|
|
self.data[attr] = filename
|
|
self.default[attr] = default
|
|
|
|
def on_welcome(self, c, e):
|
|
c.mode(self.bot_nick,"+B")
|
|
for channel in self.chanlist:
|
|
c.join(channel)
|
|
|
|
def on_pubmsg(self, c, e):
|
|
self.process_command(c, e, e.arguments[0])
|
|
|
|
def on_privmsg(self, c, e):
|
|
self.process_command(c, e, e.arguments[0])
|
|
|
|
def process_command(self, c, e, text):
|
|
if not text.startswith(self.prefix):
|
|
if text.startswith("!botlist") or text.startswith("!rollcall"):
|
|
self.botlist(c,e,text[len(self.prefix):].split(" "))
|
|
return # only deal with prefixed messages
|
|
parts = text[len(self.prefix):].split(" ")
|
|
if not parts[0] in self.command_handlers:
|
|
return # not our command
|
|
if self.isBlocked(e.source.userhost,parts[0]):
|
|
return # blocked user!
|
|
self.command_handlers[parts[0]](c,e,parts)
|
|
|
|
CHANNELS = [
|
|
'#team',
|
|
'#meta',
|
|
'#quotes',
|
|
'#journal',
|
|
'#sudoers',
|
|
'#tildeverse',
|
|
'#stevenuniverse',
|
|
'#suwp',
|
|
'#bots',
|
|
'#twtxt',
|
|
'#tildelinux'
|
|
]
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if DEBUG:
|
|
with open(os.path.expanduser("~/minerbot2.debug")) as f:
|
|
botchannels = [l.rstrip() for l in f if l.rstrip()]
|
|
else:
|
|
botchannels = CHANNELS
|
|
bot = TVBot(botchannels, 'minerbot2', 'localhost')
|
|
bot.start()
|
|
|