mirror of https://github.com/jesopo/ircstates
908 lines
31 KiB
Python
908 lines
31 KiB
Python
from ipaddress import ip_address
|
|
from typing import Callable, Dict, List, Optional, Set, Tuple
|
|
from irctokens import Line, build, Hostmask, StatefulDecoder, StatefulEncoder
|
|
from irctokens import hostmask as hostmask_
|
|
from pendulum import from_timestamp, now
|
|
|
|
from .user import User
|
|
from .channel import Channel
|
|
from .channel_user import ChannelUser
|
|
from .isupport import ISupport
|
|
from .decorators import handler_decorator
|
|
from .casemap import casefold
|
|
from .names import Name
|
|
from .emit import *
|
|
from .numerics import *
|
|
|
|
LINE_HANDLERS: Dict[str, List[Callable[["Server", Line], Emit]]] = {}
|
|
line_handler = handler_decorator(LINE_HANDLERS)
|
|
|
|
class ServerException(Exception):
|
|
pass
|
|
class ServerDisconnectedException(ServerException):
|
|
pass
|
|
|
|
WHO_TYPE = "735" # randomly generated
|
|
TYPE_EMIT = Optional[Emit]
|
|
|
|
class Server(object):
|
|
def __init__(self, name: str):
|
|
self.name = name
|
|
|
|
self.nickname = ""
|
|
self.nickname_lower = ""
|
|
self.username: Optional[str] = None
|
|
self.hostname: Optional[str] = None
|
|
self.realname: Optional[str] = None
|
|
self.account: Optional[str] = None
|
|
self.server: Optional[str] = None
|
|
self.away: Optional[str] = None
|
|
self.ip: Optional[str] = None
|
|
|
|
self.registered = False
|
|
self.modes: Set[str] = set()
|
|
self.motd: List[str] = []
|
|
|
|
self._decoder = StatefulDecoder()
|
|
|
|
self.users: Dict[str, User] = {}
|
|
self.channels: Dict[str, Channel] = {}
|
|
|
|
self.isupport = ISupport()
|
|
|
|
self.has_cap: bool = False
|
|
self._temp_caps: Dict[str, str] = {}
|
|
self.available_caps: Dict[str, str] = {}
|
|
self.agreed_caps: List[str] = []
|
|
|
|
def __repr__(self) -> str:
|
|
return f"Server(name={self.name!r})"
|
|
|
|
def recv(self, data: bytes) -> List[Line]:
|
|
lines = self._decoder.push(data)
|
|
if lines is None:
|
|
raise ServerDisconnectedException()
|
|
return lines
|
|
|
|
def parse_tokens(self, line: Line) -> TYPE_EMIT:
|
|
ret_emit: TYPE_EMIT = None
|
|
if line.command in LINE_HANDLERS:
|
|
for callback in LINE_HANDLERS[line.command]:
|
|
emit = callback(self, line)
|
|
if emit is not None and ret_emit is None:
|
|
emit.command = line.command
|
|
ret_emit = emit
|
|
return ret_emit
|
|
|
|
def casefold(self, s1: str):
|
|
return casefold(self.isupport.casemapping, s1)
|
|
def casefold_equals(self, s1: str, s2: str):
|
|
return self.casefold(s1) == self.casefold(s2)
|
|
def is_me(self, nickname: str):
|
|
return self.casefold(nickname) == self.nickname_lower
|
|
|
|
def has_user(self, nickname: str) -> bool:
|
|
return self.casefold(nickname) in self.users
|
|
def _add_user(self, nickname: str, nickname_lower: str):
|
|
user = self.create_user(Name(nickname, nickname_lower))
|
|
self.users[nickname_lower] = user
|
|
|
|
def is_channel(self, target: str) -> bool:
|
|
return target[:1] in self.isupport.chantypes
|
|
def has_channel(self, name: str) -> bool:
|
|
return self.casefold(name) in self.channels
|
|
def get_channel(self, name: str) -> Optional[Channel]:
|
|
return self.channels.get(self.casefold(name), None)
|
|
|
|
def create_user(self, nickname: Name) -> User:
|
|
return User(nickname)
|
|
|
|
def create_channel(self, name: Name) -> Channel:
|
|
return Channel(name)
|
|
|
|
def _user_join(self, channel: Channel, user: User) -> ChannelUser:
|
|
channel_user = ChannelUser(
|
|
user.get_name(),
|
|
channel.get_name())
|
|
|
|
user.channels.add(self.casefold(channel.name))
|
|
channel.users[user.nickname_lower] = channel_user
|
|
return channel_user
|
|
|
|
def prepare_whox(self, target: str) -> Line:
|
|
return build("WHO", [target, f"n%afhinrstu,{WHO_TYPE}"])
|
|
|
|
def _self_hostmask(self, hostmask: Hostmask):
|
|
self.nickname = hostmask.nickname
|
|
if hostmask.username:
|
|
self.username = hostmask.username
|
|
if hostmask.hostname:
|
|
self.hostname = hostmask.hostname
|
|
|
|
def _emit(self) -> Emit:
|
|
return Emit()
|
|
|
|
@line_handler(RPL_WELCOME)
|
|
# first message reliably sent to us after registration is complete
|
|
def _handle_welcome(self, line: Line) -> Emit:
|
|
self.nickname = line.params[0]
|
|
self.nickname_lower = self.casefold(line.params[0])
|
|
self.registered = True
|
|
return self._emit()
|
|
|
|
@line_handler(RPL_ISUPPORT)
|
|
# https://defs.ircdocs.horse/defs/isupport.html
|
|
def _handle_ISUPPORT(self, line: Line) -> Emit:
|
|
self.isupport.from_tokens(line.params[1:-1])
|
|
return self._emit()
|
|
|
|
@line_handler(RPL_MOTDSTART)
|
|
# start of MOTD
|
|
def _handle_motd_start(self, line: Line) -> Emit:
|
|
self.motd.clear()
|
|
return self._emit()
|
|
@line_handler(RPL_MOTDSTART)
|
|
# start of MOTD
|
|
@line_handler(RPL_MOTD)
|
|
# line of MOTD
|
|
def _handle_motd_line(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
text = line.params[1]
|
|
emit.text = text
|
|
self.motd.append(text)
|
|
return emit
|
|
|
|
@line_handler("NICK")
|
|
def _handle_NICK(self, line: Line) -> Emit:
|
|
new_nickname = line.params[0]
|
|
new_nickname_lower = self.casefold(new_nickname)
|
|
nickname_lower = self.casefold(line.hostmask.nickname)
|
|
|
|
emit = self._emit()
|
|
|
|
if nickname_lower in self.users:
|
|
user = self.users.pop(nickname_lower)
|
|
emit.user = user
|
|
user.change_nickname(new_nickname, new_nickname_lower)
|
|
self.users[new_nickname_lower] = user
|
|
|
|
for channel_lower in user.channels:
|
|
channel = self.channels[channel_lower]
|
|
channel_user = channel.users.pop(nickname_lower)
|
|
channel.users[user.nickname_lower] = channel_user
|
|
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
|
|
self.nickname = new_nickname
|
|
self.nickname_lower = new_nickname_lower
|
|
return emit
|
|
|
|
@line_handler("JOIN")
|
|
def _handle_JOIN(self, line: Line) -> Emit:
|
|
extended = len(line.params) == 3
|
|
|
|
account = line.params[1].strip("*") if extended else None
|
|
realname = line.params[2] if extended else None
|
|
|
|
emit = self._emit()
|
|
|
|
channel_lower = self.casefold(line.params[0])
|
|
nickname_lower = self.casefold(line.hostmask.nickname)
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
if not channel_lower in self.channels:
|
|
channel = self.create_channel(
|
|
Name(line.params[0], channel_lower)
|
|
)
|
|
#TODO: put this somewhere better
|
|
for mode in self.isupport.chanmodes.a_modes:
|
|
channel.list_modes[mode] = []
|
|
|
|
self.channels[channel_lower] = channel
|
|
|
|
self._self_hostmask(line.hostmask)
|
|
if extended:
|
|
self.account = account
|
|
self.realname = realname
|
|
|
|
if channel_lower in self.channels:
|
|
channel = self.channels[channel_lower]
|
|
emit.channel = channel
|
|
if not nickname_lower in self.users:
|
|
self._add_user(line.hostmask.nickname, nickname_lower)
|
|
|
|
user = self.users[nickname_lower]
|
|
emit.user = user
|
|
if line.hostmask.username:
|
|
user.username = line.hostmask.username
|
|
if line.hostmask.hostname:
|
|
user.hostname = line.hostmask.hostname
|
|
if extended:
|
|
user.account = account
|
|
user.realname = realname
|
|
|
|
channel_user = self._user_join(channel, user)
|
|
channel_user.joined = now("utc")
|
|
return emit
|
|
|
|
def _user_part(self, line: Line,
|
|
nickname: str,
|
|
channel_name: str,
|
|
reason_i: int) -> Tuple[Emit, Optional[User]]:
|
|
emit = self._emit()
|
|
channel_lower = self.casefold(channel_name)
|
|
reason = line.params[reason_i] if line.params[reason_i:] else None
|
|
if not reason is None:
|
|
emit.text = reason
|
|
|
|
user: Optional[User] = None
|
|
if channel_lower in self.channels:
|
|
channel = self.channels[channel_lower]
|
|
emit.channel = channel
|
|
|
|
nickname_lower = self.casefold(nickname)
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
|
|
user.channels.remove(channel.name_lower)
|
|
del channel.users[user.nickname_lower]
|
|
if not user.channels:
|
|
del self.users[nickname_lower]
|
|
|
|
if nickname_lower == self.nickname_lower:
|
|
del self.channels[channel_lower]
|
|
|
|
for key, cuser in channel.users.items():
|
|
ruser = self.users[key]
|
|
ruser.channels.remove(channel.name_lower)
|
|
if not ruser.channels:
|
|
del self.users[ruser.nickname_lower]
|
|
|
|
return emit, user
|
|
|
|
@line_handler("PART")
|
|
def _handle_PART(self, line: Line) -> Emit:
|
|
emit, user = self._user_part(line, line.hostmask.nickname,
|
|
line.params[0], 1)
|
|
if not user is None:
|
|
emit.user = user
|
|
if user.nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
return emit
|
|
@line_handler("KICK")
|
|
def _handle_KICK(self, line: Line) -> Emit:
|
|
emit, kicked = self._user_part(line, line.params[1], line.params[0],
|
|
2)
|
|
if not kicked is None:
|
|
emit.user_target = kicked
|
|
|
|
if kicked.nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
|
|
kicker_lower = self.casefold(line.hostmask.nickname)
|
|
if kicker_lower == self.nickname_lower:
|
|
emit.self_source = True
|
|
|
|
if kicker_lower in self.users:
|
|
emit.user_source = self.users[kicker_lower]
|
|
else:
|
|
emit.user_source = self.create_user(
|
|
Name(line.hostmask.nickname, kicker_lower)
|
|
)
|
|
|
|
return emit
|
|
|
|
def _self_quit(self):
|
|
self.users.clear()
|
|
self.channels.clear()
|
|
|
|
@line_handler("QUIT")
|
|
def _handle_quit(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
nickname_lower = self.casefold(line.hostmask.nickname)
|
|
reason = line.params[0] if line.params else None
|
|
if not reason is None:
|
|
emit.text = reason
|
|
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
self._self_quit()
|
|
else:
|
|
if nickname_lower in self.users:
|
|
user = self.users.pop(nickname_lower)
|
|
emit.user = user
|
|
for channel_lower in user.channels:
|
|
channel = self.channels[channel_lower]
|
|
del channel.users[user.nickname_lower]
|
|
return emit
|
|
|
|
@line_handler("ERROR")
|
|
def _handle_ERROR(self, line: Line) -> Emit:
|
|
self._self_quit()
|
|
return self._emit()
|
|
|
|
@line_handler(RPL_NAMREPLY)
|
|
# channel's user list, "NAMES #channel" response (and on-join)
|
|
def _handle_names(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
channel_lower = self.casefold(line.params[2])
|
|
if channel_lower in self.channels:
|
|
channel = self.channels[channel_lower]
|
|
emit.channel = channel
|
|
nicknames = list(filter(bool, line.params[3].split(" ")))
|
|
users: List[User] = []
|
|
emit.users = users
|
|
|
|
for nickname in nicknames:
|
|
modes = ""
|
|
for char in nickname:
|
|
mode = self.isupport.prefix.from_prefix(char)
|
|
if mode:
|
|
modes += mode
|
|
else:
|
|
break
|
|
|
|
hostmask = hostmask_(nickname[len(modes):])
|
|
nickname_lower = self.casefold(hostmask.nickname)
|
|
if not nickname_lower in self.users:
|
|
self._add_user(hostmask.nickname, nickname_lower)
|
|
user = self.users[nickname_lower]
|
|
users.append(user)
|
|
|
|
if not nickname_lower in channel.users:
|
|
channel_user = self._user_join(channel, user)
|
|
else:
|
|
channel_user = channel.users[nickname_lower]
|
|
|
|
if hostmask.username:
|
|
user.username = hostmask.username
|
|
if hostmask.hostname:
|
|
user.hostname = hostmask.hostname
|
|
|
|
if nickname_lower == self.nickname_lower:
|
|
self._self_hostmask(hostmask)
|
|
|
|
for mode in modes:
|
|
channel_user.modes.add(mode)
|
|
return emit
|
|
|
|
@line_handler(RPL_CREATIONTIME)
|
|
# channel creation time, "MODE #channel" response (and on-join)
|
|
def _handle_creation_time(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
channel_lower = self.casefold(line.params[1])
|
|
if channel_lower in self.channels:
|
|
channel = self.channels[channel_lower]
|
|
emit.channel = channel
|
|
channel.created = from_timestamp(int(line.params[2]))
|
|
return emit
|
|
|
|
@line_handler("TOPIC")
|
|
def _handle_TOPIC(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
channel_lower = self.casefold(line.params[0])
|
|
if channel_lower in self.channels:
|
|
channel = self.channels[channel_lower]
|
|
emit.channel = channel
|
|
channel.topic = line.params[1]
|
|
channel.topic_setter = line.source
|
|
channel.topic_time = now("utc")
|
|
return emit
|
|
|
|
@line_handler(RPL_TOPIC)
|
|
# topic text, "TOPIC #channel" response (and on-join)
|
|
def _handle_topic_num(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
channel_lower = self.casefold(line.params[1])
|
|
if channel_lower in self.channels:
|
|
channel = self.channels[channel_lower]
|
|
emit.channel = channel
|
|
self.channels[channel_lower].topic = line.params[2]
|
|
return emit
|
|
@line_handler(RPL_TOPICWHOTIME)
|
|
# topic setby, "TOPIC #channel" response (and on-join)
|
|
def _handle_topic_time(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
channel_lower = self.casefold(line.params[1])
|
|
if channel_lower in self.channels:
|
|
channel = self.channels[channel_lower]
|
|
emit.channel = channel
|
|
channel.topic_setter = line.params[2]
|
|
channel.topic_time = from_timestamp(int(line.params[3]))
|
|
return emit
|
|
|
|
def _channel_modes(self,
|
|
channel: Channel,
|
|
modes: List[str],
|
|
params: List[str]
|
|
) -> List[Tuple[str, Optional[str]]]:
|
|
tokens: List[Tuple[str, Optional[str]]] = []
|
|
|
|
for mode in modes:
|
|
add = mode[0] == "+"
|
|
char = mode[1]
|
|
arg: Optional[str] = None
|
|
|
|
if char in self.isupport.prefix.modes: # a user's status
|
|
arg = params.pop(0)
|
|
nickname_lower = self.casefold(arg)
|
|
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
channel_user = channel.users[user.nickname_lower]
|
|
if add:
|
|
channel_user.modes.add(char)
|
|
else:
|
|
channel_user.modes.discard(char)
|
|
else:
|
|
has_arg = False
|
|
is_list = False
|
|
if char in self.isupport.chanmodes.a_modes:
|
|
has_arg = True
|
|
is_list = True
|
|
elif add:
|
|
has_arg = char in (self.isupport.chanmodes.b_modes+
|
|
self.isupport.chanmodes.c_modes)
|
|
else: # remove
|
|
has_arg = char in self.isupport.chanmodes.b_modes
|
|
|
|
if has_arg:
|
|
arg = params.pop(0)
|
|
|
|
if add:
|
|
channel.add_mode(char, arg, is_list)
|
|
else:
|
|
channel.remove_mode(char, arg)
|
|
|
|
tokens.append((mode, arg))
|
|
|
|
return tokens
|
|
|
|
@line_handler("MODE")
|
|
def _handle_MODE(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
target = line.params[0]
|
|
modes_str = line.params[1]
|
|
params = line.params[2:].copy()
|
|
|
|
modifier = "+"
|
|
modes: List[str] = []
|
|
|
|
for c in list(modes_str):
|
|
if c in ["+", "-"]:
|
|
modifier = c
|
|
else:
|
|
modes.append(f"{modifier}{c}")
|
|
|
|
target_lower = self.casefold(target)
|
|
if target_lower == self.nickname_lower:
|
|
emit.self_target = True
|
|
emit.tokens = modes
|
|
|
|
for mode in modes:
|
|
add = mode[0] == "+"
|
|
char = mode[1]
|
|
if add:
|
|
self.modes.add(char)
|
|
else:
|
|
self.modes.discard(char)
|
|
elif target_lower in self.channels:
|
|
channel = self.channels[self.casefold(target)]
|
|
emit.channel = channel
|
|
ctokens = self._channel_modes(channel, modes, params)
|
|
|
|
ctokens_str: List[str] = []
|
|
for mode, arg in ctokens:
|
|
if arg is not None:
|
|
ctokens_str.append(f"{mode} {arg}")
|
|
else:
|
|
ctokens_str.append(mode)
|
|
emit.tokens = ctokens_str
|
|
return emit
|
|
|
|
@line_handler(RPL_CHANNELMODEIS)
|
|
# channel modes, "MODE #channel" response (sometimes on-join?)
|
|
def _handle_channelmodeis(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
channel_lower = self.casefold(line.params[1])
|
|
if channel_lower in self.channels:
|
|
channel = self.channels[channel_lower]
|
|
emit.channel = channel
|
|
modes = [f"+{char}" for char in line.params[2].lstrip("+")]
|
|
params = line.params[3:]
|
|
self._channel_modes(channel, modes, params)
|
|
return emit
|
|
|
|
@line_handler(RPL_UMODEIS)
|
|
# our own user modes, "MODE nickname" response (sometimes on-connect?)
|
|
def _handle_umodeis(self, line: Line) -> Emit:
|
|
for char in line.params[1].lstrip("+"):
|
|
self.modes.add(char)
|
|
return self._emit()
|
|
|
|
def _mode_list(self,
|
|
channel_name: str,
|
|
mode: str,
|
|
mask: str):
|
|
channel_lower = self.casefold(channel_name)
|
|
if channel_lower in self.channels:
|
|
channel = self.channels[channel_lower]
|
|
if not mode in channel._list_modes_temp:
|
|
channel._list_modes_temp[mode] = []
|
|
channel._list_modes_temp[mode].append(mask)
|
|
def _mode_list_end(self,
|
|
channel_name: str,
|
|
mode: str):
|
|
channel_lower = self.casefold(channel_name)
|
|
if channel_lower in self.channels:
|
|
channel = self.channels[channel_lower]
|
|
if mode in channel._list_modes_temp:
|
|
mlist = channel._list_modes_temp.pop(mode)
|
|
channel.list_modes[mode] = mlist
|
|
|
|
@line_handler(RPL_BANLIST)
|
|
def _handle_banlist(self, line: Line) -> Emit:
|
|
channel = line.params[1]
|
|
mask = line.params[2]
|
|
|
|
if len(line.params) > 3:
|
|
# parse these out but we're not storing them yet
|
|
set_by = line.params[3]
|
|
set_at = int(line.params[4])
|
|
|
|
self._mode_list(channel, "b", mask)
|
|
return self._emit()
|
|
|
|
@line_handler(RPL_ENDOFBANLIST)
|
|
def _handle_banlist_end(self, line: Line) -> Emit:
|
|
channel = line.params[1]
|
|
self._mode_list_end(channel, "b")
|
|
return self._emit()
|
|
|
|
@line_handler(RPL_QUIETLIST)
|
|
def _handle_quietlist(self, line: Line) -> Emit:
|
|
channel = line.params[1]
|
|
mode = line.params[2]
|
|
mask = line.params[3]
|
|
set_by = line.params[4]
|
|
set_at = int(line.params[5])
|
|
|
|
self._mode_list(channel, mode, mask)
|
|
return self._emit()
|
|
|
|
@line_handler(RPL_ENDOFQUIETLIST)
|
|
def _handle_quietlist_end(self, line: Line) -> Emit:
|
|
channel = line.params[1]
|
|
mode = line.params[2]
|
|
self._mode_list_end(channel, mode)
|
|
return self._emit()
|
|
|
|
@line_handler("PRIVMSG")
|
|
@line_handler("NOTICE")
|
|
@line_handler("TAGMSG")
|
|
def _handle_message(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
# this does not visually spark joy
|
|
if not line.source:
|
|
return emit
|
|
|
|
message = line.params[1] if line.params[1:] else None
|
|
if not message is None:
|
|
emit.text = message
|
|
|
|
nickname_lower = self.casefold(line.hostmask.nickname)
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self_source = True
|
|
self._self_hostmask(line.hostmask)
|
|
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
else:
|
|
user = self.create_user(
|
|
Name(line.hostmask.nickname, nickname_lower)
|
|
)
|
|
emit.user = user
|
|
|
|
if line.hostmask.username:
|
|
user.username = line.hostmask.username
|
|
if line.hostmask.hostname:
|
|
user.hostname = line.hostmask.hostname
|
|
|
|
target_raw = target = line.params[0]
|
|
statusmsg = []
|
|
while target:
|
|
if target[0] in self.isupport.statusmsg:
|
|
statusmsg.append(target[0])
|
|
target = target[1:]
|
|
else:
|
|
break
|
|
emit.target = target_raw
|
|
|
|
target_lower = self.casefold(target)
|
|
if self.is_channel(target):
|
|
if target_lower in self.channels:
|
|
channel = self.channels[target_lower]
|
|
emit.channel = channel
|
|
elif target_lower == self.nickname_lower:
|
|
emit.self_target = True
|
|
return emit
|
|
|
|
@line_handler(RPL_VISIBLEHOST)
|
|
# our own hostname, sometimes username@hostname, when it changes
|
|
def _handle_visiblehost(self, line: Line) -> Emit:
|
|
username, _, hostname = line.params[1].rpartition("@")
|
|
self.hostname = hostname
|
|
if username:
|
|
self.username = username
|
|
return self._emit()
|
|
|
|
@line_handler(RPL_WHOREPLY)
|
|
# WHO line, "WHO #channel|nickname" response
|
|
def _handle_who(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
emit.target = line.params[1]
|
|
nickname = line.params[5]
|
|
username = line.params[2]
|
|
hostname = line.params[3]
|
|
status = line.params[6]
|
|
away = "" if "G" in status else None
|
|
realname = line.params[7].split(" ", 1)[1]
|
|
|
|
server: Optional[str] = None
|
|
if not line.params[4] == "*":
|
|
server = line.params[4]
|
|
|
|
nickname_lower = self.casefold(line.params[5])
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
self.username = username
|
|
self.hostname = hostname
|
|
self.realname = realname
|
|
self.server = server
|
|
self.away = away
|
|
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
emit.user = user
|
|
user.username = username
|
|
user.hostname = hostname
|
|
user.realname = realname
|
|
user.server = server
|
|
user.away = away
|
|
return emit
|
|
|
|
@line_handler(RPL_WHOSPCRPL)
|
|
# WHOX line, "WHO #channel|nickname" response; only listen for our "type"
|
|
def _handle_whox(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
if line.params[1] == WHO_TYPE and len(line.params) == 10:
|
|
nickname_lower = self.casefold(line.params[6])
|
|
username = line.params[2]
|
|
hostname = line.params[4]
|
|
status = line.params[7]
|
|
away = "" if "G" in status else None
|
|
realname = line.params[9]
|
|
|
|
account = ""
|
|
if not line.params[8] == "0":
|
|
account = line.params[8]
|
|
|
|
server: Optional[str] = None
|
|
if not line.params[5] == "*":
|
|
server = line.params[5]
|
|
ip: Optional[str] = None
|
|
if not line.params[3] == "255.255.255.255":
|
|
try:
|
|
ip = ip_address(line.params[3]).compressed
|
|
except ValueError:
|
|
pass
|
|
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
emit.user = user
|
|
user.username = username
|
|
user.hostname = hostname
|
|
user.realname = realname
|
|
user.account = account
|
|
user.server = server
|
|
user.away = away
|
|
if ip is not None:
|
|
user.ip = ip
|
|
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
self.username = username
|
|
self.hostname = hostname
|
|
self.realname = realname
|
|
self.account = account
|
|
self.server = server
|
|
self.away = away
|
|
if ip is not None:
|
|
self.ip = ip
|
|
|
|
return emit
|
|
|
|
@line_handler(RPL_WHOISUSER)
|
|
# WHOIS "user" line, one of "WHOIS nickname" response lines
|
|
def _handle_whoisuser(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
nickname = line.params[1]
|
|
username = line.params[2]
|
|
hostname = line.params[3]
|
|
realname = line.params[5]
|
|
|
|
nickname_lower = self.casefold(nickname)
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
self.username = username
|
|
self.hostname = hostname
|
|
self.realname = realname
|
|
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
emit.user = user
|
|
user.username = username
|
|
user.hostname = hostname
|
|
user.realname = realname
|
|
return emit
|
|
|
|
@line_handler("CHGHOST")
|
|
def _handle_CHGHOST(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
username = line.params[0]
|
|
hostname = line.params[1]
|
|
nickname_lower = self.casefold(line.hostmask.nickname)
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
self.username = username
|
|
self.hostname = hostname
|
|
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
emit.user = user
|
|
user.username = username
|
|
user.hostname = hostname
|
|
return emit
|
|
|
|
@line_handler("SETNAME")
|
|
def _handle_SETNAME(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
realname = line.params[0]
|
|
nickname_lower = self.casefold(line.hostmask.nickname)
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
self.realname = realname
|
|
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
emit.user = user
|
|
user.realname = realname
|
|
return emit
|
|
|
|
@line_handler("RENAME")
|
|
def _handle_RENAME(self, line: Line) -> Emit:
|
|
source_fold = self.casefold(line.params[0])
|
|
rename = line.params[1]
|
|
rename_fold = self.casefold(rename)
|
|
|
|
if source_fold in self.channels:
|
|
channel = self.channels.pop(source_fold)
|
|
|
|
channel.change_name(rename, rename_fold)
|
|
for nickname in channel.users.keys():
|
|
user = self.users[nickname]
|
|
user.channels.remove(source_fold)
|
|
user.channels.add(rename_fold)
|
|
|
|
self.channels[rename_fold] = channel
|
|
return self._emit()
|
|
|
|
@line_handler(RPL_AWAY)
|
|
# sent in response to a command directed at a user who is marked as away
|
|
def _handle_RPL_AWAY(self, line: Line) -> Emit:
|
|
nickname = line.params[1]
|
|
nickname_lower = self.casefold(nickname)
|
|
reason = line.params[2]
|
|
|
|
if nickname_lower == self.nickname_lower:
|
|
self.away = reason
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
user.away = reason
|
|
|
|
return self._emit()
|
|
|
|
@line_handler("AWAY")
|
|
def _handle_AWAY(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
away = line.params[0] if line.params else None
|
|
nickname_lower = self.casefold(line.hostmask.nickname)
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
self.away = away
|
|
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
emit.user = user
|
|
user.away = away
|
|
return emit
|
|
|
|
@line_handler("ACCOUNT")
|
|
def _handle_ACCOUNT(self, line: Line) -> Emit:
|
|
emit = self._emit()
|
|
account = line.params[0].strip("*")
|
|
nickname_lower = self.casefold(line.hostmask.nickname)
|
|
if nickname_lower == self.nickname_lower:
|
|
emit.self = True
|
|
self.account = account
|
|
|
|
if nickname_lower in self.users:
|
|
user = self.users[nickname_lower]
|
|
emit.user = user
|
|
user.account = account
|
|
return emit
|
|
|
|
@line_handler("CAP")
|
|
def _handle_CAP(self, line: Line) -> Emit:
|
|
self.has_cap = True
|
|
subcommand = line.params[1].upper()
|
|
multiline = line.params[2] == "*"
|
|
caps = line.params[2 + (1 if multiline else 0)]
|
|
|
|
|
|
tokens: Dict[str, str] = {}
|
|
tokens_str: List[str] = []
|
|
for cap in filter(bool, caps.split(" ")):
|
|
tokens_str.append(cap)
|
|
key, _, value = cap.partition("=")
|
|
tokens[key] = value
|
|
|
|
emit = self._emit()
|
|
emit.subcommand = subcommand
|
|
emit.finished = not multiline
|
|
emit.tokens = tokens_str
|
|
|
|
if subcommand == "LS":
|
|
self._temp_caps.update(tokens)
|
|
if not multiline:
|
|
self.available_caps = self._temp_caps.copy()
|
|
self._temp_caps.clear()
|
|
elif subcommand == "NEW":
|
|
self.available_caps.update(tokens)
|
|
elif subcommand == "DEL":
|
|
for key in tokens.keys():
|
|
if key in self.available_caps.keys():
|
|
del self.available_caps[key]
|
|
if key in self.agreed_caps:
|
|
self.agreed_caps.remove(key)
|
|
elif subcommand == "ACK":
|
|
for key in tokens.keys():
|
|
if key.startswith("-"):
|
|
key = key[1:]
|
|
if key in self.agreed_caps:
|
|
self.agreed_caps.remove(key)
|
|
elif (not key in self.agreed_caps and
|
|
key in self.available_caps):
|
|
self.agreed_caps.append(key)
|
|
return emit
|
|
|
|
@line_handler(RPL_LOGGEDIN)
|
|
def _handle_loggedin(self, line: Line) -> Emit:
|
|
hostmask_str = line.params[1]
|
|
hostmask = hostmask_(hostmask_str)
|
|
account = line.params[2]
|
|
|
|
self.account = account
|
|
self._self_hostmask(hostmask)
|
|
return self._emit()
|
|
|
|
@line_handler(RPL_LOGGEDOUT)
|
|
def _handle_loggedout(self, line: Line) -> Emit:
|
|
hostmask_str = line.params[1]
|
|
hostmask = hostmask_(hostmask_str)
|
|
|
|
self.account = None
|
|
self._self_hostmask(hostmask)
|
|
return self._emit()
|