use "from typing import x"

This commit is contained in:
jesopo 2020-03-11 21:45:26 +00:00
parent 6cb454b755
commit 91648986c8
2 changed files with 13 additions and 14 deletions

View File

@ -1,4 +1,4 @@
import typing
from typing import Dict, List, Optional
TAG_ESCAPE = ["\\", " ", ";", "\r", "\n"]
TAG_UNESCAPE = ["\\\\", "\s", "\:", r"\r", r"\n"]
@ -33,11 +33,10 @@ class Hostmask(object):
class Line(object):
def __init__(self,
tags:
typing.Optional[typing.Dict[str, typing.Optional[str]]]=None,
source: typing.Optional[str]=None,
tags: Optional[Dict[str, Optional[str]]]=None,
source: Optional[str]=None,
command: str="",
params: typing.List[str]=None):
params: List[str]=None):
self.tags = tags
self.source = source
self.command = command
@ -52,14 +51,14 @@ class Line(object):
return (f"Line(tag={self.tags}, source={self.source}"
f", command={self.command}, params={self.params})")
_hostmask: typing.Optional[Hostmask] = None
_hostmask: Optional[Hostmask] = None
@property
def hostmask(self):
self._hostmask = self._hostmask or Hostmask(self.source)
return self._hostmask
def format(self) -> str:
outs: typing.List[str] = []
outs: List[str] = []
if self.tags:
tags_str = []
for key in sorted(self.tags.keys()):
@ -113,8 +112,8 @@ def tokenise(line: str) -> Line:
def format(
command: str,
params: typing.List[str]=[],
source: typing.Optional[str]=None,
tags: typing.Optional[typing.Dict[str, typing.Optional[str]]]=None
params: List[str]=[],
source: Optional[str]=None,
tags: Optional[Dict[str, Optional[str]]]=None
) -> str:
return Line(tags, source, command, params).format()

View File

@ -1,4 +1,4 @@
import typing
from typing import List, Optional
from .protocol import Line, tokenise
class StatefulDecoder(object):
@ -13,7 +13,7 @@ class StatefulDecoder(object):
def pending(self) -> bytes:
return self._buffer
def push(self, data: bytes) -> typing.Optional[typing.List[Line]]:
def push(self, data: bytes) -> Optional[List[Line]]:
if not data:
return None
@ -21,7 +21,7 @@ class StatefulDecoder(object):
lines = [l.strip(b"\r") for l in self._buffer.split(b"\n")]
self._buffer = lines.pop(-1)
decode_lines: typing.List[str] = []
decode_lines: List[str] = []
for line in lines:
try:
decode_lines.append(line.decode(self._encoding))
@ -36,7 +36,7 @@ class StatefulEncoder(object):
def clear(self):
self._buffer = b""
self._buffered_lines: typing.List[Line] = []
self._buffered_lines: List[Line] = []
def pending(self) -> bytes:
return self._buffer