Added thread pool and thread pool manager (among other minor changes)

This commit is contained in:
aewens 2019-05-01 02:34:26 +02:00
parent fec911b36a
commit 42f32fce15
21 changed files with 605 additions and 544 deletions

4
.gitignore vendored
View File

@ -1,7 +1,11 @@
# Application-specific
tags
scratch
scratch.py
*.bak.py
*.db
archive/
.vscode/
# ---> Python
# Byte-compiled / optimized / DLL files

1
abots/db/__init__.py Normal file
View File

@ -0,0 +1 @@
from abots.db.sqlite import SQLite

98
abots/db/sqlite.py Normal file
View File

@ -0,0 +1,98 @@
from abots.helpers import eprint
from os import stat, remove
from os.path import dirname, basename, isfile, isdir, join as path_join
from shutil import copyfile
from time import strftime
from operator import itemgetter
from sqlite3 import connect, Error as SQLiteError
class Quill:
def __init__(self, db_file):
self.db_file = db_file
self.path = dirname(self.db_file)
self.filename = basename(self.db_file)
self.connection = None
self.cursor = None
self.memory = False
self._load()
def _load(self):
try:
self.connection = connect(db_file)
except SQLiteError as e:
eprint(e)
self.connection = connect(":memory:")
self.memory = True
self.cursor = self.connection.cursor()
def _unload(self):
self.conn.close()
# Must be called from `backup`
def _remove_old_backups(self, backup_dir, backups):
contents = listdir(backup_dir)
files = [(f, stat(f).st_mtime) for f in contents if isfile(f)]
# Sort by mtime
files.sort(key=itemgetter(1))
remove_files = files[:-backups]
for rfile in remove_files:
remove(rfile)
def backup(self, backup_dir=None, backups=1):
if backup_dir is None:
backup_dir =
if not isdir(backup_dir):
eprint("ERROR: Backup directory does not exist")
return None
suffix = strftime("-%Y%m%d-%H%M%S")
backup_name = f"{self.filename} {suffix}"
backup_file = path_join(backup_dir, backup_name)
# Lock database
self.cursor.execute("begin immediate")
copyfile(self.db_file, backup_file)
# Unlock database
self.connection.rollback()
self._remove_old_backups(backup_dir, backups)
def execute(self, executor, commit=True, *args):
if "?" in executor:
result = self.cursor.execute(executor, *args)
else:
result = self.cursor.execute(executor)
if commit:
self.connection.commit()
return result
def fetch(self, executor, *args):
return self.execute(executor, commit=False, *args).fetchall()
def create_table(self, name, fields):
_fields = list()
for field in fields:
name = field.get("name", None)
kind = field.get("kind", None)
attr = field.get("attr", None)
if None in [name, kind, attr] or type(attr) != list:
return None
_fields.append(f"{name} {kind} {" ".join(attr)}")
field_string = ", ".join(_fields)
executor = f"CREATE TABLE {self.name}({field_string})"
self.execute(executor)
return True
def insert_values(self, table, values):
if type(values) != list:
values = [values]
values_string = ", ".join(values)
executor = f"INSERT INTO {table} VALUES({values_string})"
self.execute(executor)
return True
def select(self, table, values, where=None):
if type(values) != list:
values = [values]
values_string = ", ".join(values)
executor = f"SELECT {values_string} FROM"
if where:
executor = f"{executor} WHERE {}"

View File

@ -1,6 +1,4 @@
from abots.events.proxy import Proxy
from abots.events.every import Every
from abots.events.shared import Shared
from abots.events.processor import Processor
from abots.events.actor import Actor, Envelope, Supervisor
# from abots.events.scheduler import Scheduler
# from abots.events.scheduler import Scheduler
from abots.events.threads import ThreadPool, ThreadPoolManager

View File

@ -1,7 +1,6 @@
from abots.helpers import eprint, noop
from abots.helpers import eprint, cast
from multiprocessing import Process, Event, Queue
from traceback import print_exc
from enum import Enum
class MailCode(Enum):
@ -153,7 +152,7 @@ class Actor(Process):
self.mailbox = queue
self.valid = True
def call(self, source, method, *args, **kwargs):
def _call(self, source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
@ -163,22 +162,27 @@ class Actor(Process):
return status
def run(self):
print(f"Starting {self.name}")
# print(f"Starting {self.name}")
if not self.valid:
return None
print(f"[{self.name}]: Starting proc")
self.proc.from_actor(self.pid, self.kill_switch, self.mailbox)
# print(f"[{self.name}]: Starting proc")
exports = dict()
exports["pid"] = self.pid
exports["kill_switch"] = self.kill_switch
exports["mailbox"] = self.mailbox
exports["ledger"] = self.ledger
cast(self.proc, "from_actor", exports)
proc = Process(target=self.proc.start)
proc.start()
actions = ["reply", "deliver", "send", "send_to", "send_faux"]
print(f"[{self.name}]: Started proc")
# print(f"[{self.name}]: Started proc")
self.kill_switch.wait()
# while not self.kill_switch.is_set():
# while not self.mailbox.empty():
# envelope = Envelope(self.own_pid, self.mailbox.get())
# if not envelope.valid:
# continue
# response = self.call(self.proc, "handle", envelope.message)
# response = cast(self.proc, "handle", envelope.message)
# if response is None or len(response) != 3:
# continue
# action, args, kwargs = response
@ -188,19 +192,19 @@ class Actor(Process):
# continue
# elif action == "deliver" and not envelope.can_deliver:
# continue
# self.call(envelope, action, *args, **kwargs)
# cast(envelope, action, *args, **kwargs)
self.stop()
def stop(self, done=None, timeout=None):
print(f"Stopping {self.name}")
# print(f"Stopping {self.name}")
delay = Event()
self.call(self.proc, "stop", delay)
cast(self.proc, "stop", delay)
delay.wait(timeout)
self.kill_switch.set()
self.mailbox.close()
self.mailbox.join_thread()
self.call(done, "set")
print(f"Stopped {self.name}")
cast(done, "set")
# print(f"Stopped {self.name}")
class Supervisor:
def __init__(self, name="__ROOT__", ledger=Ledger()):
@ -226,7 +230,7 @@ class Supervisor:
return actor
def dismiss(self, child, done=None):
print(f"Dismissing {child}")
# print(f"Dismissing {child}")
if type(child) != int:
child = self.ledger.get_pid(child)
if child not in self.children:
@ -238,8 +242,8 @@ class Supervisor:
event.set()
queue.close()
queue.join_thread()
self.call(done, "set")
print(f"Dismissed {child}")
cast(done, "set")
# print(f"Dismissed {child}")
def stop(self, done=None):
for child in self.children:
@ -251,4 +255,4 @@ class Supervisor:
event.set()
queue.close()
queue.join_thread()
self.call(done, "set")
cast(done, "set")

View File

@ -1,23 +1,22 @@
from time import monotonic, sleep
from multiprocessing import Event, Process
from threading import Event, Thread
class Every:
def __init__(self, interval, function, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.interval = interval
self.function = function
self.condition = Event()
self.event = Event()
def _wrapper(self):
def _wrapper(self, *args, **kwargs):
start = monotonic()
while not self.condition.is_set():
self.function(*self.args, **self.kwargs)
while not self.event.is_set():
self.function(*args, **kwargs)
sleep(self.interval - ((monotonic() - start) % self.interval))
def start(self):
proc = Process(target=self._wrapper)
proc.start()
thread = Thread(target=self._wrapper, args=args, kwargs=kwargs)
thread.setDaemon(True)
thread.start()
def stop(self):
self.condition.set()
self.event.set()

238
abots/events/threads.py Normal file
View File

@ -0,0 +1,238 @@
from abots.helpers import eprint, cast
from abots.events import Every
from queue import Queue, Empty, Full
from threading import Thread, Event, Lock, RLock, BoundedSemaphore
"""
TODO:
- Add routine clean-up in ThreadPoolManager of un-used thread pools
"""
class ThreadPool:
def __init__(self, pool_size, timeout=None):
self.locks = list()
self.events = list()
self.queues = list()
self.workers = list()
data = dict()
for s in range(pool_size):
lock = Lock()
event = Event()
queue = Queue()
args = (s, event, queue, timeout)
worker = Thread(target=self._worker, args=args)
worker.setDaemon(True)
self.locks.append(lock)
self.events.append(event)
self.queues.append(queue)
self.workers.append(worker)
worker.start()
def _worker(self, worker_id, event, queue, timeout=None):
while not event.is_set():
try:
# print(f"[{self.tid}]: Getting task")
if timeout is not None:
job = queue.get(block=True, timeout=timeout)
else:
job = queue.get_nowait()
if len(job) != 2:
# print(f"[worker:{worker_id}]: Job is malformed")
continue
controls, task = job
if len(controls) != 3:
# print(f"[worker:{worker_id}]: Controls are malformed")
continue
done, semaphore, lock = controls
if task is None: # NOTE: Poison pill to kill worker
# print(f"[worker:{worker_id}]: Poisoned")
event.set()
done.set()
lock.release()
semaphore.release()
break
if len(task) != 3:
# print(f"[worker:{worker_id}]: Task is malformed")
done.set()
lock.release()
semaphore.release()
continue
method, args, kwargs = task
# print(f"[{self.tid}]: Running task")
try:
method(*args, **kwargs)
except Exception as e:
print(e)
finally:
done.set()
lock.release()
semaphore.release()
queue.task_done()
except Empty:
continue
# Clear out the queue
# print(f"[worker:{worker_id}]: Clearing out queue")
while True:
try:
queue.get_nowait()
queue.task_done()
except Empty:
break
def stop(self, done=None, wait=True):
# print(f"Stopping pool")
for event in self.events:
event.set()
if wait:
for worker in self.workers:
worker.join()
cast(done, "set")
# print(f"Stopped pool")
class ThreadPoolManager:
def __init__(self, pool_size, cleanup=-1, timeout=None):
self.pool_size = pool_size
self.timeout = timeout
self.cleanup = cleanup
self.pool_cursor = 0
self.worker_cursor = 0
self.pools = list()
self.locks = list()
self.events = list()
self.queues = list()
self.workers = list()
self.semaphores = list()
self._manager = Lock() # NOTE: Maybe make this an RLock?
self.stopped = Event()
self._add_pool()
if self.cleanup > 0:
self.cleaner = Every(self.cleanup, self._cleaner)
def _next_pool(self):
self.pool_cursor = (self.pool_cursor + 1) % len(self.pools)
def _next_worker(self):
self.worker_cursor = (self.worker_cursor + 1) % self.pool_size
# NOTE: This is a potential optimization for later
# if self.worker_cursor == 0:
# self._next_pool()
def _cleaner(self):
if len(self.pools) == 0 or self._manager.locked():
return # Try again later
with self._manager:
pools = list()
for pool_index in range(1, len(self.pools) - 1):
pool = self.pools[pool_index]
queues = self.queues[pool_index]
idle = any([queue.empty() for queue in queues])
if not idle:
continue
pools.append(pool)
for pool in pools:
self._stop_pool(pool)
def _stop_pool(self, pool, done=None, wait=True):
index = self.pools.index(pool)
# print(f"[manager] Stopping pool {index}")
lock = self.locks[index]
event = self.events[index]
queue = self.queues[index]
worker = self.workers[index]
semaphore = self.semaphores[index]
self.pools.remove(pool)
self.locks.remove(lock)
self.events.remove(event)
self.queues.remove(queue)
self.workers.remove(worker)
self.semaphores.remove(semaphore)
pool.stop(done, wait)
# print(f"[manager] Stopped pool {index}")
def _add_pool(self):
index = len(self.pools)
# print(f"[manager] Adding pool {index}")
pool = ThreadPool(self.pool_size, self.timeout)
self.pools.append(pool)
self.locks.append(pool.locks)
self.events.append(pool.events)
self.queues.append(pool.queues)
self.workers.append(pool.workers)
self.semaphores.append(BoundedSemaphore(self.pool_size))
return index
def add_pool(self):
with self._manager:
self._add_pool()
def wait(self):
# print("[manager] Waiting on workers in pools")
with self._manager:
for pool in self.pools:
for worker in pool.workers:
worker.join()
def stop(self, wait=True):
# print("[manager] Stopping")
if self.cleanup > 0:
self.cleaner.event.set()
with self._manager:
dones = list()
threads = list()
for pool in self.pools:
done = Event()
dones.append(done)
thread = Thread(target=pool.stop, args=(done, wait))
thread.setDaemon(True)
threads.append(thread)
thread.start()
if wait:
for thread in threads:
thread.join(self.timeout)
cast(self.stopped, "set")
# print("[manager] Stopped")
def reserve(self, method, args=tuple(), kwargs=dict(), reserve=True):
done = Event()
# print("[manager:reserve] Acquiring lock")
with self._manager:
task = method, args, kwargs
if self.pool_cursor >= len(self.pools):
self._next_pool()
pool_found = False
for p in range(len(self.pools)):
# print(f"[manager:reserve] Trying pool {self.pool_cursor}")
semaphore = self.semaphores[self.pool_cursor]
if not semaphore.acquire(False):
self._next_pool()
continue
pool_found = True
break
if not pool_found:
# print(f"[manager:reserve] Pools are full, adding new pool")
index = self._add_pool()
self.pool_cursor = index
self.worker_cursor = 0
semaphore = self.semaphores[self.pool_cursor]
semaphore.acquire()
# print(f"[manager:reserve] Using pool {self.pool_cursor}")
pool = self.pools[self.pool_cursor]
for w in range(self.pool_size):
# print(f"[manager:reserve] Trying worker {self.worker_cursor}")
lock = self.locks[self.pool_cursor][self.worker_cursor]
event =self.events[self.pool_cursor][self.worker_cursor]
queue =self.queues[self.pool_cursor][self.worker_cursor]
if event.is_set() or lock.locked():
self._next_worker()
continue
if not reserve:
lock = Lock()
# print(f"[manager:reserve] Using worker {self.worker_cursor}")
lock.acquire()
controls = done, semaphore, lock
job = controls, task
queue.put_nowait(job)
self._next_worker()
break
return done

View File

@ -1,3 +1,4 @@
from abots.helpers.hash import create_hash, md5, sha1, sha256, sha512
from abots.helpers.encode import jots, jsto, b64e, b64d, h2b64, b642h, ctos
from abots.helpers.numbers import clamp, randfloat
from abots.helpers.general import eprint, deduce, noop, cast, get_digit

View File

@ -9,18 +9,18 @@ def jots(data, readable=False):
# If readable is set, it pretty prints the JSON to be more human-readable
if readable:
kwargs["sort_keys"] = True
# kwargs["sort_keys"] = True
kwargs["indent"] = 4
kwargs["separators"] = (",", ":")
try:
return json.dumps(data, **kwargs)
return dumps(data, **kwargs)
except ValueError as e:
return None
# JSON decoder, converts a string to a python object
def jsto(data):
try:
return json.loads(data)
return loads(data)
except ValueError as e:
return None

View File

@ -1,18 +1,25 @@
from sys import stderr
from traceback import print_exc
def eprint(*args, **kwargs):
print(*args, file=stderr, **kwargs)
def noop(*args, **kwargs):
pass
def cast(source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
return status
def deduce(reference, attributes):
if type(attributes) is not list:
attributes = [attributes]
return list(map(lambda attr: getattr(reference, attr, None), attributes))
def noop(*args, **kwargs):
pass
def cast(obj, method, *args, **kwargs):
return getattr(obj, method, noop)(*args, **kwargs)
def get_digit(number, position):
return False if number - 10**position < 0 else number // 10*position % 10

10
abots/helpers/numbers.py Normal file
View File

@ -0,0 +1,10 @@
from random import random, randint
def clamp(number, low, high):
return max(low, min(number, high))
def randfloat(start, end=None):
if end is None:
end = start
start = 0
return (end - start) * random() + start

View File

@ -7,12 +7,11 @@ Socket Client
"""
from abots.helpers import eprint, noop
from abots.helpers import eprint, cast
from struct import pack, unpack
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
from traceback import print_exc
class SocketClient():
def __init__(self, host, port, handler, buffer_size=4096, secure=False,
@ -28,9 +27,6 @@ class SocketClient():
self.connection = (self.host, self.port)
self.running = True
self.pid = None
self.kill_switch = None
self.mailbox = None
def _recv_bytes(self, get_bytes, decode=True):
data = "".encode()
@ -95,39 +91,28 @@ class SocketClient():
return e
return None
def from_actor(self, pid, event, queue):
self.pid = pid
self.kill_switch = event
self.mailbox = queue
def from_actor(self, imports):
cast(self.handler, "load", imports)
def start(self):
err = self._prepare()
if err is not None:
eprint(err)
return err
print("Ready!")
self.handler.initialize()
# print("Ready!")
cast(self.handler, "initialize")
while self.running:
if self.call(self.kill_switch, "is_set"):
self.stop()
break
cast(self.handler, "pre_process")
message = self._get_message()
if message is None:
continue
self.handler.message(message)
cast(self.handler, "message", message)
def call(self, source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
return status
cast(self.handler, "post_process")
def stop(self, done=None):
print("Stopping client!")
# print("Stopping client!")
self.running = False
self.sock.close()
self.call(done, "set")
print("Stopped client!")
cast(done, "set")
# print("Stopped client!")

View File

@ -1,18 +1,45 @@
"""
Socket Server Handlers
Socket Client Handlers
======================
"""
from abots.helpers import cast
from time import sleep
from struct import pack, unpack
class SocketClientHandler:
def __init__(self, client):
self.client = client
self.imports = dict()
def load(self, imports):
self.imports = imports
def pre_process(self):
kill_switch = self.imports.get("kill_switch", None)
if kill_switch is None:
return False
if kill_switch.is_set():
self.client.stop()
return True
return False
def post_process(self):
kill_switch = self.imports.get("kill_switch", None)
mailbox = self.imports.get("mailbox", None)
# pid = self.imports.get("pid", None)
# ledger = self.imports.get("ledger", None)
if kill_switch is None or mailbox is None:
return
while not kill_switch.is_set():
while not mailbox.empty():
message = mailbox.get()
print("POST", message)
# Prepends message size code along with replacing variables in message
def format(self, message, *args):

View File

@ -6,15 +6,15 @@ net/SocketServer
"""
from abots.helpers import eprint, noop
from abots.events import Envelope
from abots.helpers import eprint, cast
from threading import Thread
from threading import Thread, Event
from struct import pack, unpack
from select import select
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from time import time
from ssl import wrap_socket
from traceback import print_exc
class SocketServer:
def __init__(self, host, port, handler, listeners=5, buffer_size=4096,
@ -48,25 +48,37 @@ class SocketServer:
self.clients = list()
# State variable for if the server is running or not. See `run`.
self.running = True
self.pid = None
self.kill_switch = None
self.mailbox = None
self.kill_switch = Event()
self.imports = dict()
def _new_client(self, sock, address):
sock.settimeout(60)
client_host, client_port = address
self.sockets.append(sock)
# Have handler process new client event
cast(self.handler, "open_client", client_host, client_port)
# Spawn new thread for client
event = self.kill_switch
client_thread = Thread(target=self._client_thread, args=(sock, event))
self.clients.append(client_thread)
client_thread.start()
# Logic for the client socket running in its own thread
def _client_thread(self, sock):
while self.running:
def _client_thread(self, sock, kill_switch):
while not kill_switch.is_set():
try:
message = self.get_message(sock)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
self.handler.close_client(sock)
cast(self.handler, "close_client", sock)
break
if message is None:
continue
# Each message returns a status code, exactly which code is
# determined by the handler
self.handler.message(sock, message)
cast(self.handler, "message", sock, message)
self.close_sock(sock)
return
@ -128,7 +140,7 @@ class SocketServer:
# Packages a message and sends it to socket
def send_message(self, sock, message, *args):
formatted = self.handler.format(message, *args)
formatted = cast(self.handler, "format", message, *args)
try:
sock.send(formatted)
# The socket can either be broken or no longer open at all
@ -143,10 +155,8 @@ class SocketServer:
if not_server and not_client:
self.send_message(sock, client_message, *args)
def from_actor(self, pid, event, queue):
self.pid = None
self.kill_switch = event
self.mailbox = queue
def from_actor(self, imports):
cast(self.handler, "load", imports)
# The Process function for running the socket server logic loop
def start(self):
@ -154,53 +164,29 @@ class SocketServer:
if err is not None:
eprint(err)
return err
print("Server ready!")
while self.running:
if self.call(self.kill_switch, "is_set"):
self.stop()
# print("Server ready!")
while not self.kill_switch.is_set():
broken = cast(self.handler, "pre_process")
if broken:
break
try:
# Accept new socket client
client_sock, client_address = self.sock.accept()
client_sock.settimeout(60)
self._new_client(client_sock, client_address)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
continue
# Collect the metadata of the client socket
client_host, client_port = client_address
# Define metadata for client
self.sockets.append(client_sock)
# Have handler process new client event
self.handler.open_client(client_host, client_port)
# Spawn new thread for client
args = (client_sock,)
client_thread = Thread(target=self._client_thread, args=args)
self.clients.append(client_thread)
client_thread.start()
def call(self, source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
return status
# cast(self.handler, "post_process")
# Stop the socket server
def stop(self, done=None):
print("Stopping server!")
self.handler.close()
def stop(self, done=None, join=False):
cast(self.handler, "close")
for sock in self.sockets:
if sock != self.sock:
sock.close()
self.running = False
self.kill_switch.set()
self.sock.close()
for client in self.clients:
client.join()
self.call(done, "set")
print("Stopped server!")
if join:
for client in self.clients:
client.join()
cast(done, "set")

View File

@ -7,12 +7,39 @@ Socket Server Handlers
"""
from abots.helpers import cast
from time import sleep
from struct import pack, unpack
class SocketServerHandler:
def __init__(self, server):
self.server = server
self.imports = dict()
def load(self, imports):
self.imports = imports
def pre_process(self):
kill_switch = self.imports.get("kill_switch", None)
if kill_switch is None:
return False
if kill_switch.is_set():
self.server.stop()
return True
return False
def post_process(self):
kill_switch = self.imports.get("kill_switch", None)
mailbox = self.imports.get("mailbox", None)
# pid = self.imports.get("pid", None)
# ledger = self.imports.get("ledger", None)
if kill_switch is None or mailbox is None:
return
while not kill_switch.is_set():
while not mailbox.empty():
message = mailbox.get()
print("POST", message)
# Tells all clients that a node joined the socket server
def open_client(self, address, port):

View File

@ -1,426 +1,102 @@
#!env/bin/python3
import sys
sys.path.insert(0, "/center/lib")
# from abots.ui import TUI
# tui = TUI()
# ========================
# from abots.events.event import Event
# from abots.events.proc import Proc
# from abots.events import Every
# from time import time, sleep
# from multiprocessing import Queue, Process, Manager
# event_q = Queue()
# # send_q = Queue()
# timeout = 0.02
# def nag(eq):
# eq.put(time())
# def blab(eq):
# print(eq.get(block=True, timeout=timeout))
# def destroy(everys, queues):
# for every in everys:
# every.stop()
# for queue in queues:
# queue.close()
# queue.join_thread()
# nag_every = Every(10, nag, event_q)
# nag_proc = Process(target=nag_every.start)
# blab_every = Every(10, blab, event_q)
# blab_proc = Process(target=blab_every.start)
# everys = [
# nag_every,
# blab_every
# ]
# queues = [event_q]
# nag_proc.start()
# blab_proc.start()
# ========================
# from abots.events import Proxy
# class Counter(object):
# def __init__(self):
# self._value = 0
# def update(self, value):
# self._value = self._value + value
# def get(self):
# return self._value
# def __str__(self):
# return "Counter"
# proxy = Proxy(Counter)
# ========================
# from abots.events import Processor, Every, Shared
# from time import sleep
# class Simple:
# def __init__(self):
# self.counter = 0
# def handler(self, pipe, event):
# self.counter = self.counter + 1
# print(event, self.counter)
# self.pipe.notify("complex", self.counter)
# def start(self):
# print("Started")
# def stop(self):
# print("Stopped")
# class Complex:
# def __init__(self, pipe):
# self.pipe = pipe
# self.events = self.pipe.events
# self.condition = self.pipe.condition
# self.counter = 0
# self.runner = Every(10, self.handler)
# def handler(self):
# self.pipe.notify("simple", self.counter)
# events = Shared(self.events)
# while not self.condition.is_set():
# while not events.empty():
# self.counter = self.counter + 1
# event = events.safe_get()
# print(event, self.counter)
# events.done()
# self.pipe.notify("simple", self.counter)
# self.stop()
# def start(self):
# print("Started")
# self.pipe.listen("complex")
# self.runner.start()
# def stop(self):
# print("Stopped")
# if self.condition.is_set():
# self.condition.set()
# self.runner.stop()
# simple = Simple()
# processor = Processor()
# processor.start()
# processor.register(simple.handler, name="simple", simple=True)
# processor.register(Complex)
# processor.spinup_all()
# sleep(10)
# processor.stop()
# ========================
# from abots.events import Actor
# from abots.helpers import noop
# class Recaman:
# def __init__(self, steps):
# self.current = 0
# self.steps = steps
# self.seq = list()
# self.seq.append(self.current)
# def next(self):
# step = len(self.seq)
# if step == self.steps:
# return None
# backwards = self.current - step
# if backwards > 0 and backwards not in self.seq:
# self.current = backwards
# else:
# self.current = self.current + step
# self.seq.append(self.current)
# return self.current
# class Fibonacci:
# def __init__(self, steps):
# self.steps = steps
# self.seq = [0, 1]
# def next(self):
# step = len(self.seq)
# if step == self.steps:
# return None
# a, b = self.seq[-2:]
# c = a + b
# self.seq.append(c)
# return c
# class Watcher:
# def __init__(self, watching):
# self.seqs = list()
# self.done = list()
# self.names = dict()
# self.ready = False
# for watch_id, name in enumerate(watching):
# self.seqs.append(list())
# self.done.append(False)
# self.names[name] = watch_id
# def receive(self, name, entry):
# if self.ready:
# return None
# watch_id = self.names.get(name, None)
# if watch_id is None:
# return None
# if entry is None:
# self.done[watch_id] = True
# if all(self.done):
# self.ready = True
# return True
# return None
# self.seqs[watch_id].append(entry)
# return False
# class Cruncher:
# def __init__(self):
# self.seqs = None
# self.seq = list()
# self.step = 0
# def load(self, seqs):
# self.seqs = seqs
# def next(self):
# if self.seqs is None:
# return None
# steps = max(map(len, self.seqs))
# if self.step == steps:
# return None
# entries = list()
# for seq in self.seqs:
# entries.append(seq[self.step])
# state = self.average(entries)
# print(state, entries)
# self.seq.append(state)
# return state
# def average(self, entries):
# return sum(entries) / len(entries)
# def compute(self):
# if self.seqs is None:
# return None
# running = True
# while running:
# running = False if self.next() is None else True
# return self.seq
# steps = 15
# r = Recaman(steps)
# f = Fibonacci(steps)
# w = Watcher([r, f])
# c = Cruncher()
# def crunch(actor, message):
# pass
# def collect(actor, message):
# cpid, name, entry = message
# status = w.receive(name, entry)
# if status:
# procs = Manager().list()
# ac = Actor(crunch, procs)
# ar = Actor(noop, procs)
# af = Actor(noop, procs)
# aw = Actor(collect, procs)
# ar.start()
# af.start()
# aw.start()
# ac.start()
# ========================
# from multiprocessing import Process, Queue, Event, Manager
# from time import sleep
# from enum import Enum
"""
class Test(Actor):
def __init__(self, name):
super().__init__(name)
def handler(self, code, header, message):
result = message[::-1]
if code == MailCode.NO_SENDER:
return None
from_pid, to_pid = header
if code == MailCode.DELIVER:
self.deliver(from_pid, to_pid, result)
else:
self.send(to_pid, result)
"""
# def basic(pid, handler):
# print("go")
# kill_switch, mailbox = supervisor.export(pid)
# while not kill_switch.is_set():
# while not mailbox.empty():
# sleep(1)
# handler = dict()
# handler[MailCode.SENDER] = lambda s, r, m: supervisor.send(pid, r, m[::-1])
# handler[MailCode.NO_SENDER] = lambda r, m: supervisor.send(pid, r, m[::-1])
# handler[MailCode.DEFAULT] = lambda s, r, m: supervisor.send(pid, r, m[::-1])
# header, message = supervisor.receive(mailbox, handler)
# print("no")
# supervisor = Actor("__root__")
# pid1, proc1 = supervisor.spawn("a", basic, ("b"))
# pid2, proc2 = supervisor.spawn("b", basic, ("a"))
# proc1.start()
# proc2.start()
# supervisor.deliver("a", "Hello, world!")
# p1.start()
# p2.start()
# sleep(5)
# procs.stop()
# ========================
# import asyncio
# from time import time, monotonic
# begin = monotonic()
# # Awful on purpose
# def fib(n):
# if n <= 1:
# return 1
# return fib(n - 1) + fib(n - 2)
# @asyncio.coroutine
# def coroutine(n):
# print("Processing: Coroutine")
# print("Starting: Part 1")
# result1 = yield from part1(n)
# print("Starting: Part 2")
# result2 = yield from part2(n, result1)
# return (result1, result2)
# @asyncio.coroutine
# def part1(n):
# print("Processing: Part 1")
# return fib(n)
# @asyncio.coroutine
# def part2(n, p1):
# print("Processing: Part 2")
# return p1 + fib(n)
# num = 30
# # print(fib(num))
# # print(fib(num))
# loop = asyncio.get_event_loop()
# try:
# print("Starting")
# coro = coroutine(num)
# print("Looping")
# value = loop.run_until_complete(coro)
# print(f"Returning: {value}")
# finally:
# print("Closing")
# loop.close()
# print(f"{monotonic() - begin:.2f}s")
# def compute(name):
# print(f"Starting {name} compute")
# try:
# while True:
# message = (yield)
# if name in message:
# result = sum(fib(30) for _ in range(len(message)))
# print(f"{name}[compute]: {result}")
# except GeneratorExit:
# print(f"Closing {name} compute")
# def reverse(name):
# print(f"Starting {name} reverse")
# try:
# while True:
# message = (yield)
# if name in message:
# print(f"{name}[reverse]: {message[::-1]}")
# except GeneratorExit:
# print(f"Closing {name} reverse")
# def echo(name):
# print(f"Starting {name} echo")
# rn = reverse(name)
# cn = compute(name)
# next(rn)
# next(cn)
# try:
# while True:
# message = (yield)
# rn.send(message)
# cn.send(message)
# if name in message:
# print(f"{name}[echo]: {message}")
# except GeneratorExit:
# print(f"Closing {name} echo")
# e1 = echo("test")
# next(e1)
# ========================
from abots.helpers import eprint, noop
from abots.events import Supervisor, Envelope
from abots.net import (SocketServer, SocketClient, SocketServerHandler,
SocketClientHandler)
from time import sleep
supervisor = Supervisor()
# (server, *a, **k), format|message|open_client|close_client|close
server = SocketServer("localhost", 10901, handler=SocketServerHandler)
# server_proc = Process(target=server)
server_actor = supervisor.spawn("server", server)
# (client, *a, **k), format|message|initialize
client = SocketClient("localhost", 10901, handler=SocketClientHandler)
# client_proc = Process(target=client.start)
client_actor = supervisor.spawn("client", client)
# server_proc.start()
# client_proc.start()
server_actor.start()
client_actor.start()
sleep(5)
print("Stopping")
supervisor.stop()
print("Stopped")
from abots.helpers import jsto
from abots.events import ThreadPoolManager
from urllib.request import urlopen
from time import monotonic
def urlread(url):
with urlopen(url) as u:
return u.read().decode("utf-8")
def get_hn_story(item, score_threshold):
story_url = f"https://hacker-news.firebaseio.com/v0/item/{item}.json"
hn_story = jsto(urlread(story_url))
score = hn_story.get("score", 0)
if score >= score_threshold:
return item, hn_story
# return None
def gen_hn(score_threshold):
hn_start = monotonic()
# print("Gathering new stories")
hn_new_stories = "https://hacker-news.firebaseio.com/v0/topstories.json"
items = jsto(urlread(hn_new_stories))
# stories = dict()
for i, item in enumerate(items):
# print(f"[{i}] Processing story '{item}'")
get_hn_story(item, score_threshold)
elapsed = monotonic() - hn_start
# print("Done processing")
return elapsed
def gen_hn_threaded(score_threshold):
hn_start = monotonic()
# print("Gathering new stories")
hn_new_stories = "https://hacker-news.firebaseio.com/v0/topstories.json"
items = jsto(urlread(hn_new_stories))
# stories = dict()
for i, item in enumerate(items):
# print(f"[{i}] Processing story '{item}'")
Thread(target=get_hn_story, args=(item, score_threshold)).start()
elapsed = monotonic() - hn_start
# print("Done processing")
return elapsed
# score_threshold = 300
# print("Starting normal")
# elapsed1 = gen_hn(score_threshold)
# print("Starting threaded")
# elapsed2 = gen_hn_threaded(score_threshold)
# print(f"First: {elapsed1:0.2f}s\tSecond: {elapsed2:0.2f}s")
# Awful on purpose
def fib(n):
if n <= 1:
return 1
return fib(n - 1) + fib(n - 2)
def normal(n, size):
fib_start = monotonic()
for s in range(size):
fib(n)
elapsed = monotonic() - fib_start
return elapsed
def threaded(n, size, manager):
fib_start = monotonic()
tasks = list()
while len(tasks) < size:
args = (n,)
task = manager.reserve(fib, args)
tasks.append(task)
for task in tasks:
task.wait()
elapsed = monotonic() - fib_start
return elapsed
print("Loading manager")
test_size = 24
pool_size = int(test_size / 2)
pre_start = monotonic()
manager = ThreadPoolManager(pool_size)
loading = monotonic() - pre_start
print(f"Loading: {loading:0.2f}s")
n = 25
print("Starting normal")
normal_time = normal(n, test_size)
print("Starting threaded")
threaded_time = threaded(n, test_size, manager)
print(f"Normal: {normal_time:0.2f}s\tThreaded: {threaded_time:0.2f}s")
print("Stopping manager")
post_start = monotonic()
manager.stop()
manager.stopped.wait()
stopping = monotonic() - post_start
print(f"Stopping: {stopping:0.2f}s")
result_time = loading + threaded_time + stopping
print(f"\n[RESULTS] Normal: {normal_time:0.2f}s\tThreaded: {result_time:0.2f}s")