a better solution to cyclical imports

This commit is contained in:
jesopo 2020-04-21 17:23:37 +01:00
parent c5ed9fb605
commit f1f6bb0d61
6 changed files with 70 additions and 64 deletions

View File

@ -1,3 +1,4 @@
from .protocol import build, format, tokenise from .tokenise import tokenise
from .stateful import StatefulDecoder, StatefulEncoder from .formatting import format
from .objects import Hostmask, Line from .objects import build, Line, Hostmask
from .stateful import StatefulDecoder, StatefulEncoder

2
irctokens/const.py Normal file
View File

@ -0,0 +1,2 @@
TAG_UNESCAPED = ["\\", " ", ";", "\r", "\n"]
TAG_ESCAPED = ["\\\\", "\\s", "\\:", "\\r", "\\n"]

44
irctokens/formatting.py Normal file
View File

@ -0,0 +1,44 @@
from typing import Dict, List, Optional
from .const import TAG_ESCAPED, TAG_UNESCAPED
def _escape_tag(value: str):
for i, char in enumerate(TAG_UNESCAPED):
value = value.replace(char, TAG_ESCAPED[i])
return value
def format(
tags: Optional[Dict[str, str]],
source: Optional[str],
command: str,
params: List[str]):
outs: List[str] = []
if tags:
tags_str = []
for key in sorted(tags.keys()):
if tags[key]:
value = tags[key] or ""
tags_str.append(f"{key}={_escape_tag(value)}")
else:
tags_str.append(key)
outs.append(f"@{';'.join(tags_str)}")
if source is not None:
outs.append(f":{source}")
outs.append(command)
params = params.copy()
if params:
last = params.pop(-1)
for param in params:
if " " in param:
raise ValueError("non last params cannot have spaces")
elif param.startswith(":"):
raise ValueError("non last params cannot start with colon")
outs.extend(params)
if (not last or
" " in last or
last.startswith(":")):
last = f":{last}"
outs.append(last)
return " ".join(outs)

View File

@ -1,4 +1,5 @@
from typing import Callable, Dict, List, Optional from typing import Dict, List, Optional
from .formatting import format as format_
class Hostmask(object): class Hostmask(object):
def __init__(self, source: str, def __init__(self, source: str,
@ -32,13 +33,11 @@ class Line(object):
tags: Optional[Dict[str, str]], tags: Optional[Dict[str, str]],
source: Optional[str], source: Optional[str],
command: str, command: str,
params: List[str], params: List[str]):
format: Callable[["Line"], str]):
self.tags = tags self.tags = tags
self.source = source self.source = source
self.command = command self.command = command
self.params = params self.params = params
self._format = format
def __eq__(self, other) -> bool: def __eq__(self, other) -> bool:
if isinstance(other, Line): if isinstance(other, Line):
@ -57,11 +56,17 @@ class Line(object):
return self._hostmask return self._hostmask
def format(self) -> str: def format(self) -> str:
return self._format(self) return format_(self.tags, self.source, self.command, self.params)
def with_source(self, source: str) -> "Line": def with_source(self, source: str) -> "Line":
return Line(self.tags, source, self.command, self.params, self._format) return Line(self.tags, source, self.command, self.params)
def copy(self) -> "Line": def copy(self) -> "Line":
return Line(self.tags, self.source, self.command, self.params, return Line(self.tags, self.source, self.command, self.params)
self._format)
def build(
command: str,
params: List[str]=[],
source: Optional[str]=None,
tags: Optional[Dict[str, str]]=None
) -> Line:
return Line(tags, source, command, params)

View File

@ -1,5 +1,6 @@
from typing import List, Optional from typing import List, Optional
from .protocol import Line, tokenise_b from .objects import Line
from .tokenise import tokenise_b
class StatefulDecoder(object): class StatefulDecoder(object):
def __init__(self, encoding: str="utf8", fallback: str="latin-1"): def __init__(self, encoding: str="utf8", fallback: str="latin-1"):

View File

@ -1,8 +1,6 @@
from typing import Dict, List, Optional from typing import Dict, Optional
from .objects import Hostmask, Line from .objects import Line
from .const import TAG_ESCAPED, TAG_UNESCAPED
TAG_UNESCAPED = ["\\", " ", ";", "\r", "\n"]
TAG_ESCAPED = ["\\\\", "\\s", "\\:", "\\r", "\\n"]
def _unescape_tag(value: str): def _unescape_tag(value: str):
unescaped, escaped = "", list(value) unescaped, escaped = "", list(value)
@ -20,51 +18,6 @@ def _unescape_tag(value: str):
else: else:
unescaped += current unescaped += current
return unescaped return unescaped
def _escape_tag(value: str):
for i, char in enumerate(TAG_UNESCAPED):
value = value.replace(char, TAG_ESCAPED[i])
return value
def format(line: Line) -> str:
outs: List[str] = []
if line.tags:
tags_str = []
for key in sorted(line.tags.keys()):
if line.tags[key]:
value = line.tags[key] or ""
tags_str.append(f"{key}={_escape_tag(value)}")
else:
tags_str.append(key)
outs.append(f"@{';'.join(tags_str)}")
if line.source:
outs.append(f":{line.source}")
outs.append(line.command)
params = line.params.copy()
if line.params:
last = params.pop(-1)
for param in params:
if " " in param:
raise ValueError("non last params cannot have spaces")
elif param.startswith(":"):
raise ValueError("non last params cannot start with colon")
outs.extend(params)
if (not last or
" " in last or
last.startswith(":")):
last = f":{last}"
outs.append(last)
return " ".join(outs)
def build(
command: str,
params: List[str]=[],
source: Optional[str]=None,
tags: Optional[Dict[str, str]]=None
) -> Line:
return Line(tags, source, command, params, format)
def _tokenise(tags_s: Optional[str], line: str) -> Line: def _tokenise(tags_s: Optional[str], line: str) -> Line:
tags: Optional[Dict[str, str]] = None tags: Optional[Dict[str, str]] = None
@ -86,7 +39,7 @@ def _tokenise(tags_s: Optional[str], line: str) -> Line:
if trailing_sep: if trailing_sep:
params.append(trailing) params.append(trailing)
return build(command, params, source, tags) return Line(tags, source, command, params)
def tokenise_b(line_b: bytes, def tokenise_b(line_b: bytes,
encoding: str="utf8", encoding: str="utf8",