relay/main.py

228 lines
7.3 KiB
Python

#!/usr/bin/env python3
import asyncio
import random
# xfnw was too lazy to import any more, so he went with a star-import
# :<
# well, I'm also lazy, but I am IN NO WAY letting my LSP scream at me for
# all the unknown symbols, I just use an alias instead.
import irctokens as It
import ircrobots as Ir
import ircrobots.server as S
import config as cfg
## LOGGING ##
f_log = None
def log(type_: str, msg: str, also_print: bool = False):
"""Log a message, optionally echoed to stdout, with a type prefix."""
f_log.write(f"[{type_}] {msg}\n")
if also_print:
print(msg)
def manual_log(type_: str, msg: str, *args, **kwargs):
"""Manually log a message with given type."""
log("manual", f"({type_}) {msg}", *args, **kwargs)
## UTILS ##
# Coming soon
## BOT ##
class Server(Ir.Server):
# overwrite connect so i can put try except blocks there
async def connect(self, transport: S.ITCPTransport, params: Ir.ConnectionParams):
try:
await S.sts_transmute(params)
await S.resume_transmute(params)
reader, writer = await transport.connect(
params.host,
params.port,
tls=params.tls,
bindhost=params.bindhost,
)
self._reader = reader
self._writer = writer
self.params = params
await self.handshake()
except:
manual_log("ERROR", "connection with {} failed, disconnecting".format(self.name), also_print=True)
self.disconnected = True
def auto_log(self, msg: str, *args, **kwargs):
"""Log an automated message with server name as info."""
log(self.name, msg, *args, **kwargs)
async def line_read(self, line: It.line):
self.auto_log(f"< {line.format()}")
if line.command == "001":
manual_log("status", f"connected to {self.name} :D", True)
self.chans = cfg.SERVERS[self.name]["chans"]
self.chans_actual = []
for c in self.chans:
await self.send(It.build("JOIN", [c]))
manual_log("status", f"joined {self.name} {c} through config", True)
self.chans_actual.append(c)
elif line.command == "PRIVMSG" and line.params[0] == self.nickname:
self.auto_log(f"< {line.format()}")
line.params.pop(0)
nick = line.source.split("!")[0]
text = line.params[0]
if text.startswith("!") and nick in cfg.ADMINS:
args = text[1:].split(" ")
asyncio.create_task(self.ac(self.name, args))
return
elif line.command == "PRIVMSG" and line.params[0] in self.chans_actual:
chan = line.params.pop(0)
me = False
if "\1ACTION" in line.params[0]:
me = True
text = line.params[0].replace("\1ACTION", "").replace("\1", "")
nick = line.source.split("!")[0]
if me:
text = "* " + nick + text
if (
nick == self.nickname
or (line.tags and "batch" in line.tags)
or "\x0f\x0f\x0f\x0f" in text
):
return
if (
nick.lower() in self.users
and self.users[nick.lower()].account in cfg.ADMINS
):
if (
text[: len(self.nickname) + 3].lower()
== f"{self.nickname}: !".lower()
):
args = text[len(self.nickname) + 3 :].split(" ")
if args[0] == "connect" and len(args) > 4:
await self.bot.add_server(
args[1],
Ir.ConnectionParams(cfg.NICKNAME, args[2], args[3]),
)
for c in self.chans_actual:
await self.send(
It.build(
"PRIVMSG", [c, "Connected to {} :3".format(args[1])]
)
)
return
asyncio.create_task(self.ac(self.name, args))
return
for npn in cfg.NOPING:
offset = 1
for loc in find_all_indexes(text.lower(), npn.lower()):
text = text[: loc + offset] + "\u200b" + text[loc + offset :]
offset += 1
for server in self.bot.servers:
hide = False
if len(cfg.SERVERS[self.name]["chans"]) == 0:
hide = True
else:
for c in cfg.SERVERS[self.name]["chans"]:
if c.endswith(chan) and cfg.SERVERS[server].get("hidechan") == True:
hide = True
break
asyncio.create_task(
self.bot.servers[server].bc(self.name, chan, nick, text, hide)
)
if line.command == "INVITE":
await self.send(It.build("JOIN", [line.params[1]]))
manual_log("status", f"joined {self.name} {line.params[1]} through invite", True)
# TODO: add to relay chans if needed
self.chans.append(line.params[1])
self.chans_actual.append(line.params[1])
async def line_send(self, line: It.Line):
self.auto_log(f"> {line.format()}")
async def bc(self, name, chan, nick, msg, hide):
"""Handle a relay operation."""
manual_log("relay", f"{self.name}{chan}@{nick}{'(hidechan)' if hide else ''}: {msg}", True)
if self.disconnected or "chans" not in list(dir(self)):
return
if name == self.name and len(cfg.SERVERS[name]["chans"]) == 1:
return
for c in self.chans_actual:
s = f"\x0f\x0f\x0f\x0f<{nick[:1]}\u200b{nick[1:]}/{name}"
if not hide:
s += f"{chan}"
s += f"> {msg}"
await self.send(It.build("PRIVMSG", [c, s]))
async def ac(self, name, args):
"""Handle an admin command"""
manual_log("admin-command", f"{self.name}: {args}", True)
if self.disconnected or "chans" not in list(dir(self)):
return
nargs = []
isComb = False
for arg in args:
if arg[0] == ":":
isComb = True
nargs.append(arg[1:])
continue
if isComb:
nargs[-1] += " " + arg
else:
nargs.append(arg)
await self.send(It.build(nargs[0], nargs[1:])) # TODO: loop over chans
class Bot(Ir.Bot):
def create_server(self, name: str):
return Server(self, name)
def find_all_indexes(input_str, search_str):
l1 = []
length = len(input_str)
index = 0
while index < length:
i = input_str.find(search_str, index)
if i == -1:
return l1
l1.append(i)
index = i + 1
return l1
async def main():
bot = Bot()
for name, s in cfg.SERVERS.items():
params = Ir.ConnectionParams(cfg.NICKNAME, **s["connection"])
await bot.add_server(name, params)
await bot.run()
if __name__ == "__main__":
f_log = open(cfg.log_file, "w+")
asyncio.run(main())