From b0e85842e3c5684df651c0fe16efd991a4fd4904 Mon Sep 17 00:00:00 2001 From: aewens Date: Wed, 15 May 2019 00:15:18 +0200 Subject: [PATCH] Added coroutine features and bridge/connect feature to socket clients --- abots/events/__init__.py | 1 + abots/events/async.py | 38 ++++++++++++++++++++++++++++++++++++++ abots/net/socket_client.py | 38 ++++++++++++++++++++++++++++++++------ abots/net/socket_server.py | 7 ++++--- 4 files changed, 75 insertions(+), 9 deletions(-) create mode 100644 abots/events/async.py diff --git a/abots/events/__init__.py b/abots/events/__init__.py index 6d0b8ca..e4f2784 100644 --- a/abots/events/__init__.py +++ b/abots/events/__init__.py @@ -1,3 +1,4 @@ from abots.events.every import Every from abots.events.threads import ThreadPool, ThreadMarshal, acquire_timeout from abots.events.duodecimer import Duodecimer, Cron +from abots.events.async import AsyncEvent diff --git a/abots/events/async.py b/abots/events/async.py new file mode 100644 index 0000000..01d070f --- /dev/null +++ b/abots/events/async.py @@ -0,0 +1,38 @@ +from abots.helpers import coroutine + +class AsyncEvent: + def __init__(self): + self._set = False + self._targets = list() + self._events = self._loop() + + @coroutine + def _loop(self): + try: + while True: + action = (yield) + print(action) + if action == "set": + self._set = True + for info in self._targets: + target, event = info + target.send(event) + elif action == "clear": + self._set = False + except GeneratorExit: + pass + + def is_set(self): + return self._set + + def set(self): + self._events.send("set") + + def clear(self): + self._events.send("clear") + + def wait(self, target, event): + self._targets.append((target, event)) + + def close(self): + self._events.close() diff --git a/abots/net/socket_client.py b/abots/net/socket_client.py index 7ed7dd1..0e11811 100755 --- a/abots/net/socket_client.py +++ b/abots/net/socket_client.py @@ -7,7 +7,7 @@ Socket Client """ -from abots.helpers import eprint, cast, jots, jsto, utc_now_timestamp +from abots.helpers import eprint, cast, jots, jsto, utc_now_timestamp, coroutine from struct import pack, unpack from socket import socket, timeout as sock_timeout @@ -32,7 +32,7 @@ class SocketClient(Thread): self.reconnects = reconnects self.sock = socket(AF_INET, SOCK_STREAM) if self.secure: - self.sock = wrap_socket(self.sock, **kwargs) + self.sock = wrap_socket(self.sock) self.connection = (self.host, self.port) self.running = True @@ -51,8 +51,11 @@ class SocketClient(Thread): self.queues["outbox"] = self._outbox self.queues["events"] = self._events + self._bridge = None + def _send_event(self, message): self._events.put(jots(message)) + cast(self._bridge, "send", ("events", message)) def _prepare(self): self.sock.setblocking(False) @@ -63,15 +66,18 @@ class SocketClient(Thread): return True, e return False, None - def _obtain(self, queue, timeout=False): + def _obtain(self, queue_name, timeout=False): + queue = self.queues[queue_name] if timeout is False: timeout = self.timeout while True: try: if timeout is not None: - yield queue.get(timeout=timeout) + message = queue.get(timeout=timeout) else: - yield queue.get_nowait() + message = queue.get_nowait() + yield message + cast(self._bridge, "send", (queue_name, message)) queue.task_done() except Empty: break @@ -88,6 +94,7 @@ class SocketClient(Thread): packet = self.sock.recv(self.buffer_size) result = packet.decode() if decode else packet self._outbox.put(result) + cast(self._bridge, "send", ("outbox", result)) except (BrokenPipeError, OSError) as e: pass @@ -140,10 +147,29 @@ class SocketClient(Thread): self._attempt_reconnect() def recv(self): - yield from self._obtain(self._outbox) + yield from self._obtain("outbox") def send(self, message): self._inbox.put(message) + cast(self._bridge, "send" ("inbox", message)) + + def connect(self, bridge): + self._bridge = bridge() + + @coroutine + def bridge(self, inbox, outbox, events): + try: + while True: + task = (yield) + source, message = task + if source == "inbox": + outbox.puts(message) + elif source == "outbox": + inbox.puts(message) + elif source == "events": + events.put(message) + except GeneratorExit: + pass def run(self): err, report = self._prepare() diff --git a/abots/net/socket_server.py b/abots/net/socket_server.py index ac42d5a..3e53b5c 100755 --- a/abots/net/socket_server.py +++ b/abots/net/socket_server.py @@ -231,7 +231,8 @@ class SocketServer(Thread): self.send_message(uuid, message, *args) def recv(self): - return [letter for letter in self._obtain(self._outbox)] + #return [letter for letter in self._obtain(self._outbox)] + yield from self._obtain(self._outbox) def send(self, uuid, message): letter = uuid, message @@ -245,7 +246,7 @@ class SocketServer(Thread): return err queue_args = self._inbox, self.timeout Thread(target=self._queue_thread, args=queue_args).start() - # print("Server ready!") + print("Server ready!") self.ready.set() while not self.kill_switch.is_set(): try: @@ -272,4 +273,4 @@ class SocketServer(Thread): for client in self.clients: client.join(self.timeout) self.stopped.set() - cast(done, "set") \ No newline at end of file + cast(done, "set")