From cc0b7d9b5efaba2a705dac5bd6a64bb61ba7e80f Mon Sep 17 00:00:00 2001 From: aewens Date: Fri, 22 Feb 2019 22:08:58 +0100 Subject: [PATCH] Initial commit --- .gitignore | 3 + LICENSE | 2 +- abots/net/__init__.py | 2 + abots/net/socket_client.py | 116 ++++++++++++++++++++ abots/net/socket_server.py | 215 +++++++++++++++++++++++++++++++++++++ requirements.txt | 3 + test/client.py | 13 +++ test/server.py | 20 ++++ test_abots.py | 6 ++ 9 files changed, 379 insertions(+), 1 deletion(-) create mode 100644 abots/net/__init__.py create mode 100644 abots/net/socket_client.py create mode 100644 abots/net/socket_server.py create mode 100644 requirements.txt create mode 100755 test/client.py create mode 100755 test/server.py create mode 100644 test_abots.py diff --git a/.gitignore b/.gitignore index e61bca2..cf536c5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# Application-specific +scratch + # ---> Python # Byte-compiled / optimized / DLL files __pycache__/ diff --git a/LICENSE b/LICENSE index b2a9c51..4a43692 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) . All rights reserved. +Copyright (c) 2019 Austin Ewens. All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: diff --git a/abots/net/__init__.py b/abots/net/__init__.py new file mode 100644 index 0000000..240cfea --- /dev/null +++ b/abots/net/__init__.py @@ -0,0 +1,2 @@ +from abots.net.socket_server import SocketServer +from abots.net.socket_client import SocketClient diff --git a/abots/net/socket_client.py b/abots/net/socket_client.py new file mode 100644 index 0000000..ca58a41 --- /dev/null +++ b/abots/net/socket_client.py @@ -0,0 +1,116 @@ +from struct import pack, unpack +from multiprocessing import Process, JoinableQueue, Queue +from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR + +class SocketClient(Process): + def __init__(self, host, port, buffer_size=4096, end_of_line="\r\n", + inbox=JoinableQueue(), outbox=Queue(), handler=lambda x: x): + Process.__init__(self) + + self.host = host + self.port = port + self.buffer_size = buffer_size + self.end_of_line = end_of_line + self.handler = handler + self.sock = socket(AF_INET, SOCK_STREAM) + self.connection = (self.host, self.port) + self.running = True + self.inbox = inbox + self.outbox = outbox + self.error = None + + def _recv_bytes(self, get_bytes, decode=True): + data = "".encode() + eol = self.end_of_line.encode() + while len(data) < get_bytes: + bufsize = get_bytes - len(data) + if bufsize > self.buffer_size: + bufsize = self.buffer_size + try: + packet = self.sock.recv(bufsize) + except OSError: + return None + length = len(data) + len(packet) + checker = packet if length < get_bytes else packet[:-2] + if eol in checker: + packet = packet.split(eol)[0] + eol + return data + packet + data = data + packet + return data.decode() if decode else data + + def _package_message(self, message, *args): + formatted = None + if len(args) > 0: + formatted = message.format(*args) + self.end_of_line + else: + formatted = message + self.end_of_line + packaged = pack(">I", len(formatted)) + formatted.encode() + return packaged + + def _get_message_size(self): + raw_message_size = self._recv_bytes(4, False) + if not raw_message_size: + return None + message_size = unpack(">I", raw_message_size)[0] + return message_size + + def _get_message(self): + message_size = self._get_message_size() + if message_size is None: + return None + try: + return self._recv_bytes(message_size).strip(self.end_of_line) + except OSError: + return None + + def _send_message(self, message, *args): + packaged = self._package_message(message, *args) + try: + self.sock.send(packaged) + except OSError: + self.stop() + + def _gather_messages(self): + data = self._get_message() + if data is None: + return None + self.outbox.put(self.handler(data)) + + def _prepare(self): + self.sock.setblocking(False) + self.sock.settimeout(1) + try: + self.sock.connect(self.connection) + except OSError as e: + return e + return None + + def send(self, message, *args): + self.inbox.put((message, args)) + + def results(self): + messages = list() + while not self.outbox.empty(): + messages.append(self.outbox.get()) + return messages + + def run(self): + err = self._prepare() + if err is not None: + print(err) + return err + print("Ready!") + while self.running: + self._gather_messages() + while not self.inbox.empty(): + message, args = self.inbox.get() + self._send_message(message, *args) + self.inbox.task_done() + self._gather_messages() + # response = input("> ") + # self._send_message(response) + + def stop(self): + self.running = False + self.sock.close() + self.terminate() \ No newline at end of file diff --git a/abots/net/socket_server.py b/abots/net/socket_server.py new file mode 100644 index 0000000..758dd41 --- /dev/null +++ b/abots/net/socket_server.py @@ -0,0 +1,215 @@ +from threading import Thread +from struct import pack, unpack +from select import select +from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR + +class SocketServer(Thread): + def __init__(self, host, port, listeners=5, buffer_size=4096, + max_message_size=26214400, end_of_line="\r\n", handler=None): + Thread.__init__(self) + + self.host = host + self.port = port + self.listeners = listeners + self.buffer_size = buffer_size + self.max_message_size = max_message_size + self.end_of_line = end_of_line + self.handler = self._handler if handler is None else handler + self.sock = socket(AF_INET, SOCK_STREAM) + self.sock_fd = -1 + self.lookup = list() + self.sockets = list() + self.clients = dict() + self.running = True + + def _handler(self, sock, message): + print("RAW", message) + if message == "STOP": + self.broadcast(self.sock, "STOP") + self.stop() + return -1 + if message == "QUIT": + client_fd = self.get_client_fd(sock) + if client_fd is None: + return 0 + client_address = [a for fd, a in self.lookup if fd == client_fd][0] + client_name = "{}:{}".format(*client_address) + self.broadcast(self.sock, "LEAVE {}".format(client_name)) + self._close_sock(sock) + return 1 + elif message == "LIST": + fds = list() #list(map(str, self.clients.keys())) + client_fd = self.get_client_fd(sock) + for fd in self.clients.keys(): + if fd == self.sock_fd: + fds.append("*{}".format(fd)) + elif fd == client_fd: + fds.append("+{}".format(fd)) + else: + fds.append(str(fd)) + self.send(sock, ",".join(fds)) + return 1 + elif message[:5] == "SEND ": + params = message[5:].split(" ", 1) + if len(params) < 2: + return 0 + fd, response = params + client_sock = self.clients.get(int(fd), dict()).get("sock", None) + if client_sock is None: + return 0 + self.send(client_sock, response) + return 1 + elif message[:6] == "BCAST ": + response = message[6:] + self.broadcast(sock, response) + return 1 + else: + return 2 + + def _close_sock(self, sock): + self.sockets.remove(sock) + fd = self.get_client_fd(sock) + if fd is not None: + del self.clients[fd] + sock.close() + + def _recv_bytes(self, sock, get_bytes, decode=True): + data = "".encode() + eol = self.end_of_line.encode() + if get_bytes > self.max_message_size: + return None + attempts = 0 + while len(data) < get_bytes: + if attempts > self.max_message_size / self.buffer_size: + break + else: + attempts = attempts + 1 + bufsize = get_bytes - len(data) + if bufsize > self.buffer_size: + bufsize = self.buffer_size + try: + packet = sock.recv(bufsize) + except OSError: + return None + length = len(data) + len(packet) + checker = packet if length < get_bytes else packet[:-2] + if eol in checker: + packet = packet.split(eol)[0] + eol + return data + packet + data = data + packet + return data.decode() if decode else data + + def _package_message(self, message, *args): + formatted = None + if len(args) > 0: + formatted = message.format(*args) + self.end_of_line + else: + formatted = message + self.end_of_line + packaged = pack(">I", len(formatted)) + formatted.encode() + return packaged + + def _get_message_size(self, sock): + raw_message_size = self._recv_bytes(sock, 4, False) + if not raw_message_size: + return None + message_size = unpack(">I", raw_message_size)[0] + return message_size + + def get(self, sock): + message_size = self._get_message_size(sock) + if message_size is None: + return None + elif message_size > self.max_message_size: + return None + try: + return self._recv_bytes(sock, message_size).strip(self.end_of_line) + except OSError: + self._close_sock(sock) + return None + + def send(self, sock, message, *args): + packaged = self._package_message(message, *args) + try: + sock.send(packaged) + except BrokenPipeError: + self._close_sock(sock) + except OSError: + self._close_sock(sock) + + def get_client_fd(self, client_sock): + try: + return client_sock.fileno() + except OSError: + for fd, sock in self.lookup: + if sock != client_sock: + continue + return fd + return None + + def broadcast(self, client_sock, client_message, *args): + for sock in self.sockets: + not_server = sock != self.sock + not_client = sock != client_sock + if not_server and not_client: + self.send(sock, client_message, *args) + + def _prepare(self): + self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) + try: + self.sock.bind((self.host, self.port)) + except OSError as e: + return e + self.sock.listen(self.listeners) + self.sock_fd = self.sock.fileno() + sock_address = self.sock.getsockname() + sock_host, sock_port = sock_address + self.lookup.append((self.sock_fd, sock_address)) + self.sockets.append(self.sock) + self.clients[self.sock_fd] = dict() + self.clients[self.sock_fd]["host"] = sock_host + self.clients[self.sock_fd]["port"] = sock_port + self.clients[self.sock_fd]["sock"] = self.sock + return None + + def start(self): + err = self._prepare() + if err is not None: + print(err) + return err + print("Server ready!") + while self.running: + try: + selection = select(self.sockets, list(), list(), 5) + read_socks, write_socks, err_socks = selection + except OSError as e: + print("Error", e) + continue + for sock in read_socks: + if sock == self.sock: + try: + client_sock, client_address = self.sock.accept() + client_sock.settimeout(60) + except OSError: + continue + client_name = "{}:{}".format(*client_address) + client_host, client_port = client_address + client_fd = client_sock.fileno() + self.lookup.append((client_fd, client_sock)) + self.sockets.append(client_sock) + self.clients[client_fd] = dict() + self.clients[client_fd]["host"] = client_host + self.clients[client_fd]["port"] = client_port + self.clients[client_fd]["sock"] = client_sock + joined = "ENTER {}".format(client_name) + print(joined) + self.broadcast(client_sock, joined) + else: + message = self.get(sock) + if message is None: + continue + status = self.handler(sock, message) + print(status, message) + + def stop(self): + self.running = False + self.sock.close() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..1eaa2bc --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +pytest +python-gnupg +cryptography diff --git a/test/client.py b/test/client.py new file mode 100755 index 0000000..a884a17 --- /dev/null +++ b/test/client.py @@ -0,0 +1,13 @@ +#!env/bin/python3 + +import sys +sys.path.insert(0, "/center/lib") + +from abots.net import SocketClient +from multiprocessing import JoinableQueue, Queue + +inbox = JoinableQueue() +outbox = Queue() +client = SocketClient("127.0.0.1", 10701, inbox=inbox, outbox=outbox) +client.send("LIST") +client.start() \ No newline at end of file diff --git a/test/server.py b/test/server.py new file mode 100755 index 0000000..9728428 --- /dev/null +++ b/test/server.py @@ -0,0 +1,20 @@ +#!env/bin/python3 + +import sys +sys.path.insert(0, "/center/lib") + +from abots.net import SocketServer, SocketClient +from multiprocessing import Process +from time import sleep + +host = "127.0.0.1" +port = 10701 + +server = SocketServer(host, port) +server.start() +# pserver = Process(target=lambda x: x.start(), args=(server,)) +# pserver.start() +# sleep(1) +# if type(input("Start client: ")) is str: +# client = SocketClient(host, port) +# client.start() diff --git a/test_abots.py b/test_abots.py new file mode 100644 index 0000000..52bfe20 --- /dev/null +++ b/test_abots.py @@ -0,0 +1,6 @@ +#!env/bin/python3 + +# import sys +# sys.path.insert(0, "/center/lib") + +#from abots.net import SocketServer, SocketClient