Revamp bot
Moves minerbot2 to the framework I use for the badger and reminder bots.
This commit is contained in:
commit
ff75ac01b5
|
@ -0,0 +1,118 @@
|
|||
|
||||
# Created by https://www.gitignore.io/api/python
|
||||
# Edit at https://www.gitignore.io/?templates=python
|
||||
|
||||
### Python ###
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
pip-wheel-metadata/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
target/
|
||||
|
||||
# pyenv
|
||||
.python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# celery beat schedule file
|
||||
celerybeat-schedule
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# Mr Developer
|
||||
.mr.developer.cfg
|
||||
.project
|
||||
.pydevproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# End of https://www.gitignore.io/api/python
|
||||
|
||||
# NickServ password (be sure to chmod to 600)
|
||||
.password
|
||||
|
||||
# data files
|
||||
*.json
|
||||
*.psv
|
||||
.finnhub_token
|
|
@ -0,0 +1,22 @@
|
|||
# badger
|
||||
|
||||
The Twitch Plays Pokemon badge system, ported to tildeverse IRC!
|
||||
|
||||
## How it works
|
||||
|
||||
Every 20 to 50 messages, the person talking will recieve a random badge. These badges have a distribution like so:
|
||||
|
||||
|Badge Name |Chance of pull|
|
||||
|-----------|--------------|
|
||||
|Berrybadge |65.00% |
|
||||
|Firebadge |15.00% |
|
||||
|Rockbadge |10.00% |
|
||||
|Waterbadge |5.00% |
|
||||
|Shadybadge |2.00% |
|
||||
|Musicbadge |1.90% |
|
||||
|Sportsbadge|1.00% |
|
||||
|Tildebadge |0.10% |
|
||||
|
||||
You can transmute 3 or more badges. When you do this, it will create one, hopefully rarer badge and take the badges you put in away.
|
||||
|
||||
For technical documentation on how transmutation works, refer to TPP docs [here](https://twitchplayspokemon.tv/transmutation_calculations).
|
|
@ -0,0 +1,165 @@
|
|||
import impmod, socket, traceback, events, time, os
|
||||
from irc.client import NickMask
|
||||
class Socket:
|
||||
def __init__(self,server):
|
||||
self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
|
||||
self.sock.connect(server)
|
||||
self._read_buffer=b""
|
||||
def read(self):
|
||||
try:
|
||||
data=self.sock.recv(4096)
|
||||
if not data:
|
||||
return None
|
||||
except: return traceback.print_exc()
|
||||
data = self._read_buffer+data
|
||||
self._read_buffer=b""
|
||||
lines = [line.strip(b"\r") for line in data.split(b"\n")]
|
||||
if lines[-1]:
|
||||
self._read_buffer=lines[-1]
|
||||
lines.pop(-1)
|
||||
lines = [line.decode("utf-8") for line in lines]
|
||||
return lines
|
||||
def send(self,line):
|
||||
self.sock.send(line.encode("utf-8"))
|
||||
def close(self):
|
||||
self.sock.shutdown(socket.SHUT_RDWR)
|
||||
self.sock.close()
|
||||
|
||||
def unescape(value):
|
||||
return value.replace(r"\:",";").replace(r"\s"," ").replace(r"\\","\\").replace(r"\r","\r").replace(r"\n","\n")
|
||||
|
||||
def escape(value):
|
||||
return value.replace(";",r"\:").replace(" ",r"\s").replace("\\",r"\\").replace("\r",r"\r").replace("\n",r"\n")
|
||||
|
||||
MISSING = None
|
||||
|
||||
class IRCLine:
|
||||
def __init__(self,command,*params,tags=dict(),hostmask=""):
|
||||
self.command=command
|
||||
if len(params)==0:
|
||||
self.params=[]
|
||||
elif len(params)==1 and type(params[0]) in (list,tuple):
|
||||
self.params=list(params[0])
|
||||
else:
|
||||
self.params=list(params)
|
||||
self.tags=tags
|
||||
self.hostmask=NickMask(hostmask) if hostmask else None
|
||||
@property
|
||||
def line(self):
|
||||
prefix=""
|
||||
if len(list(self.tags.keys()))>0:
|
||||
tagc = len(list(self.tags.keys()))
|
||||
prefix+="@"
|
||||
for i,tag in enumerate(self.tags.keys()):
|
||||
prefix+=tag
|
||||
if self.tags[tag] is not MISSING:
|
||||
prefix+="="+escape(str(self.tags[tag]))
|
||||
if (i+1)<tagc:
|
||||
prefix+=";"
|
||||
prefix+=" "
|
||||
if self.hostmask:
|
||||
prefix+=":{} ".format(self.hostmask)
|
||||
return prefix+" ".join([self.command]+self.params)+"\r\n"
|
||||
@classmethod
|
||||
def parse_line(cls,line):
|
||||
parts = line.split()
|
||||
tags = dict()
|
||||
if parts[0].startswith("@"):
|
||||
taglist = parts.pop(0)[1:].split(";")
|
||||
for tag in taglist:
|
||||
if "=" in tag:
|
||||
key, value = tag.split("=",1)
|
||||
tags[key]=unescape(value)
|
||||
else:
|
||||
tags[tag]=MISSING
|
||||
hostmask=None
|
||||
if parts[0].startswith(":"):
|
||||
hostmask=parts.pop(0)[1:]
|
||||
i=len(parts)-1
|
||||
while i>0 and not parts[i].startswith(":"): i-=1
|
||||
if i!=0: parts[i:]=[" ".join(parts[i:])]
|
||||
return cls(*parts,tags=tags,hostmask=hostmask)
|
||||
def encode(self,*args,**kwargs):
|
||||
# clearly, if we're here, I'm an idiot and am trying to send an
|
||||
# IRCLine object down the tube. just do it.
|
||||
return self.line.encode(*args,**kwargs)
|
||||
|
||||
PLUGIN_MODULES={}
|
||||
class IRCBot:
|
||||
def __init__(self,nickname,username,realname="IRCBot",server=("localhost",6667),channels=["#bots"]):
|
||||
self.nickname=nickname
|
||||
self.username=username
|
||||
self.realname=realname
|
||||
self.server=server
|
||||
self.channels=channels
|
||||
self.event_manager=events.EventManager()
|
||||
def load_modules(self):
|
||||
self.event_manager.clear()
|
||||
for name in os.listdir("plugins"):
|
||||
if name.endswith(".py"):
|
||||
self.load_module(name[:-3],os.path.join("plugins",name))
|
||||
def load_module(self,modname,path):
|
||||
try:
|
||||
if modname in PLUGIN_MODULES:
|
||||
print("{} already imported, reloading".format(modname))
|
||||
PLUGIN_MODULES[modname].reload()
|
||||
else:
|
||||
try:
|
||||
print("importing {}".format(modname))
|
||||
PLUGIN_MODULES[modname]=impmod.Module(modname,path)
|
||||
except:
|
||||
print("Unable to load plugin {}".format(modname))
|
||||
traceback.print_exc()
|
||||
register_func = getattr(PLUGIN_MODULES[modname].module,"register",None)
|
||||
if not register_func:
|
||||
print(f"Plugin {modname} has no register function!")
|
||||
print("Remember, if porting plugins from a minerbot-based architecture,")
|
||||
print("you have to add a register function to use the new system.")
|
||||
return
|
||||
register_func(self)
|
||||
except:
|
||||
traceback.print_exc()
|
||||
pass
|
||||
def handle_line(self,line):
|
||||
if type(line)!=IRCLine: line = IRCLine.parse_line(line)
|
||||
self.event_manager(events.Event("raw_line",text=line.line,parsed=line))
|
||||
if line.command=="PING":
|
||||
line.command="PONG"
|
||||
self.socket.send(line.line)
|
||||
return
|
||||
if line.hostmask is None: return
|
||||
if line.hostmask.nick==self.nickname:
|
||||
return
|
||||
if line.command in "PRIVMSG NOTICE".split():
|
||||
target = line.params[0]
|
||||
message = line.params[1][1:]
|
||||
self.event_manager(events.Event(line.command.lower(),target=target,message=message,tags=line.tags,hostmask=line.hostmask))
|
||||
elif line.command == "TAGMSG":
|
||||
self.event_manager(events.Event("tagmsg",hostmask=line.hostmask,tags=line.tags,target=line.params[0]))
|
||||
elif line.command == "INVITE":
|
||||
self.event_manager(events.Event("invite",to=line.params[1][1:],hostmask=line.hostmask))
|
||||
elif line.command == "PING":
|
||||
self.socket.send(IRCLine("PONG",line.params).line)
|
||||
def start(self):
|
||||
self.socket = Socket(self.server)
|
||||
self.socket.send("NICK {}\r\n".format(self.nickname))
|
||||
self.socket.send("USER {} * * :{}\r\n".format(self.username,self.realname))
|
||||
time.sleep(2) # give the server some time to record my username
|
||||
self.event_manager(events.Event("connection_established"))
|
||||
for channel in self.channels:
|
||||
self.socket.send(f"JOIN {channel}\r\n")
|
||||
time.sleep(1)
|
||||
self.socket.send("CAP REQ account-tag\r\n")
|
||||
self.running=True
|
||||
while self.running:
|
||||
lines = self.socket.read()
|
||||
if lines:
|
||||
for line in lines:
|
||||
self.handle_line(line)
|
||||
self.socket.close()
|
||||
del self.socket
|
||||
|
||||
if __name__=="__main__":
|
||||
bot = IRCBot("minerbot2","minerbot2",channels=["#khuxkm"])
|
||||
bot.load_modules()
|
||||
bot.start()
|
|
@ -0,0 +1,17 @@
|
|||
import plugin
|
||||
class DictData(plugin.JSONData):
|
||||
def __init__(self,filename,**kwargs):
|
||||
plugin.JSONData.__init__(self,kwargs)
|
||||
self.filename = filename
|
||||
self.load(self.filename)
|
||||
def __getitem__(self,k):
|
||||
self.load(self.filename)
|
||||
return self.value[k]
|
||||
def __setitem__(self,k,v):
|
||||
self.value[k]=v
|
||||
self.save(self.filename)
|
||||
def __contains__(self,k):
|
||||
return k in self.value
|
||||
def get(self,k,default=None):
|
||||
self.load(self.filename)
|
||||
return self.value.get(k,default)
|
|
@ -0,0 +1,22 @@
|
|||
from collections import defaultdict
|
||||
|
||||
class Event:
|
||||
def __init__(self,name,**kwargs):
|
||||
self.data=kwargs
|
||||
self.name=name
|
||||
def __getitem__(self,k):
|
||||
return self.data[k]
|
||||
def __getattr__(self,k):
|
||||
if k in self.data: return self.data[k]
|
||||
|
||||
class EventManager:
|
||||
def __init__(self):
|
||||
self.handlers=defaultdict(list)
|
||||
def clear(self):
|
||||
self.__init__()
|
||||
def on(self,event,func):
|
||||
self.handlers[event].append(func)
|
||||
def __call__(self,event_obj):
|
||||
print(event_obj.name,event_obj.data)
|
||||
handlers = self.handlers[event_obj.name]
|
||||
for handler in handlers: handler(event_obj)
|
|
@ -0,0 +1,19 @@
|
|||
import importlib, importlib.util, sys
|
||||
|
||||
class Module:
|
||||
"""A module. Stores module object, spec object, and handles reloading."""
|
||||
def __init__(self,modname,path=None):
|
||||
if path is None:
|
||||
path = modname+".py"
|
||||
self.modname, self.path = modname, path
|
||||
self.spec = importlib.util.spec_from_file_location(modname,path)
|
||||
self.module = importlib.util.module_from_spec(self.spec)
|
||||
self.spec.loader.exec_module(self.module)
|
||||
def reload(self):
|
||||
if self.modname not in sys.modules:
|
||||
sys.modules[self.modname]=self.module
|
||||
# Alright, this needs some explaining.
|
||||
# When you do importlib.reload, it does some juju magic shist to find the spec and calls importlib._bootstrap._exec.
|
||||
# When you dynamically import a module it won't do its magic correctly and it'll error.
|
||||
# Luckily, we can skip all the juju magic since we can just store the spec.
|
||||
importlib._bootstrap._exec(self.spec,self.module)
|
|
@ -0,0 +1,78 @@
|
|||
import json, traceback
|
||||
help = {}
|
||||
class CommandGroup:
|
||||
def __init__(self,f,default="help"):
|
||||
self.base = f
|
||||
self.subcmds = {}
|
||||
self.subcmds_help = {}
|
||||
self.default = default
|
||||
def command(self,name,help=""):
|
||||
self.subcmds_help[name]=help
|
||||
def _register_subcmd(f):
|
||||
self.subcmds[name]=f
|
||||
return f
|
||||
return _register_subcmd
|
||||
def __call__(self,bot,channel,nick,subcmd,*args):
|
||||
print("Calling base")
|
||||
if self.base(bot,channel,nick,subcmd,*args):
|
||||
return
|
||||
print("Calling subcommand {}".format(subcmd if subcmd in self.subcmds else self.default))
|
||||
if subcmd in self.subcmds:
|
||||
return self.subcmds[subcmd](bot,channel,nick,subcmd,*args)
|
||||
else:
|
||||
return self.subcmds[self.default](bot,channel,nick,subcmd,*args)
|
||||
|
||||
class Data:
|
||||
"""A class for plugin data."""
|
||||
def __init__(self,value):
|
||||
self.value = value
|
||||
def serialize(self):
|
||||
return self.value
|
||||
def deserialize(self,value):
|
||||
self.value = value
|
||||
def save(self,filename):
|
||||
with open(filename,"w") as f:
|
||||
f.write(self.serialize())
|
||||
def load(self,filename):
|
||||
try:
|
||||
with open(filename) as f:
|
||||
self.deserialize(f.read())
|
||||
except:
|
||||
print("Error loading data from {!r}:".format(filename))
|
||||
traceback.print_exc()
|
||||
pass # You should've initialized this with a sane default, so just keep the default on error
|
||||
|
||||
class JSONData(Data):
|
||||
"""Data, but can be serialized to JSON (and should be)."""
|
||||
def serialize(self):
|
||||
return json.dumps(self.value)
|
||||
def deserialize(self,value):
|
||||
self.value = json.loads(value)
|
||||
|
||||
def clear():
|
||||
cmds.clear()
|
||||
help.clear()
|
||||
listeners.clear()
|
||||
|
||||
#def command(name,helptext="No help available for this command."):
|
||||
# def _register_cmd(func):
|
||||
# cmds[name]=func
|
||||
# help[name]=helptext
|
||||
# return func
|
||||
# return _register_cmd
|
||||
|
||||
def group(name,helptext=""):
|
||||
def _register_group(f):
|
||||
gr = CommandGroup(f)
|
||||
return gr
|
||||
return _register_group
|
||||
|
||||
def listener(name):
|
||||
def _register_cmd(func):
|
||||
listeners[name]=func
|
||||
return func
|
||||
return _register_cmd
|
||||
|
||||
def alias(name,target):
|
||||
cmds[name]=cmds[target]
|
||||
help[name]=help[target]
|
|
@ -0,0 +1,44 @@
|
|||
from events import Event
|
||||
from bot import IRCLine
|
||||
ADMIN_HOSTMASKS = [x+"!khuxkm@sudoers.tilde.team" for x in "khuxkm khuxkm|lounge".split()]
|
||||
BOT = None
|
||||
|
||||
def admin(event):
|
||||
if BOT is None: return
|
||||
if event.hostmask not in ADMIN_HOSTMASKS:
|
||||
BOT.socket.send(IRCLine("PRIVMSG","#meta","You're not the boss of me! (hostmask {!s})".format(event.hostmask)).line)
|
||||
return
|
||||
if len(event.parts)==0: return
|
||||
if event.parts[0]=="reload":
|
||||
BOT.load_modules()
|
||||
elif event.parts[0]=="quit":
|
||||
BOT.socket.send(IRCLine("QUIT",[":goodbye"]).line)
|
||||
BOT.running=False
|
||||
elif event.parts[0]=="msg":
|
||||
target = event.parts[1]
|
||||
message = ":"+" ".join(event.parts[2:])
|
||||
BOT.socket.send(IRCLine("PRIVMSG",target,message).line)
|
||||
else:
|
||||
event_out = Event("admin_"+event.parts[0])
|
||||
event_out.data.update(event.data)
|
||||
event_out.data["parts"]=event.parts[1:]
|
||||
BOT.event_manager(event_out)
|
||||
|
||||
def on_invite(event):
|
||||
if BOT is None: return
|
||||
if event.hostmask not in ADMIN_HOSTMASKS: return
|
||||
BOT.socket.send(IRCLine("JOIN",[event.to]).line)
|
||||
|
||||
PASSWORD="whoops"
|
||||
try:
|
||||
with open(".password") as f: PASSWORD=f.read().strip()
|
||||
except: pass
|
||||
def login(event):
|
||||
BOT.socket.send(IRCLine("NS","IDENTIFY",PASSWORD).line)
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT=bot
|
||||
bot.event_manager.on("command_admin",admin)
|
||||
bot.event_manager.on("invite",on_invite)
|
||||
bot.event_manager.on("connection_established",login)
|
|
@ -0,0 +1,28 @@
|
|||
import words, plugin, random
|
||||
from titlecase import titlecase
|
||||
from bot import IRCLine
|
||||
words.loadDict("words")
|
||||
|
||||
BOT = None
|
||||
|
||||
def say(target, msg):
|
||||
BOT.socket.send(IRCLine("PRIVMSG",target,":"+msg))
|
||||
|
||||
def backronym(event):
|
||||
if not BOT: return None
|
||||
channel = event.target
|
||||
nick = event.hostmask.nick
|
||||
word = event.parts[0]
|
||||
result = []
|
||||
for char in word:
|
||||
fwords = words.getWords("^{}.*".format(char))
|
||||
if not fwords:
|
||||
say(channel,nick+": Don't be a meanie! (No words start with '{}'!)".format(char))
|
||||
else:
|
||||
result.append(random.choice(fwords))
|
||||
say(channel,nick+": "+titlecase(" ".join(result)))
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT = bot
|
||||
bot.event_manager.on("command_backronym",backronym)
|
|
@ -0,0 +1,26 @@
|
|||
from bot import IRCLine
|
||||
from events import Event
|
||||
BOT = None
|
||||
|
||||
def on_botlist(event):
|
||||
if not BOT: return None
|
||||
commands = []
|
||||
for handler in BOT.event_manager.handlers.keys():
|
||||
if handler.startswith("command_"):
|
||||
command = handler[len("command_"):]
|
||||
if command not in "admin botlist".split(): commands.append(command)
|
||||
#print(commands)
|
||||
BOT.socket.send(IRCLine("PRIVMSG",event.target,":{}: I'm minerbot2, rewritten again! Commands include {}".format(event.hostmask.nick,", ".join(["!"+x for x in commands]))))
|
||||
|
||||
def on_privmsg(event):
|
||||
if BOT and BOT.prefix=="!": return
|
||||
if event.message in ("!botlist", "!rollcall"):
|
||||
ev = Event("command_botlist",parts=[])
|
||||
ev.data.update(event.data)
|
||||
on_botlist(ev)
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT=bot
|
||||
bot.event_manager.on("command_botlist",on_botlist)
|
||||
bot.event_manager.on("privmsg",on_privmsg)
|
|
@ -0,0 +1,49 @@
|
|||
import time, requests, traceback, random
|
||||
from bot import IRCLine
|
||||
BOT = None
|
||||
|
||||
def time_secs():
|
||||
return time.time()
|
||||
|
||||
USER_AGENT = "minerbot2 on tilde.chat IRC (ran by /u/kd2bwz2)"
|
||||
|
||||
def grab_posts():
|
||||
r = requests.get("https://reddit.com/r/eyebleach/.json",headers={"User-Agent":USER_AGENT})
|
||||
if r.status_code==200: return [x["data"] for x in r.json()["data"]["children"]]
|
||||
try:
|
||||
r.raise_for_status()
|
||||
except: traceback.print_exc() # print status code error
|
||||
return [] # return empty list
|
||||
|
||||
LAST_GRAB = time_secs()
|
||||
_posts = grab_posts()
|
||||
|
||||
def posts():
|
||||
global LAST_GRAB,_posts
|
||||
if (time_secs()-LAST_GRAB)>=(5*60): # cache for 5 minutes
|
||||
LAST_GRAB = time_secs()
|
||||
_posts = grab_posts()
|
||||
return _posts
|
||||
|
||||
def respond(event,msg):
|
||||
target = event.target if event.target.startswith("#") else event.hostmask.nick
|
||||
prefix = ""
|
||||
if event.target.startswith("#"): prefix=event.hostmask.nick+": "
|
||||
BOT.socket.send(IRCLine("PRIVMSG",target,":"+prefix+msg))
|
||||
|
||||
RNG = random.SystemRandom()
|
||||
|
||||
def on_cheerup(event):
|
||||
if not BOT: return
|
||||
try:
|
||||
post = RNG.choice(posts())
|
||||
respond(event,f"{post['title']} - {post['url']}")
|
||||
except:
|
||||
traceback.print_exc()
|
||||
respond(event,"Something went wrong!")
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT=bot
|
||||
bot.event_manager.on("command_cheerup",on_cheerup)
|
||||
bot.event_manager.on("command_cute",on_cheerup)
|
|
@ -0,0 +1,38 @@
|
|||
import plugin, random
|
||||
from bot import IRCLine
|
||||
|
||||
BOT = None
|
||||
|
||||
def say(target,msg):
|
||||
BOT.socket.send(IRCLine("PRIVMSG",target,":"+msg))
|
||||
|
||||
RNG = random.SystemRandom()
|
||||
|
||||
RESPONSES = ["I dunno, I think I'll go with \"{}\".","Hmm, it's a hard choice, but \"{}\".","Hmm... \"{}\". Hands down."]
|
||||
|
||||
def choose(event):
|
||||
choices = []
|
||||
s = ""
|
||||
quote = False
|
||||
for choice in event.parts:
|
||||
if quote:
|
||||
s+=" "+choice
|
||||
if choice.endswith('"'):
|
||||
quote = False
|
||||
choices.append(s.strip('"'))
|
||||
s = ""
|
||||
elif choice.startswith('"'):
|
||||
quote = True
|
||||
s+=choice
|
||||
else:
|
||||
choices.append(choice)
|
||||
if sorted([x.lower() for x in choices])==list("dl"):
|
||||
choice = "l" if "l" in choices else "L"
|
||||
else:
|
||||
choice = RNG.choice(choices)
|
||||
say(event.target if event.target.startswith("#") else event.hostmask.nick,("{}: ".format(event.hostmask.nick) if event.target.startswith("#") else " ".strip())+"{}".format(RNG.choice(RESPONSES).format(choice)))
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT=bot
|
||||
bot.event_manager.on("command_choose",choose)
|
|
@ -0,0 +1,18 @@
|
|||
import events
|
||||
BOT=None
|
||||
|
||||
def on_privmsg(event):
|
||||
if BOT is None: return
|
||||
prefix = BOT.prefix if event.target.startswith("#") else ""
|
||||
if event.message.startswith(prefix):
|
||||
parts = event.message.split(" ")
|
||||
parts[0]=parts[0][len(prefix):]
|
||||
event_out = events.Event("command_"+parts.pop(0),parts=parts)
|
||||
event_out.data.update(event.data)
|
||||
BOT.event_manager(event_out)
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT=bot
|
||||
bot.prefix="!"
|
||||
bot.event_manager.on("privmsg",on_privmsg)
|
|
@ -0,0 +1,115 @@
|
|||
import plugin, re, pluralslib, os.path
|
||||
from dictdata import DictData
|
||||
import importlib
|
||||
from bot import IRCLine
|
||||
importlib.reload(plugin)
|
||||
|
||||
BOT = None
|
||||
|
||||
def say(target,msg):
|
||||
if not BOT: return None
|
||||
BOT.socket.send(IRCLine("PRIVMSG",target,":"+msg))
|
||||
|
||||
milestones = DictData("counting_milestones.json")
|
||||
milestones_stack = [] if "stack" not in milestones else milestones["stack"]
|
||||
current_number = 7 if "current_num" not in milestones else milestones["current_num"]
|
||||
last_poster = " " if "last_poster" not in milestones else milestones["last_poster"]
|
||||
|
||||
@plugin.group("milestone")
|
||||
def milestone(bot,channel,nick,subcmd,*args):
|
||||
if subcmd not in "add check dedupe list manual dist now next":
|
||||
say(channel,"{}: Usage: {}milestone <add/check/dedupe/list> [args]".format(nick,bot.prefix))
|
||||
return True
|
||||
|
||||
@milestone.command("dedupe")
|
||||
def dedupe(bot, channel, nick, subcmd, *args):
|
||||
milestones["stack"]=sorted(list(set(milestones["stack"])))
|
||||
milestones["stack"]=[x for x in milestones["stack"] if x>current_number]
|
||||
|
||||
@milestone.command("add")
|
||||
def add(bot,channel,nick,subcmd,*args):
|
||||
if len(args)<1:
|
||||
say(channel,"{}: Usage: {}milestone add <number> [number number...]".format(nick,bot.prefix))
|
||||
return
|
||||
try:
|
||||
milestones_stack.extend([int(x) for x in args])
|
||||
milestones_stack.sort()
|
||||
while milestones_stack[0]<current_number:
|
||||
n = milestones_stack.pop(0)
|
||||
say(channel,"{}: Throwing out {!s} (already achieved)".format(nick,n))
|
||||
milestones["stack"]=milestones_stack
|
||||
dedupe(bot,channel,nick,"dedupe")
|
||||
say(channel,"{}: Added {}.".format(nick,", ".join([x for x in args if int(x)>current_number])))
|
||||
except:
|
||||
say(channel,"ACCESS VIOLATION: Numbers must be integers")
|
||||
|
||||
@milestone.command("dist")
|
||||
def dist(bot,channel,nick,subcmd,*args):
|
||||
try:
|
||||
number_to_check = int(args[0])
|
||||
except ValueError:
|
||||
say(channel,"ACCESS VIOLATION: Invalid number "+args[0])
|
||||
d = number_to_check-current_number
|
||||
if len(args)>1 and args[1]=="milestone":
|
||||
say(channel,"{}: The next milestone ({!s}) will be hit in {}".format(nick,number_to_check,pluralslib.plural(d,"number")))
|
||||
else:
|
||||
say(channel,"{}: {!s} will be hit in {}".format(nick,number_to_check,pluralslib.plural(d,"number")))
|
||||
|
||||
@milestone.command("check")
|
||||
@milestone.command("next")
|
||||
def check(bot,channel,nick,subcmd,*args):
|
||||
if len(args)!=0:
|
||||
say(channel,"{}: Usage: {}milestone {}".format(nick,bot.prefix,subcmd))
|
||||
return
|
||||
number_to_check = milestones_stack[0] if len(milestones_stack)>0 else None
|
||||
if number_to_check is None:
|
||||
say(channel,"{}: Please add a milestone by using {}milestone add <number>".format(nick,bot.prefix))
|
||||
return
|
||||
dist(bot,channel,nick,"dist",str(number_to_check),"milestone")
|
||||
|
||||
@milestone.command("list")
|
||||
def list_milestones(bot,channel,nick,subcmd,*args):
|
||||
say(channel,nick+": Upcoming milestones: {}".format(", ".join([str(x) for x in milestones["stack"][:10]])))
|
||||
|
||||
@milestone.command("now")
|
||||
def now(bot,channel,nick,subcmd,*args):
|
||||
say(channel,nick+": The correct next number is `{}`.".format(current_number+1))
|
||||
|
||||
def on_milestone(event):
|
||||
if not BOT: return None
|
||||
milestone(BOT,event.target,event.hostmask.nick,*event.parts)
|
||||
|
||||
def listen_counting(event):
|
||||
if not BOT: return None
|
||||
bot = BOT
|
||||
channel = event.target
|
||||
nick = event.hostmask.nick
|
||||
msg = event.message
|
||||
if channel!="#counting" or (re.match("^\d+$",msg) is None): # if not in #counting or if not a number
|
||||
return
|
||||
global current_number, last_poster
|
||||
if int(msg)!=(current_number+1):
|
||||
say("#counting-meta","ERROR: {} attempted to enter {} (should be {})".format(nick,msg,str(current_number+1)))
|
||||
say(nick,"Hey, just wanted to let you know that you just put the wrong number in. Try putting {} in.".format(str(current_number+1)))
|
||||
return
|
||||
if last_poster==nick:
|
||||
say("#counting-meta","ERROR: {} went twice".format(nick))
|
||||
say(nick,"Hey, just wanted to let you know that you can't go twice in #counting!")
|
||||
return
|
||||
current_number=int(msg)
|
||||
milestones["current_num"]=current_number
|
||||
last_poster=nick
|
||||
milestones["last_poster"]=last_poster
|
||||
if current_number in milestones_stack:
|
||||
milestones_stack.remove(current_number)
|
||||
milestones["stack"]=milestones_stack
|
||||
milestones[current_number]=nick
|
||||
say("#counting-meta","CONGRATS {} ON THE {} GET!".format(nick,current_number))
|
||||
# if zipcode.has_zipcode(current_number) and current_number>10000:
|
||||
# plugin.cmds["zipcode"](bot,"#counting-meta","automated-trigger",str(current_number),"quiet")
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT = bot
|
||||
bot.event_manager.on("command_milestone",on_milestone)
|
||||
bot.event_manager.on("privmsg",listen_counting)
|
|
@ -0,0 +1,44 @@
|
|||
import plugin
|
||||
from bot import IRCLine
|
||||
|
||||
BOT = None
|
||||
def say(target,msg):
|
||||
BOT.socket.send(IRCLine("PRIVMSG",target,":"+msg))
|
||||
|
||||
class DefaultDict(dict):
|
||||
def __init__(self,f,takes_args=True):
|
||||
self.f = f
|
||||
self.takes_args = takes_args
|
||||
def __get__(self,k):
|
||||
if k not in self:
|
||||
if self.takes_args:
|
||||
self[k]=self.f(k)
|
||||
else:
|
||||
self[k]=self.f()
|
||||
return self[k]
|
||||
|
||||
PREFIX = "regional_indicator_"
|
||||
|
||||
mapping = dict()
|
||||
for letter in "abcdefghijklmnopqrstuvwxyz":
|
||||
mapping[letter]=":"+PREFIX+letter+":"
|
||||
mapping["a"]=":a:"
|
||||
mapping["b"]=":b:"
|
||||
mapping[" "]=" "
|
||||
|
||||
def emojitext(event):
|
||||
if not BOT: return None
|
||||
channel = event.target if event.target.startswith("#") else event.hostmask.nick
|
||||
words = event.parts
|
||||
nick = event.hostmask.nick
|
||||
try:
|
||||
text = " ".join(words)
|
||||
say(nick," ".join([mapping[c] for c in text]).replace(":a: :b:",":ab:"))
|
||||
if channel!=nick: say(channel,nick+": PMed!")
|
||||
except Exception as e:
|
||||
say(nick,"ACCESS VIOLATON: "+e.__class__.__name__+": "+e.args[0])
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT = bot
|
||||
bot.event_manager.on("command_emojitext",emojitext)
|
|
@ -0,0 +1,45 @@
|
|||
import requests
|
||||
from urllib.parse import urlencode
|
||||
from bot import IRCLine
|
||||
BOT = None
|
||||
|
||||
def respond(event,msg):
|
||||
is_pub = event.target.startswith("#")
|
||||
BOT.socket.send(IRCLine("PRIVMSG",event.target if is_pub else event.hostmask.nick,":"+(event.hostmask.nick+": " if is_pub else "")+msg))
|
||||
|
||||
def get_delta(fv,tv):
|
||||
d = (tv-fv)
|
||||
d = d/fv
|
||||
return d
|
||||
|
||||
TOKEN=""
|
||||
try:
|
||||
with open(".finnhub_token") as f: TOKEN=f.read().strip()
|
||||
except: print("Token not found!")
|
||||
|
||||
def on_get_stock(event):
|
||||
if not BOT: return
|
||||
if not TOKEN:
|
||||
respond(event,"khuxkm is an idiot and forgot to supply an API token. I can't do anything about that!")
|
||||
return
|
||||
if len(event.parts)!=1:
|
||||
respond(event,"You can only request 1 symbol at a time.")
|
||||
return
|
||||
symbol = event.parts[0]
|
||||
r = requests.get("https://finnhub.io/api/v1/quote?"+urlencode(dict(symbol=symbol,token=TOKEN)))
|
||||
if r.status_code!=200:
|
||||
respond(event,"The finnhub API returned a status code of "+str(r.status_code)+".")
|
||||
return
|
||||
try:
|
||||
res = r.json()
|
||||
except:
|
||||
respond(event,r.text)
|
||||
return
|
||||
previous_close, current_value = res["pc"], res["c"]
|
||||
delta = "{:+0.2%}".format(get_delta(previous_close,current_value))
|
||||
respond(event,f"{symbol} is currently valued at {current_value} ({delta} from previous close)")
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT=bot
|
||||
bot.event_manager.on("command_getStock",on_get_stock)
|
|
@ -0,0 +1,21 @@
|
|||
import plugin, requests
|
||||
from bot import IRCLine
|
||||
BOT = None
|
||||
|
||||
def say(target,msg):
|
||||
if not BOT: return
|
||||
BOT.socket.send(IRCLine("PRIVMSG",target,":"+msg))
|
||||
|
||||
def get_gibi():
|
||||
r = requests.get("https://khuxkm.tilde.team/gibi/gibi.php")
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def gibi(event):
|
||||
g = get_gibi()
|
||||
say(event.target if event.target.startswith("#") else event.hostmask.nick,("{}: ".format(event.hostmask.nick) if event.target.startswith("#") else " ".strip())+"Good idea: {i[good]}; Bad idea: {i[bad]} (Source: {i[source]})".format(i=g))
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT=bot
|
||||
bot.event_manager.on("command_gibi",gibi)
|
|
@ -0,0 +1,61 @@
|
|||
import requests,plugin,traceback,sys
|
||||
from bot import IRCLine
|
||||
|
||||
BOT = None
|
||||
def say(target,msg):
|
||||
if not BOT: return
|
||||
BOT.socket.send(IRCLine("PRIVMSG",target,":"+msg))
|
||||
|
||||
def shorten(url):
|
||||
r = requests.post("https://ttm.sh",data=dict(url=url))
|
||||
r.raise_for_status()
|
||||
return r.text.strip()
|
||||
|
||||
def fixUp(text):
|
||||
if not text: return "_" # escape empty string
|
||||
text = text.replace("_","__") # escape underscores
|
||||
text = text.replace(" ","_") # escape spaces
|
||||
text = text.replace("-","--") # escape dashes
|
||||
text = text.replace("''",'"') # escape double quote
|
||||
text = text.replace("?","~q") # escape question marks
|
||||
text = text.replace("%","~p") # escape question marks
|
||||
text = text.replace("#","~h") # escape question marks
|
||||
text = text.replace("/","~s") # escape question marks
|
||||
return text
|
||||
|
||||
def memegen(bot,channel,nick,template,*msg):
|
||||
if nick=="jan6": return
|
||||
if not msg: return
|
||||
msg = " ".join(msg)
|
||||
if "|" in msg:
|
||||
top, bottom = msg.split("|")
|
||||
else:
|
||||
top, bottom = "", msg
|
||||
top = fixUp(top)
|
||||
bottom = fixUp(bottom)
|
||||
if "://" in template: # URL
|
||||
url = "https://memegen.link/custom/{}/{}.jpg?alt={}".format(top,bottom,template)
|
||||
else:
|
||||
try:
|
||||
r = requests.get("https://memegen.link/{}/{}/{}".format(template,top,bottom))
|
||||
r.raise_for_status()
|
||||
r = r.json()
|
||||
url = r["direct"]["masked"]
|
||||
except requests.exceptions.HTTPError:
|
||||
say(channel,"ACCESS VIOLATION: Cannot find meme format {}.".format(template))
|
||||
return
|
||||
url = shorten(url)
|
||||
say(channel,"{}: {}".format(nick,url))
|
||||
|
||||
def on_memegen(event):
|
||||
if not BOT: return
|
||||
channel = event.target if event.target.startswith("#") else event.hostmask.nick
|
||||
try:
|
||||
memegen(BOT,channel,event.hostmask.nick,*event.parts)
|
||||
except Exception as e:
|
||||
say(channel,traceback.format_exception_only(*sys.exc_info()[:2])[0])
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT = bot
|
||||
bot.event_manager.on("command_memegen",on_memegen)
|
|
@ -0,0 +1,39 @@
|
|||
import plugin, dictdata, hashlib
|
||||
from bot import IRCLine
|
||||
BOT = None
|
||||
|
||||
sha256 = lambda s: hashlib.sha256(s.encode("utf-8")).hexdigest()
|
||||
|
||||
fixed_opinions = dictdata.DictData("fixed_opinions.json")
|
||||
|
||||
BOLD = b"\x02".decode("ascii")
|
||||
RESET = b"\x0f".decode("ascii")
|
||||
OPINIONS = ["suck","neat","cool","bad"]
|
||||
|
||||
def suckify(s):
|
||||
return BOLD+" ".join(list(s))+RESET
|
||||
|
||||
def chunkify(o,s):
|
||||
ret = []
|
||||
for i in range(0,len(o),s):
|
||||
ret.append(o[i:i+s])
|
||||
return ret
|
||||
|
||||
def get_opinion(s):
|
||||
if s in fixed_opinions: return fixed_opinions[s]
|
||||
hash = chunkify(sha256(s),2)
|
||||
h = int(hash[len(s)%len(hash)],16)
|
||||
return OPINIONS[h%len(OPINIONS)]
|
||||
|
||||
def opinion(event):
|
||||
if not BOT: return None
|
||||
channel = event.target if event.target.startswith("#") else event.hostmask.nick
|
||||
nick = event.hostmask.nick
|
||||
args = event.parts
|
||||
if not args: return
|
||||
BOT.socket.send(IRCLine("PRIVMSG",channel,":"+("{}: ".format(nick) if channel!=nick else " ".strip())+"{} {}".format(" ".join(args),suckify(get_opinion(" ".join(args))))))
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT = bot
|
||||
bot.event_manager.on("command_opinion",opinion)
|
|
@ -0,0 +1,11 @@
|
|||
from bot import IRCLine
|
||||
BOT = None
|
||||
|
||||
def on_admin_raw(event):
|
||||
# normalize and send line
|
||||
BOT.socket.send(IRCLine.parse_line(" ".join(event.parts)).line)
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT=bot
|
||||
bot.event_manager.on("admin_raw",on_admin_raw)
|
|
@ -0,0 +1,54 @@
|
|||
import requests,dictdata,os.path,re,traceback
|
||||
from bot import IRCLine
|
||||
|
||||
BOT = None
|
||||
def say(target,msg):
|
||||
if not BOT: return
|
||||
BOT.socket.send(IRCLine("PRIVMSG",target,":"+msg))
|
||||
|
||||
def shorten(url):
|
||||
r = requests.post("https://ttm.sh",data=dict(url=url))
|
||||
r.raise_for_status()
|
||||
return r.text.strip()
|
||||
|
||||
IMAGE_URL = dictdata.DictData("image_urls.json")
|
||||
URL = re.compile(r"((?:https?|ftp)://[^\s/$.?#].[^\s]*)")
|
||||
|
||||
image_exts = ".png .jpg .jpeg .gif".split()
|
||||
def is_image(url):
|
||||
return os.path.splitext(os.path.basename(url))[1] in image_exts
|
||||
|
||||
def on_privmsg(event):
|
||||
matches = [url for url in URL.findall(event.message) if is_image(url)]
|
||||
if not matches: return
|
||||
target = event.target if event.target.startswith("#") else event.hostmask.nick
|
||||
url = matches[-1]
|
||||
IMAGE_URL[target]=url
|
||||
|
||||
def on_shortenimg(event):
|
||||
if not BOT: return
|
||||
url = None
|
||||
target = event.target if event.target.startswith("#") else event.hostmask.nick
|
||||
if len(event.parts)>1:
|
||||
say(target,(event.hostmask.nick+": " if target!=event.hostmask.nick else '')+"Usage: "+BOT.prefix+"shortenimg [url]")
|
||||
return
|
||||
elif len(event.parts)==1:
|
||||
matches = [url for url in URL.findall(event.message) if is_image(url)]
|
||||
if len(matches)>0: url=matches[-1]
|
||||
elif len(event.parts)==0:
|
||||
if target not in IMAGE_URL:
|
||||
say(target,(event.hostmask.nick+": " if target!=event.hostmask.nick else '')+"I haven't seen an image here to shorten.")
|
||||
return
|
||||
url=IMAGE_URL[target]
|
||||
try:
|
||||
new_url = shorten(url)
|
||||
except:
|
||||
say(target,"An error occurred!")
|
||||
traceback.print_exc()
|
||||
say(target,(event.hostmask.nick+": " if target!=event.hostmask.nick else '')+"Shortened URL: "+new_url)
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT = bot
|
||||
bot.event_manager.on("privmsg",on_privmsg)
|
||||
bot.event_manager.on("command_shortenimg",on_shortenimg)
|
|
@ -0,0 +1,67 @@
|
|||
import plugin, time, requests
|
||||
from jsonpath import jsonpath
|
||||
from pluralslib import plural
|
||||
from bot import IRCLine
|
||||
BOT = None
|
||||
|
||||
def say(target,msg):
|
||||
if not BOT: return None
|
||||
BOT.socket.send(IRCLine("PRIVMSG",target,":"+msg))
|
||||
|
||||
def query(jpath,reducer=lambda x: x[0]):
|
||||
def _query(stats):
|
||||
return reducer(jsonpath(stats,jpath))
|
||||
return _query
|
||||
|
||||
QUERIES = dict(
|
||||
users = query("$.usercount"),
|
||||
channels = query("$.channelcount")
|
||||
)
|
||||
|
||||
NOUNS = dict(
|
||||
users="user",
|
||||
channels="channel"
|
||||
)
|
||||
|
||||
def time_secs():
|
||||
return time.time()
|
||||
|
||||
def grab_stats():
|
||||
r = requests.get("https://tilde.chat/stats.json")
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
LAST_GRAB = time_secs()
|
||||
_stats = grab_stats()
|
||||
|
||||
def stats():
|
||||
global LAST_GRAB,_stats
|
||||
if (time_secs()-LAST_GRAB)>=(5*60): # cache for 5 minutes
|
||||
LAST_GRAB = time_secs()
|
||||
_stats = grab_stats()
|
||||
return _stats
|
||||
|
||||
USAGE_STR = "<{}>".format("/".join(QUERIES.keys()))
|
||||
|
||||
def stat(bot,channel,nick,subcmd):
|
||||
if subcmd not in QUERIES.keys():
|
||||
say(channel,("{}: ".format(nick) if channel.startswith("#") else " ".strip())+"Usage: !stats {}".format(USAGE_STR))
|
||||
return
|
||||
st = stats()
|
||||
res = plural(QUERIES.get(subcmd,lambda x: 0)(st),NOUNS.get(subcmd,"fuck"))
|
||||
say(channel,": ".join(([nick] if channel.startswith("#") else [])+[res]))
|
||||
|
||||
def on_stats(event):
|
||||
if not BOT: return None
|
||||
channel = event.target if event.target.startswith("#") else event.hostmask.nick
|
||||
try:
|
||||
stat(BOT,channel,event.hostmask.nick,*event.parts)
|
||||
except TypeError:
|
||||
say(channel,"Usage: !stats <{}>".format("/".join(QUERIES.keys())))
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
|
||||
def register(bot):
|
||||
global BOT
|
||||
BOT=bot
|
||||
bot.event_manager.on("command_stats",on_stats)
|
|
@ -0,0 +1,10 @@
|
|||
def are(amount):
|
||||
if amount == 1:
|
||||
return 'is'
|
||||
else:
|
||||
return 'are'
|
||||
def plural(amount, base, plural='s', singular=''):
|
||||
if amount == 1:
|
||||
return str(amount) + ' ' + base + singular
|
||||
else:
|
||||
return str(amount) + ' ' + base + plural
|
|
@ -0,0 +1,17 @@
|
|||
import random,re
|
||||
|
||||
WORDS = []
|
||||
|
||||
def _isASCII(s):
|
||||
for c in s:
|
||||
if ord(c) not in range(128):
|
||||
return False
|
||||
return True
|
||||
|
||||
def loadDict(name="words"):
|
||||
global WORDS
|
||||
with open("/usr/share/dict/"+name) as f:
|
||||
WORDS = [l.strip() for l in f if l.strip() and _isASCII(l.strip())]
|
||||
|
||||
def getWords(pattern=".*"):
|
||||
return list(filter(lambda x: (re.match(pattern,x) is not None),WORDS))
|
Loading…
Reference in New Issue