relay/main.py

189 lines
6.1 KiB
Python

#!/usr/bin/env python3
import asyncio, random
from irctokens import build, Line
from ircrobots import Bot as BaseBot
from ircrobots import Server as BaseServer
from ircrobots import ConnectionParams
# xfnw was too lazy to import any more, so...
from ircrobots.server import *
from config import *
class Server(BaseServer):
# overwrite connect so i can put try except blocks there
async def connect(self, transport: ITCPTransport, params: ConnectionParams):
try:
await sts_transmute(params)
await resume_transmute(params)
reader, writer = await transport.connect(
params.host,
params.port,
tls=params.tls,
bindhost=params.bindhost,
)
self._reader = reader
self._writer = writer
self.params = params
await self.handshake()
except:
print("connection with {} failed, disconnecting".format(self.name))
self.disconnected = True
async def line_read(self, line: Line):
print(f"{self.name} < {line.format()}")
if line.command == "001":
print(f"connected to {self.name}")
self.chans = SERVERS[self.name]["chans"]
self.chans_actual = []
for c in self.chans:
# for details on the '!' see config
await self.send(build("JOIN", [c.strip("!")]))
self.chans_actual.append(c.strip("!"))
if line.command == "PRIVMSG" and line.params[0] == self.nickname:
line.params.pop(0)
nick = line.source.split("!")[0]
text = line.params[0]
if text.startswith("!") and nick in ADMINS:
args = text[1:].split(" ")
asyncio.create_task(self.ac(self.name, args))
return
if line.command == "PRIVMSG" and line.params[0] in self.chans_actual:
chan = line.params.pop(0)
me = False
if "\1ACTION" in line.params[0]:
me = True
text = line.params[0].replace("\1ACTION", "").replace("\1", "")
nick = line.source.split("!")[0]
if me:
text = "* " + nick + text
if (
nick == self.nickname
or (line.tags and "batch" in line.tags)
or "\x0f\x0f\x0f\x0f" in text
):
return
if (
nick.lower() in self.users
and self.users[nick.lower()].account in ADMINS
):
if (
text[: len(self.nickname) + 3].lower()
== f"{self.nickname}: !".lower()
):
args = text[len(self.nickname) + 3 :].split(" ")
if args[0] == "connect" and len(args) > 4:
await self.bot.add_server(
args[1],
ConnectionParams(NICKNAME, args[2], args[3]),
)
for c in self.chans_actual:
await self.send(
build(
"PRIVMSG", [c, "Connected to {} :3".format(args[1])]
)
)
return
asyncio.create_task(self.ac(self.name, args))
return
for npn in NOPING:
offset = 1
for loc in find_all_indexes(text.lower(), npn.lower()):
text = text[: loc + offset] + "\u200b" + text[loc + offset :]
offset += 1
for server in self.bot.servers:
hide = False
if len(SERVERS[self.name]["chans"]) == 0:
hide = True
else:
for c in SERVERS[self.name]["chans"]:
if c.endswith(chan) and SERVERS[server].get("hidechan") == True:
hide = True
break
asyncio.create_task(
self.bot.servers[server].bc(self.name, chan, nick, text, hide)
)
if line.command == "INVITE":
await self.send(build("JOIN", [line.params[1]]))
# TODO: add to relay chans if needed
self.chans.append(line.params[1])
self.chans_actual.append(line.params[1])
async def line_send(self, line: Line):
print(f"{self.name} > {line.format()}")
async def bc(self, name, chan, nick, msg, hide):
if self.disconnected or "chans" not in list(dir(self)):
return
if name == self.name and len(SERVERS[name]["chans"]) == 1:
return
for c in self.chans_actual:
# if c != chan:
s = f"\x0f\x0f\x0f\x0f<{nick[:1]}\u200b{nick[1:]}/{name}"
if not hide:
s += f"{chan}"
s += f"> {msg}"
await self.send(build("PRIVMSG", [c, s]))
async def ac(self, name, args):
if self.disconnected or "chans" not in list(dir(self)):
return
nargs = []
isComb = False
for arg in args:
if arg[0] == ":":
isComb = True
nargs.append(arg[1:])
continue
if isComb:
nargs[-1] += " " + arg
else:
nargs.append(arg)
print("nargs:", nargs)
await self.send(build(nargs[0], nargs[1:])) # TODO: loop over chans
class Bot(BaseBot):
def create_server(self, name: str):
return Server(self, name)
def find_all_indexes(input_str, search_str):
l1 = []
length = len(input_str)
index = 0
while index < length:
i = input_str.find(search_str, index)
if i == -1:
return l1
l1.append(i)
index = i + 1
return l1
async def main():
bot = Bot()
for name, s in SERVERS.items():
params = ConnectionParams(NICKNAME, *s["connection"])
await bot.add_server(name, params)
await bot.run()
if __name__ == "__main__":
asyncio.run(main())