Fixed bugs in events and net, got sockets working

This commit is contained in:
aewens 2019-05-09 20:41:12 +02:00
parent 8e29b9f257
commit e55717f601
14 changed files with 286 additions and 275 deletions

View File

@ -1,3 +1,3 @@
from abots.events.every import Every
from abots.events.threads import ThreadPool, ThreadMarshal
from abots.events.threads import ThreadPool, ThreadMarshal, acquire_timeout
from abots.events.duodecimer import Duodecimer, Cron

View File

@ -3,7 +3,7 @@ from abots.helpers import cast, utc_now
from queue import Queue, Empty
from collections import defaultdict
from threading import Thread
from threading import Thread, Event
"""
@ -67,19 +67,27 @@ class Cron:
return int(f"{when.hour:02}{when.minute:02}")
@staticmethod
def get_weekday(when=utc_now()):
def get_weekday(when=None):
if when is None:
when = utc_now()
return when.weekday()
@staticmethod
def get_day(when=utc_now()):
def get_day(when=None):
if when is None:
when = utc_now()
return when.day
@staticmethod
def get_month(when=utc_now()):
def get_month(when=None):
if when is None:
when = utc_now()
return when.month
@staticmethod
def get_year(when=utc_now()):
def get_year(when=None):
if when is None:
when = utc_now()
return when.year
@staticmethod
@ -100,8 +108,6 @@ class Cron:
if hour >= 24:
hour = hour % 24
return int(f"{hour:02}{minute:02}")
class Duodecimer:
def __init__(self):
@ -137,11 +143,14 @@ class Duodecimer:
state = list()
while True:
try:
task = queue.get_nowait()
if len(task) != task_size:
job = queue.get_nowait()
if len(job) != 2:
continue
cancel, task = job
if cancel.is_set() or len(task) != task_size:
# print(f"[worker:{worker_id}]: Task is malformed")
continue
state.append(task)
state.append(job)
queue.task_done()
except Empty:
break
@ -157,9 +166,16 @@ class Duodecimer:
def _timer(self, state, queue):
state = self._process_queue(state, queue, 3)
marshal = ThreadMarshal(len(state), destroy=True)
for task in state:
cancelled = list()
for job in state:
# print(f"[worker:{worker_id}]: Running task")
cancel, task = job
if cancel.is_set():
cancelled.append(job)
continue
marshal.reserve(*task)
for job in cancelled:
state.remove(job)
return state
def _cron(self, state, queue):
@ -168,7 +184,11 @@ class Duodecimer:
state["tasks"] = self._process_queue(state["tasks"], queue, 4)
previous = state["previous"]
removing = list()
for task in state["tasks"]:
for job in state["tasks"]:
cancel, task = job
if cancel.is_set():
removing.append(job)
continue
cron, target, args, kwargs = task
assert isinstance(cron.triggers, dict), "Expected dict"
triggered = list()
@ -204,16 +224,14 @@ class Duodecimer:
if timer not in self.queues.keys():
return None
task = method, args, kwargs
self.queues[timer].put(task)
cancel = Event()
job = cancel, task
self.queues[timer].put(job)
return cancel
def schedule(self, cron, target, args=tuple(), kwargs=dict()):
task = cron, target, args, kwargs
self.queues["cron"].put(task)
"""
duodecimer = Duodecimer()
task_id = duodecimer.every(10, "minutes", task)
duodecimer.cancel(task_id)
"""
cancel = Event()
job = cancel, task
self.queues["cron"].put(job)
return cancel

View File

@ -3,11 +3,21 @@ from abots.events import Every
from queue import Queue, Empty, Full
from threading import Thread, Event, Lock, RLock, BoundedSemaphore
from contextlib import contextmanager
"""
TODO:
"""
@contextmanager
def acquire_timeout(lock, timeout=-1):
if timeout is None:
timeout = -1
result = lock.acquire(timeout=timeout)
yield result
if result:
lock.release()
class ThreadPool:
def __init__(self, pool_size, timeout=None):
self.locks = list()

View File

@ -3,4 +3,5 @@ from abots.helpers.encode import jots, jsto, b64e, b64d, h2b64, b642h, ctos
from abots.helpers.numbers import clamp, randfloat
from abots.helpers.general import eprint, deduce, noop, cast, get_digit, obtain
from abots.helpers.general import utc_now, utc_now_timestamp
from abots.helpers.logging import Logger
from abots.helpers.logging import Logger
from abots.helpers.black_magic import infinitedict

View File

@ -0,0 +1,5 @@
from collections import defaultdict
def infinitedict():
d = lambda: defaultdict(d)
return defaultdict(d)

View File

@ -12,22 +12,21 @@ def create_hash(algorithm, seed=None, random_bytes=32, use_bin=False):
h.update(urandom(random_bytes))
else:
h.update(seed.encode("utf-8"))
if use_bin:
return h.digest()
return h.hexdigest()
def md5(*args, **kwargs):
create_hash("md5", *args, **kwargs)
return create_hash("md5", *args, **kwargs)
def sha1(*args, **kwargs):
create_hash("sha1", *args, **kwargs)
return create_hash("sha1", *args, **kwargs)
def sha256(*args, **kwargs):
create_hash("sha256", *args, **kwargs)
return create_hash("sha256", *args, **kwargs)
def sha512(*args, **kwargs):
create_hash("sha512", *args, **kwargs)
return create_hash("sha512", *args, **kwargs)
def pbkdf2(algo, pswd, salt=None, cycles=100000, key_len=None, use_bin=False):
if algorithm not in algorithms_available:

View File

@ -1,4 +1,2 @@
from abots.net.socket_server import SocketServer
from abots.net.socket_client import SocketClient
from abots.net.socket_server_handler import SocketServerHandler
from abots.net.socket_client_handler import SocketClientHandler
from abots.net.socket_client import SocketClient

View File

@ -7,20 +7,24 @@ Socket Client
"""
from abots.helpers import eprint, cast
from abots.helpers import eprint, cast, jots, jsto, utc_now_timestamp
from struct import pack, unpack
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
from threading import Thread, Event
from queue import Queue, Empty
class SocketClient(Thread):
def __init__(self, host, port, buffer_size=4096, secure=False,
timeout=None):
super().__init__()
class SocketClient():
def __init__(self, host, port, handler, buffer_size=4096, secure=False,
*args, **kwargs):
self.host = host
self.port = port
self.buffer_size = buffer_size
self.secure = secure
self.handler = handler(self, *args, **kwargs)
self.timeout = timeout
self.sock = socket(AF_INET, SOCK_STREAM)
if self.secure:
self.sock = wrap_socket(self.sock, **kwargs)
@ -28,6 +32,48 @@ class SocketClient():
self.connection = (self.host, self.port)
self.running = True
self.kill_switch = Event()
self.ready = Event()
self.stopped = Event()
self._inbox = Queue()
self._events = Queue()
self._outbox = Queue()
self.queues = dict()
self.queues["inbox"] = self._inbox
self.queues["outbox"] = self._outbox
self.queues["events"] = self._events
def _send_event(self, message):
self._events.put(jots(message))
def _prepare(self):
self.sock.setblocking(False)
self.sock.settimeout(1)
try:
self.sock.connect(self.connection)
except OSError as e:
return e
return None
def _obtain(self, queue, timeout=False):
if timeout is False:
timeout = self.timeout
while True:
try:
if timeout is not None:
yield queue.get(timeout=timeout)
else:
yield queue.get_nowait()
queue.task_done()
except Empty:
break
def _queue_thread(self, inbox, timeout):
while not self.kill_switch.is_set():
for message in self._obtain(inbox, timeout):
self.send_message(message)
def _recv_bytes(self, get_bytes, decode=True):
data = "".encode()
attempts = 0
@ -82,37 +128,37 @@ class SocketClient():
except OSError:
self.stop()
def _prepare(self):
self.sock.setblocking(False)
self.sock.settimeout(1)
try:
self.sock.connect(self.connection)
except OSError as e:
return e
return None
def recv(self):
return [letter for letter in self._obtain(self._outbox)]
def from_actor(self, imports):
cast(self.handler, "load", imports)
def send(self, message):
self._inbox.put(message)
def start(self):
def run(self):
err = self._prepare()
if err is not None:
eprint(err)
return err
# print("Ready!")
cast(self.handler, "initialize")
queue_args = self._inbox, self.timeout
Thread(target=self._queue_thread, args=queue_args).start()
print("Client ready!")
self.ready.set()
while self.running:
cast(self.handler, "pre_process")
message = self._get_message()
if message is None:
continue
cast(self.handler, "message", message)
cast(self.handler, "post_process")
self._outbox.put(message)
def stop(self, done=None):
# print("Stopping client!")
event = dict()
event["name"] = "closing"
event["data"] = dict()
event["data"]["when"] = utc_now_timestamp()
self._send_event(event)
self.kill_switch.set()
self.running = False
self.sock.close()
self.stopped.set()
cast(done, "set")
# print("Stopped client!")

View File

@ -1,63 +0,0 @@
"""
Socket Client Handlers
======================
"""
from abots.helpers import cast
from time import sleep
from struct import pack, unpack
class SocketClientHandler:
def __init__(self, client):
self.client = client
self.imports = dict()
def load(self, imports):
self.imports = imports
def pre_process(self):
kill_switch = self.imports.get("kill_switch", None)
if kill_switch is None:
return False
if kill_switch.is_set():
self.client.stop()
return True
return False
def post_process(self):
kill_switch = self.imports.get("kill_switch", None)
mailbox = self.imports.get("mailbox", None)
# pid = self.imports.get("pid", None)
# ledger = self.imports.get("ledger", None)
if kill_switch is None or mailbox is None:
return
while not kill_switch.is_set():
while not mailbox.empty():
message = mailbox.get()
print("POST", message)
# Prepends message size code along with replacing variables in message
def format(self, message, *args):
formatted = None
if len(args) > 0:
formatted = message.format(*args)
else:
formatted = message
# Puts message size at the front of the message
prefixed = pack(">I", len(formatted)) + formatted.encode()
return prefixed
def initialize(self):
self.client.send_message("PING")
def message(self, message):
print(f"DEBUG: {message}")
if message == "PONG":
sleep(1)
self.client.send_message("PING")

View File

@ -3,22 +3,26 @@
net/SocketServer
================
TODO:
* Add logging to broken pipe exceptions
"""
from abots.events import Envelope
from abots.helpers import eprint, cast
from abots.helpers import eprint, cast, sha256, utc_now_timestamp
from abots.helpers import jsto, jots
from threading import Thread, Event
from threading import Thread, Event, Lock
from struct import pack, unpack
from select import select
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from time import time
from ssl import wrap_socket
from queue import Queue, Empty
class SocketServer:
def __init__(self, host, port, handler, listeners=5, buffer_size=4096,
secure=False, *args, **kwargs):
class SocketServer(Thread):
def __init__(self, host, port, listeners=5, buffer_size=4096,
secure=False, timeout=None):
super().__init__()
# The connection information for server, the clients will use this to
# connect to the server
@ -35,7 +39,16 @@ class SocketServer:
# Determines if SSL wrapper is used
self.secure = secure
self.handler = handler(self, *args, **kwargs)
# Timeout set on queues
self.timeout = timeout
self._inbox = Queue()
self._events = Queue()
self._outbox = Queue()
self.queues = dict()
self.queues["inbox"] = self._inbox
self.queues["outbox"] = self._outbox
self.queues["events"] = self._events
# Sets up the socket itself
self.sock = socket(AF_INET, SOCK_STREAM)
@ -46,42 +59,83 @@ class SocketServer:
# List of all sockets involved (both client and server)
self.sockets = list()
self.clients = list()
self.uuids = dict()
# State variable for if the server is running or not. See `run`.
self.kill_switch = Event()
self.imports = dict()
self.ready = Event()
self.stopped = Event()
def _send_event(self, message):
self._events.put(jots(message))
def _new_client(self, sock, address):
sock.settimeout(60)
client_host, client_port = address
self.sockets.append(sock)
# Have handler process new client event
cast(self.handler, "open_client", client_host, client_port)
# Spawn new thread for client
event = self.kill_switch
client_thread = Thread(target=self._client_thread, args=(sock, event))
client_kill = Event()
client_uuid = sha256()
self.uuids[client_uuid] = dict()
self.uuids[client_uuid]["sock"] = sock
self.uuids[client_uuid]["kill"] = client_kill
event = dict()
event["name"] = "new_client"
event["data"] = dict()
event["data"]["host"] = client_host
event["data"]["port"] = client_port
event["data"]["uuid"] = client_uuid
self._send_event(event)
client_args = sock, client_kill, client_uuid
client_thread = Thread(target=self._client_thread, args=client_args)
self.clients.append(client_thread)
client_thread.start()
# Logic for the client socket running in its own thread
def _client_thread(self, sock, kill_switch):
def _client_thread(self, sock, kill_switch, uuid):
while not kill_switch.is_set():
try:
message = self.get_message(sock)
message = self.get_message(uuid)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
cast(self.handler, "close_client", sock)
eprint(e)
break
if message is None:
continue
# Each message returns a status code, exactly which code is
# determined by the handler
cast(self.handler, "message", sock, message)
self.close_sock(sock)
# Send message and uuid of sender to outbox queue
letter = uuid, message
self._outbox.put(letter)
self.close_sock(uuid)
return
def _obtain(self, queue, timeout=False):
if timeout is False:
timeout = self.timeout
while True:
try:
if timeout is not None:
yield queue.get(timeout=timeout)
else:
yield queue.get_nowait()
queue.task_done()
except Empty:
break
def _queue_thread(self, inbox, timeout):
while not self.kill_switch.is_set():
for letter in self._obtain(inbox, timeout):
if len(letter) != 2:
continue
uuid, message = letter
if uuid == "cast":
self.broadcast_message(uuid, message)
else:
self.send_message(uuid, message)
# Prepares socket server before starting it
def _prepare(self):
self.sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
@ -97,11 +151,31 @@ class SocketServer:
self.sockets.append(self.sock)
return None
def _sock_from_uuid(self, uuid):
return self.uuids.get(uuid, dict()).get("sock", None)
def _package_message(self, message, *args):
if len(args) > 0:
formatted = message.format(*args)
else:
formatted = message
packaged = pack(">I", len(formatted)) + formatted.encode()
return packaged
# Closes a connected socket and removes it from the sockets list
def close_sock(self, sock):
if sock in self.sockets:
def close_sock(self, uuid):
event = dict()
event["name"] = "close_client"
event["data"] = dict()
event["data"]["uuid"] = uuid
self._send_event(event)
if uuid in list(self.uuids):
sock = self.uuids[uuid]["sock"]
kill = self.uuids[uuid]["kill"]
kill.set()
del self.uuids[uuid]
self.sockets.remove(sock)
sock.close()
sock.close()
# Receives specified number of bytes from a socket
# sock - one of the sockets in sockets
@ -130,45 +204,53 @@ class SocketServer:
data = data + packet
return data.decode() if decode else data
# Get message from socket
def get_message(self, sock):
# Get message from socket
def get_message(self, uuid):
sock = self._sock_from_uuid(uuid)
if sock is None:
return None
raw_message_size = self.receive_bytes(sock, 4, False)
if raw_message_size is None:
if raw_message_size is None or len(raw_message_size) != 4:
return None
message_size = unpack(">I", raw_message_size)[0]
return self.receive_bytes(sock, message_size)
# Packages a message and sends it to socket
def send_message(self, sock, message, *args):
formatted = cast(self.handler, "format", message, *args)
def send_message(self, uuid, message, *args):
sock = self._sock_from_uuid(uuid)
if sock is None:
return None
formatted = self._package_message(message)
try:
sock.send(formatted)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
self.close_sock(sock)
return
# Like send_message, but sends to all sockets but the server and the sender
def broadcast_message(self, client_sock, client_message, *args):
for sock in self.sockets:
not_server = sock != self.sock
not_client = sock != client_sock
if not_server and not_client:
self.send_message(sock, client_message, *args)
def broadcast_message(self, client_uuid, message, *args):
for uuid in list(self.uuids):
if uuid != client_uuid:
self.send_message(uuid, message, *args)
def from_actor(self, imports):
cast(self.handler, "load", imports)
def recv(self):
return [letter for letter in self._obtain(self._outbox)]
# The Process function for running the socket server logic loop
def start(self):
def send(self, uuid, message):
letter = uuid, message
self._inbox.put(letter)
# The function for running the socket server logic loop
def run(self):
err = self._prepare()
if err is not None:
eprint(err)
return err
# print("Server ready!")
queue_args = self._inbox, self.timeout
Thread(target=self._queue_thread, args=queue_args).start()
print("Server ready!")
self.ready.set()
while not self.kill_switch.is_set():
broken = cast(self.handler, "pre_process")
if broken:
break
try:
# Accept new socket client
client_sock, client_address = self.sock.accept()
@ -176,17 +258,20 @@ class SocketServer:
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
continue
# cast(self.handler, "post_process")
# Stop the socket server
def stop(self, done=None, join=False):
cast(self.handler, "close")
for sock in self.sockets:
if sock != self.sock:
sock.close()
event = dict()
event["name"] = "closing"
event["data"] = dict()
event["data"]["when"] = utc_now_timestamp()
self._send_event(event)
for uuid in list(self.uuids):
self.close_sock(uuid)
self.kill_switch.set()
self.sock.close()
if join:
for client in self.clients:
client.join()
client.join(self.timeout)
self.stopped.set()
cast(done, "set")

View File

@ -1,72 +0,0 @@
"""
Socket Server Handlers
======================
"""
from abots.helpers import cast
from time import sleep
from struct import pack, unpack
class SocketServerHandler:
def __init__(self, server):
self.server = server
self.imports = dict()
def load(self, imports):
self.imports = imports
def pre_process(self):
kill_switch = self.imports.get("kill_switch", None)
if kill_switch is None:
return False
if kill_switch.is_set():
self.server.stop()
return True
return False
def post_process(self):
kill_switch = self.imports.get("kill_switch", None)
mailbox = self.imports.get("mailbox", None)
# pid = self.imports.get("pid", None)
# ledger = self.imports.get("ledger", None)
if kill_switch is None or mailbox is None:
return
while not kill_switch.is_set():
while not mailbox.empty():
message = mailbox.get()
print("POST", message)
# Tells all clients that a node joined the socket server
def open_client(self, address, port):
pass
# Informs the other clients a client left and closes that client's socket
def close_client(self, sock):
pass
def close(self):
pass
# Format a message before sending to client(s)
# Prepends message size code along with replacing variables in message
def format(self, message, *args):
formatted = None
if len(args) > 0:
formatted = message.format(*args)
else:
formatted = message
# Puts message size at the front of the message
prefixed = pack(">I", len(formatted)) + formatted.encode()
return prefixed
def message(self, sock, message):
print(f"DEBUG: {message}")
if message == "PING":
sleep(1)
self.server.send_message(sock, "PONG")

View File

@ -1,13 +1,17 @@
#!env/bin/python3
import sys
sys.path.insert(0, "/center/lib")
from os.path import dirname, realpath
source = "/".join(dirname(realpath(__file__)).split("/")[:-1])
sys.path.insert(0, source)
from abots.net import SocketClient
host = "127.0.0.1"
port = 10701
timeout = 3
client = SocketClient(host, port)
client = SocketClient(host, port, timeout=timeout)
client.daemon = True
client.start()

View File

@ -1,13 +1,17 @@
#!env/bin/python3
import sys
sys.path.insert(0, "/center/lib")
from os.path import dirname, realpath
source = "/".join(dirname(realpath(__file__)).split("/")[:-1])
sys.path.insert(0, source)
from abots.net import SocketServer
host = "127.0.0.1"
port = 10701
timeout = 3
server = SocketServer(host, port)
server = SocketServer(host, port, timeout=timeout)
server.daemon = True
server.start()

View File

@ -1,37 +1,13 @@
#!env/bin/python3
from abots.events import Duodecimer, Cron
from abots.helpers import eprint, cast, Logger
from abots.net import SocketServer, SocketClient
from abots.events import ThreadMarshal
from queue import Queue, Empty
from threading import Event, Lock
from time import sleep, time
pool_size = 2
marshal = ThreadMarshal(2)
server = SocketServer("localhost", 10401, timeout=3)
client = SocketClient("localhost", 10401, timeout=3)
intervals = ["5s", "30s", "1m"]
def fib(n):
if n <= 1:
return 1
return fib(n - 1) + fib(n - 2)
def fibber(n, log):
log.debug("Starting fibber")
result = fib(10 + n)
when = str(int(time()))[-3:]
log.debug(f"Timer {intervals[n % 3]} :{when}: {result}")
timers = Duodecimer()
timers.start()
log = Logger("test_abots", settings={"disabled": ["file"]})
log.start()
for i in range(15):
timers.assign(intervals[i % 3], fib, (10 + i,))
triggers = dict()
triggers["time"] = Cron.next_minutes(1)
cron = Cron(False, triggers)
timers.schedule(cron, fibber, (15, log))
log.info("Done submitting")
sleep(60 * 3 + 5)
log.info("Stopping...")
log.stop()
timers.stop()
marshal.reserve(server.start)
server.ready.wait()
marshal.reserve(client.start)