Added cleanup and monitoring thread into thread pool manager

This commit is contained in:
aewens 2019-05-03 23:26:58 +02:00
parent 263ee511fa
commit 883bebc20d
4 changed files with 154 additions and 111 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@ TODO
tags
scratch
scratch.py
*.swp
*.bak.py
*.db
archive/

View File

@ -5,6 +5,8 @@ class Every:
def __init__(self, interval, function, *args, **kwargs):
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.event = Event()
def _wrapper(self, *args, **kwargs):
@ -14,6 +16,8 @@ class Every:
sleep(self.interval - ((monotonic() - start) % self.interval))
def start(self):
args = self.args
kwargs = self.kwargs
thread = Thread(target=self._wrapper, args=args, kwargs=kwargs)
thread.setDaemon(True)
thread.start()

View File

@ -29,10 +29,16 @@ class ThreadPool:
self.workers.append(worker)
worker.start()
def _exec_controls(self, controls):
for action, methods in controls.items():
for method in methods:
cast(method, action)
def _worker(self, worker_id, event, queue, timeout=None):
while not event.is_set():
try:
# print(f"[{self.tid}]: Getting task")
# NOTE: This is really spammy, use only in case of emergencies
# print(f"[worker:{worker_id}]: Getting task")
if timeout is not None:
job = queue.get(block=True, timeout=timeout)
else:
@ -41,33 +47,27 @@ class ThreadPool:
# print(f"[worker:{worker_id}]: Job is malformed")
continue
controls, task = job
if len(controls) != 3:
if type(controls) != dict:
# print(f"[worker:{worker_id}]: Controls are malformed")
continue
done, semaphore, lock = controls
if task is None: # NOTE: Poison pill to kill worker
# print(f"[worker:{worker_id}]: Poisoned")
event.set()
done.set()
lock.release()
semaphore.release()
self._exec_controls(controls)
break
if len(task) != 3:
# print(f"[worker:{worker_id}]: Task is malformed")
done.set()
lock.release()
semaphore.release()
self._exec_controls(controls)
continue
method, args, kwargs = task
# print(f"[{self.tid}]: Running task")
# print(f"[worker:{worker_id}]: Running task")
try:
method(*args, **kwargs)
except Exception as e:
print(e)
finally:
done.set()
lock.release()
semaphore.release()
# print(f"[worker:{worker_id}]: Task complete")
self._exec_controls(controls)
queue.task_done()
except Empty:
continue
@ -91,47 +91,71 @@ class ThreadPool:
# print(f"Stopped pool")
class ThreadPoolManager:
def __init__(self, pool_size, cleanup=-1, timeout=None):
def __init__(self, pool_size, monitor=1, cleanup=True, timeout=None):
self.pool_size = pool_size
self.timeout = timeout
self.monitor_interval = monitor
self.cleanup = cleanup
self.pool_cursor = 0
self.worker_cursor = 0
self.timeout = timeout
self.stopped = Event()
self._manager = Lock() # NOTE: Maybe make this an RLock?
self._load_presets()
self._add_pool()
if self.monitor_interval > 0:
self.monitor = Every(self.monitor_interval, self._monitor)
self.monitor.start()
def _next_pool(self):
self._pool_cursor = (self._pool_cursor + 1) % len(self.pools)
def _next_worker(self):
self._worker_cursor = (self._worker_cursor + 1) % self.pool_size
# NOTE: This is a potential optimization for later
# if self._worker_cursor == 0:
# self._next_pool()
def _load_presets(self):
self._pool_cursor = 0
self._worker_cursor = 0
self.pools = list()
self.locks = list()
self.events = list()
self.queues = list()
self.workers = list()
self.semaphores = list()
self._manager = Lock() # NOTE: Maybe make this an RLock?
self.stopped = Event()
self._add_pool()
if self.cleanup > 0:
self.cleaner = Every(self.cleanup, self._cleaner)
def _next_pool(self):
self.pool_cursor = (self.pool_cursor + 1) % len(self.pools)
def _get_idle_pools(self):
idle_pools = list()
if len(self.pools) == 1:
return idle_pools
for index, queues in enumerate(self.queues):
if index == 0:
continue
queues_empty = [queue.empty() for queue in queues]
idle = all(queues_empty)
if not idle:
continue
print(f"[manager] Pool {index} is idle")
idle_pools.append(self.pools[index])
return idle_pools
def _next_worker(self):
self.worker_cursor = (self.worker_cursor + 1) % self.pool_size
# NOTE: This is a potential optimization for later
# if self.worker_cursor == 0:
# self._next_pool()
def _cleaner(self):
if len(self.pools) == 0 or self._manager.locked():
def _monitor(self):
# print("[manager] Cleaning pools")
if self._manager.locked():
return # Try again later
with self._manager:
pools = list()
for pool_index in range(1, len(self.pools) - 1):
pool = self.pools[pool_index]
queues = self.queues[pool_index]
idle = any([queue.empty() for queue in queues])
if not idle:
continue
pools.append(pool)
for pool in pools:
self._stop_pool(pool)
idle_pools = self._get_idle_pools()
if self.cleanup and len(idle_pools) > 0:
cleaning = Event()
self._cleaner(idle_pools, cleaning)
cleaning.wait()
def _cleaner(self, idle_pools, done=None):
print("[manager] Cleaning pools")
for pool in idle_pools:
self._stop_pool(pool, wait=done is not None)
print("[manager] Pools are cleaned")
cast(done, "set")
def _stop_pool(self, pool, done=None, wait=True):
index = self.pools.index(pool)
@ -166,6 +190,12 @@ class ThreadPoolManager:
with self._manager:
self._add_pool()
def clean(self, done=None):
with self._manager:
idle_pools = self._get_idle_pools()
if self.cleanup and len(idle_pools) > 0:
self._cleaner(idle_pools, done)
def wait(self):
# print("[manager] Waiting on workers in pools")
with self._manager:
@ -175,8 +205,8 @@ class ThreadPoolManager:
def stop(self, wait=True):
# print("[manager] Stopping")
if self.cleanup > 0:
self.cleaner.event.set()
if self.monitor_interval > 0:
self.monitor.event.set()
with self._manager:
dones = list()
threads = list()
@ -190,20 +220,60 @@ class ThreadPoolManager:
if wait:
for thread in threads:
thread.join(self.timeout)
self._load_presets()
cast(self.stopped, "set")
# print("[manager] Stopped")
def _run(self, task, controls, reserve, coordinates):
pool_index, worker_index = coordinates
# print(f"[manager:reserve] Trying worker {self.worker_index}")
lock = self.locks[pool_index][worker_index]
event =self.events[pool_index][worker_index]
queue =self.queues[pool_index][worker_index]
if event.is_set() or lock.locked():
return False
if not reserve:
lock = Lock()
# print(f"[manager:reserve] Using worker {worker_index}")
lock.acquire()
release = controls.get("release", list())
release.append(lock)
job = controls, task
queue.put_nowait(job)
return True
def run(self, task, done, reserve, coordinates):
if len(task) != 3:
return None
if len(coordinates) != 2:
return None
method, args, kwargs = task
if not callable(method) or type(args) != tuple or type(kwargs) != dict:
return None
pool_index, worker_index = coordinates
if pool_index >= len(self.pools) or worker_index >= self.pool_size:
return None
with self.manager:
semaphore = self.semaphores[pool_index]
if not semaphore.acquire(False):
return None
controls = dict()
controls["set"] = [done]
controls["release"] = [sempahore]
self._run(task, controls, reserve, coordinates)
return True
def reserve(self, method, args=tuple(), kwargs=dict(), reserve=True):
done = Event()
# print("[manager:reserve] Acquiring lock")
done = Event()
with self._manager:
task = method, args, kwargs
if self.pool_cursor >= len(self.pools):
if self._pool_cursor >= len(self.pools):
self._next_pool()
pool_found = False
for p in range(len(self.pools)):
# print(f"[manager:reserve] Trying pool {self.pool_cursor}")
semaphore = self.semaphores[self.pool_cursor]
# print(f"[manager:reserve] Trying pool {self._pool_cursor}")
semaphore = self.semaphores[self._pool_cursor]
if not semaphore.acquire(False):
self._next_pool()
continue
@ -212,27 +282,20 @@ class ThreadPoolManager:
if not pool_found:
# print(f"[manager:reserve] Pools are full, adding new pool")
index = self._add_pool()
self.pool_cursor = index
self.worker_cursor = 0
semaphore = self.semaphores[self.pool_cursor]
self._pool_cursor = index
self._worker_cursor = 0
semaphore = self.semaphores[self._pool_cursor]
semaphore.acquire()
# print(f"[manager:reserve] Using pool {self.pool_cursor}")
pool = self.pools[self.pool_cursor]
# print(f"[manager:reserve] Using pool {self._pool_cursor}")
pool = self.pools[self._pool_cursor]
for w in range(self.pool_size):
# print(f"[manager:reserve] Trying worker {self.worker_cursor}")
lock = self.locks[self.pool_cursor][self.worker_cursor]
event =self.events[self.pool_cursor][self.worker_cursor]
queue =self.queues[self.pool_cursor][self.worker_cursor]
if event.is_set() or lock.locked():
self._next_worker()
continue
if not reserve:
lock = Lock()
# print(f"[manager:reserve] Using worker {self.worker_cursor}")
lock.acquire()
controls = done, semaphore, lock
job = controls, task
queue.put_nowait(job)
coordinates = (self._pool_cursor, self._worker_cursor)
controls = dict()
controls["set"] = [done]
controls["release"] = [semaphore]
queued = self._run(task, controls, reserve, coordinates)
self._next_worker()
if not queued:
continue
break
return done

View File

@ -4,7 +4,8 @@ from abots.helpers import jsto
from abots.events import ThreadPoolManager
from urllib.request import urlopen
from time import monotonic
from threading import Event
from time import monotonic, sleep
def urlread(url):
with urlopen(url) as u:
@ -20,76 +21,50 @@ def get_hn_story(item, score_threshold):
def gen_hn(score_threshold):
hn_start = monotonic()
# print("Gathering new stories")
print("Gathering new stories")
hn_new_stories = "https://hacker-news.firebaseio.com/v0/topstories.json"
items = jsto(urlread(hn_new_stories))
# stories = dict()
print(f"Gathered {len(items)} stories")
for i, item in enumerate(items):
# print(f"[{i}] Processing story '{item}'")
get_hn_story(item, score_threshold)
elapsed = monotonic() - hn_start
# print("Done processing")
print("Done processing")
return elapsed
def gen_hn_threaded(score_threshold):
def gen_hn_threaded(score_threshold, manager):
hn_start = monotonic()
# print("Gathering new stories")
print("Gathering new stories")
hn_new_stories = "https://hacker-news.firebaseio.com/v0/topstories.json"
items = jsto(urlread(hn_new_stories))
# stories = dict()
tasks = list()
print(f"Gathered {len(items)} stories")
for i, item in enumerate(items):
# print(f"[{i}] Processing story '{item}'")
Thread(target=get_hn_story, args=(item, score_threshold)).start()
elapsed = monotonic() - hn_start
# print("Done processing")
return elapsed
# score_threshold = 300
# print("Starting normal")
# elapsed1 = gen_hn(score_threshold)
# print("Starting threaded")
# elapsed2 = gen_hn_threaded(score_threshold)
# print(f"First: {elapsed1:0.2f}s\tSecond: {elapsed2:0.2f}s")
# Awful on purpose
def fib(n):
if n <= 1:
return 1
return fib(n - 1) + fib(n - 2)
def normal(n, size):
fib_start = monotonic()
for s in range(size):
fib(n)
elapsed = monotonic() - fib_start
return elapsed
def threaded(n, size, manager):
fib_start = monotonic()
tasks = list()
while len(tasks) < size:
args = (n,)
task = manager.reserve(fib, args)
task = manager.reserve(get_hn_story, (item, score_threshold))
tasks.append(task)
for task in tasks:
task.wait()
elapsed = monotonic() - fib_start
elapsed = monotonic() - hn_start
print("Done processing")
return elapsed
print("Loading manager")
test_size = 24
pool_size = int(test_size / 2)
pool_size = 12
test_size = pool_size * 3
pre_start = monotonic()
manager = ThreadPoolManager(pool_size)
loading = monotonic() - pre_start
print(f"Loading: {loading:0.2f}s")
n = 25
score_threshold = 300
print("Starting normal")
normal_time = normal(n, test_size)
normal_time = gen_hn(score_threshold)
print(f"Normal: {normal_time:0.2f}s")
print("Starting threaded")
threaded_time = threaded(n, test_size, manager)
print(f"Normal: {normal_time:0.2f}s\tThreaded: {threaded_time:0.2f}s")
threaded_time = gen_hn_threaded(score_threshold, manager)
print(f"Threaded: {threaded_time:0.2f}s")
print("Stopping manager")
post_start = monotonic()