mirror of https://github.com/jesopo/ircstates
Compare commits
150 Commits
Author | SHA1 | Date |
---|---|---|
jesopo | ab30dbe658 | |
jesopo | d370c67373 | |
jesopo | 0a844bd90d | |
jesopo | 3dc56da30e | |
jesopo | c21545e2c2 | |
jesopo | 275b2c7f3d | |
jesopo | 1333228fd1 | |
jesopo | 9166d82359 | |
jesopo | ea9c0c2d1f | |
jesopo | bc7c4d75a8 | |
jesopo | a389c6f3cb | |
jesopo | 83215e996b | |
jesopo | e0cbaa4519 | |
jesopo | e3884c7505 | |
jesopo | 1e187db35f | |
jesopo | 3a95bf4bca | |
jesopo | 5c50167d96 | |
jesopo | e5a7871fd9 | |
jesopo | 3565259791 | |
jesopo | 8c16b73414 | |
jesopo | 806c6e4bf3 | |
jesopo | d865ea3253 | |
jesopo | 038c59659f | |
jesopo | 22552c5e3d | |
jesopo | ddcacabfda | |
jesopo | cb8aa4495a | |
jesopo | 3136d2b85c | |
jesopo | 4d14d67d4b | |
jesopo | 566b8ec8cd | |
jesopo | 9b407b666d | |
jesopo | a69fd01766 | |
jesopo | 1f8dfe700f | |
jesopo | 202cf8227b | |
jesopo | 2552e1cb54 | |
jesopo | 17957798bb | |
jesopo | f44bbe41e4 | |
jesopo | f253159873 | |
jesopo | cfdcc8d7e7 | |
jesopo | f51f1b689e | |
jesopo | eb216e9abf | |
jesopo | 58f83ad3de | |
jesopo | b4f91148eb | |
jesopo | 2b6d2bf7af | |
jesopo | 412f829cb4 | |
jesopo | 4849010938 | |
jesopo | 76e29d7bad | |
jesopo | 5a85e53485 | |
jesopo | e062b7b71f | |
jesopo | c841d1d6dd | |
jesopo | 07ed0bf13c | |
jesopo | ca9abfc34b | |
jesopo | 66d6bba298 | |
jesopo | ee5b0ceb4f | |
jesopo | dabb59d05f | |
jesopo | 5165573133 | |
jesopo | bf0f2fdc9f | |
jesopo | 2c1468295e | |
jesopo | c27c48af54 | |
jesopo | bf16308455 | |
jesopo | 8a31f0190d | |
jesopo | 2fb81e7aef | |
jesopo | 87f85ba57c | |
jesopo | 9806a6407b | |
jesopo | 01d8b8d111 | |
jesopo | 875e912896 | |
jesopo | 50a63ce12c | |
jesopo | 74490f616a | |
jesopo | adacb19c77 | |
jesopo | d76f50dac0 | |
jesopo | 98823298e6 | |
jesopo | 20d2f1a1db | |
jesopo | 114688e266 | |
jesopo | 7a87b7b448 | |
jesopo | c32b4bdd62 | |
jesopo | 83b31b6b2b | |
jesopo | ea421f09af | |
jesopo | 8b91dc09e3 | |
jesopo | 37227b6463 | |
jesopo | 40ec25de2b | |
jesopo | e1286f16c6 | |
jesopo | fdcf216255 | |
jesopo | 46a1d2bda8 | |
jesopo | c75a62f5d8 | |
jesopo | 3290c33106 | |
jesopo | 85794909d0 | |
jesopo | 40839c1755 | |
jesopo | 251d588ee8 | |
jesopo | 122fe23da6 | |
jesopo | 4dbf2c1981 | |
jesopo | c68c59a534 | |
jesopo | 6f79d97967 | |
jesopo | 92c883ded2 | |
jesopo | 80ef9a9edb | |
jesopo | 6ee5c75790 | |
jesopo | 191e8fdba3 | |
jesopo | 274b76ba56 | |
jesopo | bd3fd12a84 | |
jesopo | 232aa3bb61 | |
jesopo | e7d14e6f67 | |
jesopo | d212fbc11c | |
jesopo | 5878f946e8 | |
jesopo | a434f19b9e | |
jesopo | 56d047de8d | |
jesopo | 271cadf666 | |
jesopo | a1e5c07dbc | |
jesopo | 178f08d5b0 | |
jesopo | 2355b42fbb | |
jesopo | d0a3aed19f | |
jesopo | 5c5c6fca2b | |
jesopo | f74294b7bb | |
jesopo | 39694beff4 | |
jesopo | 8e4bbeb790 | |
jesopo | 0818400221 | |
jesopo | 8fbf66fe71 | |
jesopo | fb4045f2f8 | |
jesopo | 500859e9c0 | |
jesopo | 09acc74412 | |
jesopo | e02a535cd0 | |
jesopo | febc891c45 | |
jesopo | 31ab106e25 | |
jesopo | 909f72ece6 | |
jesopo | 5a8ac14896 | |
jesopo | db9c6d48d1 | |
jesopo | 79ee3d8874 | |
jesopo | 959b288b3c | |
jesopo | b587936c7f | |
jesopo | 985d800982 | |
jesopo | 6eb429d6d5 | |
jesopo | ba2410c539 | |
jesopo | 548bd722a8 | |
jesopo | c0339c11cb | |
jesopo | 0d9f8dd268 | |
jesopo | df3aba9521 | |
jesopo | dc878df2f4 | |
jesopo | 0ed79f18f5 | |
jesopo | 09ea845d2d | |
jesopo | e62e22d663 | |
jesopo | 872cb84c0e | |
jesopo | 21a6c4c47d | |
jesopo | 32e7ba230e | |
jesopo | 3fdbe04e0f | |
jesopo | 25408c7e5e | |
jesopo | 517737a921 | |
jesopo | f705f20e94 | |
jesopo | f9034d6a1b | |
jesopo | 05644d3ff1 | |
jesopo | c902c1026c | |
jesopo | fe505ce4d2 | |
jesopo | 0257a88f87 | |
jesopo | 90def4aba5 |
|
@ -1,14 +1,13 @@
|
|||
language: python
|
||||
cache: pip
|
||||
python:
|
||||
- "3.6"
|
||||
- "3.7"
|
||||
- "3.8"
|
||||
- "3.8-dev"
|
||||
- "3.9"
|
||||
install:
|
||||
- pip3 install mypy -r requirements.txt
|
||||
script:
|
||||
- pip3 install mypy types-cachetools -r requirements-dev.txt
|
||||
before_script:
|
||||
- pip3 freeze
|
||||
- mypy ircstates
|
||||
script:
|
||||
- python3 -m unittest test
|
||||
|
||||
|
|
53
README.md
53
README.md
|
@ -13,44 +13,50 @@ additional arbitrary functionality on top of it.
|
|||
|
||||
## usage
|
||||
|
||||
### simple
|
||||
|
||||
```python
|
||||
import ircstates
|
||||
|
||||
server = ircstates.Server("freenode")
|
||||
lines = server.recv(b":server 001 nick :hello world!\r\n")
|
||||
lines += server.recv(b":nick JOIN #chan\r\n")
|
||||
for line in lines:
|
||||
server.parse_tokens(line)
|
||||
|
||||
chan = server.channels["#chan"]
|
||||
```
|
||||
|
||||
### socket to state
|
||||
|
||||
```python
|
||||
import ircstates, irctokens, socket
|
||||
|
||||
NICK = "nickname"
|
||||
CHAN = "#chan"
|
||||
HOST = "127.0.0.1"
|
||||
POST = 6667
|
||||
PORT = 6667
|
||||
|
||||
server = ircstates.Server("freenode")
|
||||
sock = socket.socket()
|
||||
encoder = irctokens.StatefulEncoder()
|
||||
|
||||
sock.connect((HOST, POST))
|
||||
|
||||
def _send(raw):
|
||||
tokens = irctokens.tokenise(raw)
|
||||
encoder.push(tokens)
|
||||
sock.connect((HOST, PORT))
|
||||
def _send(raw: str):
|
||||
sock.sendall(f"{raw}\r\n".encode("utf8"))
|
||||
|
||||
_send("USER test 0 * test")
|
||||
_send(f"NICK {NICK}")
|
||||
|
||||
while True:
|
||||
while encoder.pending():
|
||||
sent_lines = encoder.pop(sock.send(encoder.pending()))
|
||||
for line in sent_lines:
|
||||
print(f"> {line.format()}")
|
||||
|
||||
recv_lines = server.recv(sock.recv(1024))
|
||||
recv_data = sock.recv(1024)
|
||||
recv_lines = server.recv(recv_data)
|
||||
for line in recv_lines:
|
||||
server.parse_tokens(line)
|
||||
print(f"< {line.format()}")
|
||||
|
||||
# user defined behaviors...
|
||||
if line.command == "PING":
|
||||
_send(f"PONG :{line.params[0]}")
|
||||
|
||||
if line.command == "001" and not CHAN in server.channels:
|
||||
_send(f"JOIN {CHAN}")
|
||||
```
|
||||
|
||||
### get a user's channels
|
||||
|
@ -60,8 +66,8 @@ while True:
|
|||
>>> user = server.users["nickname"]
|
||||
>>> user
|
||||
User(nickname='nickname')
|
||||
>>> server.user_channels[user]
|
||||
{Channel(name='#chan')}
|
||||
>>> user.channels
|
||||
{'#chan'}
|
||||
```
|
||||
|
||||
### get a channel's users
|
||||
|
@ -71,21 +77,20 @@ User(nickname='nickname')
|
|||
>>> channel = server.channels["#chan"]
|
||||
>>> channel
|
||||
Channel(name='#chan')
|
||||
>>> server.channel_users[channel]
|
||||
{User(nickname='nickname'): ChannelUser(user='nickname', channel='#chan', modes='ov')}
|
||||
>>> channel.users
|
||||
{'jess': ChannelUser(#chan jess)}
|
||||
```
|
||||
|
||||
### get a user's modes in channel
|
||||
```python
|
||||
>>> user = server.users["nickname"]
|
||||
>>> channel = server.channels["#chan"]
|
||||
>>> channel_user = server.channel_users[channel][user]
|
||||
>>> channel_user = channel.users["nickname"]
|
||||
>>> channel_user
|
||||
ChannelUser(user='nickname', channel='#chan', modes='ov')
|
||||
ChannelUser(#chan jess +ov)
|
||||
>>> channel_user.modes
|
||||
{'o', 'v'}
|
||||
```
|
||||
|
||||
## contact
|
||||
|
||||
Come say hi at [#irctokens on irc.tilde.chat](https://web.tilde.chat/?join=%23irctokens)
|
||||
Come say hi at `#irctokens` on irc.libera.chat
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
from .server import Server
|
||||
from .server import Server, ServerDisconnectedException
|
||||
from .user import User
|
||||
from .channel import Channel
|
||||
from .channel_user import Channel
|
||||
from .casemap import casefold
|
||||
from .channel_user import ChannelUser
|
||||
from .casemap import casefold, CaseMap
|
||||
from .emit import *
|
||||
from .numerics import NUMERIC_NAMES, NUMERIC_NUMBERS
|
||||
|
|
|
@ -1,18 +1,21 @@
|
|||
import string
|
||||
from typing import List
|
||||
from enum import Enum
|
||||
from string import ascii_lowercase, ascii_uppercase
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
ASCII_UPPER = list(string.ascii_uppercase)
|
||||
ASCII_LOWER = list(string.ascii_lowercase)
|
||||
RFC1459_UPPER = ASCII_UPPER+list("[]~\\")
|
||||
RFC1459_LOWER = ASCII_LOWER+list("{}^|")
|
||||
class CaseMap(Enum):
|
||||
ASCII = "ascii"
|
||||
RFC1459 = "rfc1459"
|
||||
|
||||
def _replace(s: str, upper: List[str], lower: List[str]):
|
||||
for i, char in enumerate(upper):
|
||||
s = s.replace(char, lower[i])
|
||||
return s
|
||||
|
||||
def casefold(mapping: str, s: str):
|
||||
if mapping == "rfc1459":
|
||||
return _replace(s, RFC1459_UPPER, RFC1459_LOWER)
|
||||
elif mapping == "ascii":
|
||||
return _replace(s, ASCII_UPPER, ASCII_LOWER)
|
||||
CASEMAPS: Dict[CaseMap, Dict[int, Optional[int]]] = {
|
||||
CaseMap.ASCII: str.maketrans(
|
||||
r"ABCDEFGHIJKLMNOPQRSTUVWXYZ",
|
||||
r"abcdefghijklmnopqrstuvwxyz"
|
||||
),
|
||||
CaseMap.RFC1459: str.maketrans(
|
||||
r"ABCDEFGHIJKLMNOPQRSTUVWXYZ\[]^",
|
||||
r"abcdefghijklmnopqrstuvwxyz|{}~"
|
||||
)
|
||||
}
|
||||
def casefold(casemap_name: CaseMap, s: str):
|
||||
casemap = CASEMAPS[casemap_name]
|
||||
return s.translate(casemap)
|
||||
|
|
|
@ -1,32 +1,53 @@
|
|||
from datetime import datetime
|
||||
from typing import Dict, List, Optional, Set
|
||||
from .named import Named
|
||||
from typing import Dict, List, Optional, Set
|
||||
from pendulum import DateTime
|
||||
|
||||
class Channel(Named):
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
from .channel_user import ChannelUser
|
||||
from .names import Name
|
||||
|
||||
class Channel(object):
|
||||
def __init__(self, name: Name):
|
||||
self._name = name
|
||||
|
||||
self.users: Dict[str, ChannelUser] = {}
|
||||
|
||||
self.topic: Optional[str] = None
|
||||
self.topic_setter: Optional[str] = None
|
||||
self.topic_time: Optional[datetime] = None
|
||||
self.topic_time: Optional[DateTime] = None
|
||||
|
||||
self.created: Optional[datetime] = None
|
||||
self.created: Optional[DateTime] = None
|
||||
|
||||
self.list_modes: Dict[str, List[str]] = {}
|
||||
self.modes: Dict[str, Optional[str]] = {}
|
||||
self._list_modes_temp: Dict[str, List[str]] = {}
|
||||
self.list_modes: Dict[str, List[str]] = {}
|
||||
self.modes: Dict[str, Optional[str]] = {}
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"Channel(name={self.name!r})"
|
||||
|
||||
def get_name(self) -> Name:
|
||||
return self._name
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self._name.normal
|
||||
@property
|
||||
def name_lower(self) -> str:
|
||||
return self._name.folded
|
||||
|
||||
def change_name(self,
|
||||
normal: str,
|
||||
folded: str):
|
||||
self._name.normal = normal
|
||||
self._name.folded = folded
|
||||
|
||||
def add_mode(self,
|
||||
char: str,
|
||||
param: Optional[str],
|
||||
list_mode: bool):
|
||||
if list_mode:
|
||||
if not char in self.list_modes:
|
||||
self.list_modes[char] = []
|
||||
if not param in self.list_modes[char]:
|
||||
self.list_modes[char].append(param or "")
|
||||
if param is not None:
|
||||
if not char in self.list_modes:
|
||||
self.list_modes[char] = []
|
||||
if not param in self.list_modes[char]:
|
||||
self.list_modes[char].append(param)
|
||||
else:
|
||||
self.modes[char] = param
|
||||
|
||||
|
@ -34,9 +55,8 @@ class Channel(Named):
|
|||
char: str,
|
||||
param: Optional[str]):
|
||||
if char in self.list_modes:
|
||||
if param in self.list_modes[char]:
|
||||
if (param is not None and
|
||||
param in self.list_modes[char]):
|
||||
self.list_modes[char].remove(param)
|
||||
if not self.list_modes[char]:
|
||||
del self.list_modes[char]
|
||||
elif char in self.modes:
|
||||
del self.modes[char]
|
||||
|
|
|
@ -1,16 +1,31 @@
|
|||
from typing import List
|
||||
from .user import User
|
||||
from .channel import Channel
|
||||
from typing import List, Optional, Set
|
||||
from .names import Name
|
||||
from pendulum import DateTime, now
|
||||
|
||||
class ChannelUser(object):
|
||||
def __init__(self,
|
||||
channel: Channel,
|
||||
user: User):
|
||||
self.channel = channel
|
||||
self.user = user
|
||||
self.modes: List[str] = []
|
||||
nickname: Name,
|
||||
channel_name: Name):
|
||||
self._nickname = nickname
|
||||
self._channel_name = channel_name
|
||||
|
||||
self.modes: Set[str] = set()
|
||||
self.since = now("utc")
|
||||
self.joined: Optional[DateTime] = None
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (f"ChannelUser(user={self.user.nickname!r},"
|
||||
f" channel={self.channel.name!r},"
|
||||
f" modes={''.join(self.modes)!r})")
|
||||
outs: List[str] = [self.channel, self.nickname]
|
||||
if self.modes:
|
||||
outs.append(f"+{''.join(self.modes)}")
|
||||
return f"ChannelUser({' '.join(outs)})"
|
||||
|
||||
@property
|
||||
def nickname(self) -> str:
|
||||
return self._nickname.normal
|
||||
@property
|
||||
def nickname_lower(self) -> str:
|
||||
return self._nickname.folded
|
||||
|
||||
@property
|
||||
def channel(self) -> str:
|
||||
return self._channel_name.normal
|
||||
|
|
|
@ -1,8 +1,28 @@
|
|||
from typing import Dict, List, Optional
|
||||
from .tokens import ChanModes, Prefix
|
||||
from ..casemap import CaseMap
|
||||
|
||||
CASEMAPPINGS = ["rfc1459", "ascii"]
|
||||
|
||||
def _parse_escapes(s: str):
|
||||
idx = 0
|
||||
out = ""
|
||||
|
||||
while idx < (len(s)):
|
||||
if s[idx] == "\\":
|
||||
if s[idx+1:]:
|
||||
if (s[idx+1] == "x" and
|
||||
len(s[idx+2:]) >= 2):
|
||||
out += chr(int(s[idx+2:idx+4], 16))
|
||||
idx += 4
|
||||
else:
|
||||
out += s[idx+1]
|
||||
idx += 2
|
||||
else:
|
||||
out += s[idx]
|
||||
idx += 1
|
||||
return out
|
||||
|
||||
class ISupport(object):
|
||||
raw: Dict[str, Optional[str]]
|
||||
|
||||
|
@ -12,7 +32,7 @@ class ISupport(object):
|
|||
prefix = Prefix(["o", "v"], ["@", "+"])
|
||||
|
||||
modes: int = 3 # -1 if "no limit"
|
||||
casemapping: str = "rfc1459"
|
||||
casemapping: CaseMap = CaseMap.RFC1459
|
||||
chantypes: List[str] = ["#"]
|
||||
statusmsg: List[str] = []
|
||||
|
||||
|
@ -23,13 +43,15 @@ class ISupport(object):
|
|||
monitor: Optional[int] = None # -1 if "no limit"
|
||||
watch: Optional[int] = None # -1 if "no limit"
|
||||
whox: bool = False
|
||||
nicklen: int = 9 # from RFC1459
|
||||
|
||||
def __init__(self):
|
||||
self.raw = {}
|
||||
|
||||
def tokens(self, tokens: List[str]):
|
||||
def from_tokens(self, tokens: List[str]):
|
||||
for token in tokens:
|
||||
key, sep, value = token.partition("=")
|
||||
value = _parse_escapes(value)
|
||||
self.raw[key] = value if sep else None
|
||||
|
||||
if key == "NETWORK":
|
||||
|
@ -54,8 +76,7 @@ class ISupport(object):
|
|||
self.watch = int(value) if value else -1
|
||||
|
||||
elif key == "CASEMAPPING":
|
||||
if value in CASEMAPPINGS:
|
||||
self.casemapping = value
|
||||
self.casemapping = CaseMap(value)
|
||||
|
||||
elif key == "CHANTYPES":
|
||||
self.chantypes = list(value)
|
||||
|
@ -69,3 +90,6 @@ class ISupport(object):
|
|||
|
||||
elif key == "WHOX":
|
||||
self.whox = True
|
||||
|
||||
elif key == "NICKLEN":
|
||||
self.nicklen = int(value)
|
||||
|
|
|
@ -2,15 +2,14 @@ from typing import List, Optional
|
|||
|
||||
class ChanModes(object):
|
||||
def __init__(self,
|
||||
list_modes: List[str],
|
||||
setting_b_modes: List[str],
|
||||
setting_c_modes: List[str],
|
||||
setting_d_modes: List[str]):
|
||||
|
||||
self.list_modes = list_modes
|
||||
self.setting_b_modes = setting_b_modes
|
||||
self.setting_c_modes = setting_c_modes
|
||||
self.setting_d_modes = setting_d_modes
|
||||
a_modes: List[str],
|
||||
b_modes: List[str],
|
||||
c_modes: List[str],
|
||||
d_modes: List[str]):
|
||||
self.a_modes = a_modes
|
||||
self.b_modes = b_modes
|
||||
self.c_modes = c_modes
|
||||
self.d_modes = d_modes
|
||||
|
||||
class Prefix(object):
|
||||
def __init__(self,
|
||||
|
|
|
@ -1,4 +0,0 @@
|
|||
from typing import Optional
|
||||
|
||||
class Named(object):
|
||||
name: str
|
|
@ -0,0 +1,7 @@
|
|||
|
||||
class Name(object):
|
||||
def __init__(self,
|
||||
normal: str,
|
||||
folded: str):
|
||||
self.normal = normal
|
||||
self.folded = folded
|
|
@ -1,34 +1,83 @@
|
|||
NUMERIC_NUMBERS = {}
|
||||
NUMERIC_NAMES = {}
|
||||
|
||||
def _numeric(number: str, name: str):
|
||||
NUMERIC_NUMBERS[number] = name
|
||||
NUMERIC_NAMES[name] = number
|
||||
RPL_WELCOME = "001"
|
||||
RPL_ISUPPORT = "005"
|
||||
RPL_MOTD = "372"
|
||||
RPL_MOTDSTART = "375"
|
||||
RPL_ENDOFMOTD = "376"
|
||||
ERR_NOMOTD = "422"
|
||||
RPL_UMODEIS = "221"
|
||||
RPL_VISIBLEHOST = "396"
|
||||
RPL_TRYAGAIN = "263"
|
||||
RPL_YOUREOPER = "381"
|
||||
|
||||
_numeric("001", "RPL_WELCOME")
|
||||
_numeric("005", "RPL_ISUPPORT")
|
||||
_numeric("221", "RPL_UMODEIS")
|
||||
ERR_NOSUCHNICK = "401"
|
||||
ERR_NOSUCHSERVER = "402"
|
||||
|
||||
_numeric("311", "RPL_WHOISUSER")
|
||||
RPL_CHANNELMODEIS = "324"
|
||||
RPL_CREATIONTIME = "329"
|
||||
RPL_TOPIC = "332"
|
||||
RPL_TOPICWHOTIME = "333"
|
||||
|
||||
_numeric("324", "RPL_CHANNELMODEIS")
|
||||
_numeric("329", "RPL_CREATIONTIME")
|
||||
_numeric("332", "RPL_TOPIC")
|
||||
_numeric("333", "RPL_TOPICWHOTIME")
|
||||
RPL_WHOREPLY = "352"
|
||||
RPL_WHOSPCRPL = "354"
|
||||
RPL_ENDOFWHO = "315"
|
||||
RPL_NAMREPLY = "353"
|
||||
RPL_ENDOFNAMES = "366"
|
||||
|
||||
_numeric("352", "RPL_WHOREPLY")
|
||||
_numeric("353", "RPL_NAMREPLY")
|
||||
_numeric("366", "RPL_ENDOFNAMES")
|
||||
RPL_WHOWASUSER = "314"
|
||||
RPL_ENDOFWHOWAS = "369"
|
||||
|
||||
_numeric("372", "RPL_MOTD")
|
||||
_numeric("375", "RPL_MOTDSTART")
|
||||
_numeric("396", "RPL_VISIBLEHOST")
|
||||
RPL_BANLIST = "367"
|
||||
RPL_ENDOFBANLIST = "368"
|
||||
RPL_QUIETLIST = "728"
|
||||
RPL_ENDOFQUIETLIST = "729"
|
||||
|
||||
_numeric("903", "RPL_SASLSUCCESS")
|
||||
_numeric("904", "ERR_SASLFAIL")
|
||||
_numeric("905", "ERR_SASLTOOLONG")
|
||||
_numeric("906", "ERR_SASLABORTED")
|
||||
_numeric("907", "ERR_SASLALREADY")
|
||||
_numeric("908", "RPL_SASLMECHS")
|
||||
RPL_LOGGEDIN = "900"
|
||||
RPL_LOGGEDOUT = "901"
|
||||
RPL_SASLSUCCESS = "903"
|
||||
ERR_SASLFAIL = "904"
|
||||
ERR_SASLTOOLONG = "905"
|
||||
ERR_SASLABORTED = "906"
|
||||
ERR_SASLALREADY = "907"
|
||||
RPL_SASLMECHS = "908"
|
||||
|
||||
RPL_WHOISUSER = "311"
|
||||
RPL_WHOISSERVER = "312"
|
||||
RPL_WHOISOPERATOR = "313"
|
||||
RPL_WHOISIDLE = "317"
|
||||
RPL_WHOISCHANNELS = "319"
|
||||
RPL_WHOISACCOUNT = "330"
|
||||
RPL_WHOISHOST = "378"
|
||||
RPL_WHOISMODES = "379"
|
||||
RPL_WHOISSECURE = "671"
|
||||
RPL_AWAY = "301"
|
||||
RPL_ENDOFWHOIS = "318"
|
||||
|
||||
ERR_ERRONEUSNICKNAME = "432"
|
||||
ERR_NICKNAMEINUSE = "433"
|
||||
ERR_BANNICKCHANGE = "435"
|
||||
ERR_UNAVAILRESOURCE = "437"
|
||||
ERR_NICKTOOFAST = "438"
|
||||
ERR_CANTCHANGENICK = "447"
|
||||
|
||||
ERR_NOSUCHCHANNEL = "403"
|
||||
ERR_TOOMANYCHANNELS = "405"
|
||||
ERR_USERONCHANNEL = "443"
|
||||
ERR_LINKCHANNEL = "470"
|
||||
ERR_BADCHANNAME = "479"
|
||||
ERR_BADCHANNEL = "926"
|
||||
|
||||
|
||||
ERR_BANNEDFROMCHAN = "474"
|
||||
ERR_INVITEONLYCHAN = "473"
|
||||
ERR_BADCHANNELKEY = "475"
|
||||
ERR_CHANNELISFULL = "471"
|
||||
ERR_NEEDREGGEDNICK = "477"
|
||||
ERR_THROTTLE = "480"
|
||||
|
||||
RPL_LOGOFF = "601"
|
||||
RPL_MONONLINE = "730"
|
||||
RPL_MONOFFLINE = "731"
|
||||
|
||||
RPL_RSACHALLENGE2 = "740"
|
||||
RPL_ENDOFRSACHALLENGE2 = "741"
|
||||
|
|
|
@ -1,16 +1,18 @@
|
|||
from ipaddress import ip_address
|
||||
from typing import Callable, Dict, List, Optional, Set, Tuple
|
||||
from datetime import datetime
|
||||
from irctokens import build, Hostmask, Line, StatefulDecoder, StatefulEncoder
|
||||
from irctokens import Line, build, Hostmask, StatefulDecoder, StatefulEncoder
|
||||
from irctokens import hostmask as hostmask_
|
||||
from pendulum import from_timestamp, now
|
||||
|
||||
from .named import Named
|
||||
from .user import User
|
||||
from .channel import Channel
|
||||
from .channel_user import ChannelUser
|
||||
from .isupport import ISupport
|
||||
from .decorators import handler_decorator
|
||||
from .casemap import casefold
|
||||
from .names import Name
|
||||
from .emit import *
|
||||
from .numerics import NUMERIC_NUMBERS
|
||||
from .numerics import *
|
||||
|
||||
LINE_HANDLERS: Dict[str, List[Callable[["Server", Line], Emit]]] = {}
|
||||
line_handler = handler_decorator(LINE_HANDLERS)
|
||||
|
@ -20,7 +22,10 @@ class ServerException(Exception):
|
|||
class ServerDisconnectedException(ServerException):
|
||||
pass
|
||||
|
||||
class Server(Named):
|
||||
WHO_TYPE = "735" # randomly generated
|
||||
TYPE_EMIT = Optional[Emit]
|
||||
|
||||
class Server(object):
|
||||
def __init__(self, name: str):
|
||||
self.name = name
|
||||
|
||||
|
@ -30,18 +35,18 @@ class Server(Named):
|
|||
self.hostname: Optional[str] = None
|
||||
self.realname: Optional[str] = None
|
||||
self.account: Optional[str] = None
|
||||
self.server: Optional[str] = None
|
||||
self.away: Optional[str] = None
|
||||
self.ip: Optional[str] = None
|
||||
|
||||
self.registered = False
|
||||
self.modes: List[str] = []
|
||||
self.modes: Set[str] = set()
|
||||
self.motd: List[str] = []
|
||||
|
||||
self._decoder = StatefulDecoder()
|
||||
|
||||
self.users: Dict[str, User] = {}
|
||||
self.channels: Dict[str, Channel] = {}
|
||||
self.user_channels: Dict[User, Set[Channel]] = {}
|
||||
self.channel_users: Dict[Channel, Dict[User, ChannelUser]] = {}
|
||||
self.users: Dict[str, User] = {}
|
||||
self.channels: Dict[str, Channel] = {}
|
||||
|
||||
self.isupport = ISupport()
|
||||
|
||||
|
@ -53,66 +58,71 @@ class Server(Named):
|
|||
def __repr__(self) -> str:
|
||||
return f"Server(name={self.name!r})"
|
||||
|
||||
def recv(self, data: bytes) -> List[Tuple[Line, List[Emit]]]:
|
||||
def recv(self, data: bytes) -> List[Line]:
|
||||
lines = self._decoder.push(data)
|
||||
if lines is None:
|
||||
raise ServerDisconnectedException()
|
||||
emits: List[List[Emit]] = []
|
||||
for line in lines:
|
||||
emits.append(self.parse_tokens(line))
|
||||
return list(zip(lines, emits))
|
||||
return lines
|
||||
|
||||
def parse_tokens(self, line: Line):
|
||||
emits: List[Emit] = []
|
||||
commands = [line.command]
|
||||
if (line.command.isdigit() and
|
||||
len(line.command) == 3 and
|
||||
line.command in NUMERIC_NUMBERS):
|
||||
commands.append(NUMERIC_NUMBERS[line.command])
|
||||
|
||||
for command in commands:
|
||||
if command in LINE_HANDLERS:
|
||||
for callback in LINE_HANDLERS[command]:
|
||||
emit = callback(self, line)
|
||||
def parse_tokens(self, line: Line) -> TYPE_EMIT:
|
||||
ret_emit: TYPE_EMIT = None
|
||||
if line.command in LINE_HANDLERS:
|
||||
for callback in LINE_HANDLERS[line.command]:
|
||||
emit = callback(self, line)
|
||||
if emit is not None and ret_emit is None:
|
||||
emit.command = line.command
|
||||
emits.append(emit)
|
||||
return emits
|
||||
ret_emit = emit
|
||||
return ret_emit
|
||||
|
||||
def casefold(self, s1: str):
|
||||
return casefold(self.isupport.casemapping, s1)
|
||||
def casefold_equals(self, s1: str, s2: str):
|
||||
return self.casefold(s1) == self.casefold(s2)
|
||||
def is_me(self, nickname: str):
|
||||
return self.casefold(nickname) == self.nickname_lower
|
||||
|
||||
def has_user(self, nickname: str) -> bool:
|
||||
return self.casefold(nickname) in self.users
|
||||
def create_user(self, nickname: str, nickname_lower: str):
|
||||
return User(nickname, nickname_lower)
|
||||
def _add_user(self, nickname: str, nickname_lower: str):
|
||||
user = self.create_user(nickname, nickname_lower)
|
||||
user = self.create_user(Name(nickname, nickname_lower))
|
||||
self.users[nickname_lower] = user
|
||||
|
||||
def is_channel(self, target: str) -> bool:
|
||||
return target[:1] in self.isupport.chantypes
|
||||
def has_channel(self, name: str) -> bool:
|
||||
return self.casefold(name) in self.channels
|
||||
def create_channel(self, name: str) -> Channel:
|
||||
return Channel(name)
|
||||
def get_channel(self, name: str) -> Optional[Channel]:
|
||||
return self.channels.get(self.casefold(name), None)
|
||||
|
||||
def _user_join(self, channel: Channel, user: User) -> ChannelUser:
|
||||
channel_user = ChannelUser(channel, user)
|
||||
if not user in self.user_channels:
|
||||
self.user_channels[user] = set([])
|
||||
def create_user(self, nickname: Name) -> User:
|
||||
return User(nickname)
|
||||
|
||||
self.user_channels[user].add(channel)
|
||||
self.channel_users[channel][user] = channel_user
|
||||
def create_channel(self, name: Name) -> Channel:
|
||||
return Channel(name)
|
||||
|
||||
def _user_join(self, channel: Channel, user: User) -> ChannelUser:
|
||||
channel_user = ChannelUser(
|
||||
user.get_name(),
|
||||
channel.get_name())
|
||||
|
||||
user.channels.add(self.casefold(channel.name))
|
||||
channel.users[user.nickname_lower] = channel_user
|
||||
return channel_user
|
||||
|
||||
def prepare_whox(self, target: str) -> Line:
|
||||
return build("WHO", [target, f"n%afhinrstu,{WHO_TYPE}"])
|
||||
|
||||
def _self_hostmask(self, hostmask: Hostmask):
|
||||
self.nickname = hostmask.nickname
|
||||
if hostmask.username:
|
||||
self.username = hostmask.username
|
||||
if hostmask.hostname:
|
||||
self.hostname = hostmask.hostname
|
||||
|
||||
def _emit(self) -> Emit:
|
||||
return Emit()
|
||||
|
||||
@line_handler("RPL_WELCOME")
|
||||
@line_handler(RPL_WELCOME)
|
||||
# first message reliably sent to us after registration is complete
|
||||
def _handle_welcome(self, line: Line) -> Emit:
|
||||
self.nickname = line.params[0]
|
||||
|
@ -120,20 +130,20 @@ class Server(Named):
|
|||
self.registered = True
|
||||
return self._emit()
|
||||
|
||||
@line_handler("RPL_ISUPPORT")
|
||||
@line_handler(RPL_ISUPPORT)
|
||||
# https://defs.ircdocs.horse/defs/isupport.html
|
||||
def _handle_ISUPPORT(self, line: Line) -> Emit:
|
||||
self.isupport.tokens(line.params[1:-1])
|
||||
self.isupport.from_tokens(line.params[1:-1])
|
||||
return self._emit()
|
||||
|
||||
@line_handler("RPL_MOTDSTART")
|
||||
@line_handler(RPL_MOTDSTART)
|
||||
# start of MOTD
|
||||
def _handle_motd_start(self, line: Line) -> Emit:
|
||||
self.motd.clear()
|
||||
return self._emit()
|
||||
@line_handler("RPL_MOTDSTART")
|
||||
@line_handler(RPL_MOTDSTART)
|
||||
# start of MOTD
|
||||
@line_handler("RPL_MOTD")
|
||||
@line_handler(RPL_MOTD)
|
||||
# line of MOTD
|
||||
def _handle_motd_line(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
|
@ -144,23 +154,28 @@ class Server(Named):
|
|||
|
||||
@line_handler("NICK")
|
||||
def _handle_NICK(self, line: Line) -> Emit:
|
||||
new_nickname = line.params[0]
|
||||
nickname_lower = self.casefold(line.hostmask.nickname)
|
||||
new_nickname = line.params[0]
|
||||
new_nickname_lower = self.casefold(new_nickname)
|
||||
nickname_lower = self.casefold(line.hostmask.nickname)
|
||||
|
||||
emit = self._emit()
|
||||
|
||||
if nickname_lower in self.users:
|
||||
user = self.users.pop(nickname_lower)
|
||||
emit.user = user
|
||||
|
||||
new_nickname_lower = self.casefold(new_nickname)
|
||||
user._set_nickname(new_nickname, new_nickname_lower)
|
||||
user.change_nickname(new_nickname, new_nickname_lower)
|
||||
self.users[new_nickname_lower] = user
|
||||
|
||||
for channel_lower in user.channels:
|
||||
channel = self.channels[channel_lower]
|
||||
channel_user = channel.users.pop(nickname_lower)
|
||||
channel.users[user.nickname_lower] = channel_user
|
||||
|
||||
if nickname_lower == self.nickname_lower:
|
||||
emit.self = True
|
||||
|
||||
self.nickname = new_nickname
|
||||
self.nickname_lower = self.casefold(new_nickname)
|
||||
self.nickname = new_nickname
|
||||
self.nickname_lower = new_nickname_lower
|
||||
return emit
|
||||
|
||||
@line_handler("JOIN")
|
||||
|
@ -177,13 +192,16 @@ class Server(Named):
|
|||
if nickname_lower == self.nickname_lower:
|
||||
emit.self = True
|
||||
if not channel_lower in self.channels:
|
||||
channel = self.create_channel(line.params[0])
|
||||
channel = self.create_channel(
|
||||
Name(line.params[0], channel_lower)
|
||||
)
|
||||
#TODO: put this somewhere better
|
||||
for mode in self.isupport.chanmodes.a_modes:
|
||||
channel.list_modes[mode] = []
|
||||
|
||||
self.channels[channel_lower] = channel
|
||||
self.channel_users[channel] = {}
|
||||
if line.hostmask.username:
|
||||
self.username = line.hostmask.username
|
||||
if line.hostmask.hostname:
|
||||
self.hostname = line.hostmask.hostname
|
||||
|
||||
self._self_hostmask(line.hostmask)
|
||||
if extended:
|
||||
self.account = account
|
||||
self.realname = realname
|
||||
|
@ -204,7 +222,8 @@ class Server(Named):
|
|||
user.account = account
|
||||
user.realname = realname
|
||||
|
||||
self._user_join(channel, user)
|
||||
channel_user = self._user_join(channel, user)
|
||||
channel_user.joined = now("utc")
|
||||
return emit
|
||||
|
||||
def _user_part(self, line: Line,
|
||||
|
@ -225,22 +244,20 @@ class Server(Named):
|
|||
nickname_lower = self.casefold(nickname)
|
||||
if nickname_lower in self.users:
|
||||
user = self.users[nickname_lower]
|
||||
user = user
|
||||
self.user_channels[user].remove(channel)
|
||||
if not self.user_channels[user]:
|
||||
|
||||
user.channels.remove(channel.name_lower)
|
||||
del channel.users[user.nickname_lower]
|
||||
if not user.channels:
|
||||
del self.users[nickname_lower]
|
||||
del self.user_channels[user]
|
||||
del self.channel_users[channel][user]
|
||||
|
||||
if nickname_lower == self.nickname_lower:
|
||||
del self.channels[channel_lower]
|
||||
channel_users = self.channel_users.pop(channel)
|
||||
|
||||
for user, cuser in channel_users.items():
|
||||
self.user_channels[user].remove(channel)
|
||||
if not self.user_channels[user]:
|
||||
del self.user_channels[user]
|
||||
del self.users[self.casefold(user.nickname)]
|
||||
for key, cuser in channel.users.items():
|
||||
ruser = self.users[key]
|
||||
ruser.channels.remove(channel.name_lower)
|
||||
if not ruser.channels:
|
||||
del self.users[ruser.nickname_lower]
|
||||
|
||||
return emit, user
|
||||
|
||||
|
@ -270,16 +287,15 @@ class Server(Named):
|
|||
if kicker_lower in self.users:
|
||||
emit.user_source = self.users[kicker_lower]
|
||||
else:
|
||||
emit.user_source = self.create_user(line.hostmask.nickname,
|
||||
kicker_lower)
|
||||
emit.user_source = self.create_user(
|
||||
Name(line.hostmask.nickname, kicker_lower)
|
||||
)
|
||||
|
||||
return emit
|
||||
|
||||
def _self_quit(self):
|
||||
self.users.clear()
|
||||
self.channels.clear()
|
||||
self.user_channels.clear()
|
||||
self.channel_users.clear()
|
||||
|
||||
@line_handler("QUIT")
|
||||
def _handle_quit(self, line: Line) -> Emit:
|
||||
|
@ -296,9 +312,9 @@ class Server(Named):
|
|||
if nickname_lower in self.users:
|
||||
user = self.users.pop(nickname_lower)
|
||||
emit.user = user
|
||||
for channel in self.user_channels[user]:
|
||||
del self.channel_users[channel][user]
|
||||
del self.user_channels[user]
|
||||
for channel_lower in user.channels:
|
||||
channel = self.channels[channel_lower]
|
||||
del channel.users[user.nickname_lower]
|
||||
return emit
|
||||
|
||||
@line_handler("ERROR")
|
||||
|
@ -306,7 +322,7 @@ class Server(Named):
|
|||
self._self_quit()
|
||||
return self._emit()
|
||||
|
||||
@line_handler("RPL_NAMREPLY")
|
||||
@line_handler(RPL_NAMREPLY)
|
||||
# channel's user list, "NAMES #channel" response (and on-join)
|
||||
def _handle_names(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
|
@ -327,30 +343,31 @@ class Server(Named):
|
|||
else:
|
||||
break
|
||||
|
||||
hostmask = Hostmask.from_source(nickname[len(modes):])
|
||||
hostmask = hostmask_(nickname[len(modes):])
|
||||
nickname_lower = self.casefold(hostmask.nickname)
|
||||
if not nickname_lower in self.users:
|
||||
self._add_user(hostmask.nickname, nickname_lower)
|
||||
user = self.users[nickname_lower]
|
||||
users.append(user)
|
||||
channel_user = self._user_join(channel, user)
|
||||
|
||||
if not nickname_lower in channel.users:
|
||||
channel_user = self._user_join(channel, user)
|
||||
else:
|
||||
channel_user = channel.users[nickname_lower]
|
||||
|
||||
if hostmask.username:
|
||||
user.username = hostmask.username
|
||||
if nickname_lower == self.nickname_lower:
|
||||
self.username = hostmask.username
|
||||
if hostmask.hostname:
|
||||
user.hostname = hostmask.hostname
|
||||
if nickname_lower == self.nickname_lower:
|
||||
self.hostname = hostmask.hostname
|
||||
|
||||
if nickname_lower == self.nickname_lower:
|
||||
self._self_hostmask(hostmask)
|
||||
|
||||
for mode in modes:
|
||||
if not mode in channel_user.modes:
|
||||
channel_user.modes.append(mode)
|
||||
channel_user.modes.add(mode)
|
||||
return emit
|
||||
|
||||
@line_handler("RPL_CREATIONTIME")
|
||||
@line_handler(RPL_CREATIONTIME)
|
||||
# channel creation time, "MODE #channel" response (and on-join)
|
||||
def _handle_creation_time(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
|
@ -358,7 +375,7 @@ class Server(Named):
|
|||
if channel_lower in self.channels:
|
||||
channel = self.channels[channel_lower]
|
||||
emit.channel = channel
|
||||
channel.created = datetime.fromtimestamp(int(line.params[2]))
|
||||
channel.created = from_timestamp(int(line.params[2]))
|
||||
return emit
|
||||
|
||||
@line_handler("TOPIC")
|
||||
|
@ -369,11 +386,11 @@ class Server(Named):
|
|||
channel = self.channels[channel_lower]
|
||||
emit.channel = channel
|
||||
channel.topic = line.params[1]
|
||||
channel.topic_setter = str(line.hostmask)
|
||||
channel.topic_time = datetime.utcnow()
|
||||
channel.topic_setter = line.source
|
||||
channel.topic_time = now("utc")
|
||||
return emit
|
||||
|
||||
@line_handler("RPL_TOPIC")
|
||||
@line_handler(RPL_TOPIC)
|
||||
# topic text, "TOPIC #channel" response (and on-join)
|
||||
def _handle_topic_num(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
|
@ -383,7 +400,7 @@ class Server(Named):
|
|||
emit.channel = channel
|
||||
self.channels[channel_lower].topic = line.params[2]
|
||||
return emit
|
||||
@line_handler("RPL_TOPICWHOTIME")
|
||||
@line_handler(RPL_TOPICWHOTIME)
|
||||
# topic setby, "TOPIC #channel" response (and on-join)
|
||||
def _handle_topic_time(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
|
@ -392,38 +409,55 @@ class Server(Named):
|
|||
channel = self.channels[channel_lower]
|
||||
emit.channel = channel
|
||||
channel.topic_setter = line.params[2]
|
||||
channel.topic_time = datetime.fromtimestamp(int(line.params[3]))
|
||||
channel.topic_time = from_timestamp(int(line.params[3]))
|
||||
return emit
|
||||
|
||||
def _channel_modes(self,
|
||||
channel: Channel,
|
||||
modes: List[Tuple[bool, str]],
|
||||
params: List[str]):
|
||||
for add, char in modes:
|
||||
list_mode = char in self.isupport.chanmodes.list_modes
|
||||
if char in self.isupport.prefix.modes:
|
||||
nickname_lower = self.casefold(params.pop(0))
|
||||
modes: List[str],
|
||||
params: List[str]
|
||||
) -> List[Tuple[str, Optional[str]]]:
|
||||
tokens: List[Tuple[str, Optional[str]]] = []
|
||||
|
||||
for mode in modes:
|
||||
add = mode[0] == "+"
|
||||
char = mode[1]
|
||||
arg: Optional[str] = None
|
||||
|
||||
if char in self.isupport.prefix.modes: # a user's status
|
||||
arg = params.pop(0)
|
||||
nickname_lower = self.casefold(arg)
|
||||
|
||||
if nickname_lower in self.users:
|
||||
user = self.users[nickname_lower]
|
||||
channel_user = self.channel_users[channel][user]
|
||||
channel_user = channel.users[user.nickname_lower]
|
||||
if add:
|
||||
if not char in channel_user.modes:
|
||||
channel_user.modes.append(char)
|
||||
elif char in channel_user.modes:
|
||||
channel_user.modes.remove(char)
|
||||
elif add and (
|
||||
list_mode or
|
||||
char in self.isupport.chanmodes.setting_b_modes or
|
||||
char in self.isupport.chanmodes.setting_c_modes):
|
||||
channel.add_mode(char, params.pop(0), list_mode)
|
||||
elif not add and (
|
||||
list_mode or
|
||||
char in self.isupport.chanmodes.setting_b_modes):
|
||||
channel.remove_mode(char, params.pop(0))
|
||||
elif add:
|
||||
channel.add_mode(char, None, False)
|
||||
channel_user.modes.add(char)
|
||||
else:
|
||||
channel_user.modes.discard(char)
|
||||
else:
|
||||
channel.remove_mode(char, None)
|
||||
has_arg = False
|
||||
is_list = False
|
||||
if char in self.isupport.chanmodes.a_modes:
|
||||
has_arg = True
|
||||
is_list = True
|
||||
elif add:
|
||||
has_arg = char in (self.isupport.chanmodes.b_modes+
|
||||
self.isupport.chanmodes.c_modes)
|
||||
else: # remove
|
||||
has_arg = char in self.isupport.chanmodes.b_modes
|
||||
|
||||
if has_arg:
|
||||
arg = params.pop(0)
|
||||
|
||||
if add:
|
||||
channel.add_mode(char, arg, is_list)
|
||||
else:
|
||||
channel.remove_mode(char, arg)
|
||||
|
||||
tokens.append((mode, arg))
|
||||
|
||||
return tokens
|
||||
|
||||
@line_handler("MODE")
|
||||
def _handle_MODE(self, line: Line) -> Emit:
|
||||
|
@ -432,33 +466,42 @@ class Server(Named):
|
|||
modes_str = line.params[1]
|
||||
params = line.params[2:].copy()
|
||||
|
||||
modifier = True
|
||||
modes: List[Tuple[bool, str]] = []
|
||||
modifier = "+"
|
||||
modes: List[str] = []
|
||||
|
||||
for c in list(modes_str):
|
||||
if c == "+":
|
||||
modifier = True
|
||||
elif c == "-":
|
||||
modifier = False
|
||||
if c in ["+", "-"]:
|
||||
modifier = c
|
||||
else:
|
||||
modes.append((modifier, c))
|
||||
modes.append(f"{modifier}{c}")
|
||||
|
||||
target_lower = self.casefold(target)
|
||||
if target_lower == self.nickname_lower:
|
||||
emit.self_target = True
|
||||
for add, char in modes:
|
||||
emit.tokens = modes
|
||||
|
||||
for mode in modes:
|
||||
add = mode[0] == "+"
|
||||
char = mode[1]
|
||||
if add:
|
||||
if not char in self.modes:
|
||||
self.modes.append(char)
|
||||
elif char in self.modes:
|
||||
self.modes.remove(char)
|
||||
self.modes.add(char)
|
||||
else:
|
||||
self.modes.discard(char)
|
||||
elif target_lower in self.channels:
|
||||
channel = self.channels[self.casefold(target)]
|
||||
emit.channel = channel
|
||||
self._channel_modes(channel, modes, params)
|
||||
ctokens = self._channel_modes(channel, modes, params)
|
||||
|
||||
ctokens_str: List[str] = []
|
||||
for mode, arg in ctokens:
|
||||
if arg is not None:
|
||||
ctokens_str.append(f"{mode} {arg}")
|
||||
else:
|
||||
ctokens_str.append(mode)
|
||||
emit.tokens = ctokens_str
|
||||
return emit
|
||||
|
||||
@line_handler("RPL_CHANNELMODEIS")
|
||||
@line_handler(RPL_CHANNELMODEIS)
|
||||
# channel modes, "MODE #channel" response (sometimes on-join?)
|
||||
def _handle_channelmodeis(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
|
@ -466,17 +509,73 @@ class Server(Named):
|
|||
if channel_lower in self.channels:
|
||||
channel = self.channels[channel_lower]
|
||||
emit.channel = channel
|
||||
modes = [(True, char) for char in line.params[2].lstrip("+")]
|
||||
modes = [f"+{char}" for char in line.params[2].lstrip("+")]
|
||||
params = line.params[3:]
|
||||
self._channel_modes(channel, modes, params)
|
||||
return emit
|
||||
|
||||
@line_handler("RPL_UMODEIS")
|
||||
@line_handler(RPL_UMODEIS)
|
||||
# our own user modes, "MODE nickname" response (sometimes on-connect?)
|
||||
def _handle_umodeis(self, line: Line) -> Emit:
|
||||
for char in line.params[1].lstrip("+"):
|
||||
if not char in self.modes:
|
||||
self.modes.append(char)
|
||||
self.modes.add(char)
|
||||
return self._emit()
|
||||
|
||||
def _mode_list(self,
|
||||
channel_name: str,
|
||||
mode: str,
|
||||
mask: str):
|
||||
channel_lower = self.casefold(channel_name)
|
||||
if channel_lower in self.channels:
|
||||
channel = self.channels[channel_lower]
|
||||
if not mode in channel._list_modes_temp:
|
||||
channel._list_modes_temp[mode] = []
|
||||
channel._list_modes_temp[mode].append(mask)
|
||||
def _mode_list_end(self,
|
||||
channel_name: str,
|
||||
mode: str):
|
||||
channel_lower = self.casefold(channel_name)
|
||||
if channel_lower in self.channels:
|
||||
channel = self.channels[channel_lower]
|
||||
if mode in channel._list_modes_temp:
|
||||
mlist = channel._list_modes_temp.pop(mode)
|
||||
channel.list_modes[mode] = mlist
|
||||
|
||||
@line_handler(RPL_BANLIST)
|
||||
def _handle_banlist(self, line: Line) -> Emit:
|
||||
channel = line.params[1]
|
||||
mask = line.params[2]
|
||||
|
||||
if len(line.params) > 3:
|
||||
# parse these out but we're not storing them yet
|
||||
set_by = line.params[3]
|
||||
set_at = int(line.params[4])
|
||||
|
||||
self._mode_list(channel, "b", mask)
|
||||
return self._emit()
|
||||
|
||||
@line_handler(RPL_ENDOFBANLIST)
|
||||
def _handle_banlist_end(self, line: Line) -> Emit:
|
||||
channel = line.params[1]
|
||||
self._mode_list_end(channel, "b")
|
||||
return self._emit()
|
||||
|
||||
@line_handler(RPL_QUIETLIST)
|
||||
def _handle_quietlist(self, line: Line) -> Emit:
|
||||
channel = line.params[1]
|
||||
mode = line.params[2]
|
||||
mask = line.params[3]
|
||||
set_by = line.params[4]
|
||||
set_at = int(line.params[5])
|
||||
|
||||
self._mode_list(channel, mode, mask)
|
||||
return self._emit()
|
||||
|
||||
@line_handler(RPL_ENDOFQUIETLIST)
|
||||
def _handle_quietlist_end(self, line: Line) -> Emit:
|
||||
channel = line.params[1]
|
||||
mode = line.params[2]
|
||||
self._mode_list_end(channel, mode)
|
||||
return self._emit()
|
||||
|
||||
@line_handler("PRIVMSG")
|
||||
|
@ -484,6 +583,10 @@ class Server(Named):
|
|||
@line_handler("TAGMSG")
|
||||
def _handle_message(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
# this does not visually spark joy
|
||||
if not line.source:
|
||||
return emit
|
||||
|
||||
message = line.params[1] if line.params[1:] else None
|
||||
if not message is None:
|
||||
emit.text = message
|
||||
|
@ -491,15 +594,14 @@ class Server(Named):
|
|||
nickname_lower = self.casefold(line.hostmask.nickname)
|
||||
if nickname_lower == self.nickname_lower:
|
||||
emit.self_source = True
|
||||
if line.hostmask.username:
|
||||
self.username = line.hostmask.username
|
||||
if line.hostmask.hostname:
|
||||
self.hostname = line.hostmask.hostname
|
||||
self._self_hostmask(line.hostmask)
|
||||
|
||||
if nickname_lower in self.users:
|
||||
user = self.users[nickname_lower]
|
||||
else:
|
||||
user = self.create_user(line.hostmask.nickname, nickname_lower)
|
||||
user = self.create_user(
|
||||
Name(line.hostmask.nickname, nickname_lower)
|
||||
)
|
||||
emit.user = user
|
||||
|
||||
if line.hostmask.username:
|
||||
|
@ -526,7 +628,7 @@ class Server(Named):
|
|||
emit.self_target = True
|
||||
return emit
|
||||
|
||||
@line_handler("RPL_VISIBLEHOST")
|
||||
@line_handler(RPL_VISIBLEHOST)
|
||||
# our own hostname, sometimes username@hostname, when it changes
|
||||
def _handle_visiblehost(self, line: Line) -> Emit:
|
||||
username, _, hostname = line.params[1].rpartition("@")
|
||||
|
@ -535,7 +637,7 @@ class Server(Named):
|
|||
self.username = username
|
||||
return self._emit()
|
||||
|
||||
@line_handler("RPL_WHOREPLY")
|
||||
@line_handler(RPL_WHOREPLY)
|
||||
# WHO line, "WHO #channel|nickname" response
|
||||
def _handle_who(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
|
@ -543,14 +645,22 @@ class Server(Named):
|
|||
nickname = line.params[5]
|
||||
username = line.params[2]
|
||||
hostname = line.params[3]
|
||||
status = line.params[6]
|
||||
away = "" if "G" in status else None
|
||||
realname = line.params[7].split(" ", 1)[1]
|
||||
|
||||
server: Optional[str] = None
|
||||
if not line.params[4] == "*":
|
||||
server = line.params[4]
|
||||
|
||||
nickname_lower = self.casefold(line.params[5])
|
||||
if nickname_lower == self.nickname_lower:
|
||||
emit.self = True
|
||||
self.username = username
|
||||
self.hostname = hostname
|
||||
self.realname = realname
|
||||
self.server = server
|
||||
self.away = away
|
||||
|
||||
if nickname_lower in self.users:
|
||||
user = self.users[nickname_lower]
|
||||
|
@ -558,9 +668,62 @@ class Server(Named):
|
|||
user.username = username
|
||||
user.hostname = hostname
|
||||
user.realname = realname
|
||||
user.server = server
|
||||
user.away = away
|
||||
return emit
|
||||
|
||||
@line_handler("RPL_WHOISUSER")
|
||||
@line_handler(RPL_WHOSPCRPL)
|
||||
# WHOX line, "WHO #channel|nickname" response; only listen for our "type"
|
||||
def _handle_whox(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
if line.params[1] == WHO_TYPE and len(line.params) == 10:
|
||||
nickname_lower = self.casefold(line.params[6])
|
||||
username = line.params[2]
|
||||
hostname = line.params[4]
|
||||
status = line.params[7]
|
||||
away = "" if "G" in status else None
|
||||
realname = line.params[9]
|
||||
|
||||
account = ""
|
||||
if not line.params[8] == "0":
|
||||
account = line.params[8]
|
||||
|
||||
server: Optional[str] = None
|
||||
if not line.params[5] == "*":
|
||||
server = line.params[5]
|
||||
ip: Optional[str] = None
|
||||
if not line.params[3] == "255.255.255.255":
|
||||
try:
|
||||
ip = ip_address(line.params[3]).compressed
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if nickname_lower in self.users:
|
||||
user = self.users[nickname_lower]
|
||||
emit.user = user
|
||||
user.username = username
|
||||
user.hostname = hostname
|
||||
user.realname = realname
|
||||
user.account = account
|
||||
user.server = server
|
||||
user.away = away
|
||||
if ip is not None:
|
||||
user.ip = ip
|
||||
|
||||
if nickname_lower == self.nickname_lower:
|
||||
emit.self = True
|
||||
self.username = username
|
||||
self.hostname = hostname
|
||||
self.realname = realname
|
||||
self.account = account
|
||||
self.server = server
|
||||
self.away = away
|
||||
if ip is not None:
|
||||
self.ip = ip
|
||||
|
||||
return emit
|
||||
|
||||
@line_handler(RPL_WHOISUSER)
|
||||
# WHOIS "user" line, one of "WHOIS nickname" response lines
|
||||
def _handle_whoisuser(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
|
@ -617,6 +780,39 @@ class Server(Named):
|
|||
user.realname = realname
|
||||
return emit
|
||||
|
||||
@line_handler("RENAME")
|
||||
def _handle_RENAME(self, line: Line) -> Emit:
|
||||
source_fold = self.casefold(line.params[0])
|
||||
rename = line.params[1]
|
||||
rename_fold = self.casefold(rename)
|
||||
|
||||
if source_fold in self.channels:
|
||||
channel = self.channels.pop(source_fold)
|
||||
|
||||
channel.change_name(rename, rename_fold)
|
||||
for nickname in channel.users.keys():
|
||||
user = self.users[nickname]
|
||||
user.channels.remove(source_fold)
|
||||
user.channels.add(rename_fold)
|
||||
|
||||
self.channels[rename_fold] = channel
|
||||
return self._emit()
|
||||
|
||||
@line_handler(RPL_AWAY)
|
||||
# sent in response to a command directed at a user who is marked as away
|
||||
def _handle_RPL_AWAY(self, line: Line) -> Emit:
|
||||
nickname = line.params[1]
|
||||
nickname_lower = self.casefold(nickname)
|
||||
reason = line.params[2]
|
||||
|
||||
if nickname_lower == self.nickname_lower:
|
||||
self.away = reason
|
||||
if nickname_lower in self.users:
|
||||
user = self.users[nickname_lower]
|
||||
user.away = reason
|
||||
|
||||
return self._emit()
|
||||
|
||||
@line_handler("AWAY")
|
||||
def _handle_AWAY(self, line: Line) -> Emit:
|
||||
emit = self._emit()
|
||||
|
@ -690,3 +886,22 @@ class Server(Named):
|
|||
key in self.available_caps):
|
||||
self.agreed_caps.append(key)
|
||||
return emit
|
||||
|
||||
@line_handler(RPL_LOGGEDIN)
|
||||
def _handle_loggedin(self, line: Line) -> Emit:
|
||||
hostmask_str = line.params[1]
|
||||
hostmask = hostmask_(hostmask_str)
|
||||
account = line.params[2]
|
||||
|
||||
self.account = account
|
||||
self._self_hostmask(hostmask)
|
||||
return self._emit()
|
||||
|
||||
@line_handler(RPL_LOGGEDOUT)
|
||||
def _handle_loggedout(self, line: Line) -> Emit:
|
||||
hostmask_str = line.params[1]
|
||||
hostmask = hostmask_(hostmask_str)
|
||||
|
||||
self.account = None
|
||||
self._self_hostmask(hostmask)
|
||||
return self._emit()
|
||||
|
|
|
@ -1,22 +1,48 @@
|
|||
from typing import Optional, Set
|
||||
from .named import Named
|
||||
from .names import Name
|
||||
|
||||
class User(Named):
|
||||
nickname: str
|
||||
nickname_lower: str
|
||||
class User(object):
|
||||
def __init__(self, nickname: Name):
|
||||
self._nickname = nickname
|
||||
|
||||
def __init__(self, nickname: str, nickname_lower: str):
|
||||
self._set_nickname(nickname, nickname_lower)
|
||||
self.username: Optional[str] = None
|
||||
self.hostname: Optional[str] = None
|
||||
self.realname: Optional[str] = None
|
||||
self.account: Optional[str] = None
|
||||
self.server: Optional[str] = None
|
||||
self.away: Optional[str] = None
|
||||
self.ip: Optional[str] = None
|
||||
self.channels: Set[str] = set([])
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"User(nickname={self.nickname!r})"
|
||||
|
||||
def _set_nickname(self, nickname: str,
|
||||
nickname_lower: str):
|
||||
self.nickname = nickname
|
||||
self.nickname_lower = nickname_lower
|
||||
def get_name(self) -> Name:
|
||||
return self._nickname
|
||||
@property
|
||||
def nickname(self) -> str:
|
||||
return self._nickname.normal
|
||||
@property
|
||||
def nickname_lower(self) -> str:
|
||||
return self._nickname.folded
|
||||
|
||||
def change_nickname(self,
|
||||
normal: str,
|
||||
folded: str):
|
||||
self._nickname.normal = normal
|
||||
self._nickname.folded = folded
|
||||
|
||||
def hostmask(self) -> str:
|
||||
hostmask = self.nickname
|
||||
if self.username is not None:
|
||||
hostmask += f"!{self.username}"
|
||||
if self.hostname is not None:
|
||||
hostmask += f"@{self.hostname}"
|
||||
return hostmask
|
||||
|
||||
def userhost(self) -> Optional[str]:
|
||||
if (self.username is not None and
|
||||
self.hostname is not None):
|
||||
return f"{self.username}@{self.hostname}"
|
||||
else:
|
||||
return None
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
-r requirements.txt
|
||||
freezegun ~=1.1.0
|
|
@ -1 +1,2 @@
|
|||
irctokens ==0.9.2
|
||||
irctokens ~=2.0.2
|
||||
pendulum ~=2.1.0
|
||||
|
|
2
setup.py
2
setup.py
|
@ -26,6 +26,6 @@ setuptools.setup(
|
|||
"Operating System :: Microsoft :: Windows",
|
||||
"Topic :: Communications :: Chat :: Internet Relay Chat"
|
||||
],
|
||||
python_requires='>=3.6',
|
||||
python_requires='>=3.7',
|
||||
install_requires=install_requires
|
||||
)
|
||||
|
|
|
@ -6,3 +6,5 @@ from .cap import *
|
|||
from .isupport import *
|
||||
from .casemap import *
|
||||
from .emit import *
|
||||
from .who import *
|
||||
from .sasl import *
|
||||
|
|
|
@ -3,17 +3,17 @@ import ircstates, irctokens
|
|||
|
||||
class CaseMapTestMethod(unittest.TestCase):
|
||||
def test_rfc1459(self):
|
||||
lower = ircstates.casefold("rfc1459", "ÀTEST[]~\\")
|
||||
self.assertEqual(lower, "Àtest{}^|")
|
||||
lower = ircstates.casefold(ircstates.CaseMap.RFC1459, "ÀTEST[]^\\")
|
||||
self.assertEqual(lower, "Àtest{}~|")
|
||||
|
||||
def test_ascii(self):
|
||||
lower = ircstates.casefold("ascii", "ÀTEST[]~\\")
|
||||
lower = ircstates.casefold(ircstates.CaseMap.ASCII, "ÀTEST[]~\\")
|
||||
self.assertEqual(lower, "Àtest[]~\\")
|
||||
|
||||
class CaseMapTestCommands(unittest.TestCase):
|
||||
def test_join(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":Nickname JOIN #Chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":Other JOIN #Chan"))
|
||||
self.assertIn("nickname", server.users)
|
||||
|
@ -21,17 +21,14 @@ class CaseMapTestCommands(unittest.TestCase):
|
|||
self.assertIn("other", server.users)
|
||||
self.assertNotIn("Other", server.users)
|
||||
self.assertIn("#chan", server.channels)
|
||||
self.assertNotIn("#Chan", server.channels)
|
||||
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(channel.name, "#Chan")
|
||||
user1 = server.users["nickname"]
|
||||
user2 = server.users["other"]
|
||||
self.assertIn(user1, server.channel_users[channel])
|
||||
self.assertIn(user2, server.channel_users[channel])
|
||||
self.assertEqual(len(server.channel_users[channel]), 2)
|
||||
|
||||
def test_nick(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
user = server.users["nickname"]
|
||||
server.parse_tokens(irctokens.tokenise(":nickname NICK NewNickname"))
|
||||
|
|
159
test/channel.py
159
test/channel.py
|
@ -1,145 +1,164 @@
|
|||
import unittest
|
||||
from datetime import datetime
|
||||
import pendulum
|
||||
import ircstates, irctokens
|
||||
from freezegun import freeze_time
|
||||
|
||||
class ChannelTestJoin(unittest.TestCase):
|
||||
def test_self_join(self):
|
||||
dt = pendulum.datetime(2021, 9, 6, 2, 55, 22)
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
with freeze_time("2021-09-06 02:55:22"):
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
|
||||
self.assertIn("#chan", server.channels)
|
||||
self.assertIn("nickname", server.users)
|
||||
self.assertEqual(len(server.users), 1)
|
||||
self.assertEqual(len(server.channels), 1)
|
||||
|
||||
self.assertIn(server.channels["#chan"], server.channel_users)
|
||||
self.assertIn(server.users["nickname"], server.user_channels)
|
||||
self.assertEqual(len(server.user_channels), 1)
|
||||
self.assertEqual(len(server.channel_users), 1)
|
||||
user = server.users["nickname"]
|
||||
channel = server.channels["#chan"]
|
||||
self.assertIn(user.nickname_lower, channel.users)
|
||||
|
||||
channel_user = channel.users[user.nickname_lower]
|
||||
self.assertEqual(user.channels, set([channel.name_lower]))
|
||||
self.assertEqual(channel_user.since, dt)
|
||||
self.assertEqual(channel_user.joined, dt)
|
||||
|
||||
def test_other_join(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
self.assertEqual(len(server.users), 2)
|
||||
self.assertIn("other", server.users)
|
||||
self.assertEqual(len(server.user_channels), 2)
|
||||
self.assertEqual(len(server.channel_users), 1)
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(len(channel.users), 2)
|
||||
|
||||
user = server.users["other"]
|
||||
self.assertEqual(user.channels, set([channel.name_lower]))
|
||||
|
||||
class ChannelTestPart(unittest.TestCase):
|
||||
def test_self_part(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname PART #chan"))
|
||||
self.assertEqual(len(server.users), 0)
|
||||
self.assertEqual(len(server.channels), 0)
|
||||
self.assertEqual(len(server.user_channels), 0)
|
||||
self.assertEqual(len(server.channel_users), 0)
|
||||
|
||||
def test_other_part(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other PART #chan"))
|
||||
|
||||
user = server.users["nickname"]
|
||||
channel = server.channels["#chan"]
|
||||
channel_user = channel.users[user.nickname_lower]
|
||||
|
||||
self.assertEqual(len(server.users), 1)
|
||||
self.assertEqual(len(server.channels), 1)
|
||||
self.assertIn(user, server.user_channels)
|
||||
self.assertEqual(len(server.user_channels[user]), 1)
|
||||
self.assertIn(channel, server.channel_users)
|
||||
self.assertEqual(len(server.channel_users), 1)
|
||||
self.assertIn(user, server.channel_users[channel])
|
||||
self.assertEqual(len(server.user_channels), 1)
|
||||
self.assertEqual(server.users, {"nickname": user})
|
||||
self.assertEqual(server.channels, {"#chan": channel})
|
||||
self.assertEqual(user.channels, set([channel.name_lower]))
|
||||
self.assertEqual(channel.users, {"nickname": channel_user})
|
||||
|
||||
class ChannelTestKick(unittest.TestCase):
|
||||
def test_self_kick(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise(":nickname KICK #chan nickname"))
|
||||
self.assertEqual(len(server.users), 0)
|
||||
self.assertEqual(len(server.channels), 0)
|
||||
self.assertEqual(len(server.user_channels), 0)
|
||||
self.assertEqual(len(server.channel_users), 0)
|
||||
|
||||
def test_other_kick(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname KICK #chan other"))
|
||||
|
||||
user = server.users["nickname"]
|
||||
channel = server.channels["#chan"]
|
||||
channel_user = channel.users[user.nickname_lower]
|
||||
|
||||
self.assertEqual(len(server.users), 1)
|
||||
self.assertEqual(len(server.channels), 1)
|
||||
self.assertIn(user, server.user_channels)
|
||||
self.assertEqual(len(server.user_channels[user]), 1)
|
||||
self.assertIn(channel, server.channel_users)
|
||||
self.assertEqual(len(server.channel_users), 1)
|
||||
self.assertIn(user, server.channel_users[channel])
|
||||
self.assertEqual(len(server.user_channels), 1)
|
||||
self.assertEqual(user.channels, set([channel.name_lower]))
|
||||
self.assertEqual(channel.users, {user.nickname_lower: channel_user})
|
||||
|
||||
class ChannelTestTopic(unittest.TestCase):
|
||||
def test_text(self):
|
||||
dt = pendulum.datetime(2020, 3, 12, 14, 27, 57)
|
||||
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("332 * #chan :test"))
|
||||
self.assertEqual(server.channels["#chan"].topic, "test")
|
||||
|
||||
def test_set_by_at(self):
|
||||
dt = datetime(2020, 3, 12, 14, 27, 57)
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("333 * #chan other 1584023277"))
|
||||
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(channel.topic, "test")
|
||||
self.assertEqual(channel.topic_setter, "other")
|
||||
self.assertEqual(channel.topic_time, dt)
|
||||
|
||||
def test_topic_command(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("TOPIC #chan :hello there"))
|
||||
self.assertEqual(server.channels["#chan"].topic, "hello there")
|
||||
dt = pendulum.datetime(2021, 9, 6, 2, 43, 22)
|
||||
with freeze_time("2021-09-06 02:43:22"):
|
||||
server.parse_tokens(irctokens.tokenise(":other TOPIC #chan :hello there"))
|
||||
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(channel.topic, "hello there")
|
||||
self.assertEqual(channel.topic_setter, "other")
|
||||
self.assertEqual(channel.topic_time, dt)
|
||||
|
||||
class ChannelTestCreation(unittest.TestCase):
|
||||
def test(self):
|
||||
dt = datetime(2020, 3, 12, 19, 38, 9)
|
||||
dt = pendulum.datetime(2020, 3, 12, 19, 38, 9)
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("329 * #chan 1584041889"))
|
||||
self.assertEqual(server.channels["#chan"].created, dt)
|
||||
|
||||
class ChannelTestNAMES(unittest.TestCase):
|
||||
def test(self):
|
||||
dt_1 = pendulum.datetime(2021, 9, 6, 2, 57, 22)
|
||||
dt_2 = pendulum.datetime(2021, 9, 6, 2, 58, 22)
|
||||
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("353 * * #chan :nickname @+other"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
with freeze_time("2021-09-06 02:57:22"):
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
with freeze_time("2021-09-06 02:58:22"):
|
||||
server.parse_tokens(irctokens.tokenise("353 * * #chan :nickname @+other"))
|
||||
|
||||
self.assertIn("nickname", server.users)
|
||||
self.assertIn("other", server.users)
|
||||
|
||||
user = server.users["other"]
|
||||
self.assertIn(user, server.user_channels)
|
||||
channel = server.channels["#chan"]
|
||||
self.assertIn(user, server.channel_users[channel])
|
||||
channel_user = server.channel_users[channel][user]
|
||||
self.assertEqual(channel_user.modes, ["o", "v"])
|
||||
channel_user_1 = channel.users[server.nickname_lower]
|
||||
channel_user_2 = channel.users[user.nickname_lower]
|
||||
|
||||
self.assertEqual(channel.users, {
|
||||
server.nickname_lower: channel_user_1,
|
||||
user.nickname_lower: channel_user_2
|
||||
})
|
||||
self.assertEqual(user.channels, set([channel.name_lower]))
|
||||
self.assertEqual(channel_user_2.modes, {"o", "v"})
|
||||
|
||||
self.assertEqual(channel_user_1.since, dt_1)
|
||||
self.assertEqual(channel_user_2.since, dt_2)
|
||||
|
||||
def test_userhost_in_names(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
"353 * * #chan :nickname!user@host other!user2@host2"))
|
||||
|
@ -148,3 +167,37 @@ class ChannelTestNAMES(unittest.TestCase):
|
|||
user = server.users["other"]
|
||||
self.assertEqual(user.username, "user2")
|
||||
self.assertEqual(user.hostname, "host2")
|
||||
|
||||
class ChannelNICKAfterJoin(unittest.TestCase):
|
||||
def test(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
|
||||
user = server.users["nickname"]
|
||||
channel = server.channels["#chan"]
|
||||
channel_user = channel.users[user.nickname_lower]
|
||||
server.parse_tokens(irctokens.tokenise(":nickname NICK Nickname2"))
|
||||
|
||||
self.assertEqual(channel.users, {user.nickname_lower: channel_user})
|
||||
self.assertEqual(channel_user.nickname, "Nickname2")
|
||||
self.assertEqual(channel_user.nickname_lower, "nickname2")
|
||||
|
||||
class ChannelRENAME(unittest.TestCase):
|
||||
def test(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
|
||||
user = server.users["nickname"]
|
||||
channel = server.channels["#chan"]
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(":nickname RENAME #chan #chan2 *"))
|
||||
|
||||
self.assertEqual(channel.name, "#chan2")
|
||||
self.assertEqual(set(channel.users.keys()), {"nickname", "other"})
|
||||
self.assertEqual(user.channels, {"#chan2"})
|
||||
self.assertNotIn("#chan", server.channels)
|
||||
self.assertIn("#chan2", server.channels)
|
||||
self.assertEqual(len(server.channels), 1)
|
||||
|
|
121
test/emit.py
121
test/emit.py
|
@ -4,72 +4,103 @@ import ircstates, irctokens
|
|||
class EmitTest(unittest.TestCase):
|
||||
def test_join(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
emits = server.parse_tokens(
|
||||
irctokens.tokenise(":nickname JOIN #chan"))[0]
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
emit = server.parse_tokens(
|
||||
irctokens.tokenise(":nickname JOIN #chan"))
|
||||
|
||||
self.assertEqual(emits.command, "JOIN")
|
||||
self.assertEqual(emits.self, True)
|
||||
self.assertEqual(emits.user, server.users["nickname"])
|
||||
self.assertEqual(emits.channel, server.channels["#chan"])
|
||||
self.assertEqual(emit.command, "JOIN")
|
||||
self.assertEqual(emit.self, True)
|
||||
self.assertEqual(emit.user, server.users["nickname"])
|
||||
self.assertEqual(emit.channel, server.channels["#chan"])
|
||||
|
||||
emits = server.parse_tokens(
|
||||
irctokens.tokenise(":other JOIN #chan"))[0]
|
||||
self.assertEqual(emits.command, "JOIN")
|
||||
self.assertEqual(emits.self, None)
|
||||
self.assertEqual(emits.user, server.users["other"])
|
||||
self.assertEqual(emits.channel, server.channels["#chan"])
|
||||
emit = server.parse_tokens(
|
||||
irctokens.tokenise(":other JOIN #chan"))
|
||||
self.assertIsNotNone(emit)
|
||||
self.assertEqual(emit.command, "JOIN")
|
||||
self.assertEqual(emit.self, None)
|
||||
self.assertEqual(emit.user, server.users["other"])
|
||||
self.assertEqual(emit.channel, server.channels["#chan"])
|
||||
|
||||
def test_privmsg(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
emits = server.parse_tokens(
|
||||
irctokens.tokenise(":nickname PRIVMSG #chan :hello"))[0]
|
||||
self.assertEqual(emits.command, "PRIVMSG")
|
||||
self.assertEqual(emits.text, "hello")
|
||||
self.assertEqual(emits.self_source, True)
|
||||
self.assertEqual(emits.user, server.users["nickname"])
|
||||
self.assertEqual(emits.channel, server.channels["#chan"])
|
||||
emit = server.parse_tokens(
|
||||
irctokens.tokenise(":nickname PRIVMSG #chan :hello"))
|
||||
self.assertIsNotNone(emit)
|
||||
self.assertEqual(emit.command, "PRIVMSG")
|
||||
self.assertEqual(emit.text, "hello")
|
||||
self.assertEqual(emit.self_source, True)
|
||||
self.assertEqual(emit.user, server.users["nickname"])
|
||||
self.assertEqual(emit.channel, server.channels["#chan"])
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
emits = server.parse_tokens(
|
||||
irctokens.tokenise(":other PRIVMSG #chan :hello2"))[0]
|
||||
self.assertEqual(emits.command, "PRIVMSG")
|
||||
self.assertEqual(emits.text, "hello2")
|
||||
self.assertEqual(emits.self_source, None)
|
||||
self.assertEqual(emits.user, server.users["other"])
|
||||
self.assertEqual(emits.channel, server.channels["#chan"])
|
||||
emit = server.parse_tokens(
|
||||
irctokens.tokenise(":other PRIVMSG #chan :hello2"))
|
||||
self.assertIsNotNone(emit)
|
||||
self.assertEqual(emit.command, "PRIVMSG")
|
||||
self.assertEqual(emit.text, "hello2")
|
||||
self.assertEqual(emit.self_source, None)
|
||||
self.assertEqual(emit.user, server.users["other"])
|
||||
self.assertEqual(emit.channel, server.channels["#chan"])
|
||||
|
||||
def test_privmsg_nojoin(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
|
||||
emits = server.parse_tokens(
|
||||
irctokens.tokenise(":other PRIVMSG #chan :hello"))[0]
|
||||
emit = server.parse_tokens(
|
||||
irctokens.tokenise(":other PRIVMSG #chan :hello"))
|
||||
|
||||
self.assertEqual(emits.command, "PRIVMSG")
|
||||
self.assertEqual(emits.text, "hello")
|
||||
self.assertEqual(emits.self_source, None)
|
||||
self.assertIsNotNone(emits.user)
|
||||
self.assertIsNotNone(emit)
|
||||
self.assertEqual(emit.command, "PRIVMSG")
|
||||
self.assertEqual(emit.text, "hello")
|
||||
self.assertEqual(emit.self_source, None)
|
||||
self.assertIsNotNone(emit.user)
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(emits.channel, channel)
|
||||
self.assertEqual(emit.channel, channel)
|
||||
|
||||
def test_kick(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
user = server.users["nickname"]
|
||||
channel = server.channels["#chan"]
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
user_other = server.users["other"]
|
||||
emits = server.parse_tokens(
|
||||
irctokens.tokenise(":nickname KICK #chan other :reason"))[0]
|
||||
emit = server.parse_tokens(
|
||||
irctokens.tokenise(":nickname KICK #chan other :reason"))
|
||||
|
||||
self.assertEqual(emits.command, "KICK")
|
||||
self.assertEqual(emits.text, "reason")
|
||||
self.assertEqual(emits.self_source, True)
|
||||
self.assertEqual(emits.user_source, user)
|
||||
self.assertEqual(emits.user_target, user_other)
|
||||
self.assertEqual(emits.channel, channel)
|
||||
self.assertIsNotNone(emit)
|
||||
self.assertEqual(emit.command, "KICK")
|
||||
self.assertEqual(emit.text, "reason")
|
||||
self.assertEqual(emit.self_source, True)
|
||||
self.assertEqual(emit.user_source, user)
|
||||
self.assertEqual(emit.user_target, user_other)
|
||||
self.assertEqual(emit.channel, channel)
|
||||
|
||||
def test_mode_self(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
emit = server.parse_tokens(
|
||||
irctokens.tokenise("MODE nickname x+i-i+wi-wi"))
|
||||
|
||||
self.assertIsNotNone(emit)
|
||||
self.assertEqual(emit.command, "MODE")
|
||||
self.assertTrue(emit.self_target)
|
||||
self.assertEqual(emit.tokens,
|
||||
["+x", "+i", "-i", "+w", "+i", "-w", "-i"])
|
||||
|
||||
def test_mode_channel(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
channel = server.channels["#chan"]
|
||||
emit = server.parse_tokens(
|
||||
irctokens.tokenise(":server MODE #chan +im-m+b-k asd!*@* key"))
|
||||
|
||||
self.assertIsNotNone(emit)
|
||||
self.assertEqual(emit.command, "MODE")
|
||||
self.assertEqual(emit.channel, channel)
|
||||
self.assertEqual(emit.tokens,
|
||||
["+i", "+m", "-m", "+b asd!*@*", "-k key"])
|
||||
|
|
|
@ -2,23 +2,29 @@ import unittest
|
|||
import ircstates, irctokens
|
||||
|
||||
class ISUPPORTTest(unittest.TestCase):
|
||||
def test_escape(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(r"005 * TEST=a\x20b\\x20\x2 *"))
|
||||
self.assertEqual(server.isupport.raw["TEST"], r"a b\x20x2")
|
||||
|
||||
def test_chanmodes(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
self.assertEqual(server.isupport.chanmodes.list_modes, ["b"])
|
||||
self.assertEqual(server.isupport.chanmodes.setting_b_modes, ["k"])
|
||||
self.assertEqual(server.isupport.chanmodes.setting_c_modes, ["l"])
|
||||
self.assertEqual(server.isupport.chanmodes.setting_d_modes,
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertEqual(server.isupport.chanmodes.a_modes, ["b"])
|
||||
self.assertEqual(server.isupport.chanmodes.b_modes, ["k"])
|
||||
self.assertEqual(server.isupport.chanmodes.c_modes, ["l"])
|
||||
self.assertEqual(server.isupport.chanmodes.d_modes,
|
||||
["i", "m", "n", "p", "s", "t"])
|
||||
server.parse_tokens(irctokens.tokenise("005 * CHANMODES=a,b,c,d *"))
|
||||
self.assertEqual(server.isupport.chanmodes.list_modes, ["a"])
|
||||
self.assertEqual(server.isupport.chanmodes.setting_b_modes, ["b"])
|
||||
self.assertEqual(server.isupport.chanmodes.setting_c_modes, ["c"])
|
||||
self.assertEqual(server.isupport.chanmodes.setting_d_modes, ["d"])
|
||||
self.assertEqual(server.isupport.chanmodes.a_modes, ["a"])
|
||||
self.assertEqual(server.isupport.chanmodes.b_modes, ["b"])
|
||||
self.assertEqual(server.isupport.chanmodes.c_modes, ["c"])
|
||||
self.assertEqual(server.isupport.chanmodes.d_modes, ["d"])
|
||||
|
||||
def test_prefix(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertEqual(server.isupport.prefix.modes, ["o", "v"])
|
||||
self.assertEqual(server.isupport.prefix.prefixes, ["@", "+"])
|
||||
|
||||
|
@ -38,62 +44,37 @@ class ISUPPORTTest(unittest.TestCase):
|
|||
|
||||
def test_chantypes(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertEqual(server.isupport.chantypes, ["#"])
|
||||
server.parse_tokens(irctokens.tokenise("005 * CHANTYPES=#& *"))
|
||||
self.assertEqual(server.isupport.chantypes, ["#", "&"])
|
||||
|
||||
def test_modes(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertEqual(server.isupport.modes, 3)
|
||||
server.parse_tokens(irctokens.tokenise("005 * MODES *"))
|
||||
self.assertEqual(server.isupport.modes, -1)
|
||||
server.parse_tokens(irctokens.tokenise("005 * MODES=5 *"))
|
||||
self.assertEqual(server.isupport.modes, 5)
|
||||
|
||||
def test_rfc1459(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
self.assertEqual(server.isupport.casemapping, "rfc1459")
|
||||
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=rfc1459 *"))
|
||||
self.assertEqual(server.isupport.casemapping, "rfc1459")
|
||||
lower = server.casefold("ÀTEST[]~\\")
|
||||
self.assertEqual(lower, "Àtest{}^|")
|
||||
|
||||
def test_ascii(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=ascii *"))
|
||||
self.assertEqual(server.isupport.casemapping, "ascii")
|
||||
lower = server.casefold("ÀTEST[]~\\")
|
||||
self.assertEqual(lower, "Àtest[]~\\")
|
||||
|
||||
def test_fallback_to_rfc1459(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=asd *"))
|
||||
self.assertEqual(server.isupport.casemapping, "rfc1459")
|
||||
lower = server.casefold("ÀTEST[]~\\")
|
||||
self.assertEqual(lower, "Àtest{}^|")
|
||||
|
||||
def test_network(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertIsNone(server.isupport.network)
|
||||
server.parse_tokens(irctokens.tokenise("005 * NETWORK=testnet *"))
|
||||
self.assertEqual(server.isupport.network, "testnet")
|
||||
|
||||
def test_statusmsg(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertEqual(server.isupport.statusmsg, [])
|
||||
server.parse_tokens(irctokens.tokenise("005 * STATUSMSG=&@ *"))
|
||||
self.assertEqual(server.isupport.statusmsg, ["&", "@"])
|
||||
|
||||
def test_callerid(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertIsNone(server.isupport.callerid)
|
||||
server.parse_tokens(irctokens.tokenise("005 * CALLERID=U *"))
|
||||
self.assertEqual(server.isupport.callerid, "U")
|
||||
|
@ -102,7 +83,7 @@ class ISUPPORTTest(unittest.TestCase):
|
|||
|
||||
def test_excepts(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertIsNone(server.isupport.excepts)
|
||||
server.parse_tokens(irctokens.tokenise("005 * EXCEPTS=U *"))
|
||||
self.assertEqual(server.isupport.excepts, "U")
|
||||
|
@ -111,7 +92,7 @@ class ISUPPORTTest(unittest.TestCase):
|
|||
|
||||
def test_invex(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertIsNone(server.isupport.invex)
|
||||
server.parse_tokens(irctokens.tokenise("005 * INVEX=U *"))
|
||||
self.assertEqual(server.isupport.invex, "U")
|
||||
|
@ -120,14 +101,14 @@ class ISUPPORTTest(unittest.TestCase):
|
|||
|
||||
def test_whox(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertFalse(server.isupport.whox)
|
||||
server.parse_tokens(irctokens.tokenise("005 * WHOX *"))
|
||||
self.assertTrue(server.isupport.whox)
|
||||
|
||||
def test_monitor(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertIsNone(server.isupport.monitor)
|
||||
server.parse_tokens(irctokens.tokenise("005 * MONITOR=123 *"))
|
||||
self.assertEqual(server.isupport.monitor, 123)
|
||||
|
@ -136,9 +117,36 @@ class ISUPPORTTest(unittest.TestCase):
|
|||
|
||||
def test_watch(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertIsNone(server.isupport.watch)
|
||||
server.parse_tokens(irctokens.tokenise("005 * WATCH=123 *"))
|
||||
self.assertEqual(server.isupport.watch, 123)
|
||||
server.parse_tokens(irctokens.tokenise("005 * WATCH *"))
|
||||
self.assertEqual(server.isupport.watch, -1)
|
||||
|
||||
def test_nicklen(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertEqual(server.isupport.nicklen, 9)
|
||||
server.parse_tokens(irctokens.tokenise("005 * NICKLEN=16 *"))
|
||||
self.assertEqual(server.isupport.nicklen, 16)
|
||||
|
||||
class ISupportTestCasemapping(unittest.TestCase):
|
||||
def test_rfc1459(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
self.assertEqual(server.isupport.casemapping, ircstates.CaseMap.RFC1459)
|
||||
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=rfc1459 *"))
|
||||
self.assertEqual(server.isupport.casemapping, ircstates.CaseMap.RFC1459)
|
||||
|
||||
def test_ascii(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=ascii *"))
|
||||
self.assertEqual(server.isupport.casemapping, ircstates.CaseMap.ASCII)
|
||||
|
||||
def test_unknown(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
with self.assertRaises(ValueError):
|
||||
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=asd *"))
|
||||
|
|
102
test/mode.py
102
test/mode.py
|
@ -5,32 +5,32 @@ import ircstates, irctokens
|
|||
class ModeTestUMode(unittest.TestCase):
|
||||
def test_add(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE nickname +i"))
|
||||
self.assertEqual(server.modes, ["i"])
|
||||
self.assertEqual(server.modes, {"i"})
|
||||
|
||||
def test_remove(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE nickname +i"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE nickname -i"))
|
||||
self.assertEqual(server.modes, [])
|
||||
self.assertEqual(server.modes, set())
|
||||
|
||||
class ModeTestChannelPrefix(unittest.TestCase):
|
||||
def test_add(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise("MODE #chan +ov nickname nickname"))
|
||||
user = server.users["nickname"]
|
||||
channel = server.channels["#chan"]
|
||||
channel_user = server.channel_users[channel][user]
|
||||
self.assertEqual(channel_user.modes, ["o", "v"])
|
||||
channel_user = channel.users[user.nickname_lower]
|
||||
self.assertEqual(channel_user.modes, {"o", "v"})
|
||||
|
||||
def test_remove(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise("MODE #chan +ov nickname nickname"))
|
||||
|
@ -38,31 +38,75 @@ class ModeTestChannelPrefix(unittest.TestCase):
|
|||
irctokens.tokenise("MODE #chan -ov nickname nickname"))
|
||||
user = server.users["nickname"]
|
||||
channel = server.channels["#chan"]
|
||||
channel_user = server.channel_users[channel][user]
|
||||
self.assertEqual(channel_user.modes, [])
|
||||
channel_user = channel.users[user.nickname_lower]
|
||||
self.assertEqual(channel_user.modes, set())
|
||||
|
||||
class ModeTestChannelList(unittest.TestCase):
|
||||
def test_add(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan +b asd!*@*"))
|
||||
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(channel.list_modes, {"b": []})
|
||||
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan +b asd!*@*"))
|
||||
self.assertEqual(channel.list_modes, {"b": ["asd!*@*"]})
|
||||
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan -b asd!*@*"))
|
||||
self.assertEqual(channel.list_modes, {"b": []})
|
||||
|
||||
def test_remove(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan +b asd!*@*"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan +b dsa!*@*"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan -b asd!*@*"))
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(channel.list_modes, {})
|
||||
self.assertEqual(channel.list_modes, {"b": ["dsa!*@*"]})
|
||||
|
||||
def test_banlist(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise("367 * #chan *!*@host setby 1594477713"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise("367 * #chan $a:account setby 1594477713"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise("367 * #chan r:my*gecos"))
|
||||
server.parse_tokens(irctokens.tokenise("368 * #chan *"))
|
||||
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(
|
||||
channel.list_modes["b"],
|
||||
["*!*@host", "$a:account", "r:my*gecos"]
|
||||
)
|
||||
|
||||
def test_quietlist(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise("728 * #chan q q!*@host setby 1594477713"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise("728 * #chan q $a:qaccount setby 1594477713"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise("728 * #chan q r:q*my*gecos setby 1594477713"))
|
||||
server.parse_tokens(irctokens.tokenise("729 * #chan q *"))
|
||||
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(
|
||||
channel.list_modes["q"],
|
||||
["q!*@host", "$a:qaccount", "r:q*my*gecos"]
|
||||
)
|
||||
|
||||
|
||||
class ModeTestChannelTypeB(unittest.TestCase):
|
||||
def test_add(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan +k password"))
|
||||
channel = server.channels["#chan"]
|
||||
|
@ -70,17 +114,17 @@ class ModeTestChannelTypeB(unittest.TestCase):
|
|||
|
||||
def test_remove(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan +k password"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan -k *"))
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(channel.list_modes, {})
|
||||
self.assertEqual(channel.modes, {})
|
||||
|
||||
class ModeTestChannelTypeC(unittest.TestCase):
|
||||
def test_add(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan +l 100"))
|
||||
channel = server.channels["#chan"]
|
||||
|
@ -88,17 +132,17 @@ class ModeTestChannelTypeC(unittest.TestCase):
|
|||
|
||||
def test_remove(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan +l 100"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan -l"))
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(channel.list_modes, {})
|
||||
self.assertEqual(channel.modes, {})
|
||||
|
||||
class ModeTestChannelTypeD(unittest.TestCase):
|
||||
def test_add(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan +i"))
|
||||
channel = server.channels["#chan"]
|
||||
|
@ -106,17 +150,17 @@ class ModeTestChannelTypeD(unittest.TestCase):
|
|||
|
||||
def test_remove(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan +i"))
|
||||
server.parse_tokens(irctokens.tokenise("MODE #chan -i"))
|
||||
channel = server.channels["#chan"]
|
||||
self.assertEqual(channel.list_modes, {})
|
||||
self.assertEqual(channel.modes, {})
|
||||
|
||||
class ModeTestChannelNumeric(unittest.TestCase):
|
||||
def test(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise("324 * #chan +bkli *!*@* pass 10"))
|
||||
|
@ -126,7 +170,7 @@ class ModeTestChannelNumeric(unittest.TestCase):
|
|||
|
||||
def test_without_plus(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("324 * #chan il 10"))
|
||||
channel = server.channels["#chan"]
|
||||
|
@ -135,12 +179,12 @@ class ModeTestChannelNumeric(unittest.TestCase):
|
|||
class ModeTestUserNumeric(unittest.TestCase):
|
||||
def test(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise("221 * +iw"))
|
||||
self.assertEqual(server.modes, ["i", "w"])
|
||||
self.assertEqual(server.modes, {"i", "w"})
|
||||
|
||||
def test_without_plus(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise("221 * iw"))
|
||||
self.assertEqual(server.modes, ["i", "w"])
|
||||
self.assertEqual(server.modes, {"i", "w"})
|
||||
|
|
|
@ -4,7 +4,7 @@ import ircstates, irctokens
|
|||
class MOTDTest(unittest.TestCase):
|
||||
def test(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise("375 * :start of motd"))
|
||||
server.parse_tokens(irctokens.tokenise("372 * :first line of motd"))
|
||||
server.parse_tokens(irctokens.tokenise("372 * :second line of motd"))
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
import unittest
|
||||
import ircstates, irctokens
|
||||
|
||||
class SASLTestAccount(unittest.TestCase):
|
||||
def test_loggedin(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("900 * nick!user@host account *"))
|
||||
|
||||
self.assertEqual(server.nickname, "nick")
|
||||
self.assertEqual(server.username, "user")
|
||||
self.assertEqual(server.hostname, "host")
|
||||
self.assertEqual(server.account, "account")
|
||||
|
||||
def test_loggedout(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("900 * nick!user@host account *"))
|
||||
server.parse_tokens(irctokens.tokenise("901 * nick1!user1@host1 *"))
|
||||
|
||||
self.assertEqual(server.nickname, "nick1")
|
||||
self.assertEqual(server.username, "user1")
|
||||
self.assertEqual(server.hostname, "host1")
|
||||
self.assertEqual(server.account, None)
|
118
test/user.py
118
test/user.py
|
@ -4,55 +4,71 @@ import ircstates, irctokens
|
|||
class UserTestNicknameChange(unittest.TestCase):
|
||||
def test(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname NICK nickname2"))
|
||||
self.assertEqual(server.nickname, "nickname2")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname NICK Nickname2"))
|
||||
self.assertEqual(server.nickname, "Nickname2")
|
||||
self.assertEqual(server.nickname_lower, "nickname2")
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(":nickname2 JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
self.assertIn("other", server.users)
|
||||
server.parse_tokens(irctokens.tokenise(":other NICK other2"))
|
||||
user = server.users["other"]
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(":other NICK Other2"))
|
||||
self.assertNotIn("other", server.users)
|
||||
self.assertIn("other2", server.users)
|
||||
|
||||
self.assertEqual(user.nickname, "Other2")
|
||||
self.assertEqual(user.nickname_lower, "other2")
|
||||
|
||||
class UserTestHostmaskJoin(unittest.TestCase):
|
||||
def test_both(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise(":nickname!user@host JOIN #chan"))
|
||||
self.assertEqual(server.username, "user")
|
||||
self.assertEqual(server.hostname, "host")
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(":other!user@host JOIN #chan"))
|
||||
user = server.users["other"]
|
||||
self.assertEqual(user.username, "user")
|
||||
self.assertEqual(user.hostname, "host")
|
||||
self.assertEqual(user.userhost(), "user@host")
|
||||
self.assertEqual(user.hostmask(), "other!user@host")
|
||||
|
||||
def test_user(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname!user JOIN #chan"))
|
||||
self.assertEqual(server.username, "user")
|
||||
self.assertIsNone(server.hostname)
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(":other!user JOIN #chan"))
|
||||
user = server.users["other"]
|
||||
self.assertEqual(user.username, "user")
|
||||
self.assertIsNone(user.hostname)
|
||||
self.assertIsNone(user.userhost())
|
||||
self.assertEqual(user.hostmask(), "other!user")
|
||||
|
||||
def test_host(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname@host JOIN #chan"))
|
||||
self.assertIsNone(server.username)
|
||||
self.assertEqual(server.hostname, "host")
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(":other@host JOIN #chan"))
|
||||
user = server.users["other"]
|
||||
self.assertIsNone(user.username)
|
||||
self.assertEqual(user.hostname, "host")
|
||||
self.assertIsNone(user.userhost())
|
||||
self.assertEqual(user.hostmask(), "other@host")
|
||||
|
||||
class UserTestExtendedJoin(unittest.TestCase):
|
||||
def test_without_extended_join(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
self.assertIsNone(server.account)
|
||||
self.assertIsNone(server.realname)
|
||||
|
@ -63,7 +79,7 @@ class UserTestExtendedJoin(unittest.TestCase):
|
|||
|
||||
def test_with_account(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan acc :realname"))
|
||||
self.assertEqual(server.account, "acc")
|
||||
self.assertEqual(server.realname, "realname")
|
||||
|
@ -74,7 +90,7 @@ class UserTestExtendedJoin(unittest.TestCase):
|
|||
|
||||
def test_without_account(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan * :realname"))
|
||||
self.assertEqual(server.account, "")
|
||||
self.assertEqual(server.realname, "realname")
|
||||
|
@ -86,7 +102,7 @@ class UserTestExtendedJoin(unittest.TestCase):
|
|||
class UserTestAccountNotify(unittest.TestCase):
|
||||
def test_with_account(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname ACCOUNT acc"))
|
||||
self.assertEqual(server.account, "acc")
|
||||
|
@ -97,7 +113,7 @@ class UserTestAccountNotify(unittest.TestCase):
|
|||
|
||||
def test_without_account(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname ACCOUNT *"))
|
||||
self.assertEqual(server.account, "")
|
||||
|
@ -109,7 +125,7 @@ class UserTestAccountNotify(unittest.TestCase):
|
|||
class UserTestHostmaskPRIVMSG(unittest.TestCase):
|
||||
def test_both(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise(":nickname!user@host PRIVMSG #chan :hi"))
|
||||
|
@ -124,7 +140,7 @@ class UserTestHostmaskPRIVMSG(unittest.TestCase):
|
|||
|
||||
def test_user(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise(":nickname!user PRIVMSG #chan :hi"))
|
||||
|
@ -139,7 +155,7 @@ class UserTestHostmaskPRIVMSG(unittest.TestCase):
|
|||
|
||||
def test_host(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise(":nickname@host PRIVMSG #chan :hi"))
|
||||
|
@ -155,22 +171,22 @@ class UserTestHostmaskPRIVMSG(unittest.TestCase):
|
|||
class UserTestVisibleHost(unittest.TestCase):
|
||||
def test_without_username(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("396 * hostname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise("396 * hostname *"))
|
||||
self.assertIsNone(server.username)
|
||||
self.assertEqual(server.hostname, "hostname")
|
||||
|
||||
def test_with_username(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("396 * username@hostname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise("396 * username@hostname *"))
|
||||
self.assertEqual(server.username, "username")
|
||||
self.assertEqual(server.hostname, "hostname")
|
||||
|
||||
class UserTestWHO(unittest.TestCase):
|
||||
def test(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
server.parse_tokens(
|
||||
|
@ -189,7 +205,7 @@ class UserTestWHO(unittest.TestCase):
|
|||
class UserTestCHGHOST(unittest.TestCase):
|
||||
def test(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(
|
||||
irctokens.tokenise(":nickname!user@host JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname CHGHOST u h"))
|
||||
|
@ -205,7 +221,7 @@ class UserTestCHGHOST(unittest.TestCase):
|
|||
class UserTestWHOIS(unittest.TestCase):
|
||||
def test_user_line(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise("311 * nickname u h * :r"))
|
||||
self.assertEqual(server.username, "u")
|
||||
|
@ -213,44 +229,56 @@ class UserTestWHOIS(unittest.TestCase):
|
|||
self.assertEqual(server.realname, "r")
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other CHGHOST u2 h2"))
|
||||
server.parse_tokens(irctokens.tokenise("311 * other u2 h2 * :r2"))
|
||||
user = server.users["other"]
|
||||
self.assertEqual(user.username, "u2")
|
||||
self.assertEqual(user.hostname, "h2")
|
||||
self.assertEqual(user.realname, "r2")
|
||||
|
||||
class UserTestAWAY(unittest.TestCase):
|
||||
def test_set(self):
|
||||
class UserTestAway(unittest.TestCase):
|
||||
def test_verb_set(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
user = server.users["other"]
|
||||
self.assertIsNone(server.away)
|
||||
self.assertIsNone(user.away)
|
||||
server.parse_tokens(irctokens.tokenise(":nickname AWAY :bye bye"))
|
||||
server.parse_tokens(irctokens.tokenise(":other AWAY :ik ga weg"))
|
||||
self.assertEqual(server.away, "bye bye")
|
||||
self.assertEqual(user.away, "ik ga weg")
|
||||
|
||||
def test_unset(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname AWAY :ik ga weg"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname AWAY"))
|
||||
server.parse_tokens(irctokens.tokenise(":other AWAY :ik ga weg"))
|
||||
server.parse_tokens(irctokens.tokenise(":other AWAY"))
|
||||
user = server.users["other"]
|
||||
user = server.users["nickname"]
|
||||
self.assertIsNone(server.away)
|
||||
self.assertIsNone(user.away)
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(":nickname AWAY :ik ga weg"))
|
||||
self.assertEqual(server.away, "ik ga weg")
|
||||
self.assertEqual(user.away, "ik ga weg")
|
||||
|
||||
def test_verb_unset(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
user = server.users["nickname"]
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
":nickname AWAY :let's blow this popsicle stand"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname AWAY"))
|
||||
self.assertIsNone(server.away)
|
||||
self.assertIsNone(user.away)
|
||||
|
||||
def test_numeric(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
user = server.users["nickname"]
|
||||
|
||||
self.assertIsNone(server.away)
|
||||
self.assertIsNone(user.away)
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
"301 * nickname :i saw something shiny"))
|
||||
self.assertEqual(server.away, "i saw something shiny")
|
||||
self.assertEqual(user.away, "i saw something shiny")
|
||||
|
||||
class UserTestSETNAME(unittest.TestCase):
|
||||
def test(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname"))
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
|
||||
user = server.users["other"]
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
import unittest
|
||||
import ircstates, irctokens
|
||||
from ircstates.server import WHO_TYPE
|
||||
|
||||
class WHOTest(unittest.TestCase):
|
||||
def test_who(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
user = server.users["nickname"]
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
"352 * #chan user host server nickname * :0 real"))
|
||||
|
||||
self.assertEqual(user.username, "user")
|
||||
self.assertEqual(user.hostname, "host")
|
||||
self.assertEqual(user.realname, "real")
|
||||
self.assertEqual(user.account, None)
|
||||
self.assertEqual(user.server, "server")
|
||||
self.assertIsNone(user.away)
|
||||
|
||||
self.assertEqual(server.username, user.username)
|
||||
self.assertEqual(server.hostname, user.hostname)
|
||||
self.assertEqual(server.realname, user.realname)
|
||||
self.assertEqual(server.server, user.server)
|
||||
self.assertIsNone(server.away)
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
"352 * #chan user host server nickname G* :0 real"))
|
||||
self.assertEqual(user.away, "")
|
||||
self.assertEqual(server.away, "")
|
||||
|
||||
def test_whox(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
user = server.users["nickname"]
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
f"354 * {WHO_TYPE} user 1.2.3.4 host server nickname * account :real"))
|
||||
|
||||
self.assertEqual(user.username, "user")
|
||||
self.assertEqual(user.hostname, "host")
|
||||
self.assertEqual(user.realname, "real")
|
||||
self.assertEqual(user.account, "account")
|
||||
self.assertEqual(user.server, "server")
|
||||
self.assertIsNone(user.away)
|
||||
self.assertEqual(user.ip, "1.2.3.4")
|
||||
|
||||
self.assertEqual(server.username, user.username)
|
||||
self.assertEqual(server.hostname, user.hostname)
|
||||
self.assertEqual(server.realname, user.realname)
|
||||
self.assertEqual(server.account, user.account)
|
||||
self.assertEqual(server.server, user.server)
|
||||
self.assertIsNone(server.away)
|
||||
self.assertEqual(server.ip, user.ip)
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
f"354 * {WHO_TYPE} user realip host server nickname G account :real"))
|
||||
self.assertEqual(user.away, "")
|
||||
self.assertEqual(server.away, "")
|
||||
|
||||
def test_whox_no_account(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
|
||||
user = server.users["nickname"]
|
||||
user.account = "account"
|
||||
server.account = "account"
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
f"354 * {WHO_TYPE} user realip host server nickname * 0 :real"))
|
||||
|
||||
self.assertEqual(user.account, "")
|
||||
self.assertEqual(server.account, user.account)
|
||||
|
||||
def test_whox_ipv6(self):
|
||||
server = ircstates.Server("test")
|
||||
server.parse_tokens(irctokens.tokenise("001 nickname *"))
|
||||
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
|
||||
|
||||
user = server.users["nickname"]
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
f"354 * {WHO_TYPE} user 0::1 host server nickname * 0 :real"))
|
||||
self.assertEqual(user.ip, "::1")
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
f"354 * {WHO_TYPE} user 00::2 host server nickname * 0 :real"))
|
||||
self.assertEqual(user.ip, "::2")
|
||||
|
||||
server.parse_tokens(irctokens.tokenise(
|
||||
f"354 * {WHO_TYPE} user fd00:0:0:0::1 host server nickname * 0 :real"))
|
||||
self.assertEqual(user.ip, "fd00::1")
|
Loading…
Reference in New Issue