mirror of https://github.com/jesopo/irctokens
split Line and Hostmask out to objects.py
This commit is contained in:
parent
78ecadce3d
commit
cddcc00610
|
@ -1,2 +1,3 @@
|
|||
from .protocol import Line, tokenise, build, Hostmask
|
||||
from .protocol import build, format, tokenise
|
||||
from .stateful import StatefulDecoder, StatefulEncoder
|
||||
from .objects import Hostmask, Line
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
from typing import Callable, Dict, List, Optional
|
||||
|
||||
class Hostmask(object):
|
||||
def __init__(self, source: str,
|
||||
nickname: str,
|
||||
username: Optional[str],
|
||||
hostname: Optional[str]):
|
||||
self._source = source
|
||||
self.nickname = nickname
|
||||
self.username = username
|
||||
self.hostname = hostname
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self._source
|
||||
def __repr__(self) -> str:
|
||||
return (f"Hostmask(nick={self.nickname!r}, user={self.username!r}"
|
||||
f", host={self.hostname!r})")
|
||||
def __eq__(self, other) -> bool:
|
||||
if isinstance(other, Hostmask):
|
||||
return str(self) == str(other)
|
||||
else:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def from_source(source: str):
|
||||
username, _, hostname = source.partition("@")
|
||||
nickname, _, username = username.partition("!")
|
||||
return Hostmask(source, nickname, username or None, hostname or None)
|
||||
|
||||
class Line(object):
|
||||
def __init__(self,
|
||||
tags: Optional[Dict[str, str]],
|
||||
source: Optional[str],
|
||||
command: str,
|
||||
params: List[str],
|
||||
format: Callable[["Line"], str]):
|
||||
self.tags = tags
|
||||
self.source = source
|
||||
self.command = command
|
||||
self.params = params
|
||||
self._format = format
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if isinstance(other, Line):
|
||||
return self.format() == other.format()
|
||||
else:
|
||||
return False
|
||||
def __repr__(self) -> str:
|
||||
return (f"Line(tag={self.tags!r}, source={self.source!r}"
|
||||
f", command={self.command!r}, params={self.params!r})")
|
||||
|
||||
_hostmask: Optional[Hostmask] = None
|
||||
@property
|
||||
def hostmask(self):
|
||||
if self.source and not self._hostmask:
|
||||
self._hostmask = Hostmask.from_source(self.source)
|
||||
return self._hostmask
|
||||
|
||||
def format(self) -> str:
|
||||
return self._format(self)
|
|
@ -1,4 +1,5 @@
|
|||
from typing import Dict, List, Optional
|
||||
from typing import Dict, List, Optional
|
||||
from .objects import Hostmask, Line
|
||||
|
||||
TAG_UNESCAPED = ["\\", " ", ";", "\r", "\n"]
|
||||
TAG_ESCAPED = ["\\\\", "\\s", "\\:", "\\r", "\\n"]
|
||||
|
@ -24,117 +25,38 @@ def _escape_tag(value: str):
|
|||
value = value.replace(char, TAG_ESCAPED[i])
|
||||
return value
|
||||
|
||||
class Hostmask(object):
|
||||
def __init__(self, source: str,
|
||||
nickname: str,
|
||||
username: Optional[str],
|
||||
hostname: Optional[str]):
|
||||
self._source = source
|
||||
self.nickname = nickname
|
||||
self.username = username
|
||||
self.hostname = hostname
|
||||
def format(line: Line) -> str:
|
||||
outs: List[str] = []
|
||||
if line.tags:
|
||||
tags_str = []
|
||||
for key in sorted(line.tags.keys()):
|
||||
if line.tags[key]:
|
||||
value = line.tags[key] or ""
|
||||
tags_str.append(f"{key}={_escape_tag(value)}")
|
||||
else:
|
||||
tags_str.append(key)
|
||||
outs.append(f"@{';'.join(tags_str)}")
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self._source
|
||||
def __repr__(self) -> str:
|
||||
return (f"Hostmask(nick={self.nickname!r}, user={self.username!r}"
|
||||
f", host={self.hostname!r})")
|
||||
def __eq__(self, other) -> bool:
|
||||
if isinstance(other, Hostmask):
|
||||
return str(self) == str(other)
|
||||
else:
|
||||
return False
|
||||
if line.source:
|
||||
outs.append(f":{line.source}")
|
||||
outs.append(line.command)
|
||||
|
||||
@staticmethod
|
||||
def from_source(source: str):
|
||||
username, _, hostname = source.partition("@")
|
||||
nickname, _, username = username.partition("!")
|
||||
return Hostmask(source, nickname, username or None, hostname or None)
|
||||
params = line.params.copy()
|
||||
if line.params:
|
||||
last = params.pop(-1)
|
||||
for param in params:
|
||||
if " " in param:
|
||||
raise ValueError("non last params cannot have spaces")
|
||||
elif param.startswith(":"):
|
||||
raise ValueError("non last params cannot start with colon")
|
||||
outs.extend(params)
|
||||
|
||||
class Line(object):
|
||||
def __init__(self,
|
||||
tags: Optional[Dict[str, str]]=None,
|
||||
source: Optional[str]=None,
|
||||
command: str="",
|
||||
params: List[str]=None):
|
||||
self.tags = tags
|
||||
self.source = source
|
||||
self.command = command
|
||||
self.params = params or []
|
||||
|
||||
def __eq__(self, other) -> bool:
|
||||
if isinstance(other, Line):
|
||||
return self.format() == other.format()
|
||||
else:
|
||||
return False
|
||||
def __repr__(self) -> str:
|
||||
return (f"Line(tag={self.tags!r}, source={self.source!r}"
|
||||
f", command={self.command!r}, params={self.params!r})")
|
||||
|
||||
_hostmask: Optional[Hostmask] = None
|
||||
@property
|
||||
def hostmask(self):
|
||||
if self.source and not self._hostmask:
|
||||
self._hostmask = Hostmask.from_source(self.source)
|
||||
return self._hostmask
|
||||
|
||||
def format(self) -> str:
|
||||
outs: List[str] = []
|
||||
if self.tags:
|
||||
tags_str = []
|
||||
for key in sorted(self.tags.keys()):
|
||||
if self.tags[key]:
|
||||
value = self.tags[key] or ""
|
||||
tags_str.append(f"{key}={_escape_tag(value)}")
|
||||
else:
|
||||
tags_str.append(key)
|
||||
outs.append(f"@{';'.join(tags_str)}")
|
||||
|
||||
if self.source:
|
||||
outs.append(f":{self.source}")
|
||||
outs.append(self.command)
|
||||
|
||||
params = self.params.copy()
|
||||
if self.params:
|
||||
last = params.pop(-1)
|
||||
for param in params:
|
||||
if " " in param:
|
||||
raise ValueError("non last params cannot have spaces")
|
||||
elif param.startswith(":"):
|
||||
raise ValueError("non last params cannot start with colon")
|
||||
outs.extend(params)
|
||||
|
||||
if (not last or
|
||||
" " in last or
|
||||
last.startswith(":")):
|
||||
last = f":{last}"
|
||||
outs.append(last)
|
||||
return " ".join(outs)
|
||||
|
||||
def tokenise(line: str) -> Line:
|
||||
line_obj = Line()
|
||||
|
||||
if line[0] == "@":
|
||||
message_tags, _, line = line.partition(" ")
|
||||
tags = {}
|
||||
for part in message_tags[1:].split(";"):
|
||||
key, _, value = part.partition("=")
|
||||
tags[key] = _unescape_tag(value)
|
||||
line_obj.tags = tags
|
||||
|
||||
line, trailing_sep, trailing = line.partition(" :")
|
||||
params = list(filter(bool, line.split(" ")))
|
||||
|
||||
if params[0][0] == ":":
|
||||
line_obj.source = params.pop(0)[1:]
|
||||
|
||||
line_obj.command = params.pop(0).upper()
|
||||
|
||||
if trailing_sep:
|
||||
params.append(trailing)
|
||||
line_obj.params = params
|
||||
|
||||
return line_obj
|
||||
if (not last or
|
||||
" " in last or
|
||||
last.startswith(":")):
|
||||
last = f":{last}"
|
||||
outs.append(last)
|
||||
return " ".join(outs)
|
||||
|
||||
def build(
|
||||
command: str,
|
||||
|
@ -142,4 +64,28 @@ def build(
|
|||
source: Optional[str]=None,
|
||||
tags: Optional[Dict[str, str]]=None
|
||||
) -> Line:
|
||||
return Line(tags, source, command, params)
|
||||
return Line(tags, source, command, params, format)
|
||||
|
||||
def tokenise(line: str) -> Line:
|
||||
tags: Optional[Dict[str, str]] = None
|
||||
if line[0] == "@":
|
||||
message_tags, _, line = line.partition(" ")
|
||||
tags = {}
|
||||
for part in message_tags[1:].split(";"):
|
||||
key, _, value = part.partition("=")
|
||||
tags[key] = _unescape_tag(value)
|
||||
|
||||
line, trailing_sep, trailing = line.partition(" :")
|
||||
params = list(filter(bool, line.split(" ")))
|
||||
|
||||
source: Optional[str] = None
|
||||
if params[0][0] == ":":
|
||||
source = params.pop(0)[1:]
|
||||
|
||||
command = params.pop(0).upper()
|
||||
|
||||
if trailing_sep:
|
||||
params.append(trailing)
|
||||
|
||||
return build(command, params, source, tags)
|
||||
|
||||
|
|
Loading…
Reference in New Issue