Compare commits

...

150 Commits

Author SHA1 Message Date
jesopo ab30dbe658 v0.12.1 release 2023-08-17 22:42:39 +00:00
jesopo d370c67373 remove cachetools from requirements.txt 2023-08-17 22:42:00 +00:00
jesopo 0a844bd90d remove freezegun from non-dev requirements 2023-08-17 22:37:48 +00:00
jesopo 3dc56da30e add RPL_MONONLINE to numerics.py 2022-05-23 23:29:57 +00:00
jesopo c21545e2c2 extraneous space 2022-01-29 21:19:43 +00:00
jesopo 275b2c7f3d slightly neater casemap.py 2022-01-29 21:19:31 +00:00
jesopo 1333228fd1 upgrade irctokens to v2.0.2 2022-01-29 21:18:41 +00:00
jesopo 9166d82359 v0.12.0 release 2022-01-07 19:03:18 +00:00
jesopo ea9c0c2d1f str.maketrans is a much faster casefold; make casemaps an Enum 2022-01-07 18:59:37 +00:00
jesopo bc7c4d75a8 v0.11.11 release 2022-01-07 11:42:07 +00:00
jesopo a389c6f3cb remove python3.6; add python3.9 2022-01-07 11:39:30 +00:00
jesopo 83215e996b don't use --install-types. install types-cachetools specifically 2022-01-07 11:36:51 +00:00
jesopo e0cbaa4519 .travis.yml before_script `mypy --install-types` 2022-01-07 11:30:37 +00:00
jesopo e3884c7505 put a cachetools LRUCache on casefold() 2022-01-07 11:27:01 +00:00
jesopo 1e187db35f v0.11.10 release 2021-09-18 17:10:37 +00:00
jesopo 3a95bf4bca add RPL_YOUREOPER, RPL_RSACHALLENGE2, RPL_ENDOFRSACHALLENGE2 2021-09-18 16:47:28 +00:00
jesopo 5c50167d96 v0.11.9 release 2021-09-06 03:11:49 +00:00
jesopo e5a7871fd9 record when we first saw a user in a channel and optionally when they JOINed 2021-09-06 03:10:25 +00:00
jesopo 3565259791 only make a new channel_user on NAMES when we don't have one 2021-09-06 03:04:49 +00:00
jesopo 8c16b73414 test topic_setter and topic_time in TOPIC test too 2021-09-06 02:51:34 +00:00
jesopo 806c6e4bf3 combine ChannelTestTopic.test_text and test_set_by_at 2021-09-06 02:50:22 +00:00
jesopo d865ea3253 server.modes and channel_user.modes should be sets 2021-08-16 20:21:09 +00:00
jesopo 038c59659f freenode is dead long live libera.chat 2021-05-24 17:27:10 +00:00
jesopo 22552c5e3d v0.11.8 release 2021-04-10 13:50:16 +00:00
jesopo ddcacabfda parse NICKLEN from ISUPPORT 2021-04-10 13:47:44 +00:00
jesopo cb8aa4495a implement \xHH (hex) escapes in ISUPPORT token values 2021-02-28 15:27:22 +00:00
jesopo 3136d2b85c add missing return on RENAME handler 2021-02-18 14:58:14 +00:00
jesopo 4d14d67d4b support RENAME
closes #4
2021-02-16 22:10:10 +00:00
jesopo 566b8ec8cd unknown account status is None, known not-logged-in is empty string 2021-01-08 16:50:55 +00:00
jesopo 9b407b666d v0.11.7 release 2020-12-20 00:19:39 +00:00
jesopo a69fd01766 add RPL_LOGOFF (WATCH) and RPL_MONOFFLINE (MONITOR) 2020-12-20 00:15:03 +00:00
jesopo 1f8dfe700f add RPL_ENDOFMOTD and RPL_NOMOTD 2020-12-20 00:14:48 +00:00
jesopo 202cf8227b v0.11.6 release 2020-12-01 21:45:49 +00:00
jesopo 2552e1cb54 change irctokens dependency from ==2.0.0 to ~=2.0.0 2020-12-01 16:00:15 +00:00
jesopo 17957798bb v0.11.5 release 2020-11-09 03:40:47 +00:00
jesopo f44bbe41e4 don't try to parse info from :source-less PRIVMSG/NOTICE/TAGMSG 2020-11-08 20:02:34 +00:00
jesopo f253159873 v0.11.4 release 2020-10-13 15:05:52 +00:00
jesopo cfdcc8d7e7 requirements.txt: 'pendulum ==2.1.0' -> 'pendulum ~=2.1.0' 2020-10-12 22:05:28 +00:00
jesopo f51f1b689e change pendulum dep from "==2.1.0" to ">=2.1.0" 2020-10-03 23:08:34 +00:00
jesopo eb216e9abf v0.11.3 release 2020-10-03 21:22:39 +00:00
jesopo 58f83ad3de parse_tokens() was split out from recv() 2020-10-03 17:48:50 +00:00
jesopo b4f91148eb slim down README.md socket-to-state example 2020-10-03 17:37:35 +00:00
jesopo 2b6d2bf7af add a simpler example to README.md 2020-10-03 17:29:33 +00:00
jesopo 412f829cb4 POST -> PORT typo 2020-10-03 17:26:49 +00:00
jesopo 4849010938 upgrade irctokens to v2.0.0 2020-09-30 20:06:25 +00:00
jesopo 76e29d7bad v0.11.2 release 2020-09-30 09:10:56 +00:00
jesopo 5a85e53485 channel.list_modes should always have keys, even if empty
closes #1
2020-09-29 11:55:15 +00:00
jesopo e062b7b71f +kli are not list modes 2020-09-29 11:54:50 +00:00
jesopo c841d1d6dd some test numerics were missing args 2020-09-29 11:44:45 +00:00
jesopo 07ed0bf13c WHOX IP must parse correctly (and we'll compress them) 2020-09-29 11:02:07 +00:00
jesopo ca9abfc34b fix tests for casefolding now that we've swapped ^ and ~ 2020-08-17 17:00:37 +00:00
jesopo 66d6bba298 RFC2812 says []\~ is lower of {}|^, irc2 disagrees on ~ vs ^ 2020-08-17 16:56:05 +00:00
jesopo ee5b0ceb4f v0.11.1 release 2020-08-07 14:53:30 +00:00
jesopo dabb59d05f fix recv() typehinting 2020-08-07 14:53:12 +00:00
jesopo 5165573133 v0.11.0 release 2020-08-07 14:50:09 +00:00
jesopo bf0f2fdc9f recv() doesn't call parse_tokens anymore - batch lines change state 2020-08-07 14:49:20 +00:00
jesopo 2c1468295e simplify parsing channel MODE & RPL_CHANNELMODEIS 2020-08-03 21:03:45 +00:00
jesopo c27c48af54 IPs are a static connection property, dont overwrite if gone 2020-07-14 12:16:43 +00:00
jesopo bf16308455 v0.10.3 release 2020-07-13 11:34:33 +01:00
jesopo 8a31f0190d save numeric ip result from WHOX 2020-07-12 23:16:33 +01:00
jesopo 2fb81e7aef parse out RPL_BANLIST and RPL_QUIETLIST 2020-07-11 15:36:21 +01:00
jesopo 87f85ba57c update README.md contact section to point to freenode 2020-07-10 12:09:11 +01:00
jesopo 9806a6407b pull server from WHO and WHOX 2020-07-09 11:04:44 +01:00
jesopo 01d8b8d111 remove probably copy-pasted CHGHOST from UserTestWHOIS 2020-07-08 23:42:23 +01:00
jesopo 875e912896 v0.10.2 release 2020-07-08 23:19:50 +01:00
jesopo 50a63ce12c parse out RPL_AWAY 2020-07-07 14:27:40 +01:00
jesopo 74490f616a pull away state out of WHO/WHOX 2020-07-06 20:53:09 +01:00
jesopo adacb19c77 use line.source, that's what str(hostmask) does, null source throws on hostmask 2020-07-03 23:15:39 +01:00
jesopo d76f50dac0 update irctokens to v1.1.0 2020-07-03 23:13:20 +01:00
jesopo 98823298e6 v0.10.1 release 2020-07-01 17:56:07 +01:00
jesopo 20d2f1a1db v0.10.0 release 2020-06-24 10:07:30 +01:00
jesopo 114688e266 update README.md away from channel_users/user_channels 2020-06-21 22:08:58 +01:00
jesopo 7a87b7b448 add ChannelUser.__repr__ 2020-06-21 22:04:26 +01:00
jesopo c32b4bdd62 python3.6 doesn't have native dataclasses! 2020-06-21 18:47:45 +01:00
jesopo 83b31b6b2b add missing names.py file 2020-06-21 18:45:46 +01:00
jesopo ea421f09af pass around nickname/channelname as Name objects, give to ChannelUser 2020-06-21 18:43:55 +01:00
jesopo 8b91dc09e3 slightly more efficient casefolding 2020-06-21 18:23:18 +01:00
jesopo 37227b6463 remove Named, give (nick)name/_lower to User/Channel ctors 2020-06-21 18:12:37 +01:00
jesopo 40ec25de2b don't casefold twice for self NICK 2020-06-21 00:24:45 +01:00
jesopo e1286f16c6 python 3.6 doesn't have native dataclasses 2020-06-21 00:12:37 +01:00
jesopo fdcf216255 rename isupport.chanmodes groups 2020-06-21 00:08:43 +01:00
jesopo 46a1d2bda8 ISupport.tokens(List[str]) -> ISupport.from_tokens(List[str]) 2020-06-21 00:08:22 +01:00
jesopo c75a62f5d8 refactor CHANMODES logic. less redundant bool checking 2020-06-21 00:01:03 +01:00
jesopo 3290c33106 v0.9.19 release 2020-06-14 19:54:01 +01:00
jesopo 85794909d0 fix usermode change iterate typehints 2020-06-14 19:51:19 +01:00
jesopo 40839c1755 change how channel mode emit.tokens works ("+b" -> "+b mask") 2020-06-14 19:47:52 +01:00
jesopo 251d588ee8 v0.9.18 release 2020-06-07 20:39:30 +01:00
jesopo 122fe23da6 add ERR_NOSUCHSERVER 2020-06-07 20:39:11 +01:00
jesopo 4dbf2c1981 v0.9.17 release 2020-06-07 20:20:13 +01:00
jesopo c68c59a534 update irctokens to v1.0.2 2020-06-07 20:19:56 +01:00
jesopo 6f79d97967 v0.9.16 release 2020-06-07 20:07:32 +01:00
jesopo 92c883ded2 v0.9.15 release (bad pypi package) 2020-06-07 18:49:38 +01:00
jesopo 80ef9a9edb add RPL_ENDOFWHOWAS 2020-06-07 17:51:57 +01:00
jesopo 6ee5c75790 v0.9.14 release 2020-06-07 17:41:52 +01:00
jesopo 191e8fdba3 add RPL_WHOWASUSER 2020-06-07 17:34:49 +01:00
jesopo 274b76ba56 add ERR_NOSUCHNICK 2020-06-05 13:09:13 +01:00
jesopo bd3fd12a84 user.hostmask() will never be None 2020-06-04 00:14:46 +01:00
jesopo 232aa3bb61 v0.9.13 release 2020-06-03 21:31:43 +01:00
jesopo e7d14e6f67 add user.hostmask():str and user.userhost():Optional[str] 2020-06-03 21:30:05 +01:00
jesopo d212fbc11c v0.9.12 release 2020-06-02 20:36:59 +10:00
jesopo 5878f946e8 add RPL_TRYAGAIN and ERR_BADCHANNEL 2020-06-02 20:36:13 +10:00
jesopo a434f19b9e v0.9.11 release 2020-05-07 08:40:05 +10:00
jesopo 56d047de8d switch datetimes to pendulum because stdlib doesnt handle timezones well 2020-05-07 08:38:24 +10:00
jesopo 271cadf666 use utcfromtimestamp for unit test datetimes. timezones are hard 2020-05-05 22:45:34 +01:00
jesopo a1e5c07dbc parse unix timestamps as utc 2020-05-05 20:43:09 +01:00
jesopo 178f08d5b0 v0.9.10 release 2020-04-29 14:54:31 +01:00
jesopo 2355b42fbb update irctokens to v1.0.0 2020-04-29 14:52:02 +01:00
jesopo d0a3aed19f support RPL_LOGGEDIN and RPL_LOGGEDOUT (with tests) 2020-04-29 14:48:03 +01:00
jesopo 5c5c6fca2b add RPL_LOGGEDIN/RPL_LOGGEDOUT 2020-04-29 14:35:25 +01:00
jesopo f74294b7bb v0.9.9 release 2020-04-28 01:38:04 +01:00
jesopo 39694beff4 add ERR_TOOMANYCHANNELS 2020-04-28 01:37:29 +01:00
jesopo 8e4bbeb790 v0.9.8 release 2020-04-28 01:35:44 +01:00
jesopo 0818400221 add JOIN ERR_ numerics 2020-04-28 01:34:00 +01:00
jesopo 8fbf66fe71 v0.9.7 release 2020-04-28 00:29:40 +01:00
jesopo fb4045f2f8 add NICK ERR_ numerics 2020-04-28 00:25:04 +01:00
jesopo 500859e9c0 import ServerDisconnectedError to __init__.py 2020-04-28 00:24:20 +01:00
jesopo 09acc74412 v0.9.6 release 2020-04-23 14:33:15 +01:00
jesopo e02a535cd0 actually initialise `emits` 2020-04-23 14:32:23 +01:00
jesopo febc891c45 v0.9.5 release 2020-04-22 18:00:06 +01:00
jesopo 31ab106e25 only return 1 emit (or None) 2020-04-22 17:58:28 +01:00
jesopo 909f72ece6 v0.9.4 release 2020-04-21 20:39:39 +01:00
jesopo 5a8ac14896 update irctokens to v0.9.6 2020-04-21 20:38:40 +01:00
jesopo db9c6d48d1 user.channels is now Set[str], CUser has no .user/.channel now 2020-04-21 20:35:17 +01:00
jesopo 79ee3d8874 rename channel users on NICK 2020-04-21 15:36:06 +01:00
jesopo 959b288b3c store channel membership on Channel and User objects, change tests 2020-04-21 14:28:04 +01:00
jesopo b587936c7f add ERR_NOSUCHCHANNEL 2020-04-20 17:10:56 +01:00
jesopo 985d800982 add test case for WHOX without account 2020-04-20 13:54:20 +01:00
jesopo 6eb429d6d5 v0.9.3 release 2020-04-19 14:13:48 +01:00
jesopo ba2410c539 update irctokens to v0.9.5 2020-04-19 14:12:15 +01:00
jesopo 548bd722a8 v0.9.2 release 2020-04-19 02:14:15 +01:00
jesopo c0339c11cb add return type to prepare_whox() 2020-04-19 02:13:08 +01:00
jesopo 0d9f8dd268 add WHOX support, add tests for WHO and WHOX 2020-04-19 02:12:04 +01:00
jesopo df3aba9521 RPL_ENDOFBANLIST should be 368 not 367 2020-04-19 02:11:35 +01:00
jesopo dc878df2f4 remove NUMERIC_NUMBERS; i dont think we'll need it 2020-04-19 01:44:47 +01:00
jesopo 0ed79f18f5 v0.9.1 release 2020-04-18 15:11:04 +01:00
jesopo 09ea845d2d use RPL_/ERR_ consts, not magic strings 2020-04-18 15:08:54 +01:00
jesopo e62e22d663 add numerics.RPL_ENDOFWHO 2020-04-18 15:07:43 +01:00
jesopo 872cb84c0e v0.9.0 release 2020-04-17 20:59:59 +01:00
jesopo 21a6c4c47d change numerics.py in to consts, not just dict lookups 2020-04-17 20:58:17 +01:00
jesopo 32e7ba230e v0.8.9 release 2020-04-13 17:37:38 +01:00
jesopo 3fdbe04e0f update irctokens reference to v0.9.4 2020-04-13 17:35:18 +01:00
jesopo 25408c7e5e v0.8.8 release 2020-04-11 14:06:35 +01:00
jesopo 517737a921 update irctokens reference to v0.9.3 2020-04-11 14:05:14 +01:00
jesopo f705f20e94 add unit tests for MODE emits 2020-04-10 16:05:25 +01:00
jesopo f9034d6a1b put tokens on to MODE emits 2020-04-10 10:55:57 +01:00
jesopo 05644d3ff1 v0.8.7 release 2020-04-08 22:03:59 +01:00
jesopo c902c1026c add RPL_WHOISACCOUNT 2020-04-08 22:01:53 +01:00
jesopo fe505ce4d2 v0.8.6 release 2020-04-08 21:44:02 +01:00
jesopo 0257a88f87 add RPL_WHOISCHANNELS 2020-04-08 21:43:38 +01:00
jesopo 90def4aba5 add WHOIS numerics 2020-04-08 21:23:02 +01:00
27 changed files with 1141 additions and 503 deletions

View File

@ -1,14 +1,13 @@
language: python
cache: pip
python:
- "3.6"
- "3.7"
- "3.8"
- "3.8-dev"
- "3.9"
install:
- pip3 install mypy -r requirements.txt
script:
- pip3 install mypy types-cachetools -r requirements-dev.txt
before_script:
- pip3 freeze
- mypy ircstates
script:
- python3 -m unittest test

View File

@ -13,44 +13,50 @@ additional arbitrary functionality on top of it.
## usage
### simple
```python
import ircstates
server = ircstates.Server("freenode")
lines = server.recv(b":server 001 nick :hello world!\r\n")
lines += server.recv(b":nick JOIN #chan\r\n")
for line in lines:
server.parse_tokens(line)
chan = server.channels["#chan"]
```
### socket to state
```python
import ircstates, irctokens, socket
NICK = "nickname"
CHAN = "#chan"
HOST = "127.0.0.1"
POST = 6667
PORT = 6667
server = ircstates.Server("freenode")
sock = socket.socket()
encoder = irctokens.StatefulEncoder()
sock.connect((HOST, POST))
def _send(raw):
tokens = irctokens.tokenise(raw)
encoder.push(tokens)
sock.connect((HOST, PORT))
def _send(raw: str):
sock.sendall(f"{raw}\r\n".encode("utf8"))
_send("USER test 0 * test")
_send(f"NICK {NICK}")
while True:
while encoder.pending():
sent_lines = encoder.pop(sock.send(encoder.pending()))
for line in sent_lines:
print(f"> {line.format()}")
recv_lines = server.recv(sock.recv(1024))
recv_data = sock.recv(1024)
recv_lines = server.recv(recv_data)
for line in recv_lines:
server.parse_tokens(line)
print(f"< {line.format()}")
# user defined behaviors...
if line.command == "PING":
_send(f"PONG :{line.params[0]}")
if line.command == "001" and not CHAN in server.channels:
_send(f"JOIN {CHAN}")
```
### get a user's channels
@ -60,8 +66,8 @@ while True:
>>> user = server.users["nickname"]
>>> user
User(nickname='nickname')
>>> server.user_channels[user]
{Channel(name='#chan')}
>>> user.channels
{'#chan'}
```
### get a channel's users
@ -71,21 +77,20 @@ User(nickname='nickname')
>>> channel = server.channels["#chan"]
>>> channel
Channel(name='#chan')
>>> server.channel_users[channel]
{User(nickname='nickname'): ChannelUser(user='nickname', channel='#chan', modes='ov')}
>>> channel.users
{'jess': ChannelUser(#chan jess)}
```
### get a user's modes in channel
```python
>>> user = server.users["nickname"]
>>> channel = server.channels["#chan"]
>>> channel_user = server.channel_users[channel][user]
>>> channel_user = channel.users["nickname"]
>>> channel_user
ChannelUser(user='nickname', channel='#chan', modes='ov')
ChannelUser(#chan jess +ov)
>>> channel_user.modes
{'o', 'v'}
```
## contact
Come say hi at [#irctokens on irc.tilde.chat](https://web.tilde.chat/?join=%23irctokens)
Come say hi at `#irctokens` on irc.libera.chat

View File

@ -1 +1 @@
0.8.5
0.12.1

View File

@ -1,7 +1,6 @@
from .server import Server
from .server import Server, ServerDisconnectedException
from .user import User
from .channel import Channel
from .channel_user import Channel
from .casemap import casefold
from .channel_user import ChannelUser
from .casemap import casefold, CaseMap
from .emit import *
from .numerics import NUMERIC_NAMES, NUMERIC_NUMBERS

View File

@ -1,18 +1,21 @@
import string
from typing import List
from enum import Enum
from string import ascii_lowercase, ascii_uppercase
from typing import Dict, List, Optional
ASCII_UPPER = list(string.ascii_uppercase)
ASCII_LOWER = list(string.ascii_lowercase)
RFC1459_UPPER = ASCII_UPPER+list("[]~\\")
RFC1459_LOWER = ASCII_LOWER+list("{}^|")
class CaseMap(Enum):
ASCII = "ascii"
RFC1459 = "rfc1459"
def _replace(s: str, upper: List[str], lower: List[str]):
for i, char in enumerate(upper):
s = s.replace(char, lower[i])
return s
def casefold(mapping: str, s: str):
if mapping == "rfc1459":
return _replace(s, RFC1459_UPPER, RFC1459_LOWER)
elif mapping == "ascii":
return _replace(s, ASCII_UPPER, ASCII_LOWER)
CASEMAPS: Dict[CaseMap, Dict[int, Optional[int]]] = {
CaseMap.ASCII: str.maketrans(
r"ABCDEFGHIJKLMNOPQRSTUVWXYZ",
r"abcdefghijklmnopqrstuvwxyz"
),
CaseMap.RFC1459: str.maketrans(
r"ABCDEFGHIJKLMNOPQRSTUVWXYZ\[]^",
r"abcdefghijklmnopqrstuvwxyz|{}~"
)
}
def casefold(casemap_name: CaseMap, s: str):
casemap = CASEMAPS[casemap_name]
return s.translate(casemap)

View File

@ -1,32 +1,53 @@
from datetime import datetime
from typing import Dict, List, Optional, Set
from .named import Named
from typing import Dict, List, Optional, Set
from pendulum import DateTime
class Channel(Named):
def __init__(self, name: str):
self.name = name
from .channel_user import ChannelUser
from .names import Name
class Channel(object):
def __init__(self, name: Name):
self._name = name
self.users: Dict[str, ChannelUser] = {}
self.topic: Optional[str] = None
self.topic_setter: Optional[str] = None
self.topic_time: Optional[datetime] = None
self.topic_time: Optional[DateTime] = None
self.created: Optional[datetime] = None
self.created: Optional[DateTime] = None
self.list_modes: Dict[str, List[str]] = {}
self.modes: Dict[str, Optional[str]] = {}
self._list_modes_temp: Dict[str, List[str]] = {}
self.list_modes: Dict[str, List[str]] = {}
self.modes: Dict[str, Optional[str]] = {}
def __repr__(self) -> str:
return f"Channel(name={self.name!r})"
def get_name(self) -> Name:
return self._name
@property
def name(self) -> str:
return self._name.normal
@property
def name_lower(self) -> str:
return self._name.folded
def change_name(self,
normal: str,
folded: str):
self._name.normal = normal
self._name.folded = folded
def add_mode(self,
char: str,
param: Optional[str],
list_mode: bool):
if list_mode:
if not char in self.list_modes:
self.list_modes[char] = []
if not param in self.list_modes[char]:
self.list_modes[char].append(param or "")
if param is not None:
if not char in self.list_modes:
self.list_modes[char] = []
if not param in self.list_modes[char]:
self.list_modes[char].append(param)
else:
self.modes[char] = param
@ -34,9 +55,8 @@ class Channel(Named):
char: str,
param: Optional[str]):
if char in self.list_modes:
if param in self.list_modes[char]:
if (param is not None and
param in self.list_modes[char]):
self.list_modes[char].remove(param)
if not self.list_modes[char]:
del self.list_modes[char]
elif char in self.modes:
del self.modes[char]

View File

@ -1,16 +1,31 @@
from typing import List
from .user import User
from .channel import Channel
from typing import List, Optional, Set
from .names import Name
from pendulum import DateTime, now
class ChannelUser(object):
def __init__(self,
channel: Channel,
user: User):
self.channel = channel
self.user = user
self.modes: List[str] = []
nickname: Name,
channel_name: Name):
self._nickname = nickname
self._channel_name = channel_name
self.modes: Set[str] = set()
self.since = now("utc")
self.joined: Optional[DateTime] = None
def __repr__(self) -> str:
return (f"ChannelUser(user={self.user.nickname!r},"
f" channel={self.channel.name!r},"
f" modes={''.join(self.modes)!r})")
outs: List[str] = [self.channel, self.nickname]
if self.modes:
outs.append(f"+{''.join(self.modes)}")
return f"ChannelUser({' '.join(outs)})"
@property
def nickname(self) -> str:
return self._nickname.normal
@property
def nickname_lower(self) -> str:
return self._nickname.folded
@property
def channel(self) -> str:
return self._channel_name.normal

View File

@ -1,8 +1,28 @@
from typing import Dict, List, Optional
from .tokens import ChanModes, Prefix
from ..casemap import CaseMap
CASEMAPPINGS = ["rfc1459", "ascii"]
def _parse_escapes(s: str):
idx = 0
out = ""
while idx < (len(s)):
if s[idx] == "\\":
if s[idx+1:]:
if (s[idx+1] == "x" and
len(s[idx+2:]) >= 2):
out += chr(int(s[idx+2:idx+4], 16))
idx += 4
else:
out += s[idx+1]
idx += 2
else:
out += s[idx]
idx += 1
return out
class ISupport(object):
raw: Dict[str, Optional[str]]
@ -12,7 +32,7 @@ class ISupport(object):
prefix = Prefix(["o", "v"], ["@", "+"])
modes: int = 3 # -1 if "no limit"
casemapping: str = "rfc1459"
casemapping: CaseMap = CaseMap.RFC1459
chantypes: List[str] = ["#"]
statusmsg: List[str] = []
@ -23,13 +43,15 @@ class ISupport(object):
monitor: Optional[int] = None # -1 if "no limit"
watch: Optional[int] = None # -1 if "no limit"
whox: bool = False
nicklen: int = 9 # from RFC1459
def __init__(self):
self.raw = {}
def tokens(self, tokens: List[str]):
def from_tokens(self, tokens: List[str]):
for token in tokens:
key, sep, value = token.partition("=")
value = _parse_escapes(value)
self.raw[key] = value if sep else None
if key == "NETWORK":
@ -54,8 +76,7 @@ class ISupport(object):
self.watch = int(value) if value else -1
elif key == "CASEMAPPING":
if value in CASEMAPPINGS:
self.casemapping = value
self.casemapping = CaseMap(value)
elif key == "CHANTYPES":
self.chantypes = list(value)
@ -69,3 +90,6 @@ class ISupport(object):
elif key == "WHOX":
self.whox = True
elif key == "NICKLEN":
self.nicklen = int(value)

View File

@ -2,15 +2,14 @@ from typing import List, Optional
class ChanModes(object):
def __init__(self,
list_modes: List[str],
setting_b_modes: List[str],
setting_c_modes: List[str],
setting_d_modes: List[str]):
self.list_modes = list_modes
self.setting_b_modes = setting_b_modes
self.setting_c_modes = setting_c_modes
self.setting_d_modes = setting_d_modes
a_modes: List[str],
b_modes: List[str],
c_modes: List[str],
d_modes: List[str]):
self.a_modes = a_modes
self.b_modes = b_modes
self.c_modes = c_modes
self.d_modes = d_modes
class Prefix(object):
def __init__(self,

View File

@ -1,4 +0,0 @@
from typing import Optional
class Named(object):
name: str

7
ircstates/names.py Normal file
View File

@ -0,0 +1,7 @@
class Name(object):
def __init__(self,
normal: str,
folded: str):
self.normal = normal
self.folded = folded

View File

@ -1,34 +1,83 @@
NUMERIC_NUMBERS = {}
NUMERIC_NAMES = {}
def _numeric(number: str, name: str):
NUMERIC_NUMBERS[number] = name
NUMERIC_NAMES[name] = number
RPL_WELCOME = "001"
RPL_ISUPPORT = "005"
RPL_MOTD = "372"
RPL_MOTDSTART = "375"
RPL_ENDOFMOTD = "376"
ERR_NOMOTD = "422"
RPL_UMODEIS = "221"
RPL_VISIBLEHOST = "396"
RPL_TRYAGAIN = "263"
RPL_YOUREOPER = "381"
_numeric("001", "RPL_WELCOME")
_numeric("005", "RPL_ISUPPORT")
_numeric("221", "RPL_UMODEIS")
ERR_NOSUCHNICK = "401"
ERR_NOSUCHSERVER = "402"
_numeric("311", "RPL_WHOISUSER")
RPL_CHANNELMODEIS = "324"
RPL_CREATIONTIME = "329"
RPL_TOPIC = "332"
RPL_TOPICWHOTIME = "333"
_numeric("324", "RPL_CHANNELMODEIS")
_numeric("329", "RPL_CREATIONTIME")
_numeric("332", "RPL_TOPIC")
_numeric("333", "RPL_TOPICWHOTIME")
RPL_WHOREPLY = "352"
RPL_WHOSPCRPL = "354"
RPL_ENDOFWHO = "315"
RPL_NAMREPLY = "353"
RPL_ENDOFNAMES = "366"
_numeric("352", "RPL_WHOREPLY")
_numeric("353", "RPL_NAMREPLY")
_numeric("366", "RPL_ENDOFNAMES")
RPL_WHOWASUSER = "314"
RPL_ENDOFWHOWAS = "369"
_numeric("372", "RPL_MOTD")
_numeric("375", "RPL_MOTDSTART")
_numeric("396", "RPL_VISIBLEHOST")
RPL_BANLIST = "367"
RPL_ENDOFBANLIST = "368"
RPL_QUIETLIST = "728"
RPL_ENDOFQUIETLIST = "729"
_numeric("903", "RPL_SASLSUCCESS")
_numeric("904", "ERR_SASLFAIL")
_numeric("905", "ERR_SASLTOOLONG")
_numeric("906", "ERR_SASLABORTED")
_numeric("907", "ERR_SASLALREADY")
_numeric("908", "RPL_SASLMECHS")
RPL_LOGGEDIN = "900"
RPL_LOGGEDOUT = "901"
RPL_SASLSUCCESS = "903"
ERR_SASLFAIL = "904"
ERR_SASLTOOLONG = "905"
ERR_SASLABORTED = "906"
ERR_SASLALREADY = "907"
RPL_SASLMECHS = "908"
RPL_WHOISUSER = "311"
RPL_WHOISSERVER = "312"
RPL_WHOISOPERATOR = "313"
RPL_WHOISIDLE = "317"
RPL_WHOISCHANNELS = "319"
RPL_WHOISACCOUNT = "330"
RPL_WHOISHOST = "378"
RPL_WHOISMODES = "379"
RPL_WHOISSECURE = "671"
RPL_AWAY = "301"
RPL_ENDOFWHOIS = "318"
ERR_ERRONEUSNICKNAME = "432"
ERR_NICKNAMEINUSE = "433"
ERR_BANNICKCHANGE = "435"
ERR_UNAVAILRESOURCE = "437"
ERR_NICKTOOFAST = "438"
ERR_CANTCHANGENICK = "447"
ERR_NOSUCHCHANNEL = "403"
ERR_TOOMANYCHANNELS = "405"
ERR_USERONCHANNEL = "443"
ERR_LINKCHANNEL = "470"
ERR_BADCHANNAME = "479"
ERR_BADCHANNEL = "926"
ERR_BANNEDFROMCHAN = "474"
ERR_INVITEONLYCHAN = "473"
ERR_BADCHANNELKEY = "475"
ERR_CHANNELISFULL = "471"
ERR_NEEDREGGEDNICK = "477"
ERR_THROTTLE = "480"
RPL_LOGOFF = "601"
RPL_MONONLINE = "730"
RPL_MONOFFLINE = "731"
RPL_RSACHALLENGE2 = "740"
RPL_ENDOFRSACHALLENGE2 = "741"

View File

@ -1,16 +1,18 @@
from ipaddress import ip_address
from typing import Callable, Dict, List, Optional, Set, Tuple
from datetime import datetime
from irctokens import build, Hostmask, Line, StatefulDecoder, StatefulEncoder
from irctokens import Line, build, Hostmask, StatefulDecoder, StatefulEncoder
from irctokens import hostmask as hostmask_
from pendulum import from_timestamp, now
from .named import Named
from .user import User
from .channel import Channel
from .channel_user import ChannelUser
from .isupport import ISupport
from .decorators import handler_decorator
from .casemap import casefold
from .names import Name
from .emit import *
from .numerics import NUMERIC_NUMBERS
from .numerics import *
LINE_HANDLERS: Dict[str, List[Callable[["Server", Line], Emit]]] = {}
line_handler = handler_decorator(LINE_HANDLERS)
@ -20,7 +22,10 @@ class ServerException(Exception):
class ServerDisconnectedException(ServerException):
pass
class Server(Named):
WHO_TYPE = "735" # randomly generated
TYPE_EMIT = Optional[Emit]
class Server(object):
def __init__(self, name: str):
self.name = name
@ -30,18 +35,18 @@ class Server(Named):
self.hostname: Optional[str] = None
self.realname: Optional[str] = None
self.account: Optional[str] = None
self.server: Optional[str] = None
self.away: Optional[str] = None
self.ip: Optional[str] = None
self.registered = False
self.modes: List[str] = []
self.modes: Set[str] = set()
self.motd: List[str] = []
self._decoder = StatefulDecoder()
self.users: Dict[str, User] = {}
self.channels: Dict[str, Channel] = {}
self.user_channels: Dict[User, Set[Channel]] = {}
self.channel_users: Dict[Channel, Dict[User, ChannelUser]] = {}
self.users: Dict[str, User] = {}
self.channels: Dict[str, Channel] = {}
self.isupport = ISupport()
@ -53,66 +58,71 @@ class Server(Named):
def __repr__(self) -> str:
return f"Server(name={self.name!r})"
def recv(self, data: bytes) -> List[Tuple[Line, List[Emit]]]:
def recv(self, data: bytes) -> List[Line]:
lines = self._decoder.push(data)
if lines is None:
raise ServerDisconnectedException()
emits: List[List[Emit]] = []
for line in lines:
emits.append(self.parse_tokens(line))
return list(zip(lines, emits))
return lines
def parse_tokens(self, line: Line):
emits: List[Emit] = []
commands = [line.command]
if (line.command.isdigit() and
len(line.command) == 3 and
line.command in NUMERIC_NUMBERS):
commands.append(NUMERIC_NUMBERS[line.command])
for command in commands:
if command in LINE_HANDLERS:
for callback in LINE_HANDLERS[command]:
emit = callback(self, line)
def parse_tokens(self, line: Line) -> TYPE_EMIT:
ret_emit: TYPE_EMIT = None
if line.command in LINE_HANDLERS:
for callback in LINE_HANDLERS[line.command]:
emit = callback(self, line)
if emit is not None and ret_emit is None:
emit.command = line.command
emits.append(emit)
return emits
ret_emit = emit
return ret_emit
def casefold(self, s1: str):
return casefold(self.isupport.casemapping, s1)
def casefold_equals(self, s1: str, s2: str):
return self.casefold(s1) == self.casefold(s2)
def is_me(self, nickname: str):
return self.casefold(nickname) == self.nickname_lower
def has_user(self, nickname: str) -> bool:
return self.casefold(nickname) in self.users
def create_user(self, nickname: str, nickname_lower: str):
return User(nickname, nickname_lower)
def _add_user(self, nickname: str, nickname_lower: str):
user = self.create_user(nickname, nickname_lower)
user = self.create_user(Name(nickname, nickname_lower))
self.users[nickname_lower] = user
def is_channel(self, target: str) -> bool:
return target[:1] in self.isupport.chantypes
def has_channel(self, name: str) -> bool:
return self.casefold(name) in self.channels
def create_channel(self, name: str) -> Channel:
return Channel(name)
def get_channel(self, name: str) -> Optional[Channel]:
return self.channels.get(self.casefold(name), None)
def _user_join(self, channel: Channel, user: User) -> ChannelUser:
channel_user = ChannelUser(channel, user)
if not user in self.user_channels:
self.user_channels[user] = set([])
def create_user(self, nickname: Name) -> User:
return User(nickname)
self.user_channels[user].add(channel)
self.channel_users[channel][user] = channel_user
def create_channel(self, name: Name) -> Channel:
return Channel(name)
def _user_join(self, channel: Channel, user: User) -> ChannelUser:
channel_user = ChannelUser(
user.get_name(),
channel.get_name())
user.channels.add(self.casefold(channel.name))
channel.users[user.nickname_lower] = channel_user
return channel_user
def prepare_whox(self, target: str) -> Line:
return build("WHO", [target, f"n%afhinrstu,{WHO_TYPE}"])
def _self_hostmask(self, hostmask: Hostmask):
self.nickname = hostmask.nickname
if hostmask.username:
self.username = hostmask.username
if hostmask.hostname:
self.hostname = hostmask.hostname
def _emit(self) -> Emit:
return Emit()
@line_handler("RPL_WELCOME")
@line_handler(RPL_WELCOME)
# first message reliably sent to us after registration is complete
def _handle_welcome(self, line: Line) -> Emit:
self.nickname = line.params[0]
@ -120,20 +130,20 @@ class Server(Named):
self.registered = True
return self._emit()
@line_handler("RPL_ISUPPORT")
@line_handler(RPL_ISUPPORT)
# https://defs.ircdocs.horse/defs/isupport.html
def _handle_ISUPPORT(self, line: Line) -> Emit:
self.isupport.tokens(line.params[1:-1])
self.isupport.from_tokens(line.params[1:-1])
return self._emit()
@line_handler("RPL_MOTDSTART")
@line_handler(RPL_MOTDSTART)
# start of MOTD
def _handle_motd_start(self, line: Line) -> Emit:
self.motd.clear()
return self._emit()
@line_handler("RPL_MOTDSTART")
@line_handler(RPL_MOTDSTART)
# start of MOTD
@line_handler("RPL_MOTD")
@line_handler(RPL_MOTD)
# line of MOTD
def _handle_motd_line(self, line: Line) -> Emit:
emit = self._emit()
@ -144,23 +154,28 @@ class Server(Named):
@line_handler("NICK")
def _handle_NICK(self, line: Line) -> Emit:
new_nickname = line.params[0]
nickname_lower = self.casefold(line.hostmask.nickname)
new_nickname = line.params[0]
new_nickname_lower = self.casefold(new_nickname)
nickname_lower = self.casefold(line.hostmask.nickname)
emit = self._emit()
if nickname_lower in self.users:
user = self.users.pop(nickname_lower)
emit.user = user
new_nickname_lower = self.casefold(new_nickname)
user._set_nickname(new_nickname, new_nickname_lower)
user.change_nickname(new_nickname, new_nickname_lower)
self.users[new_nickname_lower] = user
for channel_lower in user.channels:
channel = self.channels[channel_lower]
channel_user = channel.users.pop(nickname_lower)
channel.users[user.nickname_lower] = channel_user
if nickname_lower == self.nickname_lower:
emit.self = True
self.nickname = new_nickname
self.nickname_lower = self.casefold(new_nickname)
self.nickname = new_nickname
self.nickname_lower = new_nickname_lower
return emit
@line_handler("JOIN")
@ -177,13 +192,16 @@ class Server(Named):
if nickname_lower == self.nickname_lower:
emit.self = True
if not channel_lower in self.channels:
channel = self.create_channel(line.params[0])
channel = self.create_channel(
Name(line.params[0], channel_lower)
)
#TODO: put this somewhere better
for mode in self.isupport.chanmodes.a_modes:
channel.list_modes[mode] = []
self.channels[channel_lower] = channel
self.channel_users[channel] = {}
if line.hostmask.username:
self.username = line.hostmask.username
if line.hostmask.hostname:
self.hostname = line.hostmask.hostname
self._self_hostmask(line.hostmask)
if extended:
self.account = account
self.realname = realname
@ -204,7 +222,8 @@ class Server(Named):
user.account = account
user.realname = realname
self._user_join(channel, user)
channel_user = self._user_join(channel, user)
channel_user.joined = now("utc")
return emit
def _user_part(self, line: Line,
@ -225,22 +244,20 @@ class Server(Named):
nickname_lower = self.casefold(nickname)
if nickname_lower in self.users:
user = self.users[nickname_lower]
user = user
self.user_channels[user].remove(channel)
if not self.user_channels[user]:
user.channels.remove(channel.name_lower)
del channel.users[user.nickname_lower]
if not user.channels:
del self.users[nickname_lower]
del self.user_channels[user]
del self.channel_users[channel][user]
if nickname_lower == self.nickname_lower:
del self.channels[channel_lower]
channel_users = self.channel_users.pop(channel)
for user, cuser in channel_users.items():
self.user_channels[user].remove(channel)
if not self.user_channels[user]:
del self.user_channels[user]
del self.users[self.casefold(user.nickname)]
for key, cuser in channel.users.items():
ruser = self.users[key]
ruser.channels.remove(channel.name_lower)
if not ruser.channels:
del self.users[ruser.nickname_lower]
return emit, user
@ -270,16 +287,15 @@ class Server(Named):
if kicker_lower in self.users:
emit.user_source = self.users[kicker_lower]
else:
emit.user_source = self.create_user(line.hostmask.nickname,
kicker_lower)
emit.user_source = self.create_user(
Name(line.hostmask.nickname, kicker_lower)
)
return emit
def _self_quit(self):
self.users.clear()
self.channels.clear()
self.user_channels.clear()
self.channel_users.clear()
@line_handler("QUIT")
def _handle_quit(self, line: Line) -> Emit:
@ -296,9 +312,9 @@ class Server(Named):
if nickname_lower in self.users:
user = self.users.pop(nickname_lower)
emit.user = user
for channel in self.user_channels[user]:
del self.channel_users[channel][user]
del self.user_channels[user]
for channel_lower in user.channels:
channel = self.channels[channel_lower]
del channel.users[user.nickname_lower]
return emit
@line_handler("ERROR")
@ -306,7 +322,7 @@ class Server(Named):
self._self_quit()
return self._emit()
@line_handler("RPL_NAMREPLY")
@line_handler(RPL_NAMREPLY)
# channel's user list, "NAMES #channel" response (and on-join)
def _handle_names(self, line: Line) -> Emit:
emit = self._emit()
@ -327,30 +343,31 @@ class Server(Named):
else:
break
hostmask = Hostmask.from_source(nickname[len(modes):])
hostmask = hostmask_(nickname[len(modes):])
nickname_lower = self.casefold(hostmask.nickname)
if not nickname_lower in self.users:
self._add_user(hostmask.nickname, nickname_lower)
user = self.users[nickname_lower]
users.append(user)
channel_user = self._user_join(channel, user)
if not nickname_lower in channel.users:
channel_user = self._user_join(channel, user)
else:
channel_user = channel.users[nickname_lower]
if hostmask.username:
user.username = hostmask.username
if nickname_lower == self.nickname_lower:
self.username = hostmask.username
if hostmask.hostname:
user.hostname = hostmask.hostname
if nickname_lower == self.nickname_lower:
self.hostname = hostmask.hostname
if nickname_lower == self.nickname_lower:
self._self_hostmask(hostmask)
for mode in modes:
if not mode in channel_user.modes:
channel_user.modes.append(mode)
channel_user.modes.add(mode)
return emit
@line_handler("RPL_CREATIONTIME")
@line_handler(RPL_CREATIONTIME)
# channel creation time, "MODE #channel" response (and on-join)
def _handle_creation_time(self, line: Line) -> Emit:
emit = self._emit()
@ -358,7 +375,7 @@ class Server(Named):
if channel_lower in self.channels:
channel = self.channels[channel_lower]
emit.channel = channel
channel.created = datetime.fromtimestamp(int(line.params[2]))
channel.created = from_timestamp(int(line.params[2]))
return emit
@line_handler("TOPIC")
@ -369,11 +386,11 @@ class Server(Named):
channel = self.channels[channel_lower]
emit.channel = channel
channel.topic = line.params[1]
channel.topic_setter = str(line.hostmask)
channel.topic_time = datetime.utcnow()
channel.topic_setter = line.source
channel.topic_time = now("utc")
return emit
@line_handler("RPL_TOPIC")
@line_handler(RPL_TOPIC)
# topic text, "TOPIC #channel" response (and on-join)
def _handle_topic_num(self, line: Line) -> Emit:
emit = self._emit()
@ -383,7 +400,7 @@ class Server(Named):
emit.channel = channel
self.channels[channel_lower].topic = line.params[2]
return emit
@line_handler("RPL_TOPICWHOTIME")
@line_handler(RPL_TOPICWHOTIME)
# topic setby, "TOPIC #channel" response (and on-join)
def _handle_topic_time(self, line: Line) -> Emit:
emit = self._emit()
@ -392,38 +409,55 @@ class Server(Named):
channel = self.channels[channel_lower]
emit.channel = channel
channel.topic_setter = line.params[2]
channel.topic_time = datetime.fromtimestamp(int(line.params[3]))
channel.topic_time = from_timestamp(int(line.params[3]))
return emit
def _channel_modes(self,
channel: Channel,
modes: List[Tuple[bool, str]],
params: List[str]):
for add, char in modes:
list_mode = char in self.isupport.chanmodes.list_modes
if char in self.isupport.prefix.modes:
nickname_lower = self.casefold(params.pop(0))
modes: List[str],
params: List[str]
) -> List[Tuple[str, Optional[str]]]:
tokens: List[Tuple[str, Optional[str]]] = []
for mode in modes:
add = mode[0] == "+"
char = mode[1]
arg: Optional[str] = None
if char in self.isupport.prefix.modes: # a user's status
arg = params.pop(0)
nickname_lower = self.casefold(arg)
if nickname_lower in self.users:
user = self.users[nickname_lower]
channel_user = self.channel_users[channel][user]
channel_user = channel.users[user.nickname_lower]
if add:
if not char in channel_user.modes:
channel_user.modes.append(char)
elif char in channel_user.modes:
channel_user.modes.remove(char)
elif add and (
list_mode or
char in self.isupport.chanmodes.setting_b_modes or
char in self.isupport.chanmodes.setting_c_modes):
channel.add_mode(char, params.pop(0), list_mode)
elif not add and (
list_mode or
char in self.isupport.chanmodes.setting_b_modes):
channel.remove_mode(char, params.pop(0))
elif add:
channel.add_mode(char, None, False)
channel_user.modes.add(char)
else:
channel_user.modes.discard(char)
else:
channel.remove_mode(char, None)
has_arg = False
is_list = False
if char in self.isupport.chanmodes.a_modes:
has_arg = True
is_list = True
elif add:
has_arg = char in (self.isupport.chanmodes.b_modes+
self.isupport.chanmodes.c_modes)
else: # remove
has_arg = char in self.isupport.chanmodes.b_modes
if has_arg:
arg = params.pop(0)
if add:
channel.add_mode(char, arg, is_list)
else:
channel.remove_mode(char, arg)
tokens.append((mode, arg))
return tokens
@line_handler("MODE")
def _handle_MODE(self, line: Line) -> Emit:
@ -432,33 +466,42 @@ class Server(Named):
modes_str = line.params[1]
params = line.params[2:].copy()
modifier = True
modes: List[Tuple[bool, str]] = []
modifier = "+"
modes: List[str] = []
for c in list(modes_str):
if c == "+":
modifier = True
elif c == "-":
modifier = False
if c in ["+", "-"]:
modifier = c
else:
modes.append((modifier, c))
modes.append(f"{modifier}{c}")
target_lower = self.casefold(target)
if target_lower == self.nickname_lower:
emit.self_target = True
for add, char in modes:
emit.tokens = modes
for mode in modes:
add = mode[0] == "+"
char = mode[1]
if add:
if not char in self.modes:
self.modes.append(char)
elif char in self.modes:
self.modes.remove(char)
self.modes.add(char)
else:
self.modes.discard(char)
elif target_lower in self.channels:
channel = self.channels[self.casefold(target)]
emit.channel = channel
self._channel_modes(channel, modes, params)
ctokens = self._channel_modes(channel, modes, params)
ctokens_str: List[str] = []
for mode, arg in ctokens:
if arg is not None:
ctokens_str.append(f"{mode} {arg}")
else:
ctokens_str.append(mode)
emit.tokens = ctokens_str
return emit
@line_handler("RPL_CHANNELMODEIS")
@line_handler(RPL_CHANNELMODEIS)
# channel modes, "MODE #channel" response (sometimes on-join?)
def _handle_channelmodeis(self, line: Line) -> Emit:
emit = self._emit()
@ -466,17 +509,73 @@ class Server(Named):
if channel_lower in self.channels:
channel = self.channels[channel_lower]
emit.channel = channel
modes = [(True, char) for char in line.params[2].lstrip("+")]
modes = [f"+{char}" for char in line.params[2].lstrip("+")]
params = line.params[3:]
self._channel_modes(channel, modes, params)
return emit
@line_handler("RPL_UMODEIS")
@line_handler(RPL_UMODEIS)
# our own user modes, "MODE nickname" response (sometimes on-connect?)
def _handle_umodeis(self, line: Line) -> Emit:
for char in line.params[1].lstrip("+"):
if not char in self.modes:
self.modes.append(char)
self.modes.add(char)
return self._emit()
def _mode_list(self,
channel_name: str,
mode: str,
mask: str):
channel_lower = self.casefold(channel_name)
if channel_lower in self.channels:
channel = self.channels[channel_lower]
if not mode in channel._list_modes_temp:
channel._list_modes_temp[mode] = []
channel._list_modes_temp[mode].append(mask)
def _mode_list_end(self,
channel_name: str,
mode: str):
channel_lower = self.casefold(channel_name)
if channel_lower in self.channels:
channel = self.channels[channel_lower]
if mode in channel._list_modes_temp:
mlist = channel._list_modes_temp.pop(mode)
channel.list_modes[mode] = mlist
@line_handler(RPL_BANLIST)
def _handle_banlist(self, line: Line) -> Emit:
channel = line.params[1]
mask = line.params[2]
if len(line.params) > 3:
# parse these out but we're not storing them yet
set_by = line.params[3]
set_at = int(line.params[4])
self._mode_list(channel, "b", mask)
return self._emit()
@line_handler(RPL_ENDOFBANLIST)
def _handle_banlist_end(self, line: Line) -> Emit:
channel = line.params[1]
self._mode_list_end(channel, "b")
return self._emit()
@line_handler(RPL_QUIETLIST)
def _handle_quietlist(self, line: Line) -> Emit:
channel = line.params[1]
mode = line.params[2]
mask = line.params[3]
set_by = line.params[4]
set_at = int(line.params[5])
self._mode_list(channel, mode, mask)
return self._emit()
@line_handler(RPL_ENDOFQUIETLIST)
def _handle_quietlist_end(self, line: Line) -> Emit:
channel = line.params[1]
mode = line.params[2]
self._mode_list_end(channel, mode)
return self._emit()
@line_handler("PRIVMSG")
@ -484,6 +583,10 @@ class Server(Named):
@line_handler("TAGMSG")
def _handle_message(self, line: Line) -> Emit:
emit = self._emit()
# this does not visually spark joy
if not line.source:
return emit
message = line.params[1] if line.params[1:] else None
if not message is None:
emit.text = message
@ -491,15 +594,14 @@ class Server(Named):
nickname_lower = self.casefold(line.hostmask.nickname)
if nickname_lower == self.nickname_lower:
emit.self_source = True
if line.hostmask.username:
self.username = line.hostmask.username
if line.hostmask.hostname:
self.hostname = line.hostmask.hostname
self._self_hostmask(line.hostmask)
if nickname_lower in self.users:
user = self.users[nickname_lower]
else:
user = self.create_user(line.hostmask.nickname, nickname_lower)
user = self.create_user(
Name(line.hostmask.nickname, nickname_lower)
)
emit.user = user
if line.hostmask.username:
@ -526,7 +628,7 @@ class Server(Named):
emit.self_target = True
return emit
@line_handler("RPL_VISIBLEHOST")
@line_handler(RPL_VISIBLEHOST)
# our own hostname, sometimes username@hostname, when it changes
def _handle_visiblehost(self, line: Line) -> Emit:
username, _, hostname = line.params[1].rpartition("@")
@ -535,7 +637,7 @@ class Server(Named):
self.username = username
return self._emit()
@line_handler("RPL_WHOREPLY")
@line_handler(RPL_WHOREPLY)
# WHO line, "WHO #channel|nickname" response
def _handle_who(self, line: Line) -> Emit:
emit = self._emit()
@ -543,14 +645,22 @@ class Server(Named):
nickname = line.params[5]
username = line.params[2]
hostname = line.params[3]
status = line.params[6]
away = "" if "G" in status else None
realname = line.params[7].split(" ", 1)[1]
server: Optional[str] = None
if not line.params[4] == "*":
server = line.params[4]
nickname_lower = self.casefold(line.params[5])
if nickname_lower == self.nickname_lower:
emit.self = True
self.username = username
self.hostname = hostname
self.realname = realname
self.server = server
self.away = away
if nickname_lower in self.users:
user = self.users[nickname_lower]
@ -558,9 +668,62 @@ class Server(Named):
user.username = username
user.hostname = hostname
user.realname = realname
user.server = server
user.away = away
return emit
@line_handler("RPL_WHOISUSER")
@line_handler(RPL_WHOSPCRPL)
# WHOX line, "WHO #channel|nickname" response; only listen for our "type"
def _handle_whox(self, line: Line) -> Emit:
emit = self._emit()
if line.params[1] == WHO_TYPE and len(line.params) == 10:
nickname_lower = self.casefold(line.params[6])
username = line.params[2]
hostname = line.params[4]
status = line.params[7]
away = "" if "G" in status else None
realname = line.params[9]
account = ""
if not line.params[8] == "0":
account = line.params[8]
server: Optional[str] = None
if not line.params[5] == "*":
server = line.params[5]
ip: Optional[str] = None
if not line.params[3] == "255.255.255.255":
try:
ip = ip_address(line.params[3]).compressed
except ValueError:
pass
if nickname_lower in self.users:
user = self.users[nickname_lower]
emit.user = user
user.username = username
user.hostname = hostname
user.realname = realname
user.account = account
user.server = server
user.away = away
if ip is not None:
user.ip = ip
if nickname_lower == self.nickname_lower:
emit.self = True
self.username = username
self.hostname = hostname
self.realname = realname
self.account = account
self.server = server
self.away = away
if ip is not None:
self.ip = ip
return emit
@line_handler(RPL_WHOISUSER)
# WHOIS "user" line, one of "WHOIS nickname" response lines
def _handle_whoisuser(self, line: Line) -> Emit:
emit = self._emit()
@ -617,6 +780,39 @@ class Server(Named):
user.realname = realname
return emit
@line_handler("RENAME")
def _handle_RENAME(self, line: Line) -> Emit:
source_fold = self.casefold(line.params[0])
rename = line.params[1]
rename_fold = self.casefold(rename)
if source_fold in self.channels:
channel = self.channels.pop(source_fold)
channel.change_name(rename, rename_fold)
for nickname in channel.users.keys():
user = self.users[nickname]
user.channels.remove(source_fold)
user.channels.add(rename_fold)
self.channels[rename_fold] = channel
return self._emit()
@line_handler(RPL_AWAY)
# sent in response to a command directed at a user who is marked as away
def _handle_RPL_AWAY(self, line: Line) -> Emit:
nickname = line.params[1]
nickname_lower = self.casefold(nickname)
reason = line.params[2]
if nickname_lower == self.nickname_lower:
self.away = reason
if nickname_lower in self.users:
user = self.users[nickname_lower]
user.away = reason
return self._emit()
@line_handler("AWAY")
def _handle_AWAY(self, line: Line) -> Emit:
emit = self._emit()
@ -690,3 +886,22 @@ class Server(Named):
key in self.available_caps):
self.agreed_caps.append(key)
return emit
@line_handler(RPL_LOGGEDIN)
def _handle_loggedin(self, line: Line) -> Emit:
hostmask_str = line.params[1]
hostmask = hostmask_(hostmask_str)
account = line.params[2]
self.account = account
self._self_hostmask(hostmask)
return self._emit()
@line_handler(RPL_LOGGEDOUT)
def _handle_loggedout(self, line: Line) -> Emit:
hostmask_str = line.params[1]
hostmask = hostmask_(hostmask_str)
self.account = None
self._self_hostmask(hostmask)
return self._emit()

View File

@ -1,22 +1,48 @@
from typing import Optional, Set
from .named import Named
from .names import Name
class User(Named):
nickname: str
nickname_lower: str
class User(object):
def __init__(self, nickname: Name):
self._nickname = nickname
def __init__(self, nickname: str, nickname_lower: str):
self._set_nickname(nickname, nickname_lower)
self.username: Optional[str] = None
self.hostname: Optional[str] = None
self.realname: Optional[str] = None
self.account: Optional[str] = None
self.server: Optional[str] = None
self.away: Optional[str] = None
self.ip: Optional[str] = None
self.channels: Set[str] = set([])
def __repr__(self) -> str:
return f"User(nickname={self.nickname!r})"
def _set_nickname(self, nickname: str,
nickname_lower: str):
self.nickname = nickname
self.nickname_lower = nickname_lower
def get_name(self) -> Name:
return self._nickname
@property
def nickname(self) -> str:
return self._nickname.normal
@property
def nickname_lower(self) -> str:
return self._nickname.folded
def change_nickname(self,
normal: str,
folded: str):
self._nickname.normal = normal
self._nickname.folded = folded
def hostmask(self) -> str:
hostmask = self.nickname
if self.username is not None:
hostmask += f"!{self.username}"
if self.hostname is not None:
hostmask += f"@{self.hostname}"
return hostmask
def userhost(self) -> Optional[str]:
if (self.username is not None and
self.hostname is not None):
return f"{self.username}@{self.hostname}"
else:
return None

2
requirements-dev.txt Normal file
View File

@ -0,0 +1,2 @@
-r requirements.txt
freezegun ~=1.1.0

View File

@ -1 +1,2 @@
irctokens ==0.9.2
irctokens ~=2.0.2
pendulum ~=2.1.0

View File

@ -26,6 +26,6 @@ setuptools.setup(
"Operating System :: Microsoft :: Windows",
"Topic :: Communications :: Chat :: Internet Relay Chat"
],
python_requires='>=3.6',
python_requires='>=3.7',
install_requires=install_requires
)

View File

@ -6,3 +6,5 @@ from .cap import *
from .isupport import *
from .casemap import *
from .emit import *
from .who import *
from .sasl import *

View File

@ -3,17 +3,17 @@ import ircstates, irctokens
class CaseMapTestMethod(unittest.TestCase):
def test_rfc1459(self):
lower = ircstates.casefold("rfc1459", "ÀTEST[]~\\")
self.assertEqual(lower, "Àtest{}^|")
lower = ircstates.casefold(ircstates.CaseMap.RFC1459, "ÀTEST[]^\\")
self.assertEqual(lower, "Àtest{}~|")
def test_ascii(self):
lower = ircstates.casefold("ascii", "ÀTEST[]~\\")
lower = ircstates.casefold(ircstates.CaseMap.ASCII, "ÀTEST[]~\\")
self.assertEqual(lower, "Àtest[]~\\")
class CaseMapTestCommands(unittest.TestCase):
def test_join(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":Nickname JOIN #Chan"))
server.parse_tokens(irctokens.tokenise(":Other JOIN #Chan"))
self.assertIn("nickname", server.users)
@ -21,17 +21,14 @@ class CaseMapTestCommands(unittest.TestCase):
self.assertIn("other", server.users)
self.assertNotIn("Other", server.users)
self.assertIn("#chan", server.channels)
self.assertNotIn("#Chan", server.channels)
channel = server.channels["#chan"]
self.assertEqual(channel.name, "#Chan")
user1 = server.users["nickname"]
user2 = server.users["other"]
self.assertIn(user1, server.channel_users[channel])
self.assertIn(user2, server.channel_users[channel])
self.assertEqual(len(server.channel_users[channel]), 2)
def test_nick(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
user = server.users["nickname"]
server.parse_tokens(irctokens.tokenise(":nickname NICK NewNickname"))

View File

@ -1,145 +1,164 @@
import unittest
from datetime import datetime
import pendulum
import ircstates, irctokens
from freezegun import freeze_time
class ChannelTestJoin(unittest.TestCase):
def test_self_join(self):
dt = pendulum.datetime(2021, 9, 6, 2, 55, 22)
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
with freeze_time("2021-09-06 02:55:22"):
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
self.assertIn("#chan", server.channels)
self.assertIn("nickname", server.users)
self.assertEqual(len(server.users), 1)
self.assertEqual(len(server.channels), 1)
self.assertIn(server.channels["#chan"], server.channel_users)
self.assertIn(server.users["nickname"], server.user_channels)
self.assertEqual(len(server.user_channels), 1)
self.assertEqual(len(server.channel_users), 1)
user = server.users["nickname"]
channel = server.channels["#chan"]
self.assertIn(user.nickname_lower, channel.users)
channel_user = channel.users[user.nickname_lower]
self.assertEqual(user.channels, set([channel.name_lower]))
self.assertEqual(channel_user.since, dt)
self.assertEqual(channel_user.joined, dt)
def test_other_join(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
self.assertEqual(len(server.users), 2)
self.assertIn("other", server.users)
self.assertEqual(len(server.user_channels), 2)
self.assertEqual(len(server.channel_users), 1)
channel = server.channels["#chan"]
self.assertEqual(len(channel.users), 2)
user = server.users["other"]
self.assertEqual(user.channels, set([channel.name_lower]))
class ChannelTestPart(unittest.TestCase):
def test_self_part(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":nickname PART #chan"))
self.assertEqual(len(server.users), 0)
self.assertEqual(len(server.channels), 0)
self.assertEqual(len(server.user_channels), 0)
self.assertEqual(len(server.channel_users), 0)
def test_other_part(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other PART #chan"))
user = server.users["nickname"]
channel = server.channels["#chan"]
channel_user = channel.users[user.nickname_lower]
self.assertEqual(len(server.users), 1)
self.assertEqual(len(server.channels), 1)
self.assertIn(user, server.user_channels)
self.assertEqual(len(server.user_channels[user]), 1)
self.assertIn(channel, server.channel_users)
self.assertEqual(len(server.channel_users), 1)
self.assertIn(user, server.channel_users[channel])
self.assertEqual(len(server.user_channels), 1)
self.assertEqual(server.users, {"nickname": user})
self.assertEqual(server.channels, {"#chan": channel})
self.assertEqual(user.channels, set([channel.name_lower]))
self.assertEqual(channel.users, {"nickname": channel_user})
class ChannelTestKick(unittest.TestCase):
def test_self_kick(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(
irctokens.tokenise(":nickname KICK #chan nickname"))
self.assertEqual(len(server.users), 0)
self.assertEqual(len(server.channels), 0)
self.assertEqual(len(server.user_channels), 0)
self.assertEqual(len(server.channel_users), 0)
def test_other_kick(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":nickname KICK #chan other"))
user = server.users["nickname"]
channel = server.channels["#chan"]
channel_user = channel.users[user.nickname_lower]
self.assertEqual(len(server.users), 1)
self.assertEqual(len(server.channels), 1)
self.assertIn(user, server.user_channels)
self.assertEqual(len(server.user_channels[user]), 1)
self.assertIn(channel, server.channel_users)
self.assertEqual(len(server.channel_users), 1)
self.assertIn(user, server.channel_users[channel])
self.assertEqual(len(server.user_channels), 1)
self.assertEqual(user.channels, set([channel.name_lower]))
self.assertEqual(channel.users, {user.nickname_lower: channel_user})
class ChannelTestTopic(unittest.TestCase):
def test_text(self):
dt = pendulum.datetime(2020, 3, 12, 14, 27, 57)
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("332 * #chan :test"))
self.assertEqual(server.channels["#chan"].topic, "test")
def test_set_by_at(self):
dt = datetime(2020, 3, 12, 14, 27, 57)
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("333 * #chan other 1584023277"))
channel = server.channels["#chan"]
self.assertEqual(channel.topic, "test")
self.assertEqual(channel.topic_setter, "other")
self.assertEqual(channel.topic_time, dt)
def test_topic_command(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("TOPIC #chan :hello there"))
self.assertEqual(server.channels["#chan"].topic, "hello there")
dt = pendulum.datetime(2021, 9, 6, 2, 43, 22)
with freeze_time("2021-09-06 02:43:22"):
server.parse_tokens(irctokens.tokenise(":other TOPIC #chan :hello there"))
channel = server.channels["#chan"]
self.assertEqual(channel.topic, "hello there")
self.assertEqual(channel.topic_setter, "other")
self.assertEqual(channel.topic_time, dt)
class ChannelTestCreation(unittest.TestCase):
def test(self):
dt = datetime(2020, 3, 12, 19, 38, 9)
dt = pendulum.datetime(2020, 3, 12, 19, 38, 9)
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("329 * #chan 1584041889"))
self.assertEqual(server.channels["#chan"].created, dt)
class ChannelTestNAMES(unittest.TestCase):
def test(self):
dt_1 = pendulum.datetime(2021, 9, 6, 2, 57, 22)
dt_2 = pendulum.datetime(2021, 9, 6, 2, 58, 22)
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("353 * * #chan :nickname @+other"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
with freeze_time("2021-09-06 02:57:22"):
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
with freeze_time("2021-09-06 02:58:22"):
server.parse_tokens(irctokens.tokenise("353 * * #chan :nickname @+other"))
self.assertIn("nickname", server.users)
self.assertIn("other", server.users)
user = server.users["other"]
self.assertIn(user, server.user_channels)
channel = server.channels["#chan"]
self.assertIn(user, server.channel_users[channel])
channel_user = server.channel_users[channel][user]
self.assertEqual(channel_user.modes, ["o", "v"])
channel_user_1 = channel.users[server.nickname_lower]
channel_user_2 = channel.users[user.nickname_lower]
self.assertEqual(channel.users, {
server.nickname_lower: channel_user_1,
user.nickname_lower: channel_user_2
})
self.assertEqual(user.channels, set([channel.name_lower]))
self.assertEqual(channel_user_2.modes, {"o", "v"})
self.assertEqual(channel_user_1.since, dt_1)
self.assertEqual(channel_user_2.since, dt_2)
def test_userhost_in_names(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(
"353 * * #chan :nickname!user@host other!user2@host2"))
@ -148,3 +167,37 @@ class ChannelTestNAMES(unittest.TestCase):
user = server.users["other"]
self.assertEqual(user.username, "user2")
self.assertEqual(user.hostname, "host2")
class ChannelNICKAfterJoin(unittest.TestCase):
def test(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
user = server.users["nickname"]
channel = server.channels["#chan"]
channel_user = channel.users[user.nickname_lower]
server.parse_tokens(irctokens.tokenise(":nickname NICK Nickname2"))
self.assertEqual(channel.users, {user.nickname_lower: channel_user})
self.assertEqual(channel_user.nickname, "Nickname2")
self.assertEqual(channel_user.nickname_lower, "nickname2")
class ChannelRENAME(unittest.TestCase):
def test(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
user = server.users["nickname"]
channel = server.channels["#chan"]
server.parse_tokens(irctokens.tokenise(":nickname RENAME #chan #chan2 *"))
self.assertEqual(channel.name, "#chan2")
self.assertEqual(set(channel.users.keys()), {"nickname", "other"})
self.assertEqual(user.channels, {"#chan2"})
self.assertNotIn("#chan", server.channels)
self.assertIn("#chan2", server.channels)
self.assertEqual(len(server.channels), 1)

View File

@ -4,72 +4,103 @@ import ircstates, irctokens
class EmitTest(unittest.TestCase):
def test_join(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
emits = server.parse_tokens(
irctokens.tokenise(":nickname JOIN #chan"))[0]
server.parse_tokens(irctokens.tokenise("001 nickname *"))
emit = server.parse_tokens(
irctokens.tokenise(":nickname JOIN #chan"))
self.assertEqual(emits.command, "JOIN")
self.assertEqual(emits.self, True)
self.assertEqual(emits.user, server.users["nickname"])
self.assertEqual(emits.channel, server.channels["#chan"])
self.assertEqual(emit.command, "JOIN")
self.assertEqual(emit.self, True)
self.assertEqual(emit.user, server.users["nickname"])
self.assertEqual(emit.channel, server.channels["#chan"])
emits = server.parse_tokens(
irctokens.tokenise(":other JOIN #chan"))[0]
self.assertEqual(emits.command, "JOIN")
self.assertEqual(emits.self, None)
self.assertEqual(emits.user, server.users["other"])
self.assertEqual(emits.channel, server.channels["#chan"])
emit = server.parse_tokens(
irctokens.tokenise(":other JOIN #chan"))
self.assertIsNotNone(emit)
self.assertEqual(emit.command, "JOIN")
self.assertEqual(emit.self, None)
self.assertEqual(emit.user, server.users["other"])
self.assertEqual(emit.channel, server.channels["#chan"])
def test_privmsg(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
emits = server.parse_tokens(
irctokens.tokenise(":nickname PRIVMSG #chan :hello"))[0]
self.assertEqual(emits.command, "PRIVMSG")
self.assertEqual(emits.text, "hello")
self.assertEqual(emits.self_source, True)
self.assertEqual(emits.user, server.users["nickname"])
self.assertEqual(emits.channel, server.channels["#chan"])
emit = server.parse_tokens(
irctokens.tokenise(":nickname PRIVMSG #chan :hello"))
self.assertIsNotNone(emit)
self.assertEqual(emit.command, "PRIVMSG")
self.assertEqual(emit.text, "hello")
self.assertEqual(emit.self_source, True)
self.assertEqual(emit.user, server.users["nickname"])
self.assertEqual(emit.channel, server.channels["#chan"])
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
emits = server.parse_tokens(
irctokens.tokenise(":other PRIVMSG #chan :hello2"))[0]
self.assertEqual(emits.command, "PRIVMSG")
self.assertEqual(emits.text, "hello2")
self.assertEqual(emits.self_source, None)
self.assertEqual(emits.user, server.users["other"])
self.assertEqual(emits.channel, server.channels["#chan"])
emit = server.parse_tokens(
irctokens.tokenise(":other PRIVMSG #chan :hello2"))
self.assertIsNotNone(emit)
self.assertEqual(emit.command, "PRIVMSG")
self.assertEqual(emit.text, "hello2")
self.assertEqual(emit.self_source, None)
self.assertEqual(emit.user, server.users["other"])
self.assertEqual(emit.channel, server.channels["#chan"])
def test_privmsg_nojoin(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
emits = server.parse_tokens(
irctokens.tokenise(":other PRIVMSG #chan :hello"))[0]
emit = server.parse_tokens(
irctokens.tokenise(":other PRIVMSG #chan :hello"))
self.assertEqual(emits.command, "PRIVMSG")
self.assertEqual(emits.text, "hello")
self.assertEqual(emits.self_source, None)
self.assertIsNotNone(emits.user)
self.assertIsNotNone(emit)
self.assertEqual(emit.command, "PRIVMSG")
self.assertEqual(emit.text, "hello")
self.assertEqual(emit.self_source, None)
self.assertIsNotNone(emit.user)
channel = server.channels["#chan"]
self.assertEqual(emits.channel, channel)
self.assertEqual(emit.channel, channel)
def test_kick(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
user = server.users["nickname"]
channel = server.channels["#chan"]
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
user_other = server.users["other"]
emits = server.parse_tokens(
irctokens.tokenise(":nickname KICK #chan other :reason"))[0]
emit = server.parse_tokens(
irctokens.tokenise(":nickname KICK #chan other :reason"))
self.assertEqual(emits.command, "KICK")
self.assertEqual(emits.text, "reason")
self.assertEqual(emits.self_source, True)
self.assertEqual(emits.user_source, user)
self.assertEqual(emits.user_target, user_other)
self.assertEqual(emits.channel, channel)
self.assertIsNotNone(emit)
self.assertEqual(emit.command, "KICK")
self.assertEqual(emit.text, "reason")
self.assertEqual(emit.self_source, True)
self.assertEqual(emit.user_source, user)
self.assertEqual(emit.user_target, user_other)
self.assertEqual(emit.channel, channel)
def test_mode_self(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
emit = server.parse_tokens(
irctokens.tokenise("MODE nickname x+i-i+wi-wi"))
self.assertIsNotNone(emit)
self.assertEqual(emit.command, "MODE")
self.assertTrue(emit.self_target)
self.assertEqual(emit.tokens,
["+x", "+i", "-i", "+w", "+i", "-w", "-i"])
def test_mode_channel(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
channel = server.channels["#chan"]
emit = server.parse_tokens(
irctokens.tokenise(":server MODE #chan +im-m+b-k asd!*@* key"))
self.assertIsNotNone(emit)
self.assertEqual(emit.command, "MODE")
self.assertEqual(emit.channel, channel)
self.assertEqual(emit.tokens,
["+i", "+m", "-m", "+b asd!*@*", "-k key"])

View File

@ -2,23 +2,29 @@ import unittest
import ircstates, irctokens
class ISUPPORTTest(unittest.TestCase):
def test_escape(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(r"005 * TEST=a\x20b\\x20\x2 *"))
self.assertEqual(server.isupport.raw["TEST"], r"a b\x20x2")
def test_chanmodes(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
self.assertEqual(server.isupport.chanmodes.list_modes, ["b"])
self.assertEqual(server.isupport.chanmodes.setting_b_modes, ["k"])
self.assertEqual(server.isupport.chanmodes.setting_c_modes, ["l"])
self.assertEqual(server.isupport.chanmodes.setting_d_modes,
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertEqual(server.isupport.chanmodes.a_modes, ["b"])
self.assertEqual(server.isupport.chanmodes.b_modes, ["k"])
self.assertEqual(server.isupport.chanmodes.c_modes, ["l"])
self.assertEqual(server.isupport.chanmodes.d_modes,
["i", "m", "n", "p", "s", "t"])
server.parse_tokens(irctokens.tokenise("005 * CHANMODES=a,b,c,d *"))
self.assertEqual(server.isupport.chanmodes.list_modes, ["a"])
self.assertEqual(server.isupport.chanmodes.setting_b_modes, ["b"])
self.assertEqual(server.isupport.chanmodes.setting_c_modes, ["c"])
self.assertEqual(server.isupport.chanmodes.setting_d_modes, ["d"])
self.assertEqual(server.isupport.chanmodes.a_modes, ["a"])
self.assertEqual(server.isupport.chanmodes.b_modes, ["b"])
self.assertEqual(server.isupport.chanmodes.c_modes, ["c"])
self.assertEqual(server.isupport.chanmodes.d_modes, ["d"])
def test_prefix(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertEqual(server.isupport.prefix.modes, ["o", "v"])
self.assertEqual(server.isupport.prefix.prefixes, ["@", "+"])
@ -38,62 +44,37 @@ class ISUPPORTTest(unittest.TestCase):
def test_chantypes(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertEqual(server.isupport.chantypes, ["#"])
server.parse_tokens(irctokens.tokenise("005 * CHANTYPES=#& *"))
self.assertEqual(server.isupport.chantypes, ["#", "&"])
def test_modes(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertEqual(server.isupport.modes, 3)
server.parse_tokens(irctokens.tokenise("005 * MODES *"))
self.assertEqual(server.isupport.modes, -1)
server.parse_tokens(irctokens.tokenise("005 * MODES=5 *"))
self.assertEqual(server.isupport.modes, 5)
def test_rfc1459(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
self.assertEqual(server.isupport.casemapping, "rfc1459")
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=rfc1459 *"))
self.assertEqual(server.isupport.casemapping, "rfc1459")
lower = server.casefold("ÀTEST[]~\\")
self.assertEqual(lower, "Àtest{}^|")
def test_ascii(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=ascii *"))
self.assertEqual(server.isupport.casemapping, "ascii")
lower = server.casefold("ÀTEST[]~\\")
self.assertEqual(lower, "Àtest[]~\\")
def test_fallback_to_rfc1459(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=asd *"))
self.assertEqual(server.isupport.casemapping, "rfc1459")
lower = server.casefold("ÀTEST[]~\\")
self.assertEqual(lower, "Àtest{}^|")
def test_network(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertIsNone(server.isupport.network)
server.parse_tokens(irctokens.tokenise("005 * NETWORK=testnet *"))
self.assertEqual(server.isupport.network, "testnet")
def test_statusmsg(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertEqual(server.isupport.statusmsg, [])
server.parse_tokens(irctokens.tokenise("005 * STATUSMSG=&@ *"))
self.assertEqual(server.isupport.statusmsg, ["&", "@"])
def test_callerid(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertIsNone(server.isupport.callerid)
server.parse_tokens(irctokens.tokenise("005 * CALLERID=U *"))
self.assertEqual(server.isupport.callerid, "U")
@ -102,7 +83,7 @@ class ISUPPORTTest(unittest.TestCase):
def test_excepts(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertIsNone(server.isupport.excepts)
server.parse_tokens(irctokens.tokenise("005 * EXCEPTS=U *"))
self.assertEqual(server.isupport.excepts, "U")
@ -111,7 +92,7 @@ class ISUPPORTTest(unittest.TestCase):
def test_invex(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertIsNone(server.isupport.invex)
server.parse_tokens(irctokens.tokenise("005 * INVEX=U *"))
self.assertEqual(server.isupport.invex, "U")
@ -120,14 +101,14 @@ class ISUPPORTTest(unittest.TestCase):
def test_whox(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertFalse(server.isupport.whox)
server.parse_tokens(irctokens.tokenise("005 * WHOX *"))
self.assertTrue(server.isupport.whox)
def test_monitor(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertIsNone(server.isupport.monitor)
server.parse_tokens(irctokens.tokenise("005 * MONITOR=123 *"))
self.assertEqual(server.isupport.monitor, 123)
@ -136,9 +117,36 @@ class ISUPPORTTest(unittest.TestCase):
def test_watch(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertIsNone(server.isupport.watch)
server.parse_tokens(irctokens.tokenise("005 * WATCH=123 *"))
self.assertEqual(server.isupport.watch, 123)
server.parse_tokens(irctokens.tokenise("005 * WATCH *"))
self.assertEqual(server.isupport.watch, -1)
def test_nicklen(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertEqual(server.isupport.nicklen, 9)
server.parse_tokens(irctokens.tokenise("005 * NICKLEN=16 *"))
self.assertEqual(server.isupport.nicklen, 16)
class ISupportTestCasemapping(unittest.TestCase):
def test_rfc1459(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
self.assertEqual(server.isupport.casemapping, ircstates.CaseMap.RFC1459)
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=rfc1459 *"))
self.assertEqual(server.isupport.casemapping, ircstates.CaseMap.RFC1459)
def test_ascii(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=ascii *"))
self.assertEqual(server.isupport.casemapping, ircstates.CaseMap.ASCII)
def test_unknown(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
with self.assertRaises(ValueError):
server.parse_tokens(irctokens.tokenise("005 * CASEMAPPING=asd *"))

View File

@ -5,32 +5,32 @@ import ircstates, irctokens
class ModeTestUMode(unittest.TestCase):
def test_add(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise("MODE nickname +i"))
self.assertEqual(server.modes, ["i"])
self.assertEqual(server.modes, {"i"})
def test_remove(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise("MODE nickname +i"))
server.parse_tokens(irctokens.tokenise("MODE nickname -i"))
self.assertEqual(server.modes, [])
self.assertEqual(server.modes, set())
class ModeTestChannelPrefix(unittest.TestCase):
def test_add(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(
irctokens.tokenise("MODE #chan +ov nickname nickname"))
user = server.users["nickname"]
channel = server.channels["#chan"]
channel_user = server.channel_users[channel][user]
self.assertEqual(channel_user.modes, ["o", "v"])
channel_user = channel.users[user.nickname_lower]
self.assertEqual(channel_user.modes, {"o", "v"})
def test_remove(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(
irctokens.tokenise("MODE #chan +ov nickname nickname"))
@ -38,31 +38,75 @@ class ModeTestChannelPrefix(unittest.TestCase):
irctokens.tokenise("MODE #chan -ov nickname nickname"))
user = server.users["nickname"]
channel = server.channels["#chan"]
channel_user = server.channel_users[channel][user]
self.assertEqual(channel_user.modes, [])
channel_user = channel.users[user.nickname_lower]
self.assertEqual(channel_user.modes, set())
class ModeTestChannelList(unittest.TestCase):
def test_add(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("MODE #chan +b asd!*@*"))
channel = server.channels["#chan"]
self.assertEqual(channel.list_modes, {"b": []})
server.parse_tokens(irctokens.tokenise("MODE #chan +b asd!*@*"))
self.assertEqual(channel.list_modes, {"b": ["asd!*@*"]})
server.parse_tokens(irctokens.tokenise("MODE #chan -b asd!*@*"))
self.assertEqual(channel.list_modes, {"b": []})
def test_remove(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("MODE #chan +b asd!*@*"))
server.parse_tokens(irctokens.tokenise("MODE #chan +b dsa!*@*"))
server.parse_tokens(irctokens.tokenise("MODE #chan -b asd!*@*"))
channel = server.channels["#chan"]
self.assertEqual(channel.list_modes, {})
self.assertEqual(channel.list_modes, {"b": ["dsa!*@*"]})
def test_banlist(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(
irctokens.tokenise("367 * #chan *!*@host setby 1594477713"))
server.parse_tokens(
irctokens.tokenise("367 * #chan $a:account setby 1594477713"))
server.parse_tokens(
irctokens.tokenise("367 * #chan r:my*gecos"))
server.parse_tokens(irctokens.tokenise("368 * #chan *"))
channel = server.channels["#chan"]
self.assertEqual(
channel.list_modes["b"],
["*!*@host", "$a:account", "r:my*gecos"]
)
def test_quietlist(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(
irctokens.tokenise("728 * #chan q q!*@host setby 1594477713"))
server.parse_tokens(
irctokens.tokenise("728 * #chan q $a:qaccount setby 1594477713"))
server.parse_tokens(
irctokens.tokenise("728 * #chan q r:q*my*gecos setby 1594477713"))
server.parse_tokens(irctokens.tokenise("729 * #chan q *"))
channel = server.channels["#chan"]
self.assertEqual(
channel.list_modes["q"],
["q!*@host", "$a:qaccount", "r:q*my*gecos"]
)
class ModeTestChannelTypeB(unittest.TestCase):
def test_add(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("MODE #chan +k password"))
channel = server.channels["#chan"]
@ -70,17 +114,17 @@ class ModeTestChannelTypeB(unittest.TestCase):
def test_remove(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("MODE #chan +k password"))
server.parse_tokens(irctokens.tokenise("MODE #chan -k *"))
channel = server.channels["#chan"]
self.assertEqual(channel.list_modes, {})
self.assertEqual(channel.modes, {})
class ModeTestChannelTypeC(unittest.TestCase):
def test_add(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("MODE #chan +l 100"))
channel = server.channels["#chan"]
@ -88,17 +132,17 @@ class ModeTestChannelTypeC(unittest.TestCase):
def test_remove(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("MODE #chan +l 100"))
server.parse_tokens(irctokens.tokenise("MODE #chan -l"))
channel = server.channels["#chan"]
self.assertEqual(channel.list_modes, {})
self.assertEqual(channel.modes, {})
class ModeTestChannelTypeD(unittest.TestCase):
def test_add(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("MODE #chan +i"))
channel = server.channels["#chan"]
@ -106,17 +150,17 @@ class ModeTestChannelTypeD(unittest.TestCase):
def test_remove(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("MODE #chan +i"))
server.parse_tokens(irctokens.tokenise("MODE #chan -i"))
channel = server.channels["#chan"]
self.assertEqual(channel.list_modes, {})
self.assertEqual(channel.modes, {})
class ModeTestChannelNumeric(unittest.TestCase):
def test(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(
irctokens.tokenise("324 * #chan +bkli *!*@* pass 10"))
@ -126,7 +170,7 @@ class ModeTestChannelNumeric(unittest.TestCase):
def test_without_plus(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("324 * #chan il 10"))
channel = server.channels["#chan"]
@ -135,12 +179,12 @@ class ModeTestChannelNumeric(unittest.TestCase):
class ModeTestUserNumeric(unittest.TestCase):
def test(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise("221 * +iw"))
self.assertEqual(server.modes, ["i", "w"])
self.assertEqual(server.modes, {"i", "w"})
def test_without_plus(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise("221 * iw"))
self.assertEqual(server.modes, ["i", "w"])
self.assertEqual(server.modes, {"i", "w"})

View File

@ -4,7 +4,7 @@ import ircstates, irctokens
class MOTDTest(unittest.TestCase):
def test(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise("375 * :start of motd"))
server.parse_tokens(irctokens.tokenise("372 * :first line of motd"))
server.parse_tokens(irctokens.tokenise("372 * :second line of motd"))

22
test/sasl.py Normal file
View File

@ -0,0 +1,22 @@
import unittest
import ircstates, irctokens
class SASLTestAccount(unittest.TestCase):
def test_loggedin(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("900 * nick!user@host account *"))
self.assertEqual(server.nickname, "nick")
self.assertEqual(server.username, "user")
self.assertEqual(server.hostname, "host")
self.assertEqual(server.account, "account")
def test_loggedout(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("900 * nick!user@host account *"))
server.parse_tokens(irctokens.tokenise("901 * nick1!user1@host1 *"))
self.assertEqual(server.nickname, "nick1")
self.assertEqual(server.username, "user1")
self.assertEqual(server.hostname, "host1")
self.assertEqual(server.account, None)

View File

@ -4,55 +4,71 @@ import ircstates, irctokens
class UserTestNicknameChange(unittest.TestCase):
def test(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname NICK nickname2"))
self.assertEqual(server.nickname, "nickname2")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname NICK Nickname2"))
self.assertEqual(server.nickname, "Nickname2")
self.assertEqual(server.nickname_lower, "nickname2")
server.parse_tokens(irctokens.tokenise(":nickname2 JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
self.assertIn("other", server.users)
server.parse_tokens(irctokens.tokenise(":other NICK other2"))
user = server.users["other"]
server.parse_tokens(irctokens.tokenise(":other NICK Other2"))
self.assertNotIn("other", server.users)
self.assertIn("other2", server.users)
self.assertEqual(user.nickname, "Other2")
self.assertEqual(user.nickname_lower, "other2")
class UserTestHostmaskJoin(unittest.TestCase):
def test_both(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(
irctokens.tokenise(":nickname!user@host JOIN #chan"))
self.assertEqual(server.username, "user")
self.assertEqual(server.hostname, "host")
server.parse_tokens(irctokens.tokenise(":other!user@host JOIN #chan"))
user = server.users["other"]
self.assertEqual(user.username, "user")
self.assertEqual(user.hostname, "host")
self.assertEqual(user.userhost(), "user@host")
self.assertEqual(user.hostmask(), "other!user@host")
def test_user(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname!user JOIN #chan"))
self.assertEqual(server.username, "user")
self.assertIsNone(server.hostname)
server.parse_tokens(irctokens.tokenise(":other!user JOIN #chan"))
user = server.users["other"]
self.assertEqual(user.username, "user")
self.assertIsNone(user.hostname)
self.assertIsNone(user.userhost())
self.assertEqual(user.hostmask(), "other!user")
def test_host(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname@host JOIN #chan"))
self.assertIsNone(server.username)
self.assertEqual(server.hostname, "host")
server.parse_tokens(irctokens.tokenise(":other@host JOIN #chan"))
user = server.users["other"]
self.assertIsNone(user.username)
self.assertEqual(user.hostname, "host")
self.assertIsNone(user.userhost())
self.assertEqual(user.hostmask(), "other@host")
class UserTestExtendedJoin(unittest.TestCase):
def test_without_extended_join(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
self.assertIsNone(server.account)
self.assertIsNone(server.realname)
@ -63,7 +79,7 @@ class UserTestExtendedJoin(unittest.TestCase):
def test_with_account(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan acc :realname"))
self.assertEqual(server.account, "acc")
self.assertEqual(server.realname, "realname")
@ -74,7 +90,7 @@ class UserTestExtendedJoin(unittest.TestCase):
def test_without_account(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan * :realname"))
self.assertEqual(server.account, "")
self.assertEqual(server.realname, "realname")
@ -86,7 +102,7 @@ class UserTestExtendedJoin(unittest.TestCase):
class UserTestAccountNotify(unittest.TestCase):
def test_with_account(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":nickname ACCOUNT acc"))
self.assertEqual(server.account, "acc")
@ -97,7 +113,7 @@ class UserTestAccountNotify(unittest.TestCase):
def test_without_account(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":nickname ACCOUNT *"))
self.assertEqual(server.account, "")
@ -109,7 +125,7 @@ class UserTestAccountNotify(unittest.TestCase):
class UserTestHostmaskPRIVMSG(unittest.TestCase):
def test_both(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(
irctokens.tokenise(":nickname!user@host PRIVMSG #chan :hi"))
@ -124,7 +140,7 @@ class UserTestHostmaskPRIVMSG(unittest.TestCase):
def test_user(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(
irctokens.tokenise(":nickname!user PRIVMSG #chan :hi"))
@ -139,7 +155,7 @@ class UserTestHostmaskPRIVMSG(unittest.TestCase):
def test_host(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(
irctokens.tokenise(":nickname@host PRIVMSG #chan :hi"))
@ -155,22 +171,22 @@ class UserTestHostmaskPRIVMSG(unittest.TestCase):
class UserTestVisibleHost(unittest.TestCase):
def test_without_username(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("396 * hostname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise("396 * hostname *"))
self.assertIsNone(server.username)
self.assertEqual(server.hostname, "hostname")
def test_with_username(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("396 * username@hostname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise("396 * username@hostname *"))
self.assertEqual(server.username, "username")
self.assertEqual(server.hostname, "hostname")
class UserTestWHO(unittest.TestCase):
def test(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
server.parse_tokens(
@ -189,7 +205,7 @@ class UserTestWHO(unittest.TestCase):
class UserTestCHGHOST(unittest.TestCase):
def test(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(
irctokens.tokenise(":nickname!user@host JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":nickname CHGHOST u h"))
@ -205,7 +221,7 @@ class UserTestCHGHOST(unittest.TestCase):
class UserTestWHOIS(unittest.TestCase):
def test_user_line(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("311 * nickname u h * :r"))
self.assertEqual(server.username, "u")
@ -213,44 +229,56 @@ class UserTestWHOIS(unittest.TestCase):
self.assertEqual(server.realname, "r")
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other CHGHOST u2 h2"))
server.parse_tokens(irctokens.tokenise("311 * other u2 h2 * :r2"))
user = server.users["other"]
self.assertEqual(user.username, "u2")
self.assertEqual(user.hostname, "h2")
self.assertEqual(user.realname, "r2")
class UserTestAWAY(unittest.TestCase):
def test_set(self):
class UserTestAway(unittest.TestCase):
def test_verb_set(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
user = server.users["other"]
self.assertIsNone(server.away)
self.assertIsNone(user.away)
server.parse_tokens(irctokens.tokenise(":nickname AWAY :bye bye"))
server.parse_tokens(irctokens.tokenise(":other AWAY :ik ga weg"))
self.assertEqual(server.away, "bye bye")
self.assertEqual(user.away, "ik ga weg")
def test_unset(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":nickname AWAY :ik ga weg"))
server.parse_tokens(irctokens.tokenise(":nickname AWAY"))
server.parse_tokens(irctokens.tokenise(":other AWAY :ik ga weg"))
server.parse_tokens(irctokens.tokenise(":other AWAY"))
user = server.users["other"]
user = server.users["nickname"]
self.assertIsNone(server.away)
self.assertIsNone(user.away)
server.parse_tokens(irctokens.tokenise(":nickname AWAY :ik ga weg"))
self.assertEqual(server.away, "ik ga weg")
self.assertEqual(user.away, "ik ga weg")
def test_verb_unset(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
user = server.users["nickname"]
server.parse_tokens(irctokens.tokenise(
":nickname AWAY :let's blow this popsicle stand"))
server.parse_tokens(irctokens.tokenise(":nickname AWAY"))
self.assertIsNone(server.away)
self.assertIsNone(user.away)
def test_numeric(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
user = server.users["nickname"]
self.assertIsNone(server.away)
self.assertIsNone(user.away)
server.parse_tokens(irctokens.tokenise(
"301 * nickname :i saw something shiny"))
self.assertEqual(server.away, "i saw something shiny")
self.assertEqual(user.away, "i saw something shiny")
class UserTestSETNAME(unittest.TestCase):
def test(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
user = server.users["other"]

93
test/who.py Normal file
View File

@ -0,0 +1,93 @@
import unittest
import ircstates, irctokens
from ircstates.server import WHO_TYPE
class WHOTest(unittest.TestCase):
def test_who(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
user = server.users["nickname"]
server.parse_tokens(irctokens.tokenise(
"352 * #chan user host server nickname * :0 real"))
self.assertEqual(user.username, "user")
self.assertEqual(user.hostname, "host")
self.assertEqual(user.realname, "real")
self.assertEqual(user.account, None)
self.assertEqual(user.server, "server")
self.assertIsNone(user.away)
self.assertEqual(server.username, user.username)
self.assertEqual(server.hostname, user.hostname)
self.assertEqual(server.realname, user.realname)
self.assertEqual(server.server, user.server)
self.assertIsNone(server.away)
server.parse_tokens(irctokens.tokenise(
"352 * #chan user host server nickname G* :0 real"))
self.assertEqual(user.away, "")
self.assertEqual(server.away, "")
def test_whox(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
user = server.users["nickname"]
server.parse_tokens(irctokens.tokenise(
f"354 * {WHO_TYPE} user 1.2.3.4 host server nickname * account :real"))
self.assertEqual(user.username, "user")
self.assertEqual(user.hostname, "host")
self.assertEqual(user.realname, "real")
self.assertEqual(user.account, "account")
self.assertEqual(user.server, "server")
self.assertIsNone(user.away)
self.assertEqual(user.ip, "1.2.3.4")
self.assertEqual(server.username, user.username)
self.assertEqual(server.hostname, user.hostname)
self.assertEqual(server.realname, user.realname)
self.assertEqual(server.account, user.account)
self.assertEqual(server.server, user.server)
self.assertIsNone(server.away)
self.assertEqual(server.ip, user.ip)
server.parse_tokens(irctokens.tokenise(
f"354 * {WHO_TYPE} user realip host server nickname G account :real"))
self.assertEqual(user.away, "")
self.assertEqual(server.away, "")
def test_whox_no_account(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
user = server.users["nickname"]
user.account = "account"
server.account = "account"
server.parse_tokens(irctokens.tokenise(
f"354 * {WHO_TYPE} user realip host server nickname * 0 :real"))
self.assertEqual(user.account, "")
self.assertEqual(server.account, user.account)
def test_whox_ipv6(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname *"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
user = server.users["nickname"]
server.parse_tokens(irctokens.tokenise(
f"354 * {WHO_TYPE} user 0::1 host server nickname * 0 :real"))
self.assertEqual(user.ip, "::1")
server.parse_tokens(irctokens.tokenise(
f"354 * {WHO_TYPE} user 00::2 host server nickname * 0 :real"))
self.assertEqual(user.ip, "::2")
server.parse_tokens(irctokens.tokenise(
f"354 * {WHO_TYPE} user fd00:0:0:0::1 host server nickname * 0 :real"))
self.assertEqual(user.ip, "fd00::1")