format with isort and black

This commit is contained in:
A_D 2022-01-30 20:14:08 +02:00
parent 14d38192cf
commit 717dabbc40
No known key found for this signature in database
GPG Key ID: 4BE9EB7DF45076C4
14 changed files with 143 additions and 82 deletions

View File

@ -1,4 +1,3 @@
from .line import Line, build, tokenise
from .hostmask import Hostmask, hostmask
from .line import Line, build, tokenise
from .stateful import StatefulDecoder, StatefulEncoder

View File

@ -1,2 +1,2 @@
TAG_UNESCAPED = ["\\", " ", ";", "\r", "\n"]
TAG_ESCAPED = ["\\\\", "\\s", "\\:", "\\r", "\\n"]
TAG_UNESCAPED = ["\\", " ", ";", "\r", "\n"]
TAG_ESCAPED = ["\\\\", "\\s", "\\:", "\\r", "\\n"]

View File

@ -1,16 +1,20 @@
from typing import Dict, List, Optional
from .const import TAG_ESCAPED, TAG_UNESCAPED
def _escape_tag(value: str):
for i, char in enumerate(TAG_UNESCAPED):
value = value.replace(char, TAG_ESCAPED[i])
return value
def format(
tags: Optional[Dict[str, str]],
source: Optional[str],
command: str,
params: List[str]):
tags: Optional[Dict[str, str]],
source: Optional[str],
command: str,
params: List[str],
):
outs: List[str] = []
if tags:
tags_str = []
@ -36,9 +40,7 @@ def format(
raise ValueError("non last params cannot start with colon")
outs.extend(params)
if (not last or
" " in last or
last.startswith(":")):
if not last or " " in last or last.startswith(":"):
last = f":{last}"
outs.append(last)
return " ".join(outs)

View File

@ -1,10 +1,14 @@
from typing import Optional
class Hostmask(object):
def __init__(self, source: str,
nickname: str,
username: Optional[str],
hostname: Optional[str]):
def __init__(
self,
source: str,
nickname: str,
username: Optional[str],
hostname: Optional[str],
):
self._source = source
self.nickname = nickname
self.username = username
@ -12,19 +16,18 @@ class Hostmask(object):
def __str__(self) -> str:
return self._source
def __repr__(self) -> str:
return f"Hostmask({self._source})"
def __eq__(self, other) -> bool:
if isinstance(other, Hostmask):
return str(self) == str(other)
else:
return False
def hostmask(source: str) -> Hostmask:
username, _, hostname = source.partition("@")
nickname, _, username = username.partition("!")
return Hostmask(
source,
nickname,
username or None,
hostname or None)
return Hostmask(source, nickname, username or None, hostname or None)

View File

@ -1,29 +1,37 @@
from typing import Dict, List, Optional, Union
from .const import TAG_ESCAPED, TAG_UNESCAPED
from .hostmask import Hostmask, hostmask
from typing import Dict, List, Optional, Union
from .const import TAG_ESCAPED, TAG_UNESCAPED
from .formatting import format as format_
from .hostmask import Hostmask, hostmask
class Line(object):
def __init__(self,
tags: Optional[Dict[str, str]],
source: Optional[str],
command: str,
params: List[str]):
self.tags = tags
self.source = source
def __init__(
self,
tags: Optional[Dict[str, str]],
source: Optional[str],
command: str,
params: List[str],
):
self.tags = tags
self.source = source
self.command = command
self.params = params
self.params = params
def __eq__(self, other) -> bool:
if isinstance(other, Line):
return self.format() == other.format()
else:
return False
def __repr__(self) -> str:
return (f"Line(tag={self.tags!r}, source={self.source!r}"
f", command={self.command!r}, params={self.params!r})")
return (
f"Line(tag={self.tags!r}, source={self.source!r}"
f", command={self.command!r}, params={self.params!r})"
)
_hostmask: Optional[Hostmask] = None
@property
def hostmask(self) -> Hostmask:
if self.source is not None:
@ -38,17 +46,20 @@ class Line(object):
def with_source(self, source: str) -> "Line":
return Line(self.tags, source, self.command, self.params)
def copy(self) -> "Line":
return Line(self.tags, self.source, self.command, self.params)
def build(
command: str,
params: List[str]=[],
source: Optional[str]=None,
tags: Optional[Dict[str, str]]=None
) -> Line:
command: str,
params: List[str] = [],
source: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
) -> Line:
return Line(tags, source, command, params)
def _unescape_tag(value: str) -> str:
unescaped, escaped = "", list(value)
while escaped:
@ -56,7 +67,7 @@ def _unescape_tag(value: str) -> str:
if current == "\\":
if escaped:
next = escaped.pop(0)
duo = current+next
duo = current + next
if duo in TAG_ESCAPED:
index = TAG_ESCAPED.index(duo)
unescaped += TAG_UNESCAPED[index]
@ -66,6 +77,7 @@ def _unescape_tag(value: str) -> str:
unescaped += current
return unescaped
def _tokenise(line: str) -> Line:
tags: Optional[Dict[str, str]] = None
if line[0] == "@":
@ -73,7 +85,7 @@ def _tokenise(line: str) -> Line:
tags = {}
for part in tags_s[1:].split(";"):
key, _, value = part.partition("=")
tags[key] = _unescape_tag(value)
tags[key] = _unescape_tag(value)
line, trailing_sep, trailing = line.partition(" :")
params = list(filter(bool, line.split(" ")))
@ -91,17 +103,16 @@ def _tokenise(line: str) -> Line:
return Line(tags, source, command, params)
def tokenise(
line: Union[str, bytes],
encoding: str="utf8",
fallback: str="latin-1"
) -> Line:
line: Union[str, bytes], encoding: str = "utf8", fallback: str = "latin-1"
) -> Line:
dline: str = ""
if isinstance(line, bytes):
if line[0] == ord(b"@"):
tags_b, sep, line = line.partition(b" ")
dline += (tags_b+sep).decode("utf8")
dline += (tags_b + sep).decode("utf8")
try:
dline += line.decode(encoding)
except UnicodeDecodeError:

View File

@ -1,8 +1,10 @@
from typing import List, Optional
from .line import Line, tokenise
from .line import Line, tokenise
class StatefulDecoder(object):
def __init__(self, encoding: str="utf8", fallback: str="latin-1"):
def __init__(self, encoding: str = "utf8", fallback: str = "latin-1"):
self._encoding = encoding
self._fallback = fallback
self.clear()
@ -26,8 +28,9 @@ class StatefulDecoder(object):
lines.append(tokenise(line, self._encoding, self._fallback))
return lines
class StatefulEncoder(object):
def __init__(self, encoding: str="utf8"):
def __init__(self, encoding: str = "utf8"):
self._encoding = encoding
self.clear()

View File

@ -22,7 +22,7 @@ setuptools.setup(
"Operating System :: OS Independent",
"Operating System :: POSIX",
"Operating System :: Microsoft :: Windows",
"Topic :: Communications :: Chat :: Internet Relay Chat"
"Topic :: Communications :: Chat :: Internet Relay Chat",
],
python_requires='>=3.6'
python_requires=">=3.6",
)

View File

@ -1,6 +1,6 @@
from .tokenise import *
from .format import *
from .format import *
from .hostmask import *
from .parser_tests import *
from .stateful_decode import *
from .stateful_encode import *
from .hostmask import *
from .parser_tests import *
from .tokenise import *

View File

@ -1,10 +1,13 @@
import unittest
import irctokens
class FormatTestTags(unittest.TestCase):
def test(self):
line = irctokens.build("PRIVMSG", ["#channel", "hello"],
tags={"id": "\\" + " " + ";" + "\r\n"}).format()
line = irctokens.build(
"PRIVMSG", ["#channel", "hello"], tags={"id": "\\" + " " + ";" + "\r\n"}
).format()
self.assertEqual(line, "@id=\\\\\\s\\:\\r\\n PRIVMSG #channel hello")
def test_missing(self):
@ -12,29 +15,36 @@ class FormatTestTags(unittest.TestCase):
self.assertEqual(line, "PRIVMSG #channel hello")
def test_none_value(self):
line = irctokens.build("PRIVMSG", ["#channel", "hello"],
tags={"a": None}).format()
line = irctokens.build(
"PRIVMSG", ["#channel", "hello"], tags={"a": None}
).format()
self.assertEqual(line, "@a PRIVMSG #channel hello")
def test_empty_value(self):
line = irctokens.build("PRIVMSG", ["#channel", "hello"], tags={"a": ""}
).format()
line = irctokens.build(
"PRIVMSG", ["#channel", "hello"], tags={"a": ""}
).format()
self.assertEqual(line, "@a PRIVMSG #channel hello")
class FormatTestSource(unittest.TestCase):
def test(self):
line = irctokens.build("PRIVMSG", ["#channel", "hello"],
source="nick!user@host").format()
line = irctokens.build(
"PRIVMSG", ["#channel", "hello"], source="nick!user@host"
).format()
self.assertEqual(line, ":nick!user@host PRIVMSG #channel hello")
class FormatTestCommand(unittest.TestCase):
def test_lowercase(self):
line = irctokens.build("privmsg").format()
self.assertEqual(line, "privmsg")
def test_uppercase(self):
line = irctokens.build("PRIVMSG").format()
self.assertEqual(line, "PRIVMSG")
class FormatTestTrailing(unittest.TestCase):
def test_space(self):
line = irctokens.build("PRIVMSG", ["#channel", "hello world"]).format()
@ -48,13 +58,16 @@ class FormatTestTrailing(unittest.TestCase):
line = irctokens.build("PRIVMSG", ["#channel", ":helloworld"]).format()
self.assertEqual(line, "PRIVMSG #channel ::helloworld")
class FormatTestInvalidParam(unittest.TestCase):
def test_non_last_space(self):
def _inner():
irctokens.build("USER", ["user", "0 *", "real name"]).format()
self.assertRaises(ValueError, _inner)
def test_non_last_colon(self):
def _inner():
irctokens.build("PRIVMSG", [":#channel", "hello"]).format()
self.assertRaises(ValueError, _inner)

View File

@ -1,6 +1,8 @@
import unittest
import irctokens
class HostmaskTest(unittest.TestCase):
def test_all(self):
hostmask = irctokens.hostmask("nick!user@host")
@ -36,6 +38,8 @@ class HostmaskTest(unittest.TestCase):
def test_none_source(self):
line = irctokens.tokenise("PRIVMSG #channel hello")
def _hostmask():
line.hostmask
self.assertRaises(ValueError, _hostmask)

View File

@ -1,13 +1,17 @@
import os.path, unittest
import os.path
import unittest
import yaml
import irctokens
# run test cases sourced from:
# https://github.com/ircdocs/parser-tests
dir = os.path.dirname(os.path.realpath(__file__))
dir = os.path.dirname(os.path.realpath(__file__))
data_dir = os.path.join(dir, "_data")
class ParserTestsSplit(unittest.TestCase):
def test_split(self):
data_path = os.path.join(data_dir, "msg-split.yaml")
@ -20,10 +24,10 @@ class ParserTestsSplit(unittest.TestCase):
tokens = irctokens.tokenise(input)
self.assertEqual(tokens.tags, atoms.get("tags", None))
self.assertEqual(tokens.source, atoms.get("source", None))
self.assertEqual(tokens.tags, atoms.get("tags", None))
self.assertEqual(tokens.source, atoms.get("source", None))
self.assertEqual(tokens.command, atoms["verb"].upper())
self.assertEqual(tokens.params, atoms.get("params", []))
self.assertEqual(tokens.params, atoms.get("params", []))
def test_join(self):
data_path = os.path.join(data_dir, "msg-join.yaml")
@ -31,13 +35,14 @@ class ParserTestsSplit(unittest.TestCase):
tests = yaml.safe_load(data_file.read())["tests"]
for test in tests:
atoms = test["atoms"]
atoms = test["atoms"]
matches = test["matches"]
line = irctokens.build(
atoms["verb"],
atoms.get("params", []),
source=atoms.get("source", None),
tags=atoms.get("tags", None)).format()
tags=atoms.get("tags", None),
).format()
self.assertIn(line, matches)

View File

@ -1,6 +1,8 @@
import unittest
import irctokens
class DecodeTestPartial(unittest.TestCase):
def test(self):
d = irctokens.StatefulDecoder()
@ -12,11 +14,11 @@ class DecodeTestPartial(unittest.TestCase):
line = irctokens.tokenise("PRIVMSG #channel hello")
self.assertEqual(lines, [line])
class DecodeTestMultiple(unittest.TestCase):
def test(self):
d = irctokens.StatefulDecoder()
lines = d.push(b"PRIVMSG #channel1 hello\r\n"
b"PRIVMSG #channel2 hello\r\n")
lines = d.push(b"PRIVMSG #channel1 hello\r\n" b"PRIVMSG #channel2 hello\r\n")
self.assertEqual(len(lines), 2)
line1 = irctokens.tokenise("PRIVMSG #channel1 hello")
@ -24,12 +26,14 @@ class DecodeTestMultiple(unittest.TestCase):
self.assertEqual(lines[0], line1)
self.assertEqual(lines[1], line2)
class DecodeTestEncoding(unittest.TestCase):
def test(self):
d = irctokens.StatefulDecoder(encoding="iso-8859-2")
lines = d.push("PRIVMSG #channel :hello Č\r\n".encode("iso-8859-2"))
line = irctokens.tokenise("PRIVMSG #channel :hello Č")
self.assertEqual(lines[0], line)
def test_fallback(self):
d = irctokens.StatefulDecoder(fallback="latin-1")
lines = d.push("PRIVMSG #channel hélló\r\n".encode("latin-1"))
@ -37,6 +41,7 @@ class DecodeTestEncoding(unittest.TestCase):
line = irctokens.tokenise("PRIVMSG #channel hélló")
self.assertEqual(lines[0], line)
class DecodeTestEmpty(unittest.TestCase):
def test_immediate(self):
d = irctokens.StatefulDecoder()
@ -49,6 +54,7 @@ class DecodeTestEmpty(unittest.TestCase):
lines = d.push(b"")
self.assertIsNone(lines)
class DecodeTestClear(unittest.TestCase):
def test(self):
d = irctokens.StatefulDecoder()
@ -56,11 +62,12 @@ class DecodeTestClear(unittest.TestCase):
d.clear()
self.assertEqual(d.pending(), b"")
class DecodeTestTagEncodingMismatch(unittest.TestCase):
def test(self):
d = irctokens.StatefulDecoder()
d.push("@asd=á ".encode("utf8"))
lines = d.push("PRIVMSG #chan :á\r\n".encode("latin-1"))
self.assertEqual(lines[0].params[1], "á")
self.assertEqual(lines[0].params[1], "á")
self.assertEqual(lines[0].tags["asd"], "á")

View File

@ -1,6 +1,8 @@
import unittest
import irctokens
class EncodeTestPush(unittest.TestCase):
def test(self):
e = irctokens.StatefulEncoder()
@ -8,6 +10,7 @@ class EncodeTestPush(unittest.TestCase):
e.push(line)
self.assertEqual(e.pending(), b"PRIVMSG #channel hello\r\n")
class EncodeTestPop(unittest.TestCase):
def test_partial(self):
e = irctokens.StatefulEncoder()
@ -32,6 +35,7 @@ class EncodeTestPop(unittest.TestCase):
lines = e.pop(1)
self.assertEqual(len(lines), 0)
class EncodeTestClear(unittest.TestCase):
def test(self):
e = irctokens.StatefulEncoder()
@ -39,9 +43,11 @@ class EncodeTestClear(unittest.TestCase):
e.clear()
self.assertEqual(e.pending(), b"")
class EncodeTestEncoding(unittest.TestCase):
def test(self):
e = irctokens.StatefulEncoder(encoding="iso-8859-2")
e.push(irctokens.tokenise("PRIVMSG #channel :hello Č"))
self.assertEqual(e.pending(),
"PRIVMSG #channel :hello Č\r\n".encode("iso-8859-2"))
self.assertEqual(
e.pending(), "PRIVMSG #channel :hello Č\r\n".encode("iso-8859-2")
)

View File

@ -1,6 +1,8 @@
import unittest
import irctokens
class TokenTestTags(unittest.TestCase):
def test_missing(self):
line = irctokens.tokenise("PRIVMSG #channel")
@ -26,6 +28,7 @@ class TokenTestTags(unittest.TestCase):
line = irctokens.tokenise("@id=1\\ PRIVMSG #channel")
self.assertEqual(line.tags["id"], "1")
class TokenTestSource(unittest.TestCase):
def test_without_tags(self):
line = irctokens.tokenise(":nick!user@host PRIVMSG #channel")
@ -43,11 +46,13 @@ class TokenTestSource(unittest.TestCase):
line = irctokens.tokenise("@id=123 PRIVMSG #channel")
self.assertIsNone(line.source)
class TokenTestCommand(unittest.TestCase):
def test_lowercase(self):
line = irctokens.tokenise("privmsg #channel")
self.assertEqual(line.command, "PRIVMSG")
class TokenTestParams(unittest.TestCase):
def test_trailing(self):
line = irctokens.tokenise("PRIVMSG #channel :hello world")
@ -62,44 +67,47 @@ class TokenTestParams(unittest.TestCase):
self.assertEqual(line.command, "PRIVMSG")
self.assertEqual(line.params, [])
class TokenTestAll(unittest.TestCase):
def test_all(self):
line = irctokens.tokenise(
"@id=123 :nick!user@host PRIVMSG #channel :hello world")
"@id=123 :nick!user@host PRIVMSG #channel :hello world"
)
self.assertEqual(line.tags, {"id": "123"})
self.assertEqual(line.source, "nick!user@host")
self.assertEqual(line.command, "PRIVMSG")
self.assertEqual(line.params, ["#channel", "hello world"])
class TokenTestTruncate(unittest.TestCase):
def test_null(self):
line = irctokens.tokenise(
":nick!user@host PRIVMSG #channel :hello\x00 world")
line = irctokens.tokenise(":nick!user@host PRIVMSG #channel :hello\x00 world")
self.assertEqual(line.params, ["#channel", "hello"])
def test_cr(self):
line = irctokens.tokenise(
":nick!user@host PRIVMSG #channel :hello\r world")
line = irctokens.tokenise(":nick!user@host PRIVMSG #channel :hello\r world")
self.assertEqual(line.params, ["#channel", "hello"])
def test_lf(self):
line = irctokens.tokenise(
":nick!user@host PRIVMSG #channel :hello\n world")
line = irctokens.tokenise(":nick!user@host PRIVMSG #channel :hello\n world")
self.assertEqual(line.params, ["#channel", "hello"])
class TokenTestNoCommand(unittest.TestCase):
def test(self):
def _test1():
line = irctokens.tokenise(":n!u@h")
def _test2():
line = irctokens.tokenise("@tag=1 :n!u@h")
self.assertRaises(ValueError, _test1)
self.assertRaises(ValueError, _test2)
class TokenTestBytes(unittest.TestCase):
def test(self):
_str = irctokens.tokenise("@a=1 :n!u@h PRIVMSG #chan :hello word")
_str = irctokens.tokenise("@a=1 :n!u@h PRIVMSG #chan :hello word")
_bytes = irctokens.tokenise(b"@a=1 :n!u@h PRIVMSG #chan :hello word")
self.assertEqual(_str, _bytes)