228 lines
7.3 KiB
Python
228 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import asyncio
|
|
import random
|
|
|
|
# xfnw was too lazy to import any more, so he went with a star-import
|
|
# :<
|
|
# well, I'm also lazy, but I am IN NO WAY letting my LSP scream at me for
|
|
# all the unknown symbols, I just use an alias instead.
|
|
import irctokens as It
|
|
import ircrobots as Ir
|
|
import ircrobots.server as S
|
|
|
|
import config as cfg
|
|
|
|
|
|
## LOGGING ##
|
|
|
|
f_log = None
|
|
|
|
def log(type_: str, msg: str, also_print: bool = False):
|
|
"""Log a message, optionally echoed to stdout, with a type prefix."""
|
|
f_log.write(f"[{type_}] {msg}\n")
|
|
if also_print:
|
|
print(msg)
|
|
|
|
def manual_log(type_: str, msg: str, *args, **kwargs):
|
|
"""Manually log a message with given type."""
|
|
log("manual", f"({type_}) {msg}", *args, **kwargs)
|
|
|
|
## UTILS ##
|
|
|
|
# Coming soon
|
|
|
|
## BOT ##
|
|
|
|
class Server(Ir.Server):
|
|
|
|
# overwrite connect so i can put try except blocks there
|
|
async def connect(self, transport: S.ITCPTransport, params: Ir.ConnectionParams):
|
|
try:
|
|
await S.sts_transmute(params)
|
|
await S.resume_transmute(params)
|
|
|
|
reader, writer = await transport.connect(
|
|
params.host,
|
|
params.port,
|
|
tls=params.tls,
|
|
bindhost=params.bindhost,
|
|
)
|
|
|
|
self._reader = reader
|
|
self._writer = writer
|
|
|
|
self.params = params
|
|
await self.handshake()
|
|
except:
|
|
manual_log("ERROR", "connection with {} failed, disconnecting".format(self.name), also_print=True)
|
|
self.disconnected = True
|
|
|
|
def auto_log(self, msg: str, *args, **kwargs):
|
|
"""Log an automated message with server name as info."""
|
|
log(self.name, msg, *args, **kwargs)
|
|
|
|
async def line_read(self, line: It.line):
|
|
self.auto_log(f"< {line.format()}")
|
|
if line.command == "001":
|
|
manual_log("status", f"connected to {self.name} :D", True)
|
|
self.chans = cfg.SERVERS[self.name]["chans"]
|
|
self.chans_actual = []
|
|
for c in self.chans:
|
|
await self.send(It.build("JOIN", [c]))
|
|
manual_log("status", f"joined {self.name} {c} through config", True)
|
|
self.chans_actual.append(c)
|
|
|
|
elif line.command == "PRIVMSG" and line.params[0] == self.nickname:
|
|
self.auto_log(f"< {line.format()}")
|
|
|
|
line.params.pop(0)
|
|
nick = line.source.split("!")[0]
|
|
text = line.params[0]
|
|
if text.startswith("!") and nick in cfg.ADMINS:
|
|
args = text[1:].split(" ")
|
|
asyncio.create_task(self.ac(self.name, args))
|
|
return
|
|
|
|
elif line.command == "PRIVMSG" and line.params[0] in self.chans_actual:
|
|
chan = line.params.pop(0)
|
|
me = False
|
|
if "\1ACTION" in line.params[0]:
|
|
me = True
|
|
text = line.params[0].replace("\1ACTION", "").replace("\1", "")
|
|
nick = line.source.split("!")[0]
|
|
if me:
|
|
text = "* " + nick + text
|
|
if (
|
|
nick == self.nickname
|
|
or (line.tags and "batch" in line.tags)
|
|
or "\x0f\x0f\x0f\x0f" in text
|
|
):
|
|
return
|
|
|
|
if (
|
|
nick.lower() in self.users
|
|
and self.users[nick.lower()].account in cfg.ADMINS
|
|
):
|
|
if (
|
|
text[: len(self.nickname) + 3].lower()
|
|
== f"{self.nickname}: !".lower()
|
|
):
|
|
args = text[len(self.nickname) + 3 :].split(" ")
|
|
if args[0] == "connect" and len(args) > 4:
|
|
await self.bot.add_server(
|
|
args[1],
|
|
Ir.ConnectionParams(cfg.NICKNAME, args[2], args[3]),
|
|
)
|
|
for c in self.chans_actual:
|
|
await self.send(
|
|
It.build(
|
|
"PRIVMSG", [c, "Connected to {} :3".format(args[1])]
|
|
)
|
|
)
|
|
return
|
|
asyncio.create_task(self.ac(self.name, args))
|
|
return
|
|
|
|
for npn in cfg.NOPING:
|
|
offset = 1
|
|
for loc in find_all_indexes(text.lower(), npn.lower()):
|
|
text = text[: loc + offset] + "\u200b" + text[loc + offset :]
|
|
offset += 1
|
|
|
|
for server in self.bot.servers:
|
|
hide = False
|
|
if len(cfg.SERVERS[self.name]["chans"]) == 0:
|
|
hide = True
|
|
else:
|
|
for c in cfg.SERVERS[self.name]["chans"]:
|
|
if c.endswith(chan) and cfg.SERVERS[server].get("hidechan") == True:
|
|
hide = True
|
|
break
|
|
asyncio.create_task(
|
|
self.bot.servers[server].bc(self.name, chan, nick, text, hide)
|
|
)
|
|
|
|
if line.command == "INVITE":
|
|
await self.send(It.build("JOIN", [line.params[1]]))
|
|
manual_log("status", f"joined {self.name} {line.params[1]} through invite", True)
|
|
# TODO: add to relay chans if needed
|
|
self.chans.append(line.params[1])
|
|
self.chans_actual.append(line.params[1])
|
|
|
|
async def line_send(self, line: It.Line):
|
|
self.auto_log(f"> {line.format()}")
|
|
|
|
async def bc(self, name, chan, nick, msg, hide):
|
|
"""Handle a relay operation."""
|
|
|
|
manual_log("relay", f"{self.name}{chan}@{nick}{'(hidechan)' if hide else ''}: {msg}", True)
|
|
|
|
if self.disconnected or "chans" not in list(dir(self)):
|
|
return
|
|
if name == self.name and len(cfg.SERVERS[name]["chans"]) == 1:
|
|
return
|
|
|
|
for c in self.chans_actual:
|
|
s = f"\x0f\x0f\x0f\x0f<{nick[:1]}\u200b{nick[1:]}/{name}"
|
|
|
|
if not hide:
|
|
s += f"{chan}"
|
|
s += f"> {msg}"
|
|
|
|
await self.send(It.build("PRIVMSG", [c, s]))
|
|
|
|
async def ac(self, name, args):
|
|
"""Handle an admin command"""
|
|
|
|
manual_log("admin-command", f"{self.name}: {args}", True)
|
|
|
|
if self.disconnected or "chans" not in list(dir(self)):
|
|
return
|
|
|
|
nargs = []
|
|
isComb = False
|
|
for arg in args:
|
|
if arg[0] == ":":
|
|
isComb = True
|
|
nargs.append(arg[1:])
|
|
continue
|
|
if isComb:
|
|
nargs[-1] += " " + arg
|
|
else:
|
|
nargs.append(arg)
|
|
|
|
await self.send(It.build(nargs[0], nargs[1:])) # TODO: loop over chans
|
|
|
|
|
|
class Bot(Ir.Bot):
|
|
def create_server(self, name: str):
|
|
return Server(self, name)
|
|
|
|
|
|
def find_all_indexes(input_str, search_str):
|
|
l1 = []
|
|
length = len(input_str)
|
|
index = 0
|
|
while index < length:
|
|
i = input_str.find(search_str, index)
|
|
if i == -1:
|
|
return l1
|
|
l1.append(i)
|
|
index = i + 1
|
|
return l1
|
|
|
|
|
|
async def main():
|
|
bot = Bot()
|
|
for name, s in cfg.SERVERS.items():
|
|
params = Ir.ConnectionParams(cfg.NICKNAME, **s["connection"])
|
|
await bot.add_server(name, params)
|
|
|
|
await bot.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
f_log = open(cfg.log_file, "w+")
|
|
asyncio.run(main())
|