Added most of events system

This commit is contained in:
aewens 2019-03-27 21:52:05 +01:00
parent 987ac1e5b4
commit 0955c338e7
18 changed files with 806 additions and 49 deletions

5
abots/events/__init__.py Normal file
View File

@ -0,0 +1,5 @@
from abots.events.proxy import Proxy
from abots.events.every import Every
from abots.events.shared import Shared
from abots.events.processor import Processor
# from abots.events.scheduler import Scheduler

View File

@ -0,0 +1,54 @@
from abots.events.pipe import Pipe
from abots.events.module import Module
from time import time
from multiprocessing import Process, Manager
# Queue, JoinableQueue
class Dispatcher:
def __init__(self, handler=None):
self.modules = dict()
self.handler = handler
manager = Manager()
self.extenstions = manager.dict()
self.notifications = manager.dict()
def _error(self, name, data):
if self.handler is None:
return None
elif getattr(self.handler, "error", None):
return None
self.handler.error(name, data)
def _is_module_init(self, module, destroy=False):
if not proc:
return False
instance = getattr(module, "instance", None)
return not instance if destroy else instance
def _start(self, mod, handler=None, *args, **kwargs):
module = self.modules.get(mod, None)
if self._is_module_init(module):
self._error("_start", mod)
return None
events = None
kwargs = dict()
if handler is not None:
kwargs["handler"] = handler
events = Pipe(module, self.extensions, self.notifications, **kwargs)
module.instance = module.logic(events)
if getattr(module.instance, "create", None) is not None:
return module.instance.create(*args, **kwargs)
def start(self, module):
pass
def extend(self, name, logic):
self.extensions[name] = logic
def register(self, module, logic):
if self.modules.get(module, None) is None:
self._error("register", module)
return None
self.modules[name] = Module(logic)

23
abots/events/every.py Normal file
View File

@ -0,0 +1,23 @@
from time import monotonic, sleep
from multiprocessing import Event, Process
class Every:
def __init__(self, interval, function, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.interval = interval
self.function = function
self.condition = Event()
def _wrapper(self):
start = monotonic()
while not self.condition.is_set():
self.function(*self.args, **self.kwargs)
sleep(self.interval - ((monotonic() - start) % self.interval))
def start(self):
proc = Process(target=self._wrapper)
proc.start()
def stop(self):
self.condition.set()

76
abots/events/pipe.py Normal file
View File

@ -0,0 +1,76 @@
"""
events\Pipe
===========
You may notice from the lack of imports used here that Pipe is purely an
abstraction layer that allows a (Raw)Proc to interact with another (Raw)Proc
through the Processor's notification/event system. It also allows a (Raw)Proc
to stop itself or use one of the extensions added to the processor.
"""
class Pipe:
def __init__(self, exports):
# This is only here to use existing extensions, not to add new ones
self.extensions = exports.get("extensions", None)
# This is only here to append new notifications, not to read them
self.notifications = exports.get("notifications", None)
#
self.listeners = exports.get("listeners", None)
self.condition = exports.get("condition", None)
self.handler = exports.get("handler", None)
# While this one is not used now, it will be very useful for debugging
self.proc = exports.get("proc", None)
# Is set to False if any of the needed exports are missing
self.stable = True
if self.exenstions is None:
self._error("events\Pipe.__init__", "extensions")
self.stable = False
if self.notifications is None:
self._error("events\Pipe.__init__", "notifications")
self.stable = False
if self.listeners is None:
self._error("events\Pipe.__init__", "listeners")
self.stable = False
if self.condition is None:
self._error("events\Pipe.__init__", "condition")
self.stable = False
# Works if, and only if, handler has an error method to call
def _error(self, name, data):
if self.handler is None:
return None
elif getattr(self.handler, "error", None):
return None
self.handler.error(name, data)
def stop(self):
if self.condition.is_set():
self.condition.set()
def notify(self, name, data):
return False if not self.stable
self.notifications.safe_put(name, data)
def listen(self, notification, callback):
return False if not self.stable
if self.listeners.get(notification, None) is not None:
self._error("events\Pipe.listen", notification)
return False
self.listeners[notification] = callback
def silence(self, notification):
return False if not self.stable
if self.listeners.get(notification, None) is not None:
self._error("events\Pipe.listen", notification)
return False
self.listeners[notification] = callback
def use(self, extension):
return False if not self.stable
return self.extensions.get(extension, None)

124
abots/events/proc.py Normal file
View File

@ -0,0 +1,124 @@
"""
events\RawProc & events\Proc
============================
This is designed to be used run by events\Processor to allow it to control
whatever class is sent to it. The class will be spawned inside of events\Proxy
so that operate better in a multiprocessing environment. As well, they will be
provided an event\Shared for an event queue as well as a multiprocessing event
to trigger the stop condition.
"""
from abots.events.pipe import Pipe
from abots.events.shared import Shared
from abots.events import Every, Proxy
from abots.helpers import eprint
from multiprocessing import Event, Process, Manager
from traceback import print_exc
from time import sleep
# The logic in Proc will have full control of how to process events
class Proc(Process):
def __init__(self, logic, exports, *args, **kwargs):
super().__init__(self, *args, **kwargs)
# This is an optional handler used for logging
self.handler = exports.get("handler", None)
# How long to wait for condition before timing out
self.timeout = exports.get("timeout", None)
# This is just a wrapper around multiprocessing.Queue
self.events = Shared()
# This is used to indicate the stop condition for the process
self.condition = Event()
# The notifications that the process will listen to
self.listeners = Manger().dict()
# Pass along multiprocessing controls to exports for pipe
exports["events"] = self.events
exports["condition"] = self.condition
exports["listeners"] = self.listeners
# Controls the events sent to/received by the Processor
self.pipe = Pipe(exports)
# All actions needed for logic can be leveraged through the pipe
self.logic = logic(self.pipe)
# Works if, and only if, handler has an error method to call
def _error(self, name, data):
if self.handler is None:
return None
elif getattr(self.handler, "error", None):
return None
self.handler.error(name, data)
return True
# Checks if method exists in logic, and if so calls it
def call(self, method, *args, **kwargs):
logic_method = getattr(self.logic, method, None)
if logic_method is None:
self._error("event\RawProc:call", method)
return None
return logic_method(*args, **kwargs)
# This is launched with `start` because of the Process inheritance
def run(self):
try:
status = self.call("start")
except Exception:
status = print_exc()
eprint(status)
self._error("event\RawProc:run", status)
return None
return status
# Cleans up the multiprocessing components
def stop(self):
# Stop is called here so it can choose what to do with the queue
# Once it is done, it should set the condition so the events can close
try:
status = self.call("stop")
except Exception:
status = print_exc()
eprint(status)
self._error("event\RawProc:stop", status)
# If the condition was not already set, do it now
if self.condition.is_set():
self.condition.set()
# Wait until logic is done with the events, then close the queue
self.condition.wait(timeout=self.timeout)
self.events.safe_close()
return status
# A simplified version where logic will only process the event given
class SimpleProc(Proc):
def __init__(self, logic, exports, *args, **kwargs):
# While timeout is still used, it is used only in `run` here
super().__init__(self, logic, exports, *args, **kwargs)
# Since logic can be a function in Proc, it is not initalized here
self.logic = logic
# This is launched with `start` because of the Process inheritance
def run(self):
while not self.condition.is_set():
while not self.events.empty():
# The pipe is sent so logic can use it to react to the event
self.logic(self.pipe, events.safe_get())
# timeout is used here to add a delay in processing, if desired
if self.timeout is not None:
sleep(self.timeout)
# Cleans up the multiprocessing components
def stop(self):
# If the condition was not already set, do it now
if self.condition.is_set():
# This will probably never be called, but better safe than sorry
self.condition.set()
self.events.safe_close()

126
abots/events/processor.py Normal file
View File

@ -0,0 +1,126 @@
from abots.events.proc import Proc
from abots.events.shared import Shared
from abots.events import Every, Proxy
from abots.helpers import ctos
from time import sleep
from multiprocessing import Event, Process, Manager
class Processor(Process):
def __init__(self, handler=None, timeout=None):
super().__init__(self)
manager = Manager()
self.handler = handler
self.timeout = timeout
self.procs = manager.dict()
self.active = manager.dict()
self.extenstions = manager.dict()
self.notifications = Shared()
self.condition = Event()
self.exports = self._export()
def _error(self, name, data):
if self.handler is None:
return None
elif getattr(self.handler, "error", None):
return None
self.handler.error(name, data)
return True
def _export(self):
exports = dict()
exports["notifications"] = self.notifications
exports["extensions"] = self.extenstions
if self.handler is not None:
exports["handler"] = self.handler
return exports
def _check_process(self, name):
proc = self.procs.get(name, None)
if proc is None or type(proc) is not dict:
self._error("events\Processor._check_process", name)
return None, None
active = self.active.get(name, None)
return proc, active
def _all(self, action_name, **kwargs):
statuses = dict()
action = getattr(self, action_name, None)
if action is None:
self._error("events\Processor._all", action_name)
return None
for proc in self.procs:
proc_kwargs = proc_kwargs = kwargs.get(proc, dict())
statuses[proc] = action(proc, **proc_kwargs)
return statuses
def run(self):
while not self.condition.is_set():
while not self.notifications.empty():
notification = self.notifications.safe_get()
name, data = notification
for proc_name, proc in self.active.items():
if proc.condition.is_set():
self.spindown()
continue
for event_name in proc.listeners:
if name == event_name:
proc.events.safe_put(notification, self.timeout)
def stop(self):
self.spindown_all()
self.notifications.safe_close()
def extend(self, name, logic, *args, **kwargs):
self.extensions[name] = Proxy(logic).make(*args, **kwargs)
def register(self, proc, name=None, simple=False):
if name is None:
name = ctos(proc)
if name in self.procs:
self._error("events\Processor.register", name)
return None
self.procs[name] = dict()
self.procs[name]["logic"] = proc
self.procs[name]["simple"] = simple
return True
def spinup(self, name, timeout=None, *args, **kwargs):
proc, active = self._check_process(self, name)
if not proc or active:
return None
simple = proc.get("simple", False)
logic = proc.get("logic", None)
if logic is None:
self._error("events\Processor.start", name)
return None
exports = self.exports.copy()
exports["proc"] = name
if timeout is not None:
exports["timeout"] = timeout
proxy_args = (logic, exports, *args, **kwargs)
if simple:
procedure = Proxy(SimpleProc).make(*proxy_args)
else:
procedure = Proxy(Proc).make(*proxy_args)
procedure.start()
self.active[name] = procedure
return True
def spindown(self, name):
proc, active = self._check_process(self, name)
if not proc or not active:
return None
active.stop()
del self.active[name]
return True
def spinup_all(self, **kwargs):
return self._all("spinup")
def spindown_all(self, **kwargs):
statuses = self._all("spindown")
self.condition.set()
return statuses

49
abots/events/proxy.py Normal file
View File

@ -0,0 +1,49 @@
"""
events\Proxy
============
This is used to act as a proxy for sending classes / objects between other
processes, ideally without running into race conditions.
"""
from abots.helpers import ctos
from re import compile as regex
from multiprocessing.managers import BaseManager
class ProxyManager(BaseManager):
pass
class Proxy:
def __init__(self, modules):
self.modules = modules if type(modules) is list else [modules]
self._create_proxy_manager()
# Gets the module as-is, which should be a class to be called
def get(self, module):
return getattr(self._proxy_manager, module, None)
# Returns the module as an object using the provided arguments
def obj(self, module, *args, **kwargs):
return self.get(module)(*args, **kwargs)
# Lists all of the modules stored in the proxy manager
def gather(self):
return [pm for pm in dir(self._proxy_manager) if pm[0].isupper()]
# If there is only one module, it makes it into a proxy object
def make(self, *args, **kwargs):
gather = self.gather
if len(gather) != 1:
return None
return self.obj(gather[0], *args, **kwargs)
def _create_proxy_manager(self):
for module in self.modules:
module_name = ctos(module)
ProxyManager.register(module_name, module)
proxy_manager = ProxyManager()
proxy_manager.start()
self._proxy_manager = proxy_manager

View File

33
abots/events/shared.py Normal file
View File

@ -0,0 +1,33 @@
from queue import Empty, Full
from multiprocessing import get_context
import multiprocessing.queues import Queue as MPQueue
class Shared(MPQueue):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, ctx=get_context())
def safe_get(self, timeout=None):
try:
if timeout is None:
return self.get(block=False)
else:
return self.get(block=True, timeout=timeout)
except Empty:
return None
def safe_put(self, entry, timeout=None):
try:
self.put(entry, block=False, timeout=timeout)
return True
except Full:
return False
def gather(self, timeout=None):
while not self.empty():
yield self.safe_get(timeout)
def safe_close(self):
closed = sum(1 for entry in self.gather())
self.close()
self.join_thread()
return closed

View File

@ -1 +1,3 @@
from abots.helpers.json import jots, jsto
from abots.helpers.hash import create_hash, md5, sha1, sha256, sha512
from abots.helpers.encode import jots, jsto, b64e, b64d, h2b64, b642h, ctos
from abots.helpers.general import eprint

67
abots/helpers/encode.py Executable file
View File

@ -0,0 +1,67 @@
from json import dumps, loads
from codecs import encode as c_encode, decode as c_decode
from base64 import b64encode, b64decode
# JSON encoder, converts a python object to a string
def jots(data, readable=False):
kwargs = dict()
# If readable is set, it pretty prints the JSON to be more human-readable
if readable:
kwargs["sort_keys"] = True
kwargs["indent"] = 4
kwargs["separators"] = (",", ":")
try:
return json.dumps(data, **kwargs)
except ValueError as e:
return None
# JSON decoder, converts a string to a python object
def jsto(data):
try:
return json.loads(data)
except ValueError as e:
return None
# Encodes data to base64
def b64e(data, altchars=None, url=False, use_bin=False):
if type(data) is str:
data = data.encode("utf-8")
if altchars is None and url:
altchars = "-_"
base64_data = b64encode(hex_data, altchars)
if use_bin:
return base64_data
return base64_data.decode("utf-8")
# Decodes data from base64
def b64d(base64_data, altchars=None, url=False, use_bin=False):
if type(base64_data) is str:
base64_data = base64_data.encode("utf-8")
if altchars is None and url:
altchars = "-_"
data = b64decode(base64_data, altchars)
if use_bin:
return data
return data.decode("utf-8")
# Converts hex to base64 encoding
def h2b64(hex_data, altchars=None, url=False, use_bin=False):
if type(hex_data) is str:
hex_data = c_decode(hex_data, "hex")
return b64e(hex_data, altchars, url, use_bin)
# Decodes base64 and converts to hex
def b642h(base64_data, altchars=None, url=False, use_bin=False):
base64_decoded = b64d(base64_data, altchars, url, use_bin)
hex_data = c_encode(base64_decoded, "hex")
if use_bin:
return hex_data
return hex_data.decode("utf-8")
# str(ClassName) -> "<class '__main__.ClassName'>"
# This function extracts the class name from the str output
def ctos(_class):
pattern = regex(r"[<'>]")
cleaned = pattern.sub("", str(_class))
return cleaned.split("class ", 1)[1].split(".")[-1]

4
abots/helpers/general.py Normal file
View File

@ -0,0 +1,4 @@
from sys import stderr
def eprint(*args, **kwargs):
print(*args, file=stderr, **kwargs)

42
abots/helpers/hash.py Normal file
View File

@ -0,0 +1,42 @@
from os import urandom
from binascii import hexlify
from hashlib import algorithms_available, pbkdf2_hmac, script, new as new_hash
def create_hash(algorithm, seed=None, random_bytes=32, use_bin=False):
if algorithm not in algorithms_available:
return None
h = new_hash(algorithm)
if seed is None:
h.update(urandom(random_bytes))
else:
h.update(seed.encode("utf-8"))
if use_bin:
return h.digest()
return h.hexdigest()
def md5(*args, **kwargs):
create_hash("md5", *args, **kwargs)
def sha1(*args, **kwargs):
create_hash("sha1", *args, **kwargs)
def sha256(*args, **kwargs):
create_hash("sha256", *args, **kwargs)
def sha512(*args, **kwargs):
create_hash("sha512", *args, **kwargs)
def pbkdf2(algo, pswd, salt=None, cycles=100000, key_len=None, use_bin=False):
if algorithm not in algorithms_available:
return None
if type(pswd) is str:
pswd = pswd.encode("utf-8")
if salt is None:
salt = os.urandom(16)
elif type(salt) is str:
salt = salt.encode("utf-8")
derived_key = pbkdf2_hmac(algo, pswd, salt, cycles, key_len)
if use_bin:
return derived_key
return hexlify(derived_key).decode("utf-8")

View File

@ -1,22 +0,0 @@
from json import dumps, loads
# JSON encoder, converts a python object to a string
def jots(self, data, readable=False):
kwargs = dict()
# If readable is set, it pretty prints the JSON to be more human-readable
if readable:
kwargs["sort_keys"] = True
kwargs["indent"] = 4
kwargs["separators"] = (",", ":")
try:
return json.dumps(data, **kwargs)
except ValueError as e:
return None
# JSON decoder, converts a string to a python object
def jsto(self, data):
try:
return json.loads(data)
except ValueError as e:
return None

View File

@ -1,10 +1,3 @@
from abots.net.socket_client_handler import SocketClientHandler as handler
from struct import pack, unpack
from multiprocessing import Process, Queue, JoinableQueue
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
"""
Socket Client
@ -14,11 +7,18 @@ Socket Client
"""
from abots.net.socket_client_handler import SocketClientHandler as handler
from struct import pack, unpack
from multiprocessing import Process, Queue, JoinableQueue
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
class SocketClient(Process):
def __init__(self, host, port, buffer_size=4096, end_of_line="\r\n",
secure=False, inbox=JoinableQueue(), outbox=Queue(), handler=handler,
**kwargs):
Process.__init__(self)
super().__init__(self)
self.host = host
self.port = port

View File

@ -1,13 +1,3 @@
from abots.net.socket_server_handler import SocketServerHandler as handler
from threading import Thread
from struct import pack, unpack
from select import select
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from multiprocessing import Process, Queue, JoinableQueue
from time import time
from ssl import wrap_socket
"""
net\SocketServer
@ -20,14 +10,24 @@ functions to send/receive messages from the server.
"""
from abots.net.socket_server_handler import SocketServerHandler as handler
from threading import Thread
from struct import pack, unpack
from select import select
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from multiprocessing import Process, Queue, JoinableQueue
from time import time
from ssl import wrap_socket
# Inherits Process so that server can be run as a daemon
class SocketServer(Process):
# There are many parameters here, but that is so that any constant used can
# be easily tweaked and not remain hard-coded without an easy way to change
def __init__(self, host, port, listeners=5, buffer_size=4096, secure=False,
max_message_size=-1, end_of_line="\r\n", heartbeat=60,
timeout=0.02, max_message_size=-1, end_of_line="\r\n", heartbeat=60,
inbox=JoinableQueue(), outbox=Queue(), handler=handler, **kwargs):
Process.__init__(self)
super().__init__(self)
# The connection information for server, the clients will use this to
# connect to the server
@ -41,6 +41,9 @@ class SocketServer(Process):
# Size of buffer pulled by `receive_bytes` when not specified
self.buffer_size = buffer_size
# Timeout for the socket server, in term of seconds
self.timeout = timeout
# If max_message_size is -1, it allows any message size
self.max_message_size = max_message_size
@ -145,6 +148,7 @@ class SocketServer(Process):
except (BrokenPipeError, OSError) as e:
# This usually means that the port is already in use
return e
self.sock.settimeout(self.timeout)
self.sock.listen(self.listeners)
# Gets the file descriptor of the socket, which is a fallback for a

View File

@ -1,5 +1,3 @@
from curses import initscr, noecho, cbreak
"""
ui\TUI: Text User Interface
@ -12,7 +10,99 @@ nice framework I can re-use without needing to pull out a manual for curses to
figure out exactly what everything does (which is what I will be doing during
the duration of writing this script).
I would like to note here that while I usually avoid using `import <module>` in
favor of `from <module> import <component>`, I am making an exception here for
curses due to how excessive and overkill that would be.
"""
import curses
from time import sleep
class TUI:
pass
def __init__(self):
# self.screen = None
self.windows = dict()
self.running = True
# This removes the need for `initialize` and `cleanup`
curses.wrapper(self.run)
# Puts terminal into correct modes and state
def initialize(self):
self.screen = curses.initscr()
# So that input will be read and not just echo'd onto the screen
curses.noecho()
# Will read keys as they are typed without needing Enter pressed
curses.cbreak()
# Converts special keys into curses variables
self.screen.keypad(True)
self.running = True
# Returns terminal back to how it was beforehand
def cleanup(self):
self.running = False
curses.nocbreak()
self.screen.keypad(False)
curses.echo()
curses.endwin()
# Stop loop running in `run`
def stop(self):
self.running = False
# Easily check if getch matches a letter or curses.KEY_* variable
def is_key(self, getch, test):
if type(test) is str:
return getch == ord(test)
return getch == test
# Curses uses the format: height, width, y, x or y, x
def create_window(start_x, start_y, width=None, height=None):
window = dict()
# Curses logic run inside wrapper to initialize/cleanup automatically
def run(self, screen):
win = dict()
win["width"] = 80
win["height"] = 24
win["start_x"] = 10
win["start_y"] = 10
win["window"] = screen.subwin(win["height"], win["width"],
win["start_y"], win["start_x"])
window = win["window"]
# # Curses logic run inside wrapper to initialize/cleanup automatically
# def run(self, screen):
# # Make getch non-blocking and clear the screen
# screen.nodelay(True)
# screen.clear()
# width = 4
# count = 0
# direction = 1
# while self.running:
# # Get user input
# char = screen.getch()
# # Flush the user input and clear the screen
# curses.flushinp()
# screen.clear()
# if self.is_key(char, "q"):
# screen.addstr("Closing...")
# self.stop()
# break
# elif self.is_key(char, curses.KEY_UP):
# width = width + 1
# screen.addstr("#" * count)
# count = count + direction
# if count == width:
# direction = -1
# elif count == 0:
# direction = 1
# sleep(0.1)

View File

@ -1,6 +1,86 @@
#!env/bin/python3
# import sys
# sys.path.insert(0, "/center/lib")
import sys
sys.path.insert(0, "/center/lib")
#from abots.net import SocketServer, SocketClient
# from abots.net import SocketServer, SocketClient
# from abots.ui import TUI
# tui = TUI()
# from abots.events import Proxy
# class Counter(object):
# def __init__(self):
# self._value = 0
# def update(self, value):
# self._value = self._value + value
# def get_value(self):
# return self._value
# def __str__(self):
# return "Counter"
# modules = list()
# modules.append(Counter)
# proxy = Proxy(modules)
# from abots.events.event import Event
# from abots.events.proc import Proc
# from abots.events import Every
# from time import time, sleep
# from multiprocessing import Queue, Process, Manager
# event_q = Queue()
# # send_q = Queue()
# timeout = 0.02
# def nag(eq):
# eq.put(time())
# def blab(eq):
# print(eq.get(block=True, timeout=timeout))
# def destroy(everys, queues):
# for every in everys:
# every.stop()
# for queue in queues:
# queue.close()
# queue.join_thread()
# nag_every = Every(10, nag, event_q)
# nag_proc = Process(target=nag_every.start)
# blab_every = Every(10, blab, event_q)
# blab_proc = Process(target=blab_every.start)
# everys = [
# nag_every,
# blab_every
# ]
# queues = [event_q]
# nag_proc.start()
# blab_proc.start()
from abots.events import Processor
class Test:
def __init__(self, condition, events):
self.counter = 0
self.condition = condition
self.events = events
def handler(self, event):
print(event)
self.counter = self.counter + 1
def start(self):
print("Started")
def stop(self):
print("Stopped")