#!/usr/bin/env python3 import asyncio, random from irctokens import build, Line from ircrobots import Bot as BaseBot from ircrobots import Server as BaseServer from ircrobots import ConnectionParams # xfnw was too lazy to import any more, so... from ircrobots.server import * from config import * class Server(BaseServer): # overwrite connect so i can put try except blocks there async def connect(self, transport: ITCPTransport, params: ConnectionParams): try: await sts_transmute(params) await resume_transmute(params) reader, writer = await transport.connect( params.host, params.port, tls=params.tls, tls_verify=params.tls_verify, bindhost=params.bindhost, ) self._reader = reader self._writer = writer self.params = params await self.handshake() except: print("connection with {} failed, disconnecting".format(self.name)) self.disconnected = True async def line_read(self, line: Line): print(f"{self.name} < {line.format()}") if line.command == "001": print(f"connected to {self.name}") self.chans = SERVERS[self.name]["chans"] self.chans_actual = [] for c in self.chans: # for details on the '!' see config await self.send(build("JOIN", [c.strip("!")])) self.chans_actual.append(c.strip("!")) if line.command == "PRIVMSG" and line.params[0] == self.nickname: line.params.pop(0) nick = line.source.split("!")[0] text = line.params[0] if text.startswith("!") and nick in ADMINS: args = text[1:].split(" ") asyncio.create_task(self.ac(self.name, args)) return if line.command == "PRIVMSG" and line.params[0] in self.chans_actual: chan = line.params.pop(0) me = False if "\1ACTION" in line.params[0]: me = True text = line.params[0].replace("\1ACTION", "").replace("\1", "") nick = line.source.split("!")[0] if me: text = "* " + nick + text if ( nick == self.nickname or (line.tags and "batch" in line.tags) or "\x0f\x0f\x0f\x0f" in text ): return if ( nick.lower() in self.users and self.users[nick.lower()].account in ADMINS ): if ( text[: len(self.nickname) + 3].lower() == f"{self.nickname}: !".lower() ): args = text[len(self.nickname) + 3 :].split(" ") if args[0] == "connect" and len(args) > 4: await self.bot.add_server( args[1], ConnectionParams( NICKNAME, args[2], args[3], bool(int(args[4])) ), ) for c in self.chans_actual: await self.send( build( "PRIVMSG", [c, "Connected to {} :3".format(args[1])] ) ) return for i in random.sample( list(self.bot.servers), len(self.bot.servers) ): asyncio.create_task(self.bot.servers[i].ac(self.name, args)) return for npn in NOPING: offset = 1 for loc in find_all_indexes(text.lower(), npn.lower()): text = text[: loc + offset] + "\u200c" + text[loc + offset :] offset += 1 for server in self.bot.servers: hide = False if len(SERVERS[self.name]["chans"]) == 0: hide = True else: for c in SERVERS[self.name]["chans"]: if c.endswith(chan) and SERVERS[server].get("hidechan") == True: hide = True break asyncio.create_task( self.bot.servers[server].bc(self.name, chan, nick, text, hide) ) if line.command == "INVITE": await self.send(build("JOIN", [line.params[1]])) # TODO: add to relay chans if needed self.chans.append(line.params[1]) self.chans_actual.append(line.params[1]) async def line_send(self, line: Line): print(f"{self.name} > {line.format()}") async def bc(self, name, chan, nick, msg, hide): if self.disconnected or "chans" not in list(dir(self)): return if name == self.name and len(SERVERS[name]["chans"]) == 1: print("NAME", name, self.name) return print(self.name) for c in self.chans_actual: # if c != chan: print("RELAYING", c) s = f"\x0f\x0f\x0f\x0f<{nick[:1]}\u200c{nick[1:]}/{name}" if not hide: s += f"{chan}" s += f"> {msg}" await self.send(build("PRIVMSG", [c, s])) async def ac(self, name, args): if self.disconnected or "chans" not in list(dir(self)): return nargs = [] isComb = False for arg in args: print("a", arg) if arg[0] == ":": isComb = True nargs.append(arg[1:]) continue if isComb: nargs[-1] += " " + arg else: nargs.append(arg) print("nargs:", nargs) await self.send( build(nargs[0], [self.chans_actual[0]] + nargs[1:]) ) # TODO: loop over chans class Bot(BaseBot): def create_server(self, name: str): return Server(self, name) def find_all_indexes(input_str, search_str): l1 = [] length = len(input_str) index = 0 while index < length: i = input_str.find(search_str, index) if i == -1: return l1 l1.append(i) index = i + 1 return l1 async def main(): bot = Bot() for name, s in SERVERS.items(): params = ConnectionParams(NICKNAME, *s["connection"]) await bot.add_server(name, params) await bot.run() if __name__ == "__main__": asyncio.run(main())