abots/test_abots.py

58 lines
1.7 KiB
Python
Executable File

#!env/bin/python3
#from abots.net import SocketServer, SocketClient
#
#host = "localhost"
#port = 10401
#timeout = 3
#
#server = SocketServer(host, port, timeout=timeout)
#client = SocketClient(host, port, timeout=timeout)
#
#server.start()
#server.ready.wait()
#client.start()
from abots.events import ThreadMarshal
from abots.helpers import generator
from queue import Queue, Empty
from threading import Thread, Event
def worker(queue, event, timeout):
while not event.is_set():
try:
job = queue.get(timeout=timeout)
except Empty:
continue
assert len(job) == 2, "Expected two parameters"
done, task = job
assert hasattr(done, "set"), "Expected event"
assert len(task) == 3, "Expected three parameters"
method, args, kwargs = task
assert callable(method), "Expected function"
assert isinstance(args, tuple), "Expected tuple"
assert isinstance(kwargs, dict), "Expected dict"
try:
result = method(*args, **kwargs)
except Exception as e:
print(e)
done.set()
queue.task_done()
def manager(queue, event, timeout):
workers = list()
current = 0
while not event.is_set():
try:
task = queue.get(timeout=timeout)
except Empty:
continue
assert len(task) == 2, "Expected two parameters"
action, payload = task
assert isinstance(action, str), "Expected str"
if action == "new":
assert callable(payload), "Expected function"
workers.append((payload))
elif action == "reserve" and len(workers) > 0:
worker = workers[current]