commit 596ed1a1642083e001be11b859bd0b13f1b9d52b Author: khuxkm fbexl Date: Sat Feb 22 17:46:07 2020 -0500 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2a25b8a --- /dev/null +++ b/.gitignore @@ -0,0 +1,116 @@ + +# Created by https://www.gitignore.io/api/python +# Edit at https://www.gitignore.io/?templates=python + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# Mr Developer +.mr.developer.cfg +.project +.pydevproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# End of https://www.gitignore.io/api/python + +# NickServ password (be sure to chmod to 600) +.password + +# data files +*.json diff --git a/badge.py b/badge.py new file mode 100644 index 0000000..4e044a6 --- /dev/null +++ b/badge.py @@ -0,0 +1,115 @@ +import random as _random +from collections import defaultdict +from math import tan, atan, pi, log +import json + +def quantile(x,m,p): + c=1-m + if x<=0.5: # x <= 1/2 + return m*((2*x)**(1/(m*p-1))) + else: # x > 1/2 + return 1-c*((2-2*x)**(1/(c*p-1))) + +def random(m,p): + return quantile(1-_random.random(),m,p) + +class Badge: + def __init__(self,name,normal=True): + self.name=name + self.normal=normal + def to_json(self): + return dict(name=self.name,normal=self.normal) + @classmethod + def from_json(self,d): + if type(d)==str: d=json.loads(d) + return self(d["name"],d["normal"]) + def __str__(self): + return "{} (normally generated: {!s})".format(self.name,self.normal) + def __repr__(self): + return f"<{self!s}>" + +W = 0.2 + +def calculate_rarities(badges): + existing = len(badges) + normals = len([x for x in badges if x.normal]) + existing_c = defaultdict(int) + normal_c = defaultdict(int) + for badge in badges: + existing_c[badge.name]+=1 + if badge.normal: normal_c[badge.name]+=1 + r = dict() + for k in existing_c: + r[k]=[existing_c[k]/existing,normal_c[k]/normals] + r[k].append((r[k][0]*W)+(r[k][1]*(1-W))) + return r + +class BadgePopulation: + def __init__(self): + self.badges = dict() + def give_badge(self,to,name,normal=True): + if to not in self.badges: + self.badges[to]=[] + self.badges[to].append(Badge(name,normal)) + self.badges[to].sort(key=lambda x: x.name+("1" if x.normal else "0")) + @property + def population(self): + ret = [] + for to in self.badges: ret.extend(self.badges[to]) + return ret + def rarity(self,name): + rarities = calculate_rarities(self.population) + return rarities[name][2] # effective rarity + def transmute(self,user,*badge_names): + badge_names = list(badge_names) + for badge_name in badge_names: + found=False + for badge in self.badges[user]: + if badge.name==badge_name: found=True + if not found: + raise Exception(f"User {user} does not have a {badge_name}!") + rarities = calculate_rarities(self.population) + badges = [rarities[name][2] for name in badge_names] + badges.insert(0,1-_random.random()) + N = (((1-W)/len([x for x in self.population if x.normal]))+(W/len(self.population)))**-1 + for badge_name in badge_names: + taken = False + for badge in self.badges[user]: + if not taken and badge.name==badge_name: + taken=True + self.badges[user].remove(badge) + m=1-(2/pi)*atan(sum(map(lambda x: tan((pi/2)*(1-x)),badges))) + p=((log(1+m*N)+log(m*N)-log(2))/(m*log(m*N))) + out = random(m,p) + if out not in [rarities[k][2] for k in rarities]: + L=0 + H=1 + for badge in rarities: + if rarities[badge][2]out + if abs(rarities[badge][2]-out)0: + tagc = len(list(self.tags.keys())) + prefix+="@" + for i,tag in enumerate(self.tags.keys()): + prefix+=tag + if self.tags[tag] is not MISSING: + prefix+="="+escape(str(self.tags[tag])) + if (i+1)0 and not parts[i].startswith(":"): i-=1 + if i!=0: parts[i:]=[" ".join(parts[i:])] + return cls(*parts,tags=tags,hostmask=hostmask) + def encode(self,*args,**kwargs): + # clearly, if we're here, I'm an idiot and am trying to send an + # IRCLine object down the tube. just do it. + return self.line.encode(*args,**kwargs) + +PLUGIN_MODULES={} +class IRCBot: + def __init__(self,nickname,username,realname="IRCBot",server=("localhost",6667),channels=["#bots"]): + self.nickname=nickname + self.username=username + self.realname=realname + self.server=server + self.channels=channels + self.event_manager=events.EventManager() + def load_modules(self): + self.event_manager.clear() + for name in os.listdir("plugins"): + if name.endswith(".py"): + self.load_module(name[:-3],os.path.join("plugins",name)) + def load_module(self,modname,path): + try: + if modname in PLUGIN_MODULES: + print("{} already imported, reloading".format(modname)) + PLUGIN_MODULES[modname].reload() + else: + try: + print("importing {}".format(modname)) + PLUGIN_MODULES[modname]=impmod.Module(modname,path) + except: + print("Unable to load plugin {}".format(modname)) + traceback.print_exc() + register_func = getattr(PLUGIN_MODULES[modname].module,"register",None) + if not register_func: + print(f"Plugin {modname} has no register function!") + print("Remember, if porting plugins from a minerbot-based architecture,") + print("you have to add a register function to use the new system.") + return + register_func(self) + except: + traceback.print_exc() + pass + def handle_line(self,line): + if type(line)!=IRCLine: line = IRCLine.parse_line(line) + self.event_manager(events.Event("raw_line",text=line.line,parsed=line)) + if line.command=="PING": + line.command="PONG" + self.socket.send(line.line) + return + if line.hostmask is None: return + if line.hostmask.nick==self.nickname: + return + if line.command in "PRIVMSG NOTICE".split(): + target = line.params[0] + message = line.params[1][1:] + self.event_manager(events.Event(line.command.lower(),target=target,message=message,tags=line.tags,hostmask=line.hostmask)) + elif line.command == "TAGMSG": + self.event_manager(events.Event("tagmsg",hostmask=line.hostmask,tags=line.tags,target=line.params[0])) + elif line.command == "INVITE": + self.event_manager(events.Event("invite",to=line.params[1][1:],hostmask=line.hostmask)) + elif line.command == "PING": + self.socket.send(IRCLine("PONG",line.params).line) + def start(self): + self.socket = Socket(self.server) + self.socket.send("NICK {}\r\n".format(self.nickname)) + self.socket.send("USER {} * * :{}\r\n".format(self.username,self.realname)) + time.sleep(2) # give the server some time to record my username + self.event_manager(events.Event("connection_established")) + for channel in self.channels: + self.socket.send(f"JOIN {channel}\r\n") + time.sleep(1) + self.socket.send("CAP REQ account-tag\r\n") + self.running=True + while self.running: + lines = self.socket.read() + if lines: + for line in lines: + self.handle_line(line) + self.socket.close() + del self.socket + +if __name__=="__main__": + bot = IRCBot("badger","badger",channels=["#khuxkm"]) + bot.load_modules() + bot.start() diff --git a/events.py b/events.py new file mode 100644 index 0000000..6432638 --- /dev/null +++ b/events.py @@ -0,0 +1,22 @@ +from collections import defaultdict + +class Event: + def __init__(self,name,**kwargs): + self.data=kwargs + self.name=name + def __getitem__(self,k): + return self.data[k] + def __getattr__(self,k): + if k in self.data: return self.data[k] + +class EventManager: + def __init__(self): + self.handlers=defaultdict(list) + def clear(self): + self.__init__() + def on(self,event,func): + self.handlers[event].append(func) + def __call__(self,event_obj): + print(event_obj.name,event_obj.data) + handlers = self.handlers[event_obj.name] + for handler in handlers: handler(event_obj) diff --git a/impmod.py b/impmod.py new file mode 100644 index 0000000..db430cb --- /dev/null +++ b/impmod.py @@ -0,0 +1,19 @@ +import importlib, importlib.util, sys + +class Module: + """A module. Stores module object, spec object, and handles reloading.""" + def __init__(self,modname,path=None): + if path is None: + path = modname+".py" + self.modname, self.path = modname, path + self.spec = importlib.util.spec_from_file_location(modname,path) + self.module = importlib.util.module_from_spec(self.spec) + self.spec.loader.exec_module(self.module) + def reload(self): + if self.modname not in sys.modules: + sys.modules[self.modname]=self.module + # Alright, this needs some explaining. + # When you do importlib.reload, it does some juju magic shist to find the spec and calls importlib._bootstrap._exec. + # When you dynamically import a module it won't do its magic correctly and it'll error. + # Luckily, we can skip all the juju magic since we can just store the spec. + importlib._bootstrap._exec(self.spec,self.module) diff --git a/plugin.py b/plugin.py new file mode 100644 index 0000000..9753de7 --- /dev/null +++ b/plugin.py @@ -0,0 +1,50 @@ +import json, traceback + +class Data: + """A class for plugin data.""" + def __init__(self,value): + self.value = value + def serialize(self): + return self.value + def deserialize(self,value): + self.value = value + def save(self,filename): + with open(filename,"w") as f: + f.write(self.serialize()) + def load(self,filename): + try: + with open(filename) as f: + self.deserialize(f.read()) + except: + print("Error loading data from {!r}:".format(filename)) + traceback.print_exc() + pass # You should've initialized this with a sane default, so just keep the default on error + def __str__(self): + return str(self.value) + def __repr__(self): + return repr(self.value) + +class JSONData(Data): + """Data, but can be serialized to JSON (and should be).""" + def serialize(self): + return json.dumps(self.value) + def deserialize(self,value): + self.value = json.loads(value) + +class DictData(JSONData): + def __init__(self,filename): + JSONData.__init__(self,{}) + self.filename=filename + def __getitem__(self,k): + self.load() + return self.value[k] + def __setitem__(self,k,v): + self.value[k]=v + self.save() + def load(self): + super(DictData,self).load(self.filename) + def save(self): + super(DictData,self).save(self.filename) + def get(self,*args,**kwargs): + self.load() + return self.value.get(*args,**kwargs) diff --git a/plugins/admin.py b/plugins/admin.py new file mode 100644 index 0000000..8e791e8 --- /dev/null +++ b/plugins/admin.py @@ -0,0 +1,41 @@ +from events import Event +from bot import IRCLine +ADMIN_HOSTMASKS = [x+"!khuxkm@sudoers.tilde.team" for x in "khuxkm khuxkm|lounge".split()] +BOT = None + +def admin(event): + if BOT is None: return + if event.hostmask not in ADMIN_HOSTMASKS: return + if len(event.parts)==0: return + if event.parts[0]=="reload": + BOT.load_modules() + elif event.parts[0]=="quit": + BOT.socket.send(IRCLine("QUIT",[":goodbye"]).line) + BOT.running=False + elif event.parts[0]=="msg": + target = event.parts[1] + message = ":"+" ".join(event.parts[2:]) + BOT.socket.send(IRCLine("PRIVMSG",target,message).line) + else: + event_out = Event("admin_"+event.parts[0]) + event_out.data.update(event.data) + event_out.data["parts"]=event.parts[1:] + BOT.event_manager(event_out) + +def on_invite(event): + if BOT is None: return + if event.hostmask not in ADMIN_HOSTMASKS: return + BOT.socket.send(IRCLine("JOIN",[event.to]).line) + +PASSWORD="whoops" +try: + with open(".password") as f: PASSWORD=f.read().strip() +def login(event): + BOT.socket.send(IRCLine("NS","IDENTIFY",PASSWORD).line) + +def register(bot): + global BOT + BOT=bot + bot.event_manager.on("command_admin",admin) + bot.event_manager.on("invite",on_invite) + bot.event_manager.on("connection_established",login) diff --git a/plugins/badge_plugin.py b/plugins/badge_plugin.py new file mode 100644 index 0000000..f4ac7b0 --- /dev/null +++ b/plugins/badge_plugin.py @@ -0,0 +1,102 @@ +import plugin, badge, random, json, traceback +from bot import IRCLine +from collections import Counter +BOT = None + +class BadgePopData(plugin.Data): + def serialize(self): + return json.dumps(self.value.to_json()) + def deserialize(self,s): + self.value = badge.BadgePopulation.from_json(s) + +population = BadgePopData(badge.BadgePopulation()) +population.load("badges.json") + +timeouts = plugin.DictData("timeouts.json") + +badge_weights = {'Berrybadge': 0.65, 'Rockbadge': 0.1, 'Waterbadge': 0.05, 'Firebadge': 0.15, 'Tildebadge': 0.001, 'Shadybadge': 0.02, 'Musicbadge': 0.019, 'Sportsbadge': 0.01} + +def privmsg(target,message): + return IRCLine("PRIVMSG",target,":"+message).line + +def respond(event,message): + if BOT is None: return + BOT.socket.send(privmsg(event.target if event.target.startswith("#") else event.hostmask.nick,(event.hostmask.nick+": " if event.target.startswith("#") else "")+message)) + +def on_privmsg(event): + if BOT is None: return + account = event.tags.get("account",None) + if account is None: return + if not event.target.startswith("#"): return + if timeouts.get(event.target,0)==0: + badge_to_give = random.choices(list(badge_weights.keys()),list(badge_weights.values()))[0] + if account not in population.value.badges: + BOT.socket.send(privmsg(event.hostmask.nick,f"Hey, you've got a badge! Badges are a nice way to show how active you are in channels like {event.target}. Type 'help' to see what you can do with these badges.")) + population.value.give_badge(account,badge_to_give) + population.save("badges.json") + timeouts[event.target]=random.randint(20,50) + else: + timeouts[event.target]-=1 + if event.message=="!botlist": + respond(event,"Hi! I'm the badger! I give out badges randomly. "+("Commands you can use include 'listbadges' and 'transmute'." if account is not None else "To get started, log in to a services account! (/msg NickServ help)")) + +def on_cmd_help(event): + if BOT is None: return + account = event.tags.get("account",None) + if len(event.parts)==0: + respond(event,"Hi! I'm the badger! I give out badges randomly. "+("Commands you can use include 'listbadges' and 'transmute'." if account is not None else "To get started, log in to a services account! (/msg NickServ help)")) + return None + if account is None: return + if event.parts[0]=="listbadges": + respond(event,"Lists the badges in your possession. Usage: listbadges") + elif event.parts[0]=="transmute": + respond(event,"Transmutes 3 or more badges into one, possibly rarer, badge. Usage: transmute [badge four...]") + +def on_cmd_listbadges(event): + if BOT is None: return + account = event.tags.get("account",None) + if account is None: return + counts = Counter([x.name for x in population.value.badges.get(account,"")]) + ret = [] + for item in counts.items(): + ret.append("{} (x{!s})".format(*item)) + if len(counts.items())==0: + respond(event,"You don't have any badges yet! Just stay active in the channel and you'll get one eventually.") + else: + respond(event,"You have: "+", ".join(ret)) + +def on_cmd_transmute(event): + if BOT is None: return + account = event.tags.get("account",None) + if account is None: return + if len(event.parts)<3: + respond(event,"You must insert at least 3 badges for use in transmutation.") + return + try: + badge_result = population.value.transmute(account,*event.parts) + except: + traceback.print_exc() + respond(event,"You must have at least one (1) of each badge you wish to use in the transmutation.") + return + respond(event,"You put in the {!s} badges above, and out pops a {}!".format(len(event.parts),badge_result)) + population.value.give_badge(account,badge_result,False) + population.save("badges.json") + +def admin_givebadge(event): + print(event.name,event.data) + try: + account, badge = event.parts[:2] + normal=True + if len(event.parts)==3 and event.parts[2].lower() in ("n","no","f","false"): normal=False + population.value.give_badge(account,badge,normal) + population.save("badges.json") + except: pass + +def register(bot): + global BOT + BOT=bot + bot.event_manager.on("privmsg",on_privmsg) + bot.event_manager.on("command_help",on_cmd_help) + bot.event_manager.on("command_listbadges",on_cmd_listbadges) + bot.event_manager.on("command_transmute",on_cmd_transmute) + bot.event_manager.on("admin_givebadge",admin_givebadge) diff --git a/plugins/commands.py b/plugins/commands.py new file mode 100644 index 0000000..c67c549 --- /dev/null +++ b/plugins/commands.py @@ -0,0 +1,18 @@ +import events +BOT=None + +def on_privmsg(event): + if BOT is None: return + prefix = BOT.prefix if event.target.startswith("#") else "" + if event.message.startswith(prefix): + parts = event.message.split(" ") + parts[0]=parts[0][len(prefix):] + event_out = events.Event("command_"+parts.pop(0),parts=parts) + event_out.data.update(event.data) + BOT.event_manager(event_out) + +def register(bot): + global BOT + BOT=bot + bot.prefix="]" + bot.event_manager.on("privmsg",on_privmsg)