handle PART, QUIT and ERROR; better channel_users handling

This commit is contained in:
jesopo 2020-03-12 15:14:09 +00:00
parent ff12e1a962
commit cc7c3585b2
4 changed files with 124 additions and 11 deletions

View File

@ -23,12 +23,11 @@ class Server(Named):
self.users: Dict[str, User] = {}
self.channels: Dict[str, Channel] = {}
self.user_channels: Dict[User, Set[Channel]] = {}
self.channel_users: Dict[Channel, Dict[User, ChannelUser]] = {}
self.isupport = ISupport()
self.user_channels: Dict[User, Set[Channel]] = {}
self.channel_users: Dict[Channel, Set[ChannelUser]] = {}
def parse_tokens(self, line: Line):
if line.command in LINE_HANDLERS:
for callback in LINE_HANDLERS[line.command]:
@ -60,10 +59,9 @@ class Server(Named):
channel_user = ChannelUser(channel, user)
if not user in self.user_channels:
self.user_channels[user] = set([])
self.channel_users[channel] = set([])
self.user_channels[user].add(channel)
self.channel_users[channel].add(channel_user)
self.channel_users[channel][user] = channel_user
return channel_user
@line_handler("PING")
@ -94,14 +92,63 @@ class Server(Named):
@line_handler("JOIN")
def handle_JOIN(self, line: Line):
channel_lower = self.casemap_lower(line.params[0])
if self.casemap_equals(line.hostmask.nickname, self.nickname):
if not channel_lower in self.channels:
channel = Channel(line.params[0])
self.channels[channel_lower] = channel
self.channel_users[channel] = {}
if channel_lower in self.channels:
channel = self.channels[channel_lower]
user = self.get_user(line.hostmask.nickname)
self.user_join(channel, user)
elif self.casemap_equals(line.hostmask.nickname, self.nickname):
if not channel_lower in self.channels:
self.channels[channel_lower] = Channel(line.params[0])
@line_handler("PART")
def handle_PART(self, line: Line):
channel_lower = self.casemap_lower(line.params[0])
if channel_lower in self.channels:
channel = self.channels[channel_lower]
if self.casemap_equals(line.hostmask.nickname, self.nickname):
del self.channels[channel_lower]
channel_users = self.channel_users.pop(channel)
for user, cuser in channel_users.items():
self.user_channels[user].remove(channel)
if not self.user_channels[user]:
del self.user_channels[user]
del self.users[self.casemap_lower(user.nickname)]
else:
nickname_lower = self.casemap_lower(line.hostmask.nickname)
if nickname_lower in self.users:
user = self.users[nickname_lower]
self.user_channels[user].remove(channel)
if not self.user_channels[user]:
del self.users[nickname_lower]
del self.user_channels[user]
del self.channel_users[channel][user]
def _self_quit(self):
self.users.clear()
self.channels.clear()
self.user_channels.clear()
self.channel_users.clear()
self.encoder.clear()
@line_handler("QUIT")
def handle_quit(self, line: Line):
if line.hostmask.nickname == self.nickname:
self._self_quit()
else:
nickname_lower = self.casemap_lower(line.hostmask.nickname)
if nickname_lower in self.users:
user = self.users.pop(nickname_lower)
for channel in self.user_channels[user]:
del self.channel_users[channel][user]
del self.user_channels[user]
@line_handler("ERROR")
def handle_ERROR(self, line: Line):
self._self_quit()
@line_handler("353") # user list, "NAMES #channel" response (and on-join)
def handle_353(self, line: Line):
@ -135,4 +182,4 @@ class Server(Named):
if channel_lower in self.channels:
channel = self.channels[channel_lower]
channel.topic_setter = line.params[2]
channel.topic_time = datetime.fromtimestamp(line.params[3])
channel.topic_time = datetime.fromtimestamp(int(line.params[3]))

View File

@ -1 +1,2 @@
from .channel import *
from .channel import *
from .nickname import *

View File

@ -1,10 +1,55 @@
import unittest
from datetime import datetime
import ircstates, irctokens
class ChannelTest(unittest.TestCase):
class ChannelTestJoin(unittest.TestCase):
def test_self_join(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
self.assertIn("#chan", server.channels)
self.assertIn("nickname", server.users)
self.assertEqual(len(server.users), 1)
self.assertEqual(len(server.channels), 1)
self.assertIn(server.channels["#chan"], server.channel_users)
self.assertIn(server.users["nickname"], server.user_channels)
self.assertEqual(len(server.user_channels), 1)
self.assertEqual(len(server.channel_users), 1)
def test_other_join(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
self.assertEqual(len(server.users), 2)
self.assertIn("other", server.users)
self.assertEqual(len(server.user_channels), 2)
self.assertEqual(len(server.channel_users), 1)
class ChannelTestPart(unittest.TestCase):
def test_self_part(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":nickname PART #chan"))
self.assertEqual(len(server.users), 0)
self.assertEqual(len(server.channels), 0)
self.assertEqual(len(server.user_channels), 0)
self.assertEqual(len(server.channel_users), 0)
class ChannelTestTopic(unittest.TestCase):
def test_text(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("332 * #chan :test"))
self.assertEqual(server.channels["#chan"].topic, "test")
def test_set_time(self):
dt = datetime(2020, 3, 12, 14, 27, 57)
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise("333 * #chan other 1584023277"))
self.assertEqual(server.channels["#chan"].topic_time, dt)

20
test/nickname.py Normal file
View File

@ -0,0 +1,20 @@
import unittest
import ircstates, irctokens
class NicknameTestChange(unittest.TestCase):
def test_self_change(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname NICK nickname2"))
self.assertEqual(server.nickname, "nickname2")
def test_other_change(self):
server = ircstates.Server("test")
server.parse_tokens(irctokens.tokenise("001 nickname"))
server.parse_tokens(irctokens.tokenise(":nickname JOIN #chan"))
server.parse_tokens(irctokens.tokenise(":other JOIN #chan"))
self.assertIn("other", server.users)
server.parse_tokens(irctokens.tokenise(":other NICK other2"))
self.assertNotIn("other", server.users)
self.assertIn("other2", server.users)