From ec16929aad8105c09f0245db0f82d2641ad747db Mon Sep 17 00:00:00 2001 From: aewens Date: Wed, 29 May 2019 22:27:42 -0400 Subject: [PATCH] Made socket server more ambiguous and added prefix version --- abots/events/threads.py | 2 +- abots/helpers/__init__.py | 2 +- abots/helpers/black_magic.py | 31 ++++++++++++---- abots/net/prefix_socket_server.py | 37 +++++++++++++++++++ abots/net/socket_client.py | 7 +++- abots/net/socket_server.py | 13 +------ test_abots.py | 61 ++++++++++++++++++++++++++----- 7 files changed, 123 insertions(+), 30 deletions(-) create mode 100755 abots/net/prefix_socket_server.py diff --git a/abots/events/threads.py b/abots/events/threads.py index 18ce121..940c7bc 100644 --- a/abots/events/threads.py +++ b/abots/events/threads.py @@ -73,7 +73,7 @@ class ThreadPool: try: method(*args, **kwargs) except Exception as e: - print(e) + eprint(e) finally: # print(f"[worker:{worker_id}]: Task complete") self._exec_controls(controls) diff --git a/abots/helpers/__init__.py b/abots/helpers/__init__.py index 926995b..59da120 100755 --- a/abots/helpers/__init__.py +++ b/abots/helpers/__init__.py @@ -4,5 +4,5 @@ from abots.helpers.numbers import clamp, randfloat, isnumeric from abots.helpers.general import eprint, deduce, noop, cast, get_digit, obtain from abots.helpers.general import utc_now, utc_now_timestamp from abots.helpers.logging import Logger -from abots.helpers.black_magic import infinitedict, debugger +from abots.helpers.black_magic import infinitedict, debugger, singleton, curry from abots.helpers.black_magic import coroutine, generator diff --git a/abots/helpers/black_magic.py b/abots/helpers/black_magic.py index acb4598..3961119 100644 --- a/abots/helpers/black_magic.py +++ b/abots/helpers/black_magic.py @@ -1,4 +1,3 @@ -from abots.events import CoroEvent from collections import defaultdict from functools import wraps @@ -14,8 +13,8 @@ def debugger(func): signature = ", ".join(args_repr + kwargs_repr) print(f"[DEBUGGER]: Calling {func.__name__}({signature})") result = func(*args, **kwargs) - print(f"[DEBUGGER]: {func.__name__!r} returned {value!r}") - return value + print(f"[DEBUGGER]: {func.__name__!r} returned {result!r}") + return result return wrapper_debug def coroutine(func): @@ -28,13 +27,31 @@ def coroutine(func): def generator(func): @wraps(func) - @coroutine def wrapper_generator(*args, **kwargs): - event = CoroEvent() try: while True: - yield from func(event, *args, **kwargs) + yield from func(*args, **kwargs) except GeneratorExit: pass - return wrapper_generator + return coroutine(wrapper_generator) +def singleton(cls): + @wraps(cls) + def wrapper_singleton(*args, **kwargs): + if not wrapper_singleton.instance: + wrapper_singleton.instance = cls(*args, **kwargs) + return wrapper_singleton.instance + wrapper_singleton.instance = None + return wrapper_singleton + +def curry(func, argc=None): + if argc is None: + argc = func.func_code.co_argcount + @wraps(func) + def wrapper_curry(*args): + if len(args) == argc: + return func(*args) + def curried(*c_args): + return func(*(args + c_args)) + return curry(curried, argc - len(args)) + return wrapper_curry diff --git a/abots/net/prefix_socket_server.py b/abots/net/prefix_socket_server.py new file mode 100755 index 0000000..fd0f4ba --- /dev/null +++ b/abots/net/prefix_socket_server.py @@ -0,0 +1,37 @@ +""" + +net/PrefixSocketServer +================ + +TODO: +* Add logging to broken pipe exceptions + +""" + +from abots.net import SocketServer + +class PrefixSocketServer(SocketServer): + def __init__(self, host, port, listeners=5, buffer_size=4096, + secure=False, timeout=None, daemon=False): + args = host, port, listeners, buffer_size, secure, timeout, daemon + super().__init__(*args) + + def _package_message(self, message, *args): + if len(args) > 0: + formatted = message.format(*args) + else: + formatted = message + packaged = pack(">I", len(formatted)) + formatted.encode() + return packaged + + # Packages a message and sends it to socket + def send_message(self, uuid, message, *args): + sock = self._sock_from_uuid(uuid) + if sock is None: + return None + formatted = self._package_message(message) + try: + sock.send(formatted) + # The socket can either be broken or no longer open at all + except (BrokenPipeError, OSError) as e: + return diff --git a/abots/net/socket_client.py b/abots/net/socket_client.py index 62f86a9..785fce4 100755 --- a/abots/net/socket_client.py +++ b/abots/net/socket_client.py @@ -149,13 +149,18 @@ class SocketClient(Thread): def recv(self): yield from self._obtain("outbox") + def check(self): + for letter in self.recv(): + if letter is not None and len(letter) > 0: + print(letter) + def send(self, message): self._inbox.put(message) cast(self._bridge, "send", ("inbox", message)) def connect(self, bridge): self._bridge = bridge() - + @coroutine def bridge(self, inbox, outbox, events): try: diff --git a/abots/net/socket_server.py b/abots/net/socket_server.py index 3e53b5c..257daf8 100755 --- a/abots/net/socket_server.py +++ b/abots/net/socket_server.py @@ -56,7 +56,7 @@ class SocketServer(Thread): if self.secure: # Note: kwargs is used here to specify any SSL parameters desired self.sock = wrap_socket(self.sock, **kwargs) - + # List of all sockets involved (both client and server) self.sockets = list() self.clients = list() @@ -151,14 +151,6 @@ class SocketServer(Thread): def _sock_from_uuid(self, uuid): return self.uuids.get(uuid, dict()).get("sock", None) - def _package_message(self, message, *args): - if len(args) > 0: - formatted = message.format(*args) - else: - formatted = message - packaged = pack(">I", len(formatted)) + formatted.encode() - return packaged - # Closes a connected socket and removes it from the sockets list def close_sock(self, uuid): event = dict() @@ -217,9 +209,8 @@ class SocketServer(Thread): sock = self._sock_from_uuid(uuid) if sock is None: return None - formatted = self._package_message(message) try: - sock.send(formatted) + sock.send(message) # The socket can either be broken or no longer open at all except (BrokenPipeError, OSError) as e: return diff --git a/test_abots.py b/test_abots.py index 8f4878d..349db24 100755 --- a/test_abots.py +++ b/test_abots.py @@ -1,14 +1,57 @@ #!env/bin/python3 -from abots.net import SocketServer, SocketClient +#from abots.net import SocketServer, SocketClient +# +#host = "localhost" +#port = 10401 +#timeout = 3 +# +#server = SocketServer(host, port, timeout=timeout) +#client = SocketClient(host, port, timeout=timeout) +# +#server.start() +#server.ready.wait() +#client.start() +from abots.events import ThreadMarshal +from abots.helpers import generator +from queue import Queue, Empty +from threading import Thread, Event -host = "localhost" -port = 10401 -timeout = 3 +def worker(queue, event, timeout): + while not event.is_set(): + try: + job = queue.get(timeout=timeout) + except Empty: + continue + assert len(job) == 2, "Expected two parameters" + done, task = job + assert hasattr(done, "set"), "Expected event" + assert len(task) == 3, "Expected three parameters" + method, args, kwargs = task + assert callable(method), "Expected function" + assert isinstance(args, tuple), "Expected tuple" + assert isinstance(kwargs, dict), "Expected dict" + try: + result = method(*args, **kwargs) + except Exception as e: + print(e) + done.set() + queue.task_done() -server = SocketServer(host, port, timeout=timeout) -client = SocketClient(host, port, timeout=timeout) +def manager(queue, event, timeout): + workers = list() + current = 0 + while not event.is_set(): + try: + task = queue.get(timeout=timeout) + except Empty: + continue + assert len(task) == 2, "Expected two parameters" + action, payload = task + assert isinstance(action, str), "Expected str" + if action == "new": + assert callable(payload), "Expected function" + workers.append((payload)) + elif action == "reserve" and len(workers) > 0: + worker = workers[current] -server.start() -server.ready.wait() -client.start() \ No newline at end of file