Bring the repo up to date

This commit is contained in:
Robert Miles 2021-05-13 18:32:14 +00:00
parent ae5c54a4f5
commit b1d9d42c71
27 changed files with 848 additions and 27 deletions

28
bot.py
View File

@ -17,8 +17,24 @@ class Socket:
if lines[-1]:
self._read_buffer=lines[-1]
lines.pop(-1)
lines = [line.decode("utf-8") for line in lines]
return lines
out = []
for line in lines:
# try to decode as UTF-8, then CP1252, then ISO-8859-1
# if none of them work then just ignore the line
try:
line = line.decode("utf-8")
out.append(line)
except:
try:
line = line.decode("cp1252")
out.append(line)
except:
try:
line = line.decode("iso-8859-1")
out.append(line)
except:
continue
return out
def send(self,line):
self.sock.send(line.encode("utf-8"))
def close(self):
@ -67,10 +83,11 @@ class IRCLine:
if parts[0].startswith("@"):
taglist = parts.pop(0)[1:].split(";")
for tag in taglist:
if "=" in tag:
if "=" in tag and not tag.endswith("="):
key, value = tag.split("=",1)
tags[key]=unescape(value)
else:
tag = tag.rstrip("=")
tags[tag]=MISSING
hostmask=None
if parts[0].startswith(":"):
@ -93,6 +110,7 @@ class IRCBot:
self.server=server
self.channels=channels
self.event_manager=events.EventManager()
self.in_batch=False
def load_modules(self):
self.event_manager.clear()
for name in os.listdir("plugins"):
@ -127,6 +145,8 @@ class IRCBot:
line.command="PONG"
self.socket.send(line.line)
return
if line.command=="BATCH":
self.in_batch = line.params[0].startswith("+")
if line.hostmask is None: return
if line.hostmask.nick==self.nickname:
return
@ -160,6 +180,6 @@ class IRCBot:
del self.socket
if __name__=="__main__":
bot = IRCBot("minerbot2","minerbot2",channels=["#khuxkm"])
bot = IRCBot("minerbot2","minerbot2",channels=["#khuxkm hmmst","#counting","#counting-meta","#meta","#bots","#team"])
bot.load_modules()
bot.start()

View File

@ -18,5 +18,6 @@ class EventManager:
self.handlers[event].append(func)
def __call__(self,event_obj):
print(event_obj.name,event_obj.data)
if event_obj.name not in self.handlers: return
handlers = self.handlers[event_obj.name]
for handler in handlers: handler(event_obj)

34
listdata.py Normal file
View File

@ -0,0 +1,34 @@
import plugin, functools
def wrapper(data,func):
@functools.wraps(func)
def __wrapper(*args,**kwargs):
data.load(data.filename)
ret = func(*args,**kwargs)
data.save(data.filename)
return ret
return __wrapper
class ListData(plugin.JSONData):
def __init__(self,filename,*args):
plugin.JSONData.__init__(self,list(args))
self.filename = filename
self.load(self.filename)
def __getitem__(self,k):
self.load(self.filename)
return self.value[k]
def __setitem__(self,k,v):
self.value[k]=v
self.save(self.filename)
def __getattr__(self,attr):
if attr in self.__dict__: return self.__dict__[attr]
return wrapper(self,getattr(self.value,attr))
def __len__(self):
self.load(self.filename)
return len(self.value)
def __iter__(self):
self.load(self.filename)
return iter(self.value)
def __contains__(self):
self.load(self.filename)
return k in self.value

View File

@ -12,7 +12,7 @@ class CommandGroup:
self.subcmds[name]=f
return f
return _register_subcmd
def __call__(self,bot,channel,nick,subcmd,*args):
def __call__(self,bot,channel,nick,subcmd="",*args):
print("Calling base")
if self.base(bot,channel,nick,subcmd,*args):
return
@ -20,7 +20,8 @@ class CommandGroup:
if subcmd in self.subcmds:
return self.subcmds[subcmd](bot,channel,nick,subcmd,*args)
else:
return self.subcmds[self.default](bot,channel,nick,subcmd,*args)
# return self.subcmds[self.default](bot,channel,nick,subcmd,*args)
return
class Data:
"""A class for plugin data."""

View File

@ -1,12 +1,14 @@
import importlib, events
importlib.reload(events)
from events import Event
from bot import IRCLine
ADMIN_HOSTMASKS = [x+"!khuxkm@sudoers.tilde.team" for x in "khuxkm khuxkm|lounge".split()]
ADMIN_HOSTMASKS = [x+"!khuxkm@fuckup.club" for x in "khuxkm".split()]
BOT = None
def admin(event):
if BOT is None: return
if event.hostmask not in ADMIN_HOSTMASKS:
BOT.socket.send(IRCLine("PRIVMSG","#meta","You're not the boss of me! (hostmask {!s})".format(event.hostmask)).line)
BOT.socket.send(IRCLine("PRIVMSG",event.target if event.target.startswith("#") else event.hostmask.nick,"You're not the boss of me! (hostmask {!s})".format(event.hostmask)).line)
return
if len(event.parts)==0: return
if event.parts[0]=="reload":
@ -34,6 +36,7 @@ try:
with open(".password") as f: PASSWORD=f.read().strip()
except: pass
def login(event):
BOT.socket.send(IRCLine("CAP","REQ","batch").line)
BOT.socket.send(IRCLine("NS","IDENTIFY",PASSWORD).line)
def register(bot):

75
plugins/autowater.py Normal file
View File

@ -0,0 +1,75 @@
from bot import IRCLine
BOT=None
def say(target,message):
if not BOT: return
BOT.socket.send(IRCLine("PRIVMSG",target,":"+message))
import plugin, time, os, json
from dictdata import DictData
watered = DictData("autowater.json")
def water(user):
try:
with open(os.path.expanduser("~{}/.botany/visitors.json".format(user))) as f:
visitors = json.load(f)
visitors.append(dict(timestamp=int(time.time()),user="minerbot"))
with open(os.path.expanduser("~{}/.botany/visitors.json".format(user)),"w") as f:
json.dump(visitors,f,indent=2)
except:
# user doesn't exist, remove
watered[user]=(2**64)-1
@plugin.group("autowater","<add/remove>")
def autowater(bot,channel,nick,*args):
if args[0] not in "add remove".split():
say(channel,"{}: Usage: {}autowater <add/remove>".format(nick,BOT.prefix))
return True
return False
@autowater.command("help","")
def autowater_help(bot,channel,nick,*args):
say(channel,"{}: Usage: {}autowater <add/remove>".format(nick,BOT.prefix))
@autowater.command("add","")
def autowater_add(bot,channel,userhost,*args):
user = userhost.user.strip("~")
nick = userhost.nick
if user in watered:
say(channel,"{}: I'm already watching your plant. Did you mean to remove yourself? (!autowater remove)".format(nick))
else:
say(channel,"{}: I'll watch your plant for you!".format(nick))
watered[user]=0 # force a plant watering on the next message
@autowater.command("remove","")
def autowater_remove(bot,channel,userhost,*args):
user = userhost.user.strip("~")
nick = userhost.nick
if user not in watered:
say(channel,"{}: I'm not watching your plant. Did you mean to add yourself? (!autowater add)".format(nick))
else:
say(channel,"{}: Welcome back! I'll stop watching your plant since you're here.".format(nick))
del watered.value[user]
watered.save(watered.filename)
BOTANY_PLANT_WATER_TRIGGER = (1 * (24 * 3600)) # 5 days for a botany plant to die, so water every day
def autowater_listen(event):
for user in watered.value.keys():
if (int(time.time())-watered[user])>=BOTANY_PLANT_WATER_TRIGGER:
print("watering user {}".format(user))
water(user)
watered[user]=int(time.time())
def on_cmd_autowater(event):
try:
autowater(BOT,event.target if event.target.startswith("#") else event.hostmask.nick,event.hostmask,*event.parts)
except:
pass
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("command_autowater",on_cmd_autowater)
bot.event_manager.on("privmsg",autowater_listen)

View File

@ -9,10 +9,11 @@ def on_botlist(event):
if handler.startswith("command_"):
command = handler[len("command_"):]
if command not in "admin botlist".split(): commands.append(command)
#print(commands)
print(commands)
BOT.socket.send(IRCLine("PRIVMSG",event.target,":{}: I'm minerbot2, rewritten again! Commands include {}".format(event.hostmask.nick,", ".join(["!"+x for x in commands]))))
def on_privmsg(event):
if BOT.in_batch: return
if BOT and BOT.prefix=="!": return
if event.message in ("!botlist", "!rollcall"):
ev = Event("command_botlist",parts=[])

View File

@ -26,6 +26,8 @@ def choose(event):
s+=choice
else:
choices.append(choice)
if not choices:
say(event.target if event.target.startswith("#") else event.hostmask.nick,("{}: ".format(event.hostmask.nick) if event.target.startswith("#") else " ".strip())+f"Usage: {BOT.prefix}choose <choices>; use double quotes if a choice has spaces")
if sorted([x.lower() for x in choices])==list("dl"):
choice = "l" if "l" in choices else "L"
else:

View File

@ -10,6 +10,13 @@ def on_privmsg(event):
event_out = events.Event("command_"+parts.pop(0),parts=parts)
event_out.data.update(event.data)
BOT.event_manager(event_out)
secondary_prefix = BOT.nickname+": "
if event.message.startswith(secondary_prefix):
parts = event.message.split(" ")
parts.pop(0) # get rid of mention
event_out = events.Event("command_"+parts.pop(0),parts=parts)
event_out.data.update(event.data)
BOT.event_manager(event_out)
def register(bot):
global BOT

View File

@ -16,9 +16,9 @@ current_number = 7 if "current_num" not in milestones else milestones["current_n
last_poster = " " if "last_poster" not in milestones else milestones["last_poster"]
@plugin.group("milestone")
def milestone(bot,channel,nick,subcmd,*args):
if subcmd not in "add check dedupe list manual dist now next":
say(channel,"{}: Usage: {}milestone <add/check/dedupe/list> [args]".format(nick,bot.prefix))
def milestone(bot,channel,nick,*args):
if len(args)<1 or args[0] not in "add check dedupe list manual dist now next":
say(channel,"{}: Usage: {}milestone <add/check/dedupe/list/manual/dist/now/next> [args]".format(nick,bot.prefix))
return True
@milestone.command("dedupe")
@ -80,6 +80,7 @@ def on_milestone(event):
milestone(BOT,event.target,event.hostmask.nick,*event.parts)
def listen_counting(event):
if BOT.in_batch: return None
if not BOT: return None
bot = BOT
channel = event.target

44
plugins/fuckup.py Normal file
View File

@ -0,0 +1,44 @@
"""Fuckup counter - like MumboJumbo's spoon counter but I call them fuckups."""
from random import choice
from dictdata import DictData
from bot import IRCLine
BOT = None
def respond(event,msg,prefix_msg=True):
target = event.target
prefix = ""
if event.target.startswith("#"):
prefix = event.hostmask.nick+": "
else:
target = event.hostmask.nick
if not prefix_msg: prefix=""
BOT.socket.send(IRCLine("PRIVMSG",target,":"+prefix+msg))
fuckups = DictData("fuckups.json",v=["using badger commands in #meta because I thought it was #bots","forgetting how xfnw's bots in #chaos work","sending admin error messages to #meta unconditionally even though realistically it should be wherever the command was ran"])
fuckup_list_masters=["khuxkm!khuxkm@fuckup.club","xfnw!xfnw@foxy.noises"]
def on_fuckups(event):
if event.parts:
if event.parts[0]=="add":
if event.hostmask not in fuckup_list_masters:
respond(event,"You're not allowed to add fuckups to the fuckup counter. (bug k\x02\x03\x02huxkm to add you to the list if you need to be)")
return
fuckups.load(fuckups.filename)
fuckups.value["v"].append(" ".join(event.parts[1:]))
fuckups.save(fuckups.filename)
respond(event,"Added!")
if event.parts[0]=="random":
fuckup = random.choice(list(fuckups))
num = fuckups.index(fuckup)+1
respond(event,f"Fuckup {num!s}: {fuckup}")
return
respond(event,"k\x02\x03\x02huxkm has recorded {} fuckups".format(len(fuckups["v"])))
def privmsg(event): fuckups.save(fuckups.filename)
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("command_fuckups",on_fuckups)
bot.event_manager.on("privmsg",privmsg)

View File

@ -1,4 +1,4 @@
import requests
import requests, json
from urllib.parse import urlencode
from bot import IRCLine
BOT = None
@ -25,7 +25,7 @@ def on_get_stock(event):
if len(event.parts)!=1:
respond(event,"You can only request 1 symbol at a time.")
return
symbol = event.parts[0]
symbol = event.parts[0].upper()
r = requests.get("https://finnhub.io/api/v1/quote?"+urlencode(dict(symbol=symbol,token=TOKEN)))
if r.status_code!=200:
respond(event,"The finnhub API returned a status code of "+str(r.status_code)+".")
@ -35,9 +35,16 @@ def on_get_stock(event):
except:
respond(event,r.text)
return
previous_close, current_value = res["pc"], res["c"]
delta = "{:+0.2%}".format(get_delta(previous_close,current_value))
respond(event,f"{symbol} is currently valued at {current_value} ({delta} from previous close)")
if "pc" not in res:
respond(event,f"No such stock ticker {symbol}!")
return
try:
previous_close, current_value = res["pc"], res["c"]
delta = "{:+0.2%}".format(get_delta(previous_close,current_value))
respond(event,f"{symbol} is currently valued at {current_value} ({delta} from previous close)")
except:
respond(event,"Error calculating delta!")
print(json.dumps(res,indent=2))
def register(bot):
global BOT

21
plugins/jones.py Normal file
View File

@ -0,0 +1,21 @@
from bot import IRCLine
import random
BOT=None
ITEMS = ["frogs","water","chemicals"]
SENTENCES = ["They're putting {} in the {} to turn the {} gay!","They're putting {} in the {} to turn the {} into {}!"]
def jones(event):
if event.message!="!jones": return
if BOT is None: return
parts = [x for x in ITEMS]
sentence = random.choice(SENTENCES)
if "into" in sentence:
parts.append("gays")
random.shuffle(parts)
BOT.socket.send(IRCLine("PRIVMSG",(event.target if event.target.startswith("#") else event.hostmask.nick),sentence.format(*parts)))
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("privmsg",jones)

38
plugins/kdeify.py Normal file
View File

@ -0,0 +1,38 @@
import string
from bot import IRCLine
BOT = None
def respond(event,msg,prefix=True):
if not BOT: return
is_channel = event.target.startswith("#")
if prefix:
if is_channel:
prefix = f"{event.hostmask.nick}: "
else:
prefix = ""
else:
prefix = ""
target = event.target if is_channel else event.hostmask.nick
BOT.socket.send(IRCLine("PRIVMSG",[target,":"+prefix+msg]))
DELETE_PUNC = str.maketrans(string.punctuation,"\x01"*len(string.punctuation))
OVERRIDES = {"email":"kmail"}
def kdeify(s):
s = s.translate(DELETE_PUNC).replace("\x01","").lower()
if s in OVERRIDES: return OVERRIDES[s]
if s[0]=="c": return "k"+s[1:]
else: return "k"+s
def on_command_kdeify(event):
if not BOT: return None
if len(event.parts)!=1:
respond(event,f"Usage: {BOT.prefix}kdeify <word>")
return
respond(event,"You mean \"{}\"?".format(kdeify(event.parts[0])))
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("command_kdeify",on_command_kdeify)

View File

@ -23,7 +23,10 @@ def fixUp(text):
text = text.replace("/","~s") # escape question marks
return text
def memegen(bot,channel,nick,template,*msg):
def memegen(bot,channel,nick,template="help",*msg):
if template=="help":
say(channel,f"{nick}: Usage: {bot.prefix}memegen <template> [top text |] <bottom text>")
return
if nick=="jan6": return
if not msg: return
msg = " ".join(msg)
@ -34,16 +37,9 @@ def memegen(bot,channel,nick,template,*msg):
top = fixUp(top)
bottom = fixUp(bottom)
if "://" in template: # URL
url = "https://memegen.link/custom/{}/{}.jpg?alt={}".format(top,bottom,template)
url = "https://api.memegen.link/images/custom/{}/{}.png?background={}".format(top,bottom,template)
else:
try:
r = requests.get("https://memegen.link/{}/{}/{}".format(template,top,bottom))
r.raise_for_status()
r = r.json()
url = r["direct"]["masked"]
except requests.exceptions.HTTPError:
say(channel,"ACCESS VIOLATION: Cannot find meme format {}.".format(template))
return
url = "https://api.memegen.link/images/{}/{}/{}.png".format(template,top,bottom)
url = shorten(url)
say(channel,"{}: {}".format(nick,url))

40
plugins/pronouns.py Normal file
View File

@ -0,0 +1,40 @@
from bot import IRCLine
BOT = None
def respond(ev,msg):
target = ev.target if ev.target.startswith("#") else ev.hostmask.nick
prefix = ev.hostmask.nick+": " if ev.target.startswith("#") else ""
BOT.socket.send(IRCLine("PRIVMSG",target,prefix+msg))
from dictdata import DictData
pronouns = DictData("pronouns.json")
assocd = DictData("assoc_pronouns.json")
def on_cmd_pronouns(event):
if not BOT: return
nick = event.hostmask.nick
account = event.tags.get("account",nick)
if len(event.parts)==1:
if event.parts[0].lower() in "help set".split():
respond(event,f"Usage: {BOT.prefix}pronouns [nick] or {BOT.prefix}pronouns set <pronouns>")
return
nick = event.parts[0].lower()
account = assocd.get(nick,nick)
elif len(event.parts)>1:
if event.parts[0].lower()=="set":
pronouns[account]=" ".join(event.parts[1:])
pn = pronouns.get(account,"not set")
respond(event,f"Pronouns for {nick}: {pn}")
def assoc(event):
try:
event = event.parsed
assocd[event.hostmask.nick]=event.tags.get("account",event.hostmask.nick)
except: pass
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("command_pronouns",on_cmd_pronouns)
bot.event_manager.on("raw_line",assoc)

99
plugins/radio.py Normal file
View File

@ -0,0 +1,99 @@
import requests, traceback, time, subprocess
from bot import IRCLine
BOT = None
URL = "http://radio.tildeverse.org:8000/status-json.xsl"
AZURA = "https://radio.tildeverse.org/api/nowplaying/1"
cround = lambda x: x if type(x)!=float else round(x,2)
cdivmod = lambda *args: [cround(int(x) if i==0 else x) for i,x in enumerate(divmod(*args))]
def respond(event,msg,prefix=True):
if not BOT: return
is_channel = event.target.startswith("#")
if prefix:
if is_channel:
prefix = f"{event.hostmask.nick}: "
else:
prefix = ""
else:
prefix = ""
target = event.target if is_channel else event.hostmask.nick
BOT.socket.send(IRCLine("PRIVMSG",[target,":"+prefix+msg]))
def request(func, azura=False):
try:
r = requests.get(AZURA if azura else URL)
r.raise_for_status()
return func(r.json())
except:
print("Error performing request:")
traceback.print_exc()
return False
def privmsg(event):
if BOT.in_batch: return
if event.hostmask.nick!="tildebot": return
if "now playing: Stream Offline" in event.message:
try:
song = request(lambda x: x["icestats"]["source"][0]["yp_currently_playing"])
assert bool(song)
respond(event,"now playing: "+song,False)
except:
print("Error formatting response:")
traceback.print_exc()
pass
def on_command_radiopull(event):
if len(event.parts)>1:
command, args = event.parts[0], event.parts[1:]
elif len(event.parts)==1:
command = event.parts[0]
args = []
else:
command=""
args = []
if command in "np nowplaying now".split():
song = request(lambda x: [x["icestats"]["source"][0]["yp_currently_playing"],sum([source["listeners"] for source in x["icestats"]["source"]])])
if not song:
respond(event,"Cannot find currently playing song for some reason.")
return
respond(event,f"now playing: {song[0]} (for {song[1]!s} listeners)")
elif command=="dj":
is_live, name, start = request(lambda x: [x["live"].get(a) for a in "is_live streamer_name broadcast_start".split()],True)
if not is_live or (name is None and start is None):
respond(event,"Nobody is streaming at the moment (AutoDJ?)")
if start is None and name:
respond(event,f"{name} is streaming right now! (I don't know for how long though...)")
if start is not None:
ts = ""
seconds = round(time.time()-start,2)
if seconds>60:
minutes, seconds = cdivmod(seconds,60)
if minutes>60:
hours, minutes = cdivmod(minutes,60)
ts = f"{hours}h {minutes}m {seconds}s"
else:
ts = f"{minutes}m {seconds}s"
else:
ts = f"{seconds}s"
if not name:
respond(event,f"Whoever's streaming has been streaming for {ts}. (This should never be seen; consult tildebot for dj name)")
else:
respond(event,f"{name} is streaming right now! (Streaming for {ts})")
elif command=="un":
r = requests.get("https://tilderadio.org/schedule/nextdj.php")
try:
r.raise_for_status()
except:
respond(event,"Error with tilderadio API")
return
respond(event,r.text.strip())
else:
respond(event,f"Usage: {BOT.prefix}radiopull <np/nowplaying/now/dj/un>")
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("privmsg",privmsg)
bot.event_manager.on("command_radiopull",on_command_radiopull)

52
plugins/randomword.py Normal file
View File

@ -0,0 +1,52 @@
from bot import IRCLine
BOT = None
# html utility function
from bs4 import BeautifulSoup
import requests
def get_html(url):
r = requests.get(url)
r.raise_for_status()
return BeautifulSoup(r.content,"html.parser")
# end html utility function
URL = "https://randomword.com/"
def get(generator=""):
try:
soup = get_html(URL+generator)
ret = dict()
ret["word"]=soup.find("div",dict(id="random_word")).text if soup.find("div",dict(id="random_word")) else ""
ret["definition"]=soup.find("div",dict(id="random_word_definition")).text if soup.find("div",dict(id="random_word_definition")) else ""
return ret
except:
return dict(word="",definition="")
# event response utility
def respond(event,msg):
if not BOT: return
target = event.target if event.target.startswith("#") else event.hostmask.nick
BOT.socket.send(IRCLine("PRIVMSG",target,":"+(event.hostmask.nick+": " if event.target.startswith("#") else "")+msg).line)
# end event response utility
def on_cmd_randomword(event):
if not BOT: return
gen = ""
if len(event.parts)>=1: gen=event.parts[0].lower()
if gen and gen not in "noun sentence question adjective idiom verb letter paragraph vocabulary".split():
respond(event,"Invalid generator! Must be one of the generators on RandomWord.com.")
return
resp = get(gen)
if resp["word"] and resp["definition"]:
respond(event,f"{resp['word']} - {resp['definition']}")
elif resp["definition"] and not resp["word"]:
respond(event,resp["definition"])
else:
respond(event,"Something went wrong. Please try again later.")
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("command_randomword",on_cmd_randomword)
bot.event_manager.on("command_rw",on_cmd_randomword)

View File

@ -5,7 +5,13 @@ def on_admin_raw(event):
# normalize and send line
BOT.socket.send(IRCLine.parse_line(" ".join(event.parts)).line)
def on_admin_action(event):
target = event.parts[0]
message = " ".join(event.parts[1:])
BOT.socket.send(IRCLine("PRIVMSG",target,f":\x01ACTION {message}\x01"))
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("admin_raw",on_admin_raw)
bot.event_manager.on("admin_action",on_admin_action)

View File

@ -45,6 +45,7 @@ def on_shortenimg(event):
except:
say(target,"An error occurred!")
traceback.print_exc()
return
say(target,(event.hostmask.nick+": " if target!=event.hostmask.nick else '')+"Shortened URL: "+new_url)
def register(bot):

41
plugins/tildebadge.py Normal file
View File

@ -0,0 +1,41 @@
from bot import IRCLine
import subprocess, csv, io
BOT = None
def read_csv_from_string(s):
f = io.StringIO()
f.write(s)
f.seek(0)
try:
ret = list(csv.reader(f))
except:
ret = []
f.close()
return ret
def get_leaderboard():
res = subprocess.run(["/home/khuxkm/bin/python","/home/khuxkm/code/ircv3bot/tildebadge_leaderboard.py"],stdout=subprocess.PIPE)
return read_csv_from_string(res.stdout.decode("ascii"))
def on_cmd_tildebadges(event):
# boilerplate
if not BOT: return
target = event.target if event.target.startswith("#") else event.hostmask.nick
prefix = event.hostmask.nick+": " if not event.target.startswith("#") else ""
# begin command
leaderboard = get_leaderboard()
if not leaderboard:
BOT.socket.send(IRCLine("PRIVMSG",target,prefix+"Error loading leaderboard."))
return
top_10 = leaderboard[:10]
s = "Top 10 Tildebadge owners: "
for user in top_10:
escaped = user[1][0]+"\u200b"+user[1][1:]
s+=f"{escaped} ({user[2]}, {user[3]} natural), "
s = s[:-2]
BOT.socket.send(IRCLine("PRIVMSG",target,prefix+s))
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("command_tildebadges",on_cmd_tildebadges)

55
plugins/unshorten.py Normal file
View File

@ -0,0 +1,55 @@
import requests,dictdata,os.path,re,traceback
from bot import IRCLine
BOT = None
def say(target,msg):
if not BOT: return
BOT.socket.send(IRCLine("PRIVMSG",target,":"+msg))
def unshorten(url):
r = requests.get(url,allow_redirects=False)
return r.headers["Location"]
UNSHORTEN_URL = dictdata.DictData("UNSHORTEN_urls.json")
URL = re.compile(r"((?:https?://)?[^\s/$.?#]\.[^\s]*)")
from urllib.parse import urlparse
shorteners = "0x0.st ttm.sh bit.ly".split()
def is_shortened(url):
return urlparse(url).hostname in shorteners
def on_privmsg(event):
matches = [url for url in URL.findall(event.message) if is_shortened(url)]
if not matches: return
target = event.target if event.target.startswith("#") else event.hostmask.nick
url = matches[-1]
UNSHORTEN_URL[target]=url
def on_unshorten(event):
if not BOT: return
url = None
target = event.target if event.target.startswith("#") else event.hostmask.nick
if len(event.parts)>1:
say(target,(event.hostmask.nick+": " if target!=event.hostmask.nick else '')+"Usage: "+BOT.prefix+"unshorten [url]")
return
elif len(event.parts)==1:
matches = [url for url in URL.findall(event.message) if is_shortened(url)]
if len(matches)>0: url=matches[-1]
elif len(event.parts)==0:
if target not in UNSHORTEN_URL:
say(target,(event.hostmask.nick+": " if target!=event.hostmask.nick else '')+"I haven't seen a URL here to unshorten.")
return
url=UNSHORTEN_URL[target]
try:
new_url = unshorten(url)
except:
say(target,"Unable to unshorten the URL")
traceback.print_exc()
return
say(target,(event.hostmask.nick+": " if target!=event.hostmask.nick else '')+"Unshortened URL: "+new_url)
def register(bot):
global BOT
BOT = bot
bot.event_manager.on("privmsg",on_privmsg)
bot.event_manager.on("command_unshorten",on_unshorten)

18
plugins/uwu_plugin.py Normal file
View File

@ -0,0 +1,18 @@
from bot import IRCLine
import importlib
import uwu
importlib.reload(uwu)
BOT = None
def on_uwu(event):
if not BOT: return
text = uwu.translate(" ".join(event.parts))
target = event.target if event.target.startswith("#") else event.hostmask.nick
prefix = ""
if target.startswith("#"): prefix=event.hostmask.nick+": "
BOT.socket.send(IRCLine("PRIVMSG",target,":"+prefix+text))
def register(bot):
global BOT
BOT = bot
bot.event_manager.on("command_uwu",on_uwu)

49
plugins/xkcd.py Normal file
View File

@ -0,0 +1,49 @@
import traceback
from bot import IRCLine
BOT = None
# json utility function
import requests
def get_json(url):
r = requests.get(url)
r.raise_for_status()
return r.json()
# end json utility function
URL = "https://xkcd.com/"
def get(number=""):
try:
ret = get_json(URL+number+"/info.0.json")
return ret
except:
return dict(title="Connection Error",num=0,year="1970",month="1",day="1")
# event response utility
def respond(event,msg):
if not BOT: return
target = event.target if event.target.startswith("#") else event.hostmask.nick
BOT.socket.send(IRCLine("PRIVMSG",target,":"+(event.hostmask.nick+": " if event.target.startswith("#") else "")+msg).line)
# end event response utility
def on_cmd_xkcd(event):
if not BOT: return
num = ""
if len(event.parts)>=1: num=event.parts[0].lower()
if num and not num.isdigit():
respond(event,"Invalid comic number! Please use a comic number (or omit for the latest comic), like so: !xkcd 1")
return
resp = get(num)
resp["month"]=resp["month"].zfill(2)
resp["day"]=resp["day"].zfill(2)
try:
respond(event,f"{resp['title']} ({resp['year']}-{resp['month']}-{resp['day']}) - https://xkcd.com/{resp['num']!s}")
except:
traceback.print_exc()
respond(event,str(resp))
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("command_xkcd",on_cmd_xkcd)

129
spambot.py Normal file
View File

@ -0,0 +1,129 @@
import socket, sys, time
from select import select
class Socket:
def __init__(self,server):
self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.sock.connect(server)
self.sock.setblocking(0)
self._read_buffer=b""
def read(self, timeout=None):
try:
ready=select([self.sock],[],[],timeout)
if ready[0]:
data=self.sock.recv(4096)
if not data:
return []
else:
return []
except: return traceback.print_exc()
data = self._read_buffer+data
self._read_buffer=b""
lines = [line.strip(b"\r") for line in data.split(b"\n")]
if lines[-1]:
self._read_buffer=lines[-1]
lines.pop(-1)
lines = [line.decode("utf-8") for line in lines]
return lines
def send(self,line):
self.sock.send(line.encode("utf-8"))
def close(self):
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
def unescape(value):
return value.replace(r"\:",";").replace(r"\s"," ").replace(r"\\","\\").replace(r"\r","\r").replace(r"\n","\n")
def escape(value):
return value.replace(";",r"\:").replace(" ",r"\s").replace("\\",r"\\").replace("\r",r"\r").replace("\n",r"\n")
MISSING = None
class IRCLine:
def __init__(self,command,*params,tags=dict(),hostmask=""):
self.command=command
if len(params)==0:
self.params=[]
elif len(params)==1 and type(params[0]) in (list,tuple):
self.params=list(params[0])
else:
self.params=list(params)
self.tags=tags
self.hostmask=hostmask if hostmask else None
@property
def line(self):
prefix=""
if len(list(self.tags.keys()))>0:
tagc = len(list(self.tags.keys()))
prefix+="@"
for i,tag in enumerate(self.tags.keys()):
prefix+=tag
if self.tags[tag] is not MISSING:
prefix+="="+escape(str(self.tags[tag]))
if (i+1)<tagc:
prefix+=";"
prefix+=" "
if self.hostmask:
prefix+=":{} ".format(self.hostmask)
return prefix+" ".join([self.command]+self.params)+"\r\n"
@classmethod
def parse_line(cls,line):
parts = line.split()
tags = dict()
if parts[0].startswith("@"):
taglist = parts.pop(0)[1:].split(";")
for tag in taglist:
if "=" in tag:
key, value = tag.split("=",1)
tags[key]=unescape(value)
else:
tags[tag]=MISSING
hostmask=None
if parts[0].startswith(":"):
hostmask=parts.pop(0)[1:]
i=len(parts)-1
while i>0 and not parts[i].startswith(":"): i-=1
if i!=0: parts[i:]=[" ".join(parts[i:])]
return cls(*parts,tags=tags,hostmask=hostmask)
def encode(self,*args,**kwargs):
# clearly, if we're here, I'm an idiot and am trying to send an
# IRCLine object down the tube. just do it.
return self.line.encode(*args,**kwargs)
channel="#chaos"
nick="m"
server="localhost"
port=6667
timeout=10
message="RIP jmw2020 2020-2020"
privmsg = IRCLine("PRIVMSG",channel,":"+message).line
sock = Socket((server,port))
sock.send(IRCLine("NICK",nick).line)
sock.send(IRCLine("USER",nick,"8","*",":"+nick).line)
running=True
while running:
lines = sock.read()
if lines and any(["376" in x for x in lines]):
running = False
sock.send(IRCLine("JOIN",channel).line)
running=True
while running:
read=False
for line in sock.read(timeout):
read=True
line = IRCLine.parse_line(line)
if line.command=="PING":
newline = IRCLine("PONG",line.params)
sock.send(newline.line)
if line.command=="PRIVMSG":
if line.params[-1].startswith(":setmessage ") and line.hostmask.endswith("khuxkm@sudoers.tilde.team"):
privmsg=IRCLine("PRIVMSG",channel,":"+line.params[-1][len(":setmessage "):]).line
if line.params[-1].startswith(":settimeout ") and line.hostmask.endswith("khuxkm@sudoers.tilde.team"):
try:
timeout=int(line.params[-1][len(":settimeout "):])
except: pass
sock.send(privmsg)
if not read: time.sleep(timeout)

55
tasks.py Normal file
View File

@ -0,0 +1,55 @@
import sched,time,string,json
from threading import Thread
from sys import exit
class TaskPool:
def __init__(self,**kwargs):
self.base_state = kwargs
self.base_state["task_pool"] = self
self.scheduler = sched.scheduler(time.time,time.sleep)
self.thread = Thread(target=self.worker,args=(self,))
self.coroutines = []
self.states = {}
self.killswitch = False
def periodical(self,scheduler,interval,action,index,state=dict()):
if self.killswitch:
return
self.states[index] = action(state,self.base_state)
if not self.killswitch:
scheduler.enter(interval,1,self.periodical,(scheduler,interval,action,index,self.states[index]))
def worker(self,tasks):
for c,coro in enumerate(tasks.coroutines):
interval = coro["interval"]
action = coro["action"]
state = coro.get("state",dict())
tasks.periodical(tasks.scheduler,interval,action,c,state)
tasks.scheduler.run()
exit(0)
def run(self):
if self.thread.is_alive(): return # don't set up an already set-up thread
self.thread.daemon = True
self.thread.start()
def stop(self):
list(map(self.scheduler.cancel, self.scheduler.queue))
self.killswitch = True # kill any lingering tasks
def add_coroutine(self,action,interval,state=dict(),name=None):
if name is None:
name = string.ascii_letters[len(self.coroutines)]
self.coroutines.append(dict(action=action,interval=interval,state=state,name=name))
def save_state(self, index):
with open("state.{}.json".format(self.coroutines[index]["name"]),"w") as f:
json.dump(self.states[index],f)
def load_state(self, index):
try:
with open("state.{}.json".format(self.coroutines[index]["name"])) as f:
self.states[index] = json.load(f)
self.coroutines[index]["state"] = self.states[index]
except:
print("state.{}.json not found or couldn't be opened; using default".format(self.coroutines[index]["name"]))

25
uwu.py Normal file
View File

@ -0,0 +1,25 @@
SUBSTITUTES = dict(
l="w",
r="w",
L="W",
R="W"
)
def translate(s,th_to_f=False):
out = ""
for i, c in enumerate(s):
if c in SUBSTITUTES:
out+=SUBSTITUTES[c]
else:
if c=="t": # special case it
if s[i-1]=="t":
continue
elif (i+1)<len(s) and s[i+1]=="t": # tt => dd
out+="dd"
elif th_to_f and s[i+1]=="h": # th => f
out+="f"
else: out+=c
elif th_to_f and c=="h" and s[i-1]=="t": continue
elif c=="o" and s[i-1] in "mn": out+="yo"
else: out+=c
return out.replace("ove ","uv ")+" uwu"