diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 diff --git a/LICENSE b/LICENSE old mode 100644 new mode 100755 diff --git a/README.md b/README.md old mode 100644 new mode 100755 diff --git a/TODO b/TODO old mode 100644 new mode 100755 index db18124..aed4a75 --- a/TODO +++ b/TODO @@ -1,22 +1,34 @@ -* net - * socket_server - * Add comments - * Abstract default handler out to another file - * Send heartbeat - * Better handle clients disconnecting - * Remove lookup, use clients instead - * Add alias system for clients to replace using fd - * Add support for cryptography - * socket_client - * Add comments - * Abstract default handler out to another file - * Respond to heartbeat - * Respond to alias from server - * Add support for cryptography +## net -* Add crypto set -* In crypto add GPG, Diffie-Hellman, and symmetric & asymmetric crypto -* Add helpers set -* In helpers add JSON encoding / decoding -* Add db set -* In db add sqlite wrappers +### socket_server + +- [ ] Add comments +- [ ] Abstract default handler out to another file +- [ ] Send heartbeat +- [ ] Better handle clients disconnecting +- [ ] Remove lookup, use clients instead +- [ ] Add alias system for clients to replace using fd +- [ ] Add support for cryptography + +### socket_client + +- [ ] Add comments +- [ ] Abstract default handler out to another file +- [ ] Respond to heartbeat +- [ ] Respond to alias from server +- [ ] Add support for cryptography + +## crypto + +- [ ] Add crypto set +- [ ] In crypto add GPG, Diffie-Hellman, and symmetric & asymmetric crypto + +## helpers + +- [ ] Add helpers set +- [ ] In helpers add JSON encoding / decoding + +## db + +- [ ] Add db set +- [ ] In db add sqlite wrappers diff --git a/abots/net/__init__.py b/abots/net/__init__.py old mode 100644 new mode 100755 index 240cfea..6a0af77 --- a/abots/net/__init__.py +++ b/abots/net/__init__.py @@ -1,2 +1,3 @@ from abots.net.socket_server import SocketServer from abots.net.socket_client import SocketClient +from abots.net.socket_server_handler import SocketServerHandler \ No newline at end of file diff --git a/abots/net/socket_client.py b/abots/net/socket_client.py old mode 100644 new mode 100755 index ca58a41..1e4091d --- a/abots/net/socket_client.py +++ b/abots/net/socket_client.py @@ -1,5 +1,5 @@ from struct import pack, unpack -from multiprocessing import Process, JoinableQueue, Queue +from multiprocessing import Process, Queue, JoinableQueue from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR class SocketClient(Process): @@ -70,11 +70,11 @@ class SocketClient(Process): except OSError: self.stop() - def _gather_messages(self): - data = self._get_message() - if data is None: - return None - self.outbox.put(self.handler(data)) + def _process_inbox(self): + while not self.inbox.empty(): + message, args = self.inbox.get() + self._send_message(message, *args) + self.inbox.task_done() def _prepare(self): self.sock.setblocking(False) @@ -101,14 +101,10 @@ class SocketClient(Process): return err print("Ready!") while self.running: - self._gather_messages() - while not self.inbox.empty(): - message, args = self.inbox.get() - self._send_message(message, *args) - self.inbox.task_done() - self._gather_messages() - # response = input("> ") - # self._send_message(response) + data = self._get_message() + if data is not None: + self.outbox.put(self.handler(data)) + self._process_inbox() def stop(self): self.running = False diff --git a/abots/net/socket_server.py b/abots/net/socket_server.py old mode 100644 new mode 100755 index 758dd41..0fbc39b --- a/abots/net/socket_server.py +++ b/abots/net/socket_server.py @@ -1,12 +1,16 @@ +from abots.net.socket_server_handler import SocketServerHandler + from threading import Thread from struct import pack, unpack from select import select from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR +from multiprocessing import Process, Queue, JoinableQueue -class SocketServer(Thread): +class SocketServer(Process): def __init__(self, host, port, listeners=5, buffer_size=4096, - max_message_size=26214400, end_of_line="\r\n", handler=None): - Thread.__init__(self) + max_message_size=26214400, end_of_line="\r\n", inbox=JoinableQueue(), + outbox=Queue(), handler=None): + Process.__init__(self) self.host = host self.port = port @@ -14,7 +18,9 @@ class SocketServer(Thread): self.buffer_size = buffer_size self.max_message_size = max_message_size self.end_of_line = end_of_line - self.handler = self._handler if handler is None else handler + self.inbox = inbox + self.outbox = outbox + self.handler = SocketServerHandler if handler is None else handler self.sock = socket(AF_INET, SOCK_STREAM) self.sock_fd = -1 self.lookup = list() @@ -22,53 +28,9 @@ class SocketServer(Thread): self.clients = dict() self.running = True - def _handler(self, sock, message): - print("RAW", message) - if message == "STOP": - self.broadcast(self.sock, "STOP") - self.stop() - return -1 - if message == "QUIT": - client_fd = self.get_client_fd(sock) - if client_fd is None: - return 0 - client_address = [a for fd, a in self.lookup if fd == client_fd][0] - client_name = "{}:{}".format(*client_address) - self.broadcast(self.sock, "LEAVE {}".format(client_name)) - self._close_sock(sock) - return 1 - elif message == "LIST": - fds = list() #list(map(str, self.clients.keys())) - client_fd = self.get_client_fd(sock) - for fd in self.clients.keys(): - if fd == self.sock_fd: - fds.append("*{}".format(fd)) - elif fd == client_fd: - fds.append("+{}".format(fd)) - else: - fds.append(str(fd)) - self.send(sock, ",".join(fds)) - return 1 - elif message[:5] == "SEND ": - params = message[5:].split(" ", 1) - if len(params) < 2: - return 0 - fd, response = params - client_sock = self.clients.get(int(fd), dict()).get("sock", None) - if client_sock is None: - return 0 - self.send(client_sock, response) - return 1 - elif message[:6] == "BCAST ": - response = message[6:] - self.broadcast(sock, response) - return 1 - else: - return 2 - def _close_sock(self, sock): self.sockets.remove(sock) - fd = self.get_client_fd(sock) + fd = self._get_client_fd(sock) if fd is not None: del self.clients[fd] sock.close() @@ -115,7 +77,7 @@ class SocketServer(Thread): message_size = unpack(">I", raw_message_size)[0] return message_size - def get(self, sock): + def _get_message(self, sock): message_size = self._get_message_size(sock) if message_size is None: return None @@ -127,7 +89,7 @@ class SocketServer(Thread): self._close_sock(sock) return None - def send(self, sock, message, *args): + def _send_message(self, sock, message, *args): packaged = self._package_message(message, *args) try: sock.send(packaged) @@ -136,7 +98,14 @@ class SocketServer(Thread): except OSError: self._close_sock(sock) - def get_client_fd(self, client_sock): + def _broadcast_message(self, client_sock, client_message, *args): + for sock in self.sockets: + not_server = sock != self.sock + not_client = sock != client_sock + if not_server and not_client: + self._send_message(sock, client_message, *args) + + def _get_client_fd(self, client_sock): try: return client_sock.fileno() except OSError: @@ -146,12 +115,25 @@ class SocketServer(Thread): return fd return None - def broadcast(self, client_sock, client_message, *args): - for sock in self.sockets: - not_server = sock != self.sock - not_client = sock != client_sock - if not_server and not_client: - self.send(sock, client_message, *args) + def _process_inbox(self): + while not self.inbox.empty(): + data = self.inbox.get() + mode = data[0] + if mode == "SEND": + client, message, args = data[1:] + self._send_message(message, *args) + elif mode == "BCAST": + message, args = data[1:] + self._broadcast_message(self.sock, message, *args) + self.inbox.task_done() + + def _client_thread(self, sock): + while self.running: + message = self._get_message(sock) + if message is None: + continue + status = self.handler(self, sock, message) + self.outbox.put((status, message)) def _prepare(self): self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) @@ -171,44 +153,57 @@ class SocketServer(Thread): self.clients[self.sock_fd]["sock"] = self.sock return None - def start(self): + def send(self, client, message, *args): + self.inbox.put(("SEND", client, message, args)) + + def broadcast(self, message, *args): + self.inbox.put(("BCAST", message, args)) + + def results(self): + messages = list() + while not self.outbox.empty(): + messages.append(self.outbox.get()) + return messages + + def run(self): err = self._prepare() if err is not None: print(err) return err print("Server ready!") while self.running: + # try: + # selection = select(self.sockets, list(), list(), 5) + # read_socks, write_socks, err_socks = selection + # except OSError as e: + # print("Error", e) + # continue + # for sock in read_socks: + # if sock == self.sock: try: - selection = select(self.sockets, list(), list(), 5) - read_socks, write_socks, err_socks = selection - except OSError as e: - print("Error", e) + client_sock, client_address = self.sock.accept() + client_sock.settimeout(60) + except OSError: continue - for sock in read_socks: - if sock == self.sock: - try: - client_sock, client_address = self.sock.accept() - client_sock.settimeout(60) - except OSError: - continue - client_name = "{}:{}".format(*client_address) - client_host, client_port = client_address - client_fd = client_sock.fileno() - self.lookup.append((client_fd, client_sock)) - self.sockets.append(client_sock) - self.clients[client_fd] = dict() - self.clients[client_fd]["host"] = client_host - self.clients[client_fd]["port"] = client_port - self.clients[client_fd]["sock"] = client_sock - joined = "ENTER {}".format(client_name) - print(joined) - self.broadcast(client_sock, joined) - else: - message = self.get(sock) - if message is None: - continue - status = self.handler(sock, message) - print(status, message) + client_name = "{}:{}".format(*client_address) + client_host, client_port = client_address + client_fd = client_sock.fileno() + self.lookup.append((client_fd, client_sock)) + self.sockets.append(client_sock) + self.clients[client_fd] = dict() + self.clients[client_fd]["host"] = client_host + self.clients[client_fd]["port"] = client_port + self.clients[client_fd]["sock"] = client_sock + joined = "ENTER {}".format(client_name) + self.outbox.put((1, joined)) + self._broadcast_message(client_sock, joined) + Thread(target=self._client_thread, args=(client_sock,)).start() + # else: + # message = self._get_message(sock) + # if message is None: + # continue + # status = self.handler(self, sock, message) + # self.outbox.put((status, message)) def stop(self): self.running = False diff --git a/abots/net/socket_server_handler.py b/abots/net/socket_server_handler.py new file mode 100755 index 0000000..823897b --- /dev/null +++ b/abots/net/socket_server_handler.py @@ -0,0 +1,43 @@ +def SocketServerHandler(server, sock, message): + print("RAW:", message) + if message == "STOP": + server._broadcast_message(server.sock, "STOP") + server.stop() + return -1 + if message == "QUIT": + client_fd = server._get_client_fd(sock) + if client_fd is None: + return 0 + client_address = [a for fd, a in server.lookup if fd == client_fd][0] + client_name = "{}:{}".format(*client_address) + server._broadcast_message(server.sock, "LEAVE {}".format(client_name)) + server._close_sock(sock) + return 1 + elif message == "LIST": + fds = list() #list(map(str, server.clients.keys())) + client_fd = server._get_client_fd(sock) + for fd in server.clients.keys(): + if fd == server.sock_fd: + fds.append("*{}".format(fd)) + elif fd == client_fd: + fds.append("+{}".format(fd)) + else: + fds.append(str(fd)) + server._send_message(sock, ",".join(fds)) + return 1 + elif message[:5] == "SEND ": + params = message[5:].split(" ", 1) + if len(params) < 2: + return 0 + fd, response = params + client_sock = server.clients.get(int(fd), dict()).get("sock", None) + if client_sock is None: + return 0 + server._send_message(client_sock, response) + return 1 + elif message[:6] == "BCAST ": + response = message[6:] + server._broadcast_message(sock, response) + return 1 + else: + return 2 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt old mode 100644 new mode 100755 diff --git a/test/client.py b/test/client.py index a884a17..390a7a0 100755 --- a/test/client.py +++ b/test/client.py @@ -4,10 +4,10 @@ import sys sys.path.insert(0, "/center/lib") from abots.net import SocketClient -from multiprocessing import JoinableQueue, Queue -inbox = JoinableQueue() -outbox = Queue() -client = SocketClient("127.0.0.1", 10701, inbox=inbox, outbox=outbox) -client.send("LIST") +host = "127.0.0.1" +port = 10701 + +client = SocketClient(host, port) +client.daemon = True client.start() \ No newline at end of file diff --git a/test/server.py b/test/server.py index 9728428..92b34e8 100755 --- a/test/server.py +++ b/test/server.py @@ -3,18 +3,11 @@ import sys sys.path.insert(0, "/center/lib") -from abots.net import SocketServer, SocketClient -from multiprocessing import Process -from time import sleep +from abots.net import SocketServer host = "127.0.0.1" port = 10701 server = SocketServer(host, port) -server.start() -# pserver = Process(target=lambda x: x.start(), args=(server,)) -# pserver.start() -# sleep(1) -# if type(input("Start client: ")) is str: -# client = SocketClient(host, port) -# client.start() +server.daemon = True +server.start() \ No newline at end of file diff --git a/test_abots.py b/test_abots.py old mode 100644 new mode 100755