split tokenise.py to line.py and hostmask.py, rename hostmask parse

Line.hostmake now throws ValueError if source was None
This commit is contained in:
jesopo 2020-07-03 23:07:04 +01:00
parent 0b13a2ca02
commit c84509618f
7 changed files with 168 additions and 159 deletions

View File

@ -1,3 +1,4 @@
from .tokenise import tokenise
from .objects import build, Line, Hostmask
from .stateful import StatefulDecoder, StatefulEncoder
from .line import Line, build, tokenise
from .hostmask import Hostmask, hostmask
from .stateful import StatefulDecoder, StatefulEncoder

30
irctokens/hostmask.py Normal file
View File

@ -0,0 +1,30 @@
from typing import Optional
class Hostmask(object):
def __init__(self, source: str,
nickname: str,
username: Optional[str],
hostname: Optional[str]):
self._source = source
self.nickname = nickname
self.username = username
self.hostname = hostname
def __str__(self) -> str:
return self._source
def __repr__(self) -> str:
return f"Hostmask({self._source})"
def __eq__(self, other) -> bool:
if isinstance(other, Hostmask):
return str(self) == str(other)
else:
return False
def hostmask(source: str) -> Hostmask:
username, _, hostname = source.partition("@")
nickname, _, username = username.partition("!")
return Hostmask(
source,
nickname,
username or None,
hostname or None)

124
irctokens/line.py Normal file
View File

@ -0,0 +1,124 @@
from typing import Dict, List, Optional
from .const import TAG_ESCAPED, TAG_UNESCAPED
from .hostmask import Hostmask, hostmask
from .formatting import format as format_
class Line(object):
def __init__(self,
tags: Optional[Dict[str, str]],
source: Optional[str],
command: str,
params: List[str]):
self.tags = tags
self.source = source
self.command = command
self.params = params
def __eq__(self, other) -> bool:
if isinstance(other, Line):
return self.format() == other.format()
else:
return False
def __repr__(self) -> str:
return (f"Line(tag={self.tags!r}, source={self.source!r}"
f", command={self.command!r}, params={self.params!r})")
_hostmask: Optional[Hostmask] = None
@property
def hostmask(self) -> Hostmask:
if self.source is not None:
if self._hostmask is None:
self._hostmask = hostmask(self.source)
return self._hostmask
else:
raise ValueError("cannot parse hostmask from null source")
def format(self) -> str:
return format_(self.tags, self.source, self.command, self.params)
def with_source(self, source: str) -> "Line":
return Line(self.tags, source, self.command, self.params)
def copy(self) -> "Line":
return Line(self.tags, self.source, self.command, self.params)
def build(
command: str,
params: List[str]=[],
source: Optional[str]=None,
tags: Optional[Dict[str, str]]=None
) -> Line:
return Line(tags, source, command, params)
def _unescape_tag(value: str) -> str:
unescaped, escaped = "", list(value)
while escaped:
current = escaped.pop(0)
if current == "\\":
if escaped:
next = escaped.pop(0)
duo = current+next
if duo in TAG_ESCAPED:
index = TAG_ESCAPED.index(duo)
unescaped += TAG_UNESCAPED[index]
else:
unescaped += next
else:
unescaped += current
return unescaped
def _tokenise(
tags_s: Optional[str],
line: str
) -> Line:
tags: Optional[Dict[str, str]] = None
if not tags_s is None:
tags = {}
for part in tags_s[1:].split(";"):
key, _, value = part.partition("=")
tags[key] = _unescape_tag(value)
line, trailing_sep, trailing = line.partition(" :")
params = list(filter(bool, line.split(" ")))
source: Optional[str] = None
if params[0][0] == ":":
source = params.pop(0)[1:]
command = params.pop(0).upper()
if trailing_sep:
params.append(trailing)
return Line(tags, source, command, params)
def tokenise_b(
line_b: bytes,
encoding: str="utf8",
fallback: str="latin-1"
) -> Line:
if b"\x00" in line_b:
line_b, _ = line_b.split(b"\x00", 1)
tags: Optional[str] = None
if line_b[0] == ord(b"@"):
tags_b, _, line_b = line_b.partition(b" ")
tags = tags_b.decode("utf8")
try:
line = line_b.decode(encoding)
except UnicodeDecodeError:
line = line_b.decode(fallback)
return _tokenise(tags, line)
def tokenise(line: str) -> Line:
if "\x00" in line:
line, _ = line.split("\x00", 1)
if line[0] == "@":
tags, _, line = line.partition(" ")
return _tokenise(tags, line)
else:
return _tokenise(None, line)

View File

@ -1,72 +0,0 @@
from typing import Dict, List, Optional
from .formatting import format as format_
class Hostmask(object):
def __init__(self, source: str,
nickname: str,
username: Optional[str],
hostname: Optional[str]):
self._source = source
self.nickname = nickname
self.username = username
self.hostname = hostname
def __str__(self) -> str:
return self._source
def __repr__(self) -> str:
return (f"Hostmask(nick={self.nickname!r}, user={self.username!r}"
f", host={self.hostname!r})")
def __eq__(self, other) -> bool:
if isinstance(other, Hostmask):
return str(self) == str(other)
else:
return False
@staticmethod
def from_source(source: str):
username, _, hostname = source.partition("@")
nickname, _, username = username.partition("!")
return Hostmask(source, nickname, username or None, hostname or None)
class Line(object):
def __init__(self,
tags: Optional[Dict[str, str]],
source: Optional[str],
command: str,
params: List[str]):
self.tags = tags
self.source = source
self.command = command
self.params = params
def __eq__(self, other) -> bool:
if isinstance(other, Line):
return self.format() == other.format()
else:
return False
def __repr__(self) -> str:
return (f"Line(tag={self.tags!r}, source={self.source!r}"
f", command={self.command!r}, params={self.params!r})")
_hostmask: Optional[Hostmask] = None
@property
def hostmask(self):
if self.source and not self._hostmask:
self._hostmask = Hostmask.from_source(self.source)
return self._hostmask
def format(self) -> str:
return format_(self.tags, self.source, self.command, self.params)
def with_source(self, source: str) -> "Line":
return Line(self.tags, source, self.command, self.params)
def copy(self) -> "Line":
return Line(self.tags, self.source, self.command, self.params)
def build(
command: str,
params: List[str]=[],
source: Optional[str]=None,
tags: Optional[Dict[str, str]]=None
) -> Line:
return Line(tags, source, command, params)

View File

@ -1,6 +1,5 @@
from typing import List, Optional
from .objects import Line
from .tokenise import tokenise_b
from typing import List, Optional
from .line import Line, tokenise_b
class StatefulDecoder(object):
def __init__(self, encoding: str="utf8", fallback: str="latin-1"):

View File

@ -1,75 +0,0 @@
from typing import Dict, Optional
from .objects import Line
from .const import TAG_ESCAPED, TAG_UNESCAPED
def _unescape_tag(value: str):
unescaped, escaped = "", list(value)
while escaped:
current = escaped.pop(0)
if current == "\\":
if escaped:
next = escaped.pop(0)
duo = current+next
if duo in TAG_ESCAPED:
index = TAG_ESCAPED.index(duo)
unescaped += TAG_UNESCAPED[index]
else:
unescaped += next
else:
unescaped += current
return unescaped
def _tokenise(
tags_s: Optional[str],
line: str) -> Line:
tags: Optional[Dict[str, str]] = None
if not tags_s is None:
tags = {}
for part in tags_s[1:].split(";"):
key, _, value = part.partition("=")
tags[key] = _unescape_tag(value)
line, trailing_sep, trailing = line.partition(" :")
params = list(filter(bool, line.split(" ")))
source: Optional[str] = None
if params[0][0] == ":":
source = params.pop(0)[1:]
command = params.pop(0).upper()
if trailing_sep:
params.append(trailing)
return Line(tags, source, command, params)
def tokenise_b(
line_b: bytes,
encoding: str="utf8",
fallback: str="latin-1") -> Line:
if b"\x00" in line_b:
line_b, _ = line_b.split(b"\x00", 1)
tags: Optional[str] = None
if line_b[0] == ord(b"@"):
tags_b, _, line_b = line_b.partition(b" ")
tags = tags_b.decode("utf8")
try:
line = line_b.decode(encoding)
except UnicodeDecodeError:
line = line_b.decode(fallback)
return _tokenise(tags, line)
def tokenise(line: str) -> Line:
if "\x00" in line:
line, _ = line.split("\x00", 1)
if line[0] == "@":
tags, _, line = line.partition(" ")
return _tokenise(tags, line)
else:
return _tokenise(None, line)

View File

@ -3,32 +3,32 @@ import irctokens
class HostmaskTest(unittest.TestCase):
def test_all(self):
hostmask = irctokens.Hostmask.from_source("nick!user@host")
hostmask = irctokens.hostmask("nick!user@host")
self.assertEqual(hostmask.nickname, "nick")
self.assertEqual(hostmask.username, "user")
self.assertEqual(hostmask.hostname, "host")
def test_no_hostname(self):
hostmask = irctokens.Hostmask.from_source("nick!user")
hostmask = irctokens.hostmask("nick!user")
self.assertEqual(hostmask.nickname, "nick")
self.assertEqual(hostmask.username, "user")
self.assertIsNone(hostmask.hostname)
def test_no_ident(self):
hostmask = irctokens.Hostmask.from_source("nick@host")
hostmask = irctokens.hostmask("nick@host")
self.assertEqual(hostmask.nickname, "nick")
self.assertIsNone(hostmask.username)
self.assertEqual(hostmask.hostname, "host")
def test_only_nickname(self):
hostmask = irctokens.Hostmask.from_source("nick")
hostmask = irctokens.hostmask("nick")
self.assertEqual(hostmask.nickname, "nick")
self.assertIsNone(hostmask.username)
self.assertIsNone(hostmask.hostname)
def test_line(self):
line = irctokens.tokenise(":nick!user@host PRIVMSG #channel hello")
hostmask = irctokens.Hostmask.from_source("nick!user@host")
hostmask = irctokens.hostmask("nick!user@host")
self.assertEqual(line.hostmask, hostmask)
self.assertEqual(line.hostmask.nickname, "nick")
self.assertEqual(line.hostmask.username, "user")
@ -36,4 +36,6 @@ class HostmaskTest(unittest.TestCase):
def test_none_source(self):
line = irctokens.tokenise("PRIVMSG #channel hello")
self.assertIsNone(line.hostmask)
def _hostmask():
line.hostmask
self.assertRaises(ValueError, _hostmask)