Initial code commit

This commit is contained in:
Robert Miles 2020-11-16 21:58:16 +00:00
commit 97b5676a71
12 changed files with 736 additions and 0 deletions

145
.gitignore vendored Normal file
View File

@ -0,0 +1,145 @@
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
pytestdebug.log
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
doc/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
pythonenv*
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# profiling data
.prof
# End of https://www.toptal.com/developers/gitignore/api/python

1
.password Normal file
View File

@ -0,0 +1 @@
lovehimandpethimandsqueezehimandcallhimgeorge

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2020 Robert 'khuxkm' Miles, https://khuxkm.tilde.team <khuxkm@tilde.team>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

181
bot.py Normal file
View File

@ -0,0 +1,181 @@
import impmod, socket, traceback, events, time, os
from irc.client import NickMask
class Socket:
def __init__(self,server):
self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.sock.connect(server)
self._read_buffer=b""
def read(self):
try:
data=self.sock.recv(4096)
if not data:
return None
except: return traceback.print_exc()
data = self._read_buffer+data
self._read_buffer=b""
lines = [line.strip(b"\r") for line in data.split(b"\n")]
if lines[-1]:
self._read_buffer=lines[-1]
lines.pop(-1)
lines = [line.decode("utf-8") for line in lines]
# for line in lines: print(" < "+line)
return lines
def send(self,line):
# print(" > "+line)
self.sock.send(line.encode("utf-8"))
def close(self):
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
def unescape(value):
return value.replace(r"\:",";").replace(r"\s"," ").replace(r"\\","\\").replace(r"\r","\r").replace(r"\n","\n")
def escape(value):
return value.replace(";",r"\:").replace(" ",r"\s").replace("\\",r"\\").replace("\r",r"\r").replace("\n",r"\n")
MISSING = None
class IRCLine:
def __init__(self,command,*params,tags=dict(),hostmask=""):
self.command=command
if len(params)==0:
self.params=[]
elif len(params)==1 and type(params[0]) in (list,tuple):
self.params=list(params[0])
else:
self.params=list(params)
self.tags=tags
self.hostmask=NickMask(hostmask) if hostmask else None
@property
def line(self):
prefix=""
if len(list(self.tags.keys()))>0:
tagc = len(list(self.tags.keys()))
prefix+="@"
for i,tag in enumerate(self.tags.keys()):
prefix+=tag
if self.tags[tag] is not MISSING:
prefix+="="+escape(str(self.tags[tag]))
if (i+1)<tagc:
prefix+=";"
prefix+=" "
if self.hostmask:
prefix+=":{} ".format(self.hostmask)
return prefix+" ".join([self.command]+self.params)+"\r\n"
@classmethod
def parse_line(cls,line):
parts = line.split()
tags = dict()
if parts[0].startswith("@"):
taglist = parts.pop(0)[1:].split(";")
for tag in taglist:
if "=" in tag:
key, value = tag.split("=",1)
tags[key]=unescape(value)
else:
tags[tag]=MISSING
hostmask=None
if parts[0].startswith(":"):
hostmask=parts.pop(0)[1:]
i=len(parts)-1
while i>0 and not parts[i].startswith(":"): i-=1
if i!=0: parts[i:]=[" ".join(parts[i:])]
return cls(*parts,tags=tags,hostmask=hostmask)
def encode(self,*args,**kwargs):
# clearly, if we're here, I'm an idiot and am trying to send an
# IRCLine object down the tube. just do it.
return self.line.encode(*args,**kwargs)
PLUGIN_MODULES={}
class IRCBot:
def __init__(self,nickname,username,realname="IRCBot",server=("localhost",6667),channels=["#bots"]):
self.nickname=nickname
self.username=username
self.realname=realname
self.server=server
self.channels=channels
self.event_manager=events.EventManager()
def load_modules(self):
self.event_manager.clear()
for module in PLUGIN_MODULES:
teardown = getattr(PLUGIN_MODULES[module].module,"teardown",None)
if teardown is not None: teardown()
for name in os.listdir("plugins"):
if name.endswith(".py"):
self.load_module(name[:-3],os.path.join("plugins",name))
def load_module(self,modname,path):
try:
if modname in PLUGIN_MODULES:
print("[*] {} already imported, reloading".format(modname))
PLUGIN_MODULES[modname].reload()
else:
try:
print("[*] importing {}".format(modname))
PLUGIN_MODULES[modname]=impmod.Module(modname,path)
except:
print("[!] Unable to load plugin {}".format(modname))
traceback.print_exc()
return
register_func = getattr(PLUGIN_MODULES[modname].module,"register",None)
if not register_func:
print(f"[!] Plugin {modname} has no register function!")
print("[!] Remember, if porting plugins from a minerbot-based architecture,")
print("[!] you have to add a register function to use the new system.")
return
register_func(self)
except:
traceback.print_exc()
pass
def handle_line(self,line):
if type(line)!=IRCLine: line = IRCLine.parse_line(line)
self.event_manager(events.Event("raw_line",text=line.line,parsed=line))
if line.command=="PING":
line.command="PONG"
self.socket.send(line.line)
return
if line.hostmask is None: return
if line.hostmask.nick==self.nickname:
return
if line.command in "PRIVMSG NOTICE".split():
target = line.params[0]
message = line.params[1][1:]
self.event_manager(events.Event(line.command.lower(),target=target,message=message,tags=line.tags,hostmask=line.hostmask))
elif line.command == "TAGMSG":
self.event_manager(events.Event("tagmsg",hostmask=line.hostmask,tags=line.tags,target=line.params[0]))
elif line.command == "INVITE":
self.event_manager(events.Event("invite",to=line.params[1][1:],hostmask=line.hostmask))
elif line.command == "PING":
self.socket.send(IRCLine("PONG",line.params).line)
def start(self):
self.socket = Socket(self.server)
self.socket.send("NICK {}\r\n".format(self.nickname))
self.socket.send("USER {} * * :{}\r\n".format(self.username,self.realname))
connection_established = False
while not connection_established:
lines = self.socket.read()
for line in lines:
if IRCLine.parse_line(line).command=="376":
connection_established=True
self.event_manager(events.Event("connection_established"))
for channel in self.channels:
self.socket.send(f"JOIN {channel}\r\n")
time.sleep(1)
self.socket.send("CAP REQ account-tag\r\n")
self.running=True
while self.running:
lines = self.socket.read()
if lines:
for line in lines:
self.handle_line(line)
self.socket.close()
del self.socket
def stop(self):
self.running=False
for module in PLUGIN_MODULES:
teardown = getattr(PLUGIN_MODULES[module].module,"teardown",None)
if teardown is not None: teardown()
if __name__=="__main__":
bot = IRCBot("bunbot","bunbot",channels=["#bungame"])
bot.load_modules()
bot.start()

1
bungame.json Normal file
View File

@ -0,0 +1 @@
{"bun_active": true, "bun_time": 1605563567.935003, "buns": {"khuxkm|lounge": [76.96350693702698, 2.1948606967926025, 1.7122113704681396, 1.821272373199463, 2.044851064682007, 2.3677005767822266, 360.1684272289276, 114.95335626602173], "brendanjw": [963.0584230422974], "acdw": [459.92508363723755, 3.456105947494507, 32.864134550094604, 62.88563656806946, 578.2994403839111]}, "score_cache": {"khuxkm|lounge": 9.421338535523898, "brendanjw": 6.346710457471163, "acdw": 8.650267014843637}, "association": {}}

22
events.py Normal file
View File

@ -0,0 +1,22 @@
from collections import defaultdict
class Event:
def __init__(self,name,**kwargs):
self.data=kwargs
self.name=name
def __getitem__(self,k):
return self.data[k]
def __getattr__(self,k):
if k in self.data: return self.data[k]
class EventManager:
def __init__(self):
self.handlers=defaultdict(list)
def clear(self):
self.__init__()
def on(self,event,func):
self.handlers[event].append(func)
def __call__(self,event_obj):
# print(event_obj.name,event_obj.data)
handlers = self.handlers[event_obj.name]
for handler in handlers: handler(event_obj)

19
impmod.py Normal file
View File

@ -0,0 +1,19 @@
import importlib, importlib.util, sys
class Module:
"""A module. Stores module object, spec object, and handles reloading."""
def __init__(self,modname,path=None):
if path is None:
path = modname+".py"
self.modname, self.path = modname, path
self.spec = importlib.util.spec_from_file_location(modname,path)
self.module = importlib.util.module_from_spec(self.spec)
self.spec.loader.exec_module(self.module)
def reload(self):
if self.modname not in sys.modules:
sys.modules[self.modname]=self.module
# Alright, this needs some explaining.
# When you do importlib.reload, it does some juju magic shist to find the spec and calls importlib._bootstrap._exec.
# When you dynamically import a module it won't do its magic correctly and it'll error.
# Luckily, we can skip all the juju magic since we can just store the spec.
importlib._bootstrap._exec(self.spec,self.module)

54
plugin.py Normal file
View File

@ -0,0 +1,54 @@
import json, traceback
class Data:
"""A class for plugin data."""
def __init__(self,value):
self.value = value
def serialize(self):
return self.value
def deserialize(self,value):
self.value = value
def save(self,filename):
with open(filename,"w") as f:
f.write(self.serialize())
def load(self,filename):
try:
with open(filename) as f:
self.deserialize(f.read())
except:
print("Error loading data from {!r}:".format(filename))
traceback.print_exc()
pass # You should've initialized this with a sane default, so just keep the default on error
def __str__(self):
return str(self.value)
def __repr__(self):
return repr(self.value)
class JSONData(Data):
"""Data, but can be serialized to JSON (and should be)."""
def serialize(self):
return json.dumps(self.value)
def deserialize(self,value):
self.value = json.loads(value)
class DictData(JSONData):
"""JSONData, but with a dictionary interface."""
def __init__(self,filename):
JSONData.__init__(self,{})
self.filename=filename
def __getitem__(self,k):
self.load()
return self.value[k]
def __setitem__(self,k,v):
self.value[k]=v
self.save()
def __contains__(self,k):
self.load()
return k in self.value
def load(self):
super(DictData,self).load(self.filename)
def save(self):
super(DictData,self).save(self.filename)
def get(self,*args,**kwargs):
self.load()
return self.value.get(*args,**kwargs)

44
plugins/admin.py Normal file
View File

@ -0,0 +1,44 @@
from events import Event
from bot import IRCLine
ADMIN_HOSTMASKS = [x+"!khuxkm@sudoers.tilde.team" for x in "khuxkm khuxkm|lounge".split()]
BOT = None
def admin(event):
if BOT is None: return
if event.hostmask not in ADMIN_HOSTMASKS:
BOT.socket.send(IRCLine("PRIVMSG","#meta","You're not the boss of me! (hostmask {!s})".format(event.hostmask)).line)
return
if len(event.parts)==0: return
if event.parts[0]=="reload":
BOT.load_modules()
elif event.parts[0]=="quit":
BOT.socket.send(IRCLine("QUIT",[":goodbye"]).line)
BOT.stop()
elif event.parts[0]=="msg":
target = event.parts[1]
message = ":"+" ".join(event.parts[2:])
BOT.socket.send(IRCLine("PRIVMSG",target,message).line)
else:
event_out = Event("admin_"+event.parts[0])
event_out.data.update(event.data)
event_out.data["parts"]=event.parts[1:]
BOT.event_manager(event_out)
def on_invite(event):
if BOT is None: return
if event.hostmask not in ADMIN_HOSTMASKS: return
BOT.socket.send(IRCLine("JOIN",[event.to]).line)
PASSWORD="whoops"
try:
with open(".password") as f: PASSWORD=f.read().strip()
except: pass
def login(event):
BOT.socket.send(IRCLine("NS","IDENTIFY",PASSWORD).line)
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("command_admin",admin)
bot.event_manager.on("invite",on_invite)
bot.event_manager.on("connection_established",login)

175
plugins/bungame.py Normal file
View File

@ -0,0 +1,175 @@
import plugin, tasks, time, random, math
from bot import IRCLine
BOT=None
def respond(event,msg,generic=False):
if not BOT: return None
target = event.target
prefix = ""
if target.startswith("#"):
prefix += event.hostmask.nick
prefix += ": "
else:
target = event.hostmask.nick
if generic: prefix=""
BOT.socket.send(IRCLine("PRIVMSG",target,":"+prefix+msg).line)
pool = tasks.TaskPool()
bungame_data = plugin.DictData("bungame.json")
if "bun_active" not in bungame_data: bungame_data["bun_active"]=False
if "bun_time" not in bungame_data: bungame_data["bun_time"]=time.time()
if "buns" not in bungame_data: bungame_data["buns"]=dict()
if "score_cache" not in bungame_data: bungame_data["score_cache"]=dict()
if "association" not in bungame_data: bungame_data["association"]=dict()
def check_bun_active(channel):
"""Checks if a bun is active in channel.
Right now, this is a single-channel bot, but if I decide to make it a multichannel bot, this will make generalizing a lot easier."""
return bungame_data["bun_active"]
def activate_bun(channel):
"""Activates the bun in channel.
Again, just for generalizing if needed later."""
bungame_data["bun_time"]=time.time()
bungame_data["bun_active"]=True
def deactivate_bun(channel):
"""Deactivates the bun in channel.
Again, just for generalizing if needed later."""
bungame_data["bun_time"]=time.time()
bungame_data["bun_active"]=False
# Base is such that b^(1 hour in seconds) = 1000.
# This is probably too much and too easily abused but hell we'll give it a shot.
BASE = 1000**(1/(60*60))
def bun_score(time_delta):
"""Generates the score for a bun."""
return BASE**time_delta
def get_bun_time(channel):
"""Returns the bun time of the channel."""
return bungame_data["bun_time"]
def get_bun_score(channel):
"""Gives the current score of the bun in channel."""
delta = time.time()-get_bun_time(channel)
return bun_score(delta), delta
def redo_score_cache():
score_cache = dict()
for account in bungame_data["buns"].keys():
buns = bungame_data["buns"][account]
score = 0
for bun in buns:
# time delta is stored up to 4 digits precision
# really you shouldn't need more than that
score+=bun_score(bun)
score_cache[account]=score
bungame_data["score_cache"]=score_cache
def on_privmsg(event):
# don't trigger on private messages
if not event.target.startswith("#"): return
# do association first, in case an error occurs elsewhere
if "account" in event.tags: bungame_data["association"][event.hostmask.nick]=event.tags["account"]
if not check_bun_active(event.target):
if random.random()>(3/4):
activate_bun(event.target)
respond(event,"A bun hops into the room. Hop, hop, hop, little bun!",True)
def on_befriend(event):
if not event.target.startswith("#"): return
if "account" not in event.tags:
respond(event,"You need a NickServ account to participate in the bun game! (/msg NickServ help register)")
return
if not check_bun_active(event.target):
respond(event,"You missed the bun!")
return
account = event.tags["account"]
score, final_delta = get_bun_score(event.target)
deactivate_bun(event.target)
first_bun = False
# add the bun to their account and regenerate the score cache
try:
bungame_data["buns"][account].append(final_delta)
except KeyError:
bungame_data["buns"][account]=[final_delta]
bungame_data.save()
redo_score_cache()
# now tell them about it
delta_r = round(final_delta,2)
score_r = round(score,2)
if first_bun:
respond(event,f"Congratulations on your first bun! This bun has waited {delta_r} second(s), and is therefore worth {score_r} point(s)!")
else:
respond(event,f"This bun has waited {delta_r} second(s), and is therefore worth {score_r} point(s)!")
def on_peek(event):
if not event.target.startswith("#"): return
if not check_bun_active(event.target):
respond(event,"There is no bun active in this channel!")
return
score, delta = get_bun_score(event.target)
delta_r=round(delta,2)
score_r=round(score,2)
respond(event,f"If you were to befriend the bun right now, it would have waited {delta_r} second(s), and would therefore be worth {score_r} point(s).")
average = lambda l: sum(l)/len(l)
def on_stats(event):
if not event.target.startswith("#"): return
if "account" not in event.tags:
respond(event,"You need a NickServ account to participate. (/msg NickServ help register)")
return
account = event.tags["account"]
buns = bungame_data["buns"].get(account)
if not buns:
respond(event,"You haven't befriended any buns!")
return
bunc = len(buns)
avg_bunt = average(buns) # *av*era*g*e *bun* *t*ime
stat_out = "You have befriended {} bun{}. Your average befriend time is {:0.02f}, and your current score is {:0.02f}.".format(bunc,"s" if bunc!=1 else "",avg_bunt,bungame_data["score_cache"].get(account,0))
respond(event,stat_out)
def on_top10(event):
mode = "score"
if event.parts and event.parts[0] in "score count time".split():
mode = event.parts[0]
accounts = list(bungame_data["buns"].keys())
if mode == "score":
accounts.sort(key=lambda k: bungame_data["score_cache"].get(k,0),reverse=True)
accounts = [(bungame_data["association"].get(account,account),bungame_data["score_cache"][account]) for account in accounts[:10]]
elif mode == "count":
accounts.sort(key=lambda k: len(bungame_data["buns"].get(k,[])),reverse=True)
accounts = [(bungame_data["association"].get(account,account),len(bungame_data["buns"].get(account,[]))) for account in accounts[:10]]
elif mode == "time":
accounts.sort(key=lambda k: average(bungame_data["buns"].get(k,[0])),reverse=True)
accounts = [(bungame_data["association"].get(account,account),average(bungame_data["buns"].get(account,[0]))) for account in accounts[:10]]
out = f"Top 10 in {mode}: "
for account in accounts:
if mode == "count":
out += "{act[0]} ({act[1]:d})".format(act=account)
else:
out += "{act[0]} ({act[1]:0.02f})".format(act=account)
out+=", "
out=out[:-2]
respond(event,out)
def admin_redocache(event):
redo_score_cache()
respond(event,"Score cache redone!")
def register(bot):
global BOT
BOT=bot
bot.event_manager.on("privmsg",on_privmsg)
bot.event_manager.on("command_befriend",on_befriend)
bot.event_manager.on("command_bef",on_befriend)
bot.event_manager.on("command_hug",on_befriend)
bot.event_manager.on("command_peek",on_peek)
bot.event_manager.on("command_stats",on_stats)
bot.event_manager.on("command_top10",on_top10)
bot.event_manager.on("admin_redocache",admin_redocache)

18
plugins/commands.py Normal file
View File

@ -0,0 +1,18 @@
import events
BOT=None
def on_privmsg(event):
if BOT is None: return
prefix = BOT.prefix if event.target.startswith("#") else ""
if event.message.startswith(prefix):
parts = event.message.split(" ")
parts[0]=parts[0][len(prefix):]
event_out = events.Event("command_"+parts.pop(0),parts=parts)
event_out.data.update(event.data)
BOT.event_manager(event_out)
def register(bot):
global BOT
BOT=bot
bot.prefix="-"
bot.event_manager.on("privmsg",on_privmsg)

55
tasks.py Normal file
View File

@ -0,0 +1,55 @@
import sched,time,string,json
from threading import Thread
from sys import exit
class TaskPool:
def __init__(self,**kwargs):
self.base_state = kwargs
self.base_state["task_pool"] = self
self.scheduler = sched.scheduler(time.time,time.sleep)
self.thread = Thread(target=self.worker,args=(self,))
self.coroutines = []
self.states = {}
self.killswitch = False
def periodical(self,scheduler,interval,action,index,state=dict()):
if self.killswitch:
return
self.states[index] = action(state,self.base_state)
if not self.killswitch:
scheduler.enter(interval,1,self.periodical,(scheduler,interval,action,index,self.states[index]))
def worker(self,tasks):
for c,coro in enumerate(tasks.coroutines):
interval = coro["interval"]
action = coro["action"]
state = coro.get("state",dict())
tasks.periodical(tasks.scheduler,interval,action,c,state)
tasks.scheduler.run()
exit(0)
def run(self):
if self.thread.is_alive(): return # don't set up an already set-up thread
self.thread.daemon = True
self.thread.start()
def stop(self):
list(map(self.scheduler.cancel, self.scheduler.queue))
self.killswitch = True # kill any lingering tasks
def add_coroutine(self,action,interval,state=dict(),name=None):
if name is None:
name = string.ascii_letters[len(self.coroutines)]
self.coroutines.append(dict(action=action,interval=interval,state=state,name=name))
def save_state(self, index):
with open("state.{}.json".format(self.coroutines[index]["name"]),"w") as f:
json.dump(self.states[index],f)
def load_state(self, index):
try:
with open("state.{}.json".format(self.coroutines[index]["name"])) as f:
self.states[index] = json.load(f)
self.coroutines[index]["state"] = self.states[index]
except:
print("state.{}.json not found or couldn't be opened; using default".format(self.coroutines[index]["name"]))