Compare commits

...

10 Commits

Author SHA1 Message Date
jesopo 859fa20862 v2.0.2 release 2022-01-29 20:01:27 +00:00
jesopo 14d38192cf truncate on \r and \n too 2022-01-28 17:36:45 +00:00
jesopo 93f1079ae7 v2.0.1 release 2022-01-12 17:58:11 +00:00
jesopo d8fa394a30 unnecessary truthiness check
closes #4
2022-01-02 23:50:18 +00:00
jesopo 2094648a51 freenode is dead long live libera.chat 2021-05-24 17:26:37 +00:00
jesopo 6a679c7650 superfluous old line 2020-10-01 22:00:21 +00:00
jesopo 12322b97c6 v2.0.0 release 2020-09-30 20:03:24 +00:00
jesopo e2f7c1eb84 merge tokenise() and tokenise_b() - using Union[str,bytes] 2020-09-30 20:00:36 +00:00
jesopo b9a2b6c1de raise ValueError when trying to tokenise without a command 2020-07-24 10:38:37 +00:00
jesopo e0c97963df update README.md contact section to point to freenode 2020-07-10 12:08:29 +01:00
6 changed files with 61 additions and 40 deletions

View File

@ -85,4 +85,4 @@ while True:
## contact
Come say hi at [#irctokens on irc.tilde.chat](https://web.tilde.chat/?join=%23irctokens)
Come say hi at `#irctokens` on irc.libera.chat

View File

@ -1 +1 @@
1.1.0
2.0.2

View File

@ -16,7 +16,7 @@ def format(
tags_str = []
for key in sorted(tags.keys()):
if tags[key]:
value = tags[key] or ""
value = tags[key]
tags_str.append(f"{key}={_escape_tag(value)}")
else:
tags_str.append(key)

View File

@ -1,4 +1,4 @@
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union
from .const import TAG_ESCAPED, TAG_UNESCAPED
from .hostmask import Hostmask, hostmask
from .formatting import format as format_
@ -66,17 +66,14 @@ def _unescape_tag(value: str) -> str:
unescaped += current
return unescaped
def _tokenise(
tags_s: Optional[str],
line: str
) -> Line:
def _tokenise(line: str) -> Line:
tags: Optional[Dict[str, str]] = None
if not tags_s is None:
if line[0] == "@":
tags_s, _, line = line.partition(" ")
tags = {}
for part in tags_s[1:].split(";"):
key, _, value = part.partition("=")
tags[key] = _unescape_tag(value)
tags[key] = _unescape_tag(value)
line, trailing_sep, trailing = line.partition(" :")
params = list(filter(bool, line.split(" ")))
@ -85,6 +82,8 @@ def _tokenise(
if params[0][0] == ":":
source = params.pop(0)[1:]
if not params:
raise ValueError("Cannot tokenise command-less line")
command = params.pop(0).upper()
if trailing_sep:
@ -92,33 +91,28 @@ def _tokenise(
return Line(tags, source, command, params)
def tokenise_b(
line_b: bytes,
def tokenise(
line: Union[str, bytes],
encoding: str="utf8",
fallback: str="latin-1"
) -> Line:
if b"\x00" in line_b:
line_b, _ = line_b.split(b"\x00", 1)
tags: Optional[str] = None
if line_b[0] == ord(b"@"):
tags_b, _, line_b = line_b.partition(b" ")
tags = tags_b.decode("utf8")
try:
line = line_b.decode(encoding)
except UnicodeDecodeError:
line = line_b.decode(fallback)
return _tokenise(tags, line)
def tokenise(line: str) -> Line:
if "\x00" in line:
line, _ = line.split("\x00", 1)
if line[0] == "@":
tags, _, line = line.partition(" ")
return _tokenise(tags, line)
dline: str = ""
if isinstance(line, bytes):
if line[0] == ord(b"@"):
tags_b, sep, line = line.partition(b" ")
dline += (tags_b+sep).decode("utf8")
try:
dline += line.decode(encoding)
except UnicodeDecodeError:
dline += line.decode(fallback)
else:
return _tokenise(None, line)
dline = line
for badchar in set(dline) & {"\x00", "\r", "\n"}:
badindex = dline.find(badchar)
if not badindex == -1:
# truncate before this bad character
dline = dline[:badindex]
return _tokenise(dline)

View File

@ -1,5 +1,5 @@
from typing import List, Optional
from .line import Line, tokenise_b
from .line import Line, tokenise
class StatefulDecoder(object):
def __init__(self, encoding: str="utf8", fallback: str="latin-1"):
@ -23,7 +23,7 @@ class StatefulDecoder(object):
lines: List[Line] = []
for line in lines_b:
lines.append(tokenise_b(line, self._encoding, self._fallback))
lines.append(tokenise(line, self._encoding, self._fallback))
return lines
class StatefulEncoder(object):

View File

@ -71,8 +71,35 @@ class TokenTestAll(unittest.TestCase):
self.assertEqual(line.command, "PRIVMSG")
self.assertEqual(line.params, ["#channel", "hello world"])
class TokenTestNul(unittest.TestCase):
def test(self):
class TokenTestTruncate(unittest.TestCase):
def test_null(self):
line = irctokens.tokenise(
":nick!user@host PRIVMSG #channel :hello\x00 world")
self.assertEqual(line.params, ["#channel", "hello"])
def test_cr(self):
line = irctokens.tokenise(
":nick!user@host PRIVMSG #channel :hello\r world")
self.assertEqual(line.params, ["#channel", "hello"])
def test_lf(self):
line = irctokens.tokenise(
":nick!user@host PRIVMSG #channel :hello\n world")
self.assertEqual(line.params, ["#channel", "hello"])
class TokenTestNoCommand(unittest.TestCase):
def test(self):
def _test1():
line = irctokens.tokenise(":n!u@h")
def _test2():
line = irctokens.tokenise("@tag=1 :n!u@h")
self.assertRaises(ValueError, _test1)
self.assertRaises(ValueError, _test2)
class TokenTestBytes(unittest.TestCase):
def test(self):
_str = irctokens.tokenise("@a=1 :n!u@h PRIVMSG #chan :hello word")
_bytes = irctokens.tokenise(b"@a=1 :n!u@h PRIVMSG #chan :hello word")
self.assertEqual(_str, _bytes)