Added duodecimer, fixed bugs

This commit is contained in:
aewens 2019-05-07 07:00:20 +02:00
parent 15eddbcb82
commit 8e29b9f257
9 changed files with 294 additions and 357 deletions

View File

@ -1,4 +1,3 @@
from abots.events.every import Every
from abots.events.actor import Actor, Envelope, Supervisor
# from abots.events.scheduler import Scheduler
from abots.events.threads import ThreadPool, ThreadPoolManager
from abots.events.threads import ThreadPool, ThreadMarshal
from abots.events.duodecimer import Duodecimer, Cron

View File

@ -1,258 +0,0 @@
from abots.helpers import eprint, cast
from multiprocessing import Process, Event, Queue
from enum import Enum
class MailCode(Enum):
NORMAL = 2
URGENT = 3
CRITICAL = 5
SENDER = 7
NO_SENDER = 11
class Ledger:
def __init__(self, entries=list(), pids=dict()):
self.entries = entries
self.pids = pids
def _size(self):
return len(self.entries)
def get_pid(self, name):
return self.pids.get(name, None)
def get_by_pid(self, pid):
if pid >= self._size():
return None
entry = self.entries[pid]
event, queue = entry
if event.is_set():
return None
return event, queue
def get(self, name):
pid = self.get_pid(name)
if pid is None:
return None
return self.get_by_pid(pid)
def register(self, name):
if self.get_pid(name) is not None:
return None
pid = self._size()
event = Event()
queue = Queue()
self.entries.append((event, queue))
self.pids[name] = pid
return pid
class Envelope:
def __init__(self, pid, ledger, envelope=None):
self.pid = pid
self.ledger = ledger
self.code = None
self.header = None
self.message = None
self.sender = None
self.valid = True
self.can_reply = False
self.can_deliver = False
if envelope is not None:
self.allowed = self.parse(envelope)
def parse(self, envelope):
if len(envelope) != 3:
self.valid = False
return None
code, header, message = envelope
self.code = code
self.header = header
self.message = message
self.sender = self.code % MailCode.SENDER == 0 and len(self.header) == 2
if self.sender:
self.can_reply = True
from_pid, to_pid = header
if to_pid != self.pid:
self.can_deliver = True
allowed = dict()
allowed["reply"] = self.can_reply
allowed["deliver"] = self.can_deliver
return allowed
def get_code(self, urgent, critical, sender=True):
code = MailCode.SENDER if sender else MailCode.NO_SENDER
if critical:
code = code * MailCode.CRITICAL
elif urgent:
code = code * MailCode.URGENT
else:
code = code * MailCode.NORMAL
return code
def compose(self, target, code, header, message):
if type(target) != int:
entry = self.ledger.get(target)
else:
entry = self.ledger.get_by_pid(target)
if entry is None:
return None
event, queue = entry
envelope = code, header, message
queue.put(envelope)
return True
def send(self, to_pid, message, urgent=False, critical=False):
code = self.get_code(urgent, critical)
header = self.pid, to_pid
return self.compose(to_pid, code, header, message)
def send_to(self, to_pid, message, urgent=False, critical=False):
code = self.get_code(urgent, critical, sender=False)
return self.compose(to_pid, code, to_pid, message)
def send_faux(self, faux_from_pid, faux_to_pid, to_pid, message,
urgent=False, critical=False):
code = self.get_code(urgent, critical)
header = faux_from_pid, to_pid
return self.compose(faux_to_pid, code, header, message)
def reply(self, message, urgent=False, critical=False,
deliver=False):
if not self.can_reply or (deliver and not self.can_deliver):
return None
from_pid, to_pid = self.header
code = self.get_code(urgent, critical)
if deliver:
return self.compose(to_pid, code, header, message)
else:
header = to_pid, from_pid
return self.compose(from_pid, code, header, message)
def deliver(self, message, urgent=False, critical=False):
return self.reply(message, urgent, critical, deliver=True)
class Actor(Process):
def __init__(self, name, proc, pid, ledger):
super().__init__()
self.name = name
self.proc = proc
self.own_pid = pid
self.ledger = ledger
self.valid = False
self.kill_switch = None
self.mailbox = None
entry = self.ledger.get_by_pid(self.own_pid)
if entry is not None:
event, queue = entry
self.kill_switch = event
self.mailbox = queue
self.valid = True
def _call(self, source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
return status
def run(self):
# print(f"Starting {self.name}")
if not self.valid:
return None
# print(f"[{self.name}]: Starting proc")
exports = dict()
exports["pid"] = self.pid
exports["kill_switch"] = self.kill_switch
exports["mailbox"] = self.mailbox
exports["ledger"] = self.ledger
cast(self.proc, "from_actor", exports)
proc = Process(target=self.proc.start)
proc.start()
actions = ["reply", "deliver", "send", "send_to", "send_faux"]
# print(f"[{self.name}]: Started proc")
self.kill_switch.wait()
# while not self.kill_switch.is_set():
# while not self.mailbox.empty():
# envelope = Envelope(self.own_pid, self.mailbox.get())
# if not envelope.valid:
# continue
# response = cast(self.proc, "handle", envelope.message)
# if response is None or len(response) != 3:
# continue
# action, args, kwargs = response
# if action not in actions:
# continue
# elif action == "reply" and not envelope.can_reply:
# continue
# elif action == "deliver" and not envelope.can_deliver:
# continue
# cast(envelope, action, *args, **kwargs)
self.stop()
def stop(self, done=None, timeout=None):
# print(f"Stopping {self.name}")
delay = Event()
cast(self.proc, "stop", delay)
delay.wait(timeout)
self.kill_switch.set()
self.mailbox.close()
self.mailbox.join_thread()
cast(done, "set")
# print(f"Stopped {self.name}")
class Supervisor:
def __init__(self, name="__ROOT__", ledger=Ledger()):
self.ledger = ledger
self.children = set()
self.pid = self.ledger.register(name)
def call(self, source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
return status
def spawn(self, name, proc):
pid = self.ledger.register(name)
if pid is None:
return None
self.children.add(pid)
actor = Actor(name, proc, pid, self.ledger)
return actor
def dismiss(self, child, done=None):
# print(f"Dismissing {child}")
if type(child) != int:
child = self.ledger.get_pid(child)
if child not in self.children:
return None
entry = self.ledger.get_by_pid(child)
if entry is None:
return None
event, queue = entry
event.set()
queue.close()
queue.join_thread()
cast(done, "set")
# print(f"Dismissed {child}")
def stop(self, done=None):
for child in self.children:
self.dismiss(child)
entry = self.ledger.get_by_pid(self.pid)
if entry is None:
return None
event, queue = entry
event.set()
queue.close()
queue.join_thread()
cast(done, "set")

219
abots/events/duodecimer.py Normal file
View File

@ -0,0 +1,219 @@
from abots.events import Every, ThreadMarshal
from abots.helpers import cast, utc_now
from queue import Queue, Empty
from collections import defaultdict
from threading import Thread
"""
TODO:
* Add way to export tasks to pickle file on stop?
Duodecimer: "duodec-" meaning twelve and the "-imer" for timer, or twelve timers
Will be a scheduler running 12 threads of the increments of time:
* 1 second
* 30 seconds
* 1 minute
* 5 minutes
* 10 minutes
* 15 minutes
* 30 minutes
* 1 hour
* 1 day
* 1 week
* 1 fortnight (2 weeks)
* cron
The cron being a thread that triggers tasks when a date/time trigger occurs
(with 1 minute precision, but run outside the 1 minute thread). Triggers by:
* Specific times in specific dates
* Specific times every day
* Change in the week day
* Change in the month
* Change in the year
"""
minutes_in_seconds = 60
hour_in_seconds = 60 * minutes_in_seconds
day_in_seconds = 24 * hour_in_seconds
week_in_seconds = 7 * day_in_seconds
fortnight_in_seconds = 2 * week_in_seconds
class Cron:
def __init__(self, repeat, triggers):
self.repeat = repeat
self.triggers = triggers
# self.start = utc_now()
# self.date = self.get_date(self.start)
# self.time = self.get_time(self.start)
# self.weekday = self.get_weekday(self.start)
# self.day = self.get_day(self.start)
# self.month = self.get_month(self.start)
# self.year = self.get_year(self.start)
@staticmethod
def get_date(when=None):
if when is None:
when = utc_now()
return int(f"{when.year}{when.month:02}{when.day:02}")
@staticmethod
def get_time(when=None):
if when is None:
when = utc_now()
return int(f"{when.hour:02}{when.minute:02}")
@staticmethod
def get_weekday(when=utc_now()):
return when.weekday()
@staticmethod
def get_day(when=utc_now()):
return when.day
@staticmethod
def get_month(when=utc_now()):
return when.month
@staticmethod
def get_year(when=utc_now()):
return when.year
@staticmethod
def next_minutes(minutes):
now = utc_now()
hour = now.hour
minute = now.minute + minutes
if minute >= 60:
hour = 0 if hour == 23 else hour + 1
minute = minute % 60
return int(f"{hour:02}{minute:02}")
@staticmethod
def next_hours(hours):
now = utc_now()
minute = now.minute
hour = now.hour + hours
if hour >= 24:
hour = hour % 24
return int(f"{hour:02}{minute:02}")
class Duodecimer:
def __init__(self):
self.queues = dict()
self.timers = dict()
self._intervals = dict()
self._intervals["5s"] = 5
self._intervals["30s"] = 30
self._intervals["1m"] = minutes_in_seconds
self._intervals["5m"] = 5 * minutes_in_seconds
self._intervals["10m"] = 10 * minutes_in_seconds
self._intervals["15m"] = 15 * minutes_in_seconds
self._intervals["30m"] = 30 * minutes_in_seconds
self._intervals["1h"] = hour_in_seconds
self._intervals["1d"] = day_in_seconds
self._intervals["1w"] = week_in_seconds
self._intervals["1f"] = fortnight_in_seconds
self._load()
def _load(self):
for name, interval in self._intervals.items():
queue = Queue()
timer = Every(interval, self._timer, queue)
self.queues[name] = queue
self.timers[name] = timer
cron_queue = Queue()
cron_timer = Every(minutes_in_seconds, self._cron, cron_queue)
self.queues["cron"] = cron_queue
self.timers["cron"] = cron_timer
def _process_queue(self, state, queue, task_size):
if state is None:
state = list()
while True:
try:
task = queue.get_nowait()
if len(task) != task_size:
# print(f"[worker:{worker_id}]: Task is malformed")
continue
state.append(task)
queue.task_done()
except Empty:
break
return state
def _process_trigger(self, unit, previous, value):
now = cast(Cron, f"get_{unit}", utc_now())
if value == "change" and previous is not None:
return cast(Cron, f"get_{unit}", previous) != now
else:
return now == value
def _timer(self, state, queue):
state = self._process_queue(state, queue, 3)
marshal = ThreadMarshal(len(state), destroy=True)
for task in state:
# print(f"[worker:{worker_id}]: Running task")
marshal.reserve(*task)
return state
def _cron(self, state, queue):
if state is None:
state = defaultdict(lambda: None)
state["tasks"] = self._process_queue(state["tasks"], queue, 4)
previous = state["previous"]
removing = list()
for task in state["tasks"]:
cron, target, args, kwargs = task
assert isinstance(cron.triggers, dict), "Expected dict"
triggered = list()
for trigger, value in cron.triggers.items():
if trigger == "date":
triggered.append(Cron.get_date() >= value)
elif trigger == "time":
triggered.append(Cron.get_time() >= value)
elif trigger in ["weekday", "day", "month", "year"]:
processed = self._proccess_trigger(trigger, previous, value)
triggered.append(processed)
else:
continue
if all(triggered):
thread = Thread(target=target, args=args, kwargs=kwargs)
thread.start()
if not cron.repeat:
removing.append(task)
for task in removing:
state["tasks"].remove(task)
state["previous"] = utc_now()
return state
def start(self):
for name, timer in self.timers.items():
timer.start()
def stop(self):
for name, timer in self.timers.items():
timer.stop()
def assign(self, timer, method, args=tuple(), kwargs=dict()):
if timer not in self.queues.keys():
return None
task = method, args, kwargs
self.queues[timer].put(task)
def schedule(self, cron, target, args=tuple(), kwargs=dict()):
task = cron, target, args, kwargs
self.queues["cron"].put(task)
"""
duodecimer = Duodecimer()
task_id = duodecimer.every(10, "minutes", task)
duodecimer.cancel(task_id)
"""

View File

@ -11,8 +11,9 @@ class Every:
def _wrapper(self, *args, **kwargs):
start = monotonic()
state = None
while not self.event.is_set():
self.function(*args, **kwargs)
state = self.function(state, *args, **kwargs)
sleep(self.interval - ((monotonic() - start) % self.interval))
def start(self):
@ -21,6 +22,7 @@ class Every:
thread = Thread(target=self._wrapper, args=args, kwargs=kwargs)
thread.setDaemon(True)
thread.start()
return thread
def stop(self):
self.event.set()

View File

@ -6,7 +6,6 @@ from threading import Thread, Event, Lock, RLock, BoundedSemaphore
"""
TODO:
- Add routine clean-up in ThreadPoolManager of un-used thread pools
"""
class ThreadPool:
@ -90,12 +89,14 @@ class ThreadPool:
cast(done, "set")
# print(f"Stopped pool")
class ThreadPoolManager:
def __init__(self, pool_size, monitor=1, cleanup=True, timeout=None):
class ThreadMarshal:
def __init__(self, pool_size, monitor=1, cleanup=True, timeout=None,
destroy=False):
self.pool_size = pool_size
self.monitor_interval = monitor
self.cleanup = cleanup
self.timeout = timeout
self.destroy = destroy
self.stopped = Event()
self._manager = Lock() # NOTE: Maybe make this an RLock?
self._load_presets()
@ -139,16 +140,19 @@ class ThreadPoolManager:
idle_pools.append(self.pools[index])
return idle_pools
def _monitor(self):
def _monitor(self, state):
# print("[manager] Cleaning pools")
if self._manager.locked():
return # Try again later
with self._manager:
idle_pools = self._get_idle_pools()
if self.cleanup and len(idle_pools) > 0:
if self.destroy and len(idle_pools) == len(self.pools):
self.stop()
elif self.cleanup and len(idle_pools) > 0:
cleaning = Event()
self._cleaner(idle_pools, cleaning)
cleaning.wait()
return None
def _cleaner(self, idle_pools, done=None):
print("[manager] Cleaning pools")
@ -174,6 +178,24 @@ class ThreadPoolManager:
pool.stop(done, wait)
# print(f"[manager] Stopped pool {index}")
def _run(self, task, controls, reserve, coordinates):
pool_index, worker_index = coordinates
# print(f"[manager:reserve] Trying worker {self.worker_index}")
lock = self.locks[pool_index][worker_index]
event =self.events[pool_index][worker_index]
queue =self.queues[pool_index][worker_index]
if event.is_set() or lock.locked():
return False
if not reserve:
lock = Lock()
# print(f"[manager:reserve] Using worker {worker_index}")
lock.acquire()
release = controls.get("release", list())
release.append(lock)
job = controls, task
queue.put_nowait(job)
return True
def _add_pool(self):
index = len(self.pools)
# print(f"[manager] Adding pool {index}")
@ -224,24 +246,6 @@ class ThreadPoolManager:
cast(self.stopped, "set")
# print("[manager] Stopped")
def _run(self, task, controls, reserve, coordinates):
pool_index, worker_index = coordinates
# print(f"[manager:reserve] Trying worker {self.worker_index}")
lock = self.locks[pool_index][worker_index]
event =self.events[pool_index][worker_index]
queue =self.queues[pool_index][worker_index]
if event.is_set() or lock.locked():
return False
if not reserve:
lock = Lock()
# print(f"[manager:reserve] Using worker {worker_index}")
lock.acquire()
release = controls.get("release", list())
release.append(lock)
job = controls, task
queue.put_nowait(job)
return True
def run(self, task, done, reserve, coordinates):
if len(task) != 3:
return None

View File

@ -1,5 +1,6 @@
from abots.helpers.hash import create_hash, md5, sha1, sha256, sha512
from abots.helpers.encode import jots, jsto, b64e, b64d, h2b64, b642h, ctos
from abots.helpers.numbers import clamp, randfloat
from abots.helpers.general import eprint, deduce, noop, cast, get_digit
from abots.helpers.general import eprint, deduce, noop, cast, get_digit, obtain
from abots.helpers.general import utc_now, utc_now_timestamp
from abots.helpers.logging import Logger

View File

@ -1,5 +1,6 @@
from sys import stderr
from traceback import print_exc
from datetime import datetime, timezone
def eprint(*args, **kwargs):
print(*args, file=stderr, **kwargs)
@ -7,6 +8,9 @@ def eprint(*args, **kwargs):
def noop(*args, **kwargs):
pass
def obtain(source, attribute):
return getattr(source, attribute, None)
def cast(source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
@ -22,4 +26,10 @@ def deduce(reference, attributes):
return list(map(lambda attr: getattr(reference, attr, None), attributes))
def get_digit(number, position):
return False if number - 10**position < 0 else number // 10*position % 10
return False if number - 10**position < 0 else number // 10*position % 10
def utc_now():
return datetime.utcnow().replace(tzinfo=timezone.utc)
def utc_now_timestamp():
return utc_now().timestamp()

View File

@ -1,77 +1,37 @@
#!env/bin/python3
from abots.helpers import jsto
from abots.events import ThreadPoolManager
from abots.events import Duodecimer, Cron
from abots.helpers import eprint, cast, Logger
from urllib.request import urlopen
from threading import Event
from time import monotonic, sleep
from queue import Queue, Empty
from threading import Event, Lock
from time import sleep, time
def urlread(url):
with urlopen(url) as u:
return u.read().decode("utf-8")
intervals = ["5s", "30s", "1m"]
def get_hn_story(item, score_threshold):
story_url = f"https://hacker-news.firebaseio.com/v0/item/{item}.json"
hn_story = jsto(urlread(story_url))
score = hn_story.get("score", 0)
if score >= score_threshold:
return item, hn_story
# return None
def fib(n):
if n <= 1:
return 1
return fib(n - 1) + fib(n - 2)
def gen_hn(score_threshold):
hn_start = monotonic()
print("Gathering new stories")
hn_new_stories = "https://hacker-news.firebaseio.com/v0/topstories.json"
items = jsto(urlread(hn_new_stories))
# stories = dict()
print(f"Gathered {len(items)} stories")
for i, item in enumerate(items):
# print(f"[{i}] Processing story '{item}'")
get_hn_story(item, score_threshold)
elapsed = monotonic() - hn_start
print("Done processing")
return elapsed
def fibber(n, log):
log.debug("Starting fibber")
result = fib(10 + n)
when = str(int(time()))[-3:]
log.debug(f"Timer {intervals[n % 3]} :{when}: {result}")
def gen_hn_threaded(score_threshold, manager):
hn_start = monotonic()
print("Gathering new stories")
hn_new_stories = "https://hacker-news.firebaseio.com/v0/topstories.json"
items = jsto(urlread(hn_new_stories))
tasks = list()
print(f"Gathered {len(items)} stories")
for i, item in enumerate(items):
# print(f"[{i}] Processing story '{item}'")
task = manager.reserve(get_hn_story, (item, score_threshold))
tasks.append(task)
for task in tasks:
task.wait()
elapsed = monotonic() - hn_start
print("Done processing")
return elapsed
print("Loading manager")
pool_size = 12
test_size = pool_size * 3
pre_start = monotonic()
manager = ThreadPoolManager(pool_size)
loading = monotonic() - pre_start
print(f"Loading: {loading:0.2f}s")
score_threshold = 300
print("Starting normal")
normal_time = gen_hn(score_threshold)
print(f"Normal: {normal_time:0.2f}s")
print("Starting threaded")
threaded_time = gen_hn_threaded(score_threshold, manager)
print(f"Threaded: {threaded_time:0.2f}s")
print("Stopping manager")
post_start = monotonic()
manager.stop()
manager.stopped.wait()
stopping = monotonic() - post_start
print(f"Stopping: {stopping:0.2f}s")
result_time = loading + threaded_time + stopping
print(f"\n[RESULTS] Normal: {normal_time:0.2f}s\tThreaded: {result_time:0.2f}s")
timers = Duodecimer()
timers.start()
log = Logger("test_abots", settings={"disabled": ["file"]})
log.start()
for i in range(15):
timers.assign(intervals[i % 3], fib, (10 + i,))
triggers = dict()
triggers["time"] = Cron.next_minutes(1)
cron = Cron(False, triggers)
timers.schedule(cron, fibber, (15, log))
log.info("Done submitting")
sleep(60 * 3 + 5)
log.info("Stopping...")
log.stop()
timers.stop()