implement a stateful decoder (irctokens.StatefulDecoder())

This commit is contained in:
jesopo 2020-03-11 14:50:31 +00:00
parent 934a901ba1
commit 949b10945e
7 changed files with 182 additions and 97 deletions

View File

@ -10,5 +10,4 @@ install:
- pip3 install mypy
script:
- mypy irctokens
- python3 -m unittest test.tokenise
- python3 -m unittest test.format
- python3 -m unittest test

View File

@ -13,6 +13,7 @@ where you don't expect them or not being where you expect them.
### tokenisation
```python
import irctokens
line = irctokens.tokenise(
"@id=123 :jess!~jess@hostname PRIVMSG #chat :hello there!")
@ -22,14 +23,35 @@ if line.command == "PRIVMSG":
```
### formatting
```python
import socket
import irctokens
sock = socket.socket()
sock.connect(("127.0.0.1", 6667))
line = irctokens.format("USER", ["user", "0", "*", "real name"])
to_send = "%s\r\n" % line
sock.send(to_send.encode("utf8"))
>>> import irctokens
>>> irctokens.format("USER", ["user", "0", "*", "real name"])
'USER user 0 * :real name'
```
### stateful
```python
import irctokens, socket
d = irctokens.StatefulDecoder()
s = socket.socket()
s.connect(("127.0.0.1", 6667))
def _send(line):
s.send(f"{line}\r\n".encode("utf8"))
_send(irctokens.format("USER", ["username", "0", "*", "real name"]))
_send(irctokens.format("NICK", ["nickname"]))
while True:
lines = d.push(s.recv(1024))
for line in lines:
if line.command == "PING":
to_send = irctokens.format("PONG", [line.params[0]])
_send(to_send)
elif line.command == "001":
to_send = irctokens.format("JOIN", ["#test"])
_send(to_send)
```

View File

@ -1,86 +1,2 @@
import typing
TAG_ESCAPE = ["\\", " ", ";", "\r", "\n"]
TAG_UNESCAPE = ["\\\\", "\s", "\:", r"\r", r"\n"]
def _unescape_tag(value: str):
for i, char in enumerate(TAG_UNESCAPE):
value = value.replace(char, TAG_ESCAPE[i])
return value
def _escape_tag(value: str):
for i, char in enumerate(TAG_ESCAPE):
value = value.replace(char, TAG_UNESCAPE[i])
return value
class Line(object):
def __init__(self,
tags:
typing.Optional[typing.Dict[str, typing.Optional[str]]]=None,
source: typing.Optional[str]=None,
command: str="",
params: typing.List[str]=[]):
self.tags = tags
self.source = source
self.command = command
self.params = params
def format(self) -> str:
outs: typing.List[str] = []
if self.tags:
tags_str = []
for key in sorted(self.tags.keys()):
if self.tags[key]:
tags_str.append(
"%s=%s" % (key, _escape_tag(self.tags[key] or "")))
else:
tags_str.append(key)
outs.append("@%s" % ";".join(tags_str))
if self.source:
outs.append(":%s" % self.source)
outs.append(self.command.upper())
params = self.params.copy()
if self.params:
last = params.pop(-1)
outs.extend(params)
if " " in last:
last = ":%s" % last
outs.append(last)
return " ".join(outs)
def tokenise(line: str) -> Line:
line_obj = Line()
if line[0] == "@":
message_tags, _, line = line.partition(" ")
tags = {}
for part in message_tags[1:].split(";"):
key, _, value = part.partition("=")
if value:
tags[key] = _unescape_tag(value)
else:
tags[key] = None
line_obj.tags = tags
line, _, trailing = line.partition(" :")
params = list(filter(bool, line.split(" ")))
if params[0][0] == ":":
line_obj.source = params.pop(0)[1:]
line_obj.command = params.pop(0).upper()
if trailing:
params.append(trailing)
line_obj.params = params
return line_obj
def format(
command: str,
params: typing.List[str]=[],
source: typing.Optional[str]=None,
tags: typing.Optional[typing.Dict[str, typing.Optional[str]]]=None
) -> str:
return Line(tags, source, command, params).format()
from .protocol import Line, tokenise, format
from .stateful import StatefulDecoder

92
irctokens/protocol.py Normal file
View File

@ -0,0 +1,92 @@
import typing
TAG_ESCAPE = ["\\", " ", ";", "\r", "\n"]
TAG_UNESCAPE = ["\\\\", "\s", "\:", r"\r", r"\n"]
def _unescape_tag(value: str):
for i, char in enumerate(TAG_UNESCAPE):
value = value.replace(char, TAG_ESCAPE[i])
return value
def _escape_tag(value: str):
for i, char in enumerate(TAG_ESCAPE):
value = value.replace(char, TAG_UNESCAPE[i])
return value
class Line(object):
def __init__(self,
tags:
typing.Optional[typing.Dict[str, typing.Optional[str]]]=None,
source: typing.Optional[str]=None,
command: str="",
params: typing.List[str]=[]):
self.tags = tags
self.source = source
self.command = command
self.params = params
def __eq__(self, other):
if isinstance(other, Line):
return self.format() == other.format()
else:
return False
def format(self) -> str:
outs: typing.List[str] = []
if self.tags:
tags_str = []
for key in sorted(self.tags.keys()):
if self.tags[key]:
tags_str.append(
"%s=%s" % (key, _escape_tag(self.tags[key] or "")))
else:
tags_str.append(key)
outs.append("@%s" % ";".join(tags_str))
if self.source:
outs.append(":%s" % self.source)
outs.append(self.command.upper())
params = self.params.copy()
if self.params:
last = params.pop(-1)
outs.extend(params)
if " " in last:
last = ":%s" % last
outs.append(last)
return " ".join(outs)
def tokenise(line: str) -> Line:
line_obj = Line()
if line[0] == "@":
message_tags, _, line = line.partition(" ")
tags = {}
for part in message_tags[1:].split(";"):
key, _, value = part.partition("=")
if value:
tags[key] = _unescape_tag(value)
else:
tags[key] = None
line_obj.tags = tags
line, _, trailing = line.partition(" :")
params = list(filter(bool, line.split(" ")))
if params[0][0] == ":":
line_obj.source = params.pop(0)[1:]
line_obj.command = params.pop(0).upper()
if trailing:
params.append(trailing)
line_obj.params = params
return line_obj
def format(
command: str,
params: typing.List[str]=[],
source: typing.Optional[str]=None,
tags: typing.Optional[typing.Dict[str, typing.Optional[str]]]=None
) -> str:
return Line(tags, source, command, params).format()

20
irctokens/stateful.py Normal file
View File

@ -0,0 +1,20 @@
import typing
from .protocol import Line, tokenise
class StatefulDecoder(object):
def __init__(self, fallback: str="iso-8859"):
self._fallback = fallback
self._buffer = b""
def push(self, data: bytes) -> typing.List[Line]:
self._buffer += data
lines = [l.strip(b"\r") for l in self._buffer.split(b"\n")]
self._buffer = lines.pop(-1)
decode_lines: typing.List[str] = []
for line in lines:
try:
decode_lines.append(line.decode("utf8"))
except UnicodeDecodeError as e:
decode_lines.append(line.decode(self._fallback))
return [tokenise(l) for l in decode_lines]

View File

@ -0,0 +1,3 @@
from .tokenise import *
from .format import *
from .stateful_decode import *

33
test/stateful_decode.py Normal file
View File

@ -0,0 +1,33 @@
import unittest
import irctokens
class TestPartial(unittest.TestCase):
def test(self):
d = irctokens.StatefulDecoder()
lines = d.push(b"PRIVMSG ")
self.assertEqual(lines, [])
lines = d.push(b"#channel hello\r\n")
self.assertEqual(len(lines), 1)
line = irctokens.tokenise("PRIVMSG #channel hello")
self.assertEqual(lines, [line])
class TestMultiple(unittest.TestCase):
def test(self):
d = irctokens.StatefulDecoder()
lines = d.push(b"PRIVMSG #channel1 hello\r\n"
b"PRIVMSG #channel2 hello\r\n")
self.assertEqual(len(lines), 2)
line1 = irctokens.tokenise("PRIVMSG #channel1 hello")
line2 = irctokens.tokenise("PRIVMSG #channel2 hello")
self.assertEqual(lines[0], line1)
self.assertEqual(lines[1], line2)
class TestFallback(unittest.TestCase):
def test(self):
d = irctokens.StatefulDecoder(fallback="latin-1")
lines = d.push("PRIVMSG #channel hélló\r\n".encode("latin-1"))
self.assertEqual(len(lines), 1)
line = irctokens.tokenise("PRIVMSG #channel hélló")
self.assertEqual(lines[0], line)