#!/usr/bin/env python3 import asyncio import random # xfnw was too lazy to import any more, so he went with a star-import # :< # well, I'm also lazy, but I am IN NO WAY letting my LSP scream at me for # all the unknown symbols, I just use an alias instead. import irctokens as It import ircrobots as Ir import ircrobots.server as S import config as cfg ## LOGGING ## f_log = None def log(type_: str, msg: str, also_print: bool = False): """Log a message, optionally echoed to stdout, with a type prefix.""" f_log.write(f"[{type_}] {msg}\n") if also_print: print(msg) def manual_log(type_: str, msg: str, *args, **kwargs): """Manually log a message with given type.""" log("manual", f"({type_}) {msg}", *args, **kwargs) ## UTILS ## # Coming soon ## BOT ## class Server(Ir.Server): # overwrite connect so i can put try except blocks there async def connect(self, transport: S.ITCPTransport, params: Ir.ConnectionParams): try: await S.sts_transmute(params) await S.resume_transmute(params) reader, writer = await transport.connect( params.host, params.port, tls=params.tls, bindhost=params.bindhost, ) self._reader = reader self._writer = writer self.params = params await self.handshake() except: manual_log("ERROR", "connection with {} failed, disconnecting".format(self.name), also_print=True) self.disconnected = True def auto_log(self, msg: str, *args, **kwargs): """Log an automated message with server name as info.""" log(self.name, msg, *args, **kwargs) async def line_read(self, line: It.line): self.auto_log(f"< {line.format()}") if line.command == "001": manual_log("status", f"connected to {self.name} :D", True) self.chans = cfg.SERVERS[self.name]["chans"] self.chans_actual = [] for c in self.chans: await self.send(It.build("JOIN", [c])) manual_log("status", f"joined {self.name} {c} through config", True) self.chans_actual.append(c) elif line.command == "PRIVMSG" and line.params[0] == self.nickname: self.auto_log(f"< {line.format()}") line.params.pop(0) nick = line.source.split("!")[0] text = line.params[0] if text.startswith("!") and nick in cfg.ADMINS: args = text[1:].split(" ") asyncio.create_task(self.ac(self.name, args)) return elif line.command == "PRIVMSG" and line.params[0] in self.chans_actual: chan = line.params.pop(0) me = False if "\1ACTION" in line.params[0]: me = True text = line.params[0].replace("\1ACTION", "").replace("\1", "") nick = line.source.split("!")[0] if me: text = "* " + nick + text if ( nick == self.nickname or (line.tags and "batch" in line.tags) or "\x0f\x0f\x0f\x0f" in text ): return if ( nick.lower() in self.users and self.users[nick.lower()].account in cfg.ADMINS ): if ( text[: len(self.nickname) + 3].lower() == f"{self.nickname}: !".lower() ): args = text[len(self.nickname) + 3 :].split(" ") if args[0] == "connect" and len(args) > 4: await self.bot.add_server( args[1], Ir.ConnectionParams(cfg.NICKNAME, args[2], args[3]), ) for c in self.chans_actual: await self.send( It.build( "PRIVMSG", [c, "Connected to {} :3".format(args[1])] ) ) return asyncio.create_task(self.ac(self.name, args)) return for npn in cfg.NOPING: offset = 1 for loc in find_all_indexes(text.lower(), npn.lower()): text = text[: loc + offset] + "\u200b" + text[loc + offset :] offset += 1 for server in self.bot.servers: hide = False if len(cfg.SERVERS[self.name]["chans"]) == 0: hide = True else: for c in cfg.SERVERS[self.name]["chans"]: if c.endswith(chan) and cfg.SERVERS[server].get("hidechan") == True: hide = True break asyncio.create_task( self.bot.servers[server].bc(self.name, chan, nick, text, hide) ) if line.command == "INVITE": await self.send(It.build("JOIN", [line.params[1]])) manual_log("status", f"joined {self.name} {line.params[1]} through invite", True) # TODO: add to relay chans if needed self.chans.append(line.params[1]) self.chans_actual.append(line.params[1]) async def line_send(self, line: It.Line): self.auto_log(f"> {line.format()}") async def bc(self, name, chan, nick, msg, hide): """Handle a relay operation.""" manual_log("relay", f"{self.name}{chan}@{nick}{'(hidechan)' if hide else ''}: {msg}", True) if self.disconnected or "chans" not in list(dir(self)): return if name == self.name and len(cfg.SERVERS[name]["chans"]) == 1: return for c in self.chans_actual: s = f"\x0f\x0f\x0f\x0f<{nick[:1]}\u200b{nick[1:]}/{name}" if not hide: s += f"{chan}" s += f"> {msg}" await self.send(It.build("PRIVMSG", [c, s])) async def ac(self, name, args): """Handle an admin command""" manual_log("admin-command", f"{self.name}: {args}", True) if self.disconnected or "chans" not in list(dir(self)): return nargs = [] isComb = False for arg in args: if arg[0] == ":": isComb = True nargs.append(arg[1:]) continue if isComb: nargs[-1] += " " + arg else: nargs.append(arg) await self.send(It.build(nargs[0], nargs[1:])) # TODO: loop over chans class Bot(Ir.Bot): def create_server(self, name: str): return Server(self, name) def find_all_indexes(input_str, search_str): l1 = [] length = len(input_str) index = 0 while index < length: i = input_str.find(search_str, index) if i == -1: return l1 l1.append(i) index = i + 1 return l1 async def main(): bot = Bot() for name, s in cfg.SERVERS.items(): params = Ir.ConnectionParams(cfg.NICKNAME, **s["connection"]) await bot.add_server(name, params) await bot.run() if __name__ == "__main__": f_log = open(cfg.log_file, "w+") asyncio.run(main())