mirror of https://github.com/jesopo/irctokens
Compare commits
10 Commits
Author | SHA1 | Date |
---|---|---|
jesopo | 859fa20862 | |
jesopo | 14d38192cf | |
jesopo | 93f1079ae7 | |
jesopo | d8fa394a30 | |
jesopo | 2094648a51 | |
jesopo | 6a679c7650 | |
jesopo | 12322b97c6 | |
jesopo | e2f7c1eb84 | |
jesopo | b9a2b6c1de | |
jesopo | e0c97963df |
|
@ -85,4 +85,4 @@ while True:
|
|||
|
||||
## contact
|
||||
|
||||
Come say hi at [#irctokens on irc.tilde.chat](https://web.tilde.chat/?join=%23irctokens)
|
||||
Come say hi at `#irctokens` on irc.libera.chat
|
||||
|
|
|
@ -16,7 +16,7 @@ def format(
|
|||
tags_str = []
|
||||
for key in sorted(tags.keys()):
|
||||
if tags[key]:
|
||||
value = tags[key] or ""
|
||||
value = tags[key]
|
||||
tags_str.append(f"{key}={_escape_tag(value)}")
|
||||
else:
|
||||
tags_str.append(key)
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Dict, List, Optional
|
||||
from typing import Dict, List, Optional, Union
|
||||
from .const import TAG_ESCAPED, TAG_UNESCAPED
|
||||
from .hostmask import Hostmask, hostmask
|
||||
from .formatting import format as format_
|
||||
|
@ -66,17 +66,14 @@ def _unescape_tag(value: str) -> str:
|
|||
unescaped += current
|
||||
return unescaped
|
||||
|
||||
def _tokenise(
|
||||
tags_s: Optional[str],
|
||||
line: str
|
||||
) -> Line:
|
||||
|
||||
def _tokenise(line: str) -> Line:
|
||||
tags: Optional[Dict[str, str]] = None
|
||||
if not tags_s is None:
|
||||
if line[0] == "@":
|
||||
tags_s, _, line = line.partition(" ")
|
||||
tags = {}
|
||||
for part in tags_s[1:].split(";"):
|
||||
key, _, value = part.partition("=")
|
||||
tags[key] = _unescape_tag(value)
|
||||
tags[key] = _unescape_tag(value)
|
||||
|
||||
line, trailing_sep, trailing = line.partition(" :")
|
||||
params = list(filter(bool, line.split(" ")))
|
||||
|
@ -85,6 +82,8 @@ def _tokenise(
|
|||
if params[0][0] == ":":
|
||||
source = params.pop(0)[1:]
|
||||
|
||||
if not params:
|
||||
raise ValueError("Cannot tokenise command-less line")
|
||||
command = params.pop(0).upper()
|
||||
|
||||
if trailing_sep:
|
||||
|
@ -92,33 +91,28 @@ def _tokenise(
|
|||
|
||||
return Line(tags, source, command, params)
|
||||
|
||||
def tokenise_b(
|
||||
line_b: bytes,
|
||||
def tokenise(
|
||||
line: Union[str, bytes],
|
||||
encoding: str="utf8",
|
||||
fallback: str="latin-1"
|
||||
) -> Line:
|
||||
|
||||
if b"\x00" in line_b:
|
||||
line_b, _ = line_b.split(b"\x00", 1)
|
||||
|
||||
tags: Optional[str] = None
|
||||
if line_b[0] == ord(b"@"):
|
||||
tags_b, _, line_b = line_b.partition(b" ")
|
||||
tags = tags_b.decode("utf8")
|
||||
|
||||
try:
|
||||
line = line_b.decode(encoding)
|
||||
except UnicodeDecodeError:
|
||||
line = line_b.decode(fallback)
|
||||
|
||||
return _tokenise(tags, line)
|
||||
|
||||
def tokenise(line: str) -> Line:
|
||||
if "\x00" in line:
|
||||
line, _ = line.split("\x00", 1)
|
||||
|
||||
if line[0] == "@":
|
||||
tags, _, line = line.partition(" ")
|
||||
return _tokenise(tags, line)
|
||||
dline: str = ""
|
||||
if isinstance(line, bytes):
|
||||
if line[0] == ord(b"@"):
|
||||
tags_b, sep, line = line.partition(b" ")
|
||||
dline += (tags_b+sep).decode("utf8")
|
||||
try:
|
||||
dline += line.decode(encoding)
|
||||
except UnicodeDecodeError:
|
||||
dline += line.decode(fallback)
|
||||
else:
|
||||
return _tokenise(None, line)
|
||||
dline = line
|
||||
|
||||
for badchar in set(dline) & {"\x00", "\r", "\n"}:
|
||||
badindex = dline.find(badchar)
|
||||
if not badindex == -1:
|
||||
# truncate before this bad character
|
||||
dline = dline[:badindex]
|
||||
|
||||
return _tokenise(dline)
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
from typing import List, Optional
|
||||
from .line import Line, tokenise_b
|
||||
from .line import Line, tokenise
|
||||
|
||||
class StatefulDecoder(object):
|
||||
def __init__(self, encoding: str="utf8", fallback: str="latin-1"):
|
||||
|
@ -23,7 +23,7 @@ class StatefulDecoder(object):
|
|||
|
||||
lines: List[Line] = []
|
||||
for line in lines_b:
|
||||
lines.append(tokenise_b(line, self._encoding, self._fallback))
|
||||
lines.append(tokenise(line, self._encoding, self._fallback))
|
||||
return lines
|
||||
|
||||
class StatefulEncoder(object):
|
||||
|
|
|
@ -71,8 +71,35 @@ class TokenTestAll(unittest.TestCase):
|
|||
self.assertEqual(line.command, "PRIVMSG")
|
||||
self.assertEqual(line.params, ["#channel", "hello world"])
|
||||
|
||||
class TokenTestNul(unittest.TestCase):
|
||||
def test(self):
|
||||
class TokenTestTruncate(unittest.TestCase):
|
||||
def test_null(self):
|
||||
line = irctokens.tokenise(
|
||||
":nick!user@host PRIVMSG #channel :hello\x00 world")
|
||||
self.assertEqual(line.params, ["#channel", "hello"])
|
||||
|
||||
def test_cr(self):
|
||||
line = irctokens.tokenise(
|
||||
":nick!user@host PRIVMSG #channel :hello\r world")
|
||||
self.assertEqual(line.params, ["#channel", "hello"])
|
||||
|
||||
def test_lf(self):
|
||||
line = irctokens.tokenise(
|
||||
":nick!user@host PRIVMSG #channel :hello\n world")
|
||||
self.assertEqual(line.params, ["#channel", "hello"])
|
||||
|
||||
class TokenTestNoCommand(unittest.TestCase):
|
||||
def test(self):
|
||||
def _test1():
|
||||
line = irctokens.tokenise(":n!u@h")
|
||||
def _test2():
|
||||
line = irctokens.tokenise("@tag=1 :n!u@h")
|
||||
|
||||
self.assertRaises(ValueError, _test1)
|
||||
self.assertRaises(ValueError, _test2)
|
||||
|
||||
class TokenTestBytes(unittest.TestCase):
|
||||
def test(self):
|
||||
_str = irctokens.tokenise("@a=1 :n!u@h PRIVMSG #chan :hello word")
|
||||
_bytes = irctokens.tokenise(b"@a=1 :n!u@h PRIVMSG #chan :hello word")
|
||||
|
||||
self.assertEqual(_str, _bytes)
|
||||
|
|
Loading…
Reference in New Issue