Added coroutine features and bridge/connect feature to socket clients
This commit is contained in:
parent
dff547bd97
commit
b0e85842e3
|
@ -1,3 +1,4 @@
|
|||
from abots.events.every import Every
|
||||
from abots.events.threads import ThreadPool, ThreadMarshal, acquire_timeout
|
||||
from abots.events.duodecimer import Duodecimer, Cron
|
||||
from abots.events.async import AsyncEvent
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
from abots.helpers import coroutine
|
||||
|
||||
class AsyncEvent:
|
||||
def __init__(self):
|
||||
self._set = False
|
||||
self._targets = list()
|
||||
self._events = self._loop()
|
||||
|
||||
@coroutine
|
||||
def _loop(self):
|
||||
try:
|
||||
while True:
|
||||
action = (yield)
|
||||
print(action)
|
||||
if action == "set":
|
||||
self._set = True
|
||||
for info in self._targets:
|
||||
target, event = info
|
||||
target.send(event)
|
||||
elif action == "clear":
|
||||
self._set = False
|
||||
except GeneratorExit:
|
||||
pass
|
||||
|
||||
def is_set(self):
|
||||
return self._set
|
||||
|
||||
def set(self):
|
||||
self._events.send("set")
|
||||
|
||||
def clear(self):
|
||||
self._events.send("clear")
|
||||
|
||||
def wait(self, target, event):
|
||||
self._targets.append((target, event))
|
||||
|
||||
def close(self):
|
||||
self._events.close()
|
|
@ -7,7 +7,7 @@ Socket Client
|
|||
|
||||
"""
|
||||
|
||||
from abots.helpers import eprint, cast, jots, jsto, utc_now_timestamp
|
||||
from abots.helpers import eprint, cast, jots, jsto, utc_now_timestamp, coroutine
|
||||
|
||||
from struct import pack, unpack
|
||||
from socket import socket, timeout as sock_timeout
|
||||
|
@ -32,7 +32,7 @@ class SocketClient(Thread):
|
|||
self.reconnects = reconnects
|
||||
self.sock = socket(AF_INET, SOCK_STREAM)
|
||||
if self.secure:
|
||||
self.sock = wrap_socket(self.sock, **kwargs)
|
||||
self.sock = wrap_socket(self.sock)
|
||||
|
||||
self.connection = (self.host, self.port)
|
||||
self.running = True
|
||||
|
@ -51,8 +51,11 @@ class SocketClient(Thread):
|
|||
self.queues["outbox"] = self._outbox
|
||||
self.queues["events"] = self._events
|
||||
|
||||
self._bridge = None
|
||||
|
||||
def _send_event(self, message):
|
||||
self._events.put(jots(message))
|
||||
cast(self._bridge, "send", ("events", message))
|
||||
|
||||
def _prepare(self):
|
||||
self.sock.setblocking(False)
|
||||
|
@ -63,15 +66,18 @@ class SocketClient(Thread):
|
|||
return True, e
|
||||
return False, None
|
||||
|
||||
def _obtain(self, queue, timeout=False):
|
||||
def _obtain(self, queue_name, timeout=False):
|
||||
queue = self.queues[queue_name]
|
||||
if timeout is False:
|
||||
timeout = self.timeout
|
||||
while True:
|
||||
try:
|
||||
if timeout is not None:
|
||||
yield queue.get(timeout=timeout)
|
||||
message = queue.get(timeout=timeout)
|
||||
else:
|
||||
yield queue.get_nowait()
|
||||
message = queue.get_nowait()
|
||||
yield message
|
||||
cast(self._bridge, "send", (queue_name, message))
|
||||
queue.task_done()
|
||||
except Empty:
|
||||
break
|
||||
|
@ -88,6 +94,7 @@ class SocketClient(Thread):
|
|||
packet = self.sock.recv(self.buffer_size)
|
||||
result = packet.decode() if decode else packet
|
||||
self._outbox.put(result)
|
||||
cast(self._bridge, "send", ("outbox", result))
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
pass
|
||||
|
||||
|
@ -140,10 +147,29 @@ class SocketClient(Thread):
|
|||
self._attempt_reconnect()
|
||||
|
||||
def recv(self):
|
||||
yield from self._obtain(self._outbox)
|
||||
yield from self._obtain("outbox")
|
||||
|
||||
def send(self, message):
|
||||
self._inbox.put(message)
|
||||
cast(self._bridge, "send" ("inbox", message))
|
||||
|
||||
def connect(self, bridge):
|
||||
self._bridge = bridge()
|
||||
|
||||
@coroutine
|
||||
def bridge(self, inbox, outbox, events):
|
||||
try:
|
||||
while True:
|
||||
task = (yield)
|
||||
source, message = task
|
||||
if source == "inbox":
|
||||
outbox.puts(message)
|
||||
elif source == "outbox":
|
||||
inbox.puts(message)
|
||||
elif source == "events":
|
||||
events.put(message)
|
||||
except GeneratorExit:
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
err, report = self._prepare()
|
||||
|
|
|
@ -231,7 +231,8 @@ class SocketServer(Thread):
|
|||
self.send_message(uuid, message, *args)
|
||||
|
||||
def recv(self):
|
||||
return [letter for letter in self._obtain(self._outbox)]
|
||||
#return [letter for letter in self._obtain(self._outbox)]
|
||||
yield from self._obtain(self._outbox)
|
||||
|
||||
def send(self, uuid, message):
|
||||
letter = uuid, message
|
||||
|
@ -245,7 +246,7 @@ class SocketServer(Thread):
|
|||
return err
|
||||
queue_args = self._inbox, self.timeout
|
||||
Thread(target=self._queue_thread, args=queue_args).start()
|
||||
# print("Server ready!")
|
||||
print("Server ready!")
|
||||
self.ready.set()
|
||||
while not self.kill_switch.is_set():
|
||||
try:
|
||||
|
@ -272,4 +273,4 @@ class SocketServer(Thread):
|
|||
for client in self.clients:
|
||||
client.join(self.timeout)
|
||||
self.stopped.set()
|
||||
cast(done, "set")
|
||||
cast(done, "set")
|
||||
|
|
Loading…
Reference in New Issue