merge tokenise() and tokenise_b() - using Union[str,bytes]

This commit is contained in:
jesopo 2020-09-30 20:00:36 +00:00
parent b9a2b6c1de
commit e2f7c1eb84
3 changed files with 31 additions and 34 deletions

View File

@ -1,4 +1,4 @@
from typing import Dict, List, Optional from typing import Dict, List, Optional, Union
from .const import TAG_ESCAPED, TAG_UNESCAPED from .const import TAG_ESCAPED, TAG_UNESCAPED
from .hostmask import Hostmask, hostmask from .hostmask import Hostmask, hostmask
from .formatting import format as format_ from .formatting import format as format_
@ -66,17 +66,14 @@ def _unescape_tag(value: str) -> str:
unescaped += current unescaped += current
return unescaped return unescaped
def _tokenise( def _tokenise(line: str) -> Line:
tags_s: Optional[str],
line: str
) -> Line:
tags: Optional[Dict[str, str]] = None tags: Optional[Dict[str, str]] = None
if not tags_s is None: if line[0] == "@":
tags_s, _, line = line.partition(" ")
tags = {} tags = {}
for part in tags_s[1:].split(";"): for part in tags_s[1:].split(";"):
key, _, value = part.partition("=") key, _, value = part.partition("=")
tags[key] = _unescape_tag(value) tags[key] = _unescape_tag(value)
line, trailing_sep, trailing = line.partition(" :") line, trailing_sep, trailing = line.partition(" :")
params = list(filter(bool, line.split(" "))) params = list(filter(bool, line.split(" ")))
@ -94,33 +91,26 @@ def _tokenise(
return Line(tags, source, command, params) return Line(tags, source, command, params)
def tokenise_b( def tokenise(
line_b: bytes, line: Union[str, bytes],
encoding: str="utf8", encoding: str="utf8",
fallback: str="latin-1" fallback: str="latin-1"
) -> Line: ) -> Line:
if b"\x00" in line_b:
line_b, _ = line_b.split(b"\x00", 1)
tags: Optional[str] = None tags: Optional[str] = None
if line_b[0] == ord(b"@"): dline: str = ""
tags_b, _, line_b = line_b.partition(b" ") if isinstance(line, bytes):
tags = tags_b.decode("utf8") if line[0] == ord(b"@"):
tags_b, sep, line = line.partition(b" ")
try: dline += (tags_b+sep).decode("utf8")
line = line_b.decode(encoding) try:
except UnicodeDecodeError: dline += line.decode(encoding)
line = line_b.decode(fallback) except UnicodeDecodeError:
dline += line.decode(fallback)
return _tokenise(tags, line)
def tokenise(line: str) -> Line:
if "\x00" in line:
line, _ = line.split("\x00", 1)
if line[0] == "@":
tags, _, line = line.partition(" ")
return _tokenise(tags, line)
else: else:
return _tokenise(None, line) dline = line
if "\x00" in dline:
dline, _ = dline.split("\x00", 1)
return _tokenise(dline)

View File

@ -1,5 +1,5 @@
from typing import List, Optional from typing import List, Optional
from .line import Line, tokenise_b from .line import Line, tokenise
class StatefulDecoder(object): class StatefulDecoder(object):
def __init__(self, encoding: str="utf8", fallback: str="latin-1"): def __init__(self, encoding: str="utf8", fallback: str="latin-1"):
@ -23,7 +23,7 @@ class StatefulDecoder(object):
lines: List[Line] = [] lines: List[Line] = []
for line in lines_b: for line in lines_b:
lines.append(tokenise_b(line, self._encoding, self._fallback)) lines.append(tokenise(line, self._encoding, self._fallback))
return lines return lines
class StatefulEncoder(object): class StatefulEncoder(object):

View File

@ -86,3 +86,10 @@ class TokenTestNoCommand(unittest.TestCase):
self.assertRaises(ValueError, _test1) self.assertRaises(ValueError, _test1)
self.assertRaises(ValueError, _test2) self.assertRaises(ValueError, _test2)
class TokenTestBytes(unittest.TestCase):
def test(self):
_str = irctokens.tokenise("@a=1 :n!u@h PRIVMSG #chan :hello word")
_bytes = irctokens.tokenise(b"@a=1 :n!u@h PRIVMSG #chan :hello word")
self.assertEqual(_str, _bytes)