From e2f7c1eb845320204f0ddff1a85f3a231567661d Mon Sep 17 00:00:00 2001 From: jesopo Date: Wed, 30 Sep 2020 20:00:36 +0000 Subject: [PATCH] merge tokenise() and tokenise_b() - using Union[str,bytes] --- irctokens/line.py | 54 ++++++++++++++++++------------------------- irctokens/stateful.py | 4 ++-- test/tokenise.py | 7 ++++++ 3 files changed, 31 insertions(+), 34 deletions(-) diff --git a/irctokens/line.py b/irctokens/line.py index 55d0522..1c887dd 100644 --- a/irctokens/line.py +++ b/irctokens/line.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union from .const import TAG_ESCAPED, TAG_UNESCAPED from .hostmask import Hostmask, hostmask from .formatting import format as format_ @@ -66,17 +66,14 @@ def _unescape_tag(value: str) -> str: unescaped += current return unescaped -def _tokenise( - tags_s: Optional[str], - line: str - ) -> Line: - +def _tokenise(line: str) -> Line: tags: Optional[Dict[str, str]] = None - if not tags_s is None: + if line[0] == "@": + tags_s, _, line = line.partition(" ") tags = {} for part in tags_s[1:].split(";"): key, _, value = part.partition("=") - tags[key] = _unescape_tag(value) + tags[key] = _unescape_tag(value) line, trailing_sep, trailing = line.partition(" :") params = list(filter(bool, line.split(" "))) @@ -94,33 +91,26 @@ def _tokenise( return Line(tags, source, command, params) -def tokenise_b( - line_b: bytes, +def tokenise( + line: Union[str, bytes], encoding: str="utf8", fallback: str="latin-1" ) -> Line: - if b"\x00" in line_b: - line_b, _ = line_b.split(b"\x00", 1) - tags: Optional[str] = None - if line_b[0] == ord(b"@"): - tags_b, _, line_b = line_b.partition(b" ") - tags = tags_b.decode("utf8") - - try: - line = line_b.decode(encoding) - except UnicodeDecodeError: - line = line_b.decode(fallback) - - return _tokenise(tags, line) - -def tokenise(line: str) -> Line: - if "\x00" in line: - line, _ = line.split("\x00", 1) - - if line[0] == "@": - tags, _, line = line.partition(" ") - return _tokenise(tags, line) + dline: str = "" + if isinstance(line, bytes): + if line[0] == ord(b"@"): + tags_b, sep, line = line.partition(b" ") + dline += (tags_b+sep).decode("utf8") + try: + dline += line.decode(encoding) + except UnicodeDecodeError: + dline += line.decode(fallback) else: - return _tokenise(None, line) + dline = line + + if "\x00" in dline: + dline, _ = dline.split("\x00", 1) + + return _tokenise(dline) diff --git a/irctokens/stateful.py b/irctokens/stateful.py index e0de75d..b038518 100644 --- a/irctokens/stateful.py +++ b/irctokens/stateful.py @@ -1,5 +1,5 @@ from typing import List, Optional -from .line import Line, tokenise_b +from .line import Line, tokenise class StatefulDecoder(object): def __init__(self, encoding: str="utf8", fallback: str="latin-1"): @@ -23,7 +23,7 @@ class StatefulDecoder(object): lines: List[Line] = [] for line in lines_b: - lines.append(tokenise_b(line, self._encoding, self._fallback)) + lines.append(tokenise(line, self._encoding, self._fallback)) return lines class StatefulEncoder(object): diff --git a/test/tokenise.py b/test/tokenise.py index f3588c4..5de1370 100644 --- a/test/tokenise.py +++ b/test/tokenise.py @@ -86,3 +86,10 @@ class TokenTestNoCommand(unittest.TestCase): self.assertRaises(ValueError, _test1) self.assertRaises(ValueError, _test2) + +class TokenTestBytes(unittest.TestCase): + def test(self): + _str = irctokens.tokenise("@a=1 :n!u@h PRIVMSG #chan :hello word") + _bytes = irctokens.tokenise(b"@a=1 :n!u@h PRIVMSG #chan :hello word") + + self.assertEqual(_str, _bytes)