Removing archive, .gitignore apparently did not ignore
This commit is contained in:
parent
42f32fce15
commit
d25b16bc46
|
@ -1,98 +0,0 @@
|
|||
"""
|
||||
|
||||
events/Pipe
|
||||
===========
|
||||
|
||||
This is purely an abstraction layer that allows a (Simple)Proc to interact with
|
||||
another (Simple)Proc through the Processor's notification/event system. It also
|
||||
allows a (Simple)Proc to stop itself or use one of the extensions added to the
|
||||
processor.
|
||||
|
||||
"""
|
||||
|
||||
from abots.events import Shared
|
||||
|
||||
class Pipe:
|
||||
def __init__(self, exports):
|
||||
# This is only here to use existing extensions, not to add new ones
|
||||
self.extensions = exports.get("extensions", None)
|
||||
|
||||
# This is only here to append new notifications, not to read them
|
||||
self.notifications = Shared(exports.get("notifications", None))
|
||||
|
||||
# This is used to append new listeners to subscribe to
|
||||
self.listeners = exports.get("listeners", None)
|
||||
|
||||
# This is used to stop the proc the pipe is connected to
|
||||
self.condition = exports.get("condition", None)
|
||||
|
||||
# This is used to stop the proc the pipe is connected to
|
||||
self.events = exports.get("events", None)
|
||||
|
||||
# This is an optional handler used for logging
|
||||
self.handler = exports.get("handler", None)
|
||||
|
||||
# While this one is not used now, it will be very useful for debugging
|
||||
self.proc = exports.get("proc", None)
|
||||
|
||||
# Is set to False if any of the needed exports are missing
|
||||
self.stable = True
|
||||
if self.extensions is None:
|
||||
self._error("events/Pipe.__init__", "extensions")
|
||||
self.stable = False
|
||||
if self.notifications is None:
|
||||
self._error("events/Pipe.__init__", "notifications")
|
||||
self.stable = False
|
||||
if self.listeners is None:
|
||||
self._error("events/Pipe.__init__", "listeners")
|
||||
self.stable = False
|
||||
if self.condition is None:
|
||||
self._error("events/Pipe.__init__", "condition")
|
||||
self.stable = False
|
||||
if self.events is None:
|
||||
self._error("events/Pipe.__init__", "events")
|
||||
self.stable = False
|
||||
|
||||
# Works if, and only if, handler has an error method to call
|
||||
def _error(self, name, data):
|
||||
if self.handler is None:
|
||||
return None
|
||||
elif getattr(self.handler, "error", None):
|
||||
return None
|
||||
self.handler.error(name, data)
|
||||
|
||||
# Informs the associated proc to stop running
|
||||
def stop(self):
|
||||
if self.condition.is_set():
|
||||
self.condition.set()
|
||||
|
||||
# Dispatches a new notification to the processor
|
||||
def notify(self, name, data):
|
||||
if not self.stable:
|
||||
return False
|
||||
print(f"Notifying {name}: {data}")
|
||||
return self.notifications.safe_put((name, data))
|
||||
|
||||
# Informs the pipe to add a new notification listener
|
||||
def listen(self, notification):
|
||||
if not self.stable:
|
||||
return False
|
||||
if notification in self.listeners:
|
||||
self._error("events/Pipe.listen", notification)
|
||||
return False
|
||||
self.listeners.append(notification)
|
||||
|
||||
# Informs the pipe to remove a new notification listener
|
||||
def silence(self, notification):
|
||||
if not self.stable:
|
||||
return False
|
||||
if notification in self.listeners:
|
||||
self._error("events/Pipe.listen", notification)
|
||||
return False
|
||||
self.listeners.remove(notification)
|
||||
|
||||
# Utilizes one of the extensions loaded by the processor
|
||||
def use(self, extension):
|
||||
if not self.stable:
|
||||
return False
|
||||
return self.extensions.get(extension, None)
|
|
@ -1,118 +0,0 @@
|
|||
"""
|
||||
|
||||
events/SimpleProc & events/Proc
|
||||
============================
|
||||
|
||||
This is designed to be used run by events/Processor to allow it to control
|
||||
whatever class is sent to it. The class will be spawned inside of events/Proxy
|
||||
so that operate better in a multiprocessing environment. As well, they will be
|
||||
provided an event/Shared for an event queue as well as a multiprocessing event
|
||||
to trigger the stop condition.
|
||||
|
||||
"""
|
||||
from abots.events.pipe import Pipe
|
||||
from abots.events import Every, Proxy, Shared
|
||||
from abots.helpers import eprint
|
||||
|
||||
from multiprocessing import Process
|
||||
from traceback import print_exc
|
||||
from time import sleep
|
||||
|
||||
# The logic in Proc will have full control of how to process events
|
||||
class Proc(Process):
|
||||
def __init__(self, name, logic, exports, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
# Mainly used for SimpleProc
|
||||
self.name = name
|
||||
|
||||
# This is an optional handler used for logging
|
||||
self.handler = exports.get("handler")
|
||||
|
||||
# How long to wait for condition before timing out
|
||||
self.timeout = exports.get("timeout")
|
||||
|
||||
# This is just a wrapper around multiprocessing.manager.Queue
|
||||
self.events = Shared(exports.get("events"))
|
||||
|
||||
# This is used to indicate the stop condition for the process
|
||||
self.condition = exports.get("condition")
|
||||
|
||||
# The notifications that the process will listen to
|
||||
self.listeners = exports.get("listeners")
|
||||
|
||||
# Controls the events sent to/received by the Processor
|
||||
self.pipe = Pipe(exports)
|
||||
|
||||
# Mainly used for setting self.logic in run, isolated from SimpleProc
|
||||
self._logic = logic
|
||||
|
||||
# Works if, and only if, handler has an error method to call
|
||||
def _error(self, name, data):
|
||||
if self.handler is None:
|
||||
return None
|
||||
elif getattr(self.handler, "error", None):
|
||||
return None
|
||||
self.handler.error(name, data)
|
||||
return True
|
||||
|
||||
# Checks if method exists in logic, and if so calls it
|
||||
def call(self, method, *args, **kwargs):
|
||||
logic_method = getattr(self.logic, method, None)
|
||||
if logic_method is None:
|
||||
self._error("event/Proc:call", method)
|
||||
return None
|
||||
# In case the method errors out, catch it here to prevent crashing
|
||||
try:
|
||||
return logic_method(*args, **kwargs)
|
||||
except Exception:
|
||||
status = print_exc()
|
||||
eprint(status)
|
||||
self._error("event/Proc:call!", status)
|
||||
return status
|
||||
|
||||
# This is launched with `start` because of the Process inheritance
|
||||
def run(self):
|
||||
# All actions needed for logic can be leveraged through the pipe
|
||||
self.logic = self._logic(self.pipe)
|
||||
return self.call("start")
|
||||
|
||||
# Cleans up the multiprocessing components
|
||||
def stop(self):
|
||||
# Stop is called here so it can choose what to do with the queue
|
||||
# Once it is done, it should set the condition so the events can close
|
||||
status = self.call("stop")
|
||||
# Wait until logic is done with the events, then close the queue
|
||||
self.condition.wait(timeout=self.timeout)
|
||||
self.events.safe_close()
|
||||
return status
|
||||
|
||||
# A simplified version where logic will only process the event given
|
||||
class SimpleProc(Proc):
|
||||
def __init__(self, name, logic, exports, *args, **kwargs):
|
||||
# While timeout is still used, it is used only in `run` here
|
||||
super().__init__(name, logic, exports, *args, **kwargs)
|
||||
|
||||
# Since logic can be a function in Proc, it is not initalized here
|
||||
self.logic = logic
|
||||
|
||||
# By default, adds a listener from the proc's name
|
||||
self.pipe.listen(self.name)
|
||||
|
||||
# This is launched with `start` because of the Process inheritance
|
||||
def run(self):
|
||||
while not self.condition.is_set():
|
||||
while not self.events.empty():
|
||||
# The pipe is sent so logic can use it to react to the event
|
||||
self.events.safe_apply(self.logic, self.pipe)
|
||||
# timeout is used here to add a delay in processing, if desired
|
||||
if self.timeout is not None:
|
||||
sleep(self.timeout)
|
||||
self.stop()
|
||||
|
||||
# Cleans up the multiprocessing components
|
||||
def stop(self):
|
||||
# If the condition was not already set, do it now
|
||||
if self.condition.is_set():
|
||||
self.condition.set()
|
||||
self.events.safe_close()
|
|
@ -1,202 +0,0 @@
|
|||
"""
|
||||
|
||||
events/Processor
|
||||
================
|
||||
|
||||
This is where all of the magic happens. The processor will allow you to spin up
|
||||
a bunch of processes and allow them to communicate over a shared event queue.
|
||||
It does all of this using a handful of multiprocessing abstractions included in
|
||||
the "events" sub-module.
|
||||
|
||||
"""
|
||||
|
||||
from abots.events.proc import Proc, SimpleProc
|
||||
from abots.events.shared import Shared
|
||||
from abots.events import Every, Proxy
|
||||
from abots.helpers import ctos
|
||||
|
||||
from time import sleep
|
||||
from multiprocessing import Process, Manager
|
||||
|
||||
class Processor(Process):
|
||||
def __init__(self, handler=None, timeout=None):
|
||||
super().__init__()
|
||||
|
||||
# This provides the means to create shareable dictionaries
|
||||
self.manager = Manager()
|
||||
|
||||
# This is an optional handler used for logging
|
||||
self.handler = handler
|
||||
|
||||
# Dictates whether the notifications need a timeout for get/put actions
|
||||
self.timeout = timeout
|
||||
|
||||
# All of the procs handled by the processor
|
||||
self.procs = dict()
|
||||
|
||||
# All of the procs that are actively running
|
||||
self.active = dict()
|
||||
|
||||
# Any extensions that have been loaded into the processor
|
||||
self.extensions = self.manager.dict()
|
||||
|
||||
# The shared event queue for notifications for the procs
|
||||
self.notifications = Shared(self.manager.Queue())
|
||||
|
||||
# The stop condition to halt the processor and all of its processes
|
||||
self.condition = self.manager.Event()
|
||||
|
||||
# Used later to export settings inherited by the procs
|
||||
self.exports = self._export()
|
||||
|
||||
# Works if, and only if, handler has an error method to call
|
||||
def _error(self, name, data):
|
||||
if self.handler is None:
|
||||
return None
|
||||
elif getattr(self.handler, "error", None):
|
||||
return None
|
||||
self.handler.error(name, data)
|
||||
return True
|
||||
|
||||
# Creates a dictionary of the shared settings to pass to procs
|
||||
def _export(self):
|
||||
exports = dict()
|
||||
exports["notifications"] = self.notifications.queue
|
||||
exports["extensions"] = self.extensions
|
||||
if self.handler is not None:
|
||||
exports["handler"] = self.handler
|
||||
return exports
|
||||
|
||||
# Determine if a process exists, is formatted correctly, and active
|
||||
def _check_process(self, name):
|
||||
proc = self.procs.get(name, None)
|
||||
if proc is None:
|
||||
self._error("events/Processor._check_process", name)
|
||||
return None, None
|
||||
active = self.active.get(name, None)
|
||||
return proc, active
|
||||
|
||||
# Wrapper for calling actions on all procs
|
||||
def _all(self, action_name, **kwargs):
|
||||
statuses = dict()
|
||||
action = getattr(self, action_name, None)
|
||||
if action is None:
|
||||
self._error("events/Processor._all", action_name)
|
||||
return None
|
||||
for proc_name in self.procs.keys():
|
||||
proc_kwargs = kwargs.get(proc_name, dict())
|
||||
# Save the status of the action for any error reporting
|
||||
statuses[proc_name] = action(proc_name, **proc_kwargs)
|
||||
return statuses
|
||||
|
||||
# Add the extension through a Proxy to work across procs
|
||||
def extend(self, name, logic, *args, **kwargs):
|
||||
self.extensions[name] = Proxy(logic).make(*args, **kwargs)
|
||||
|
||||
# The loop that handles distributing notifications to their procs
|
||||
def run(self):
|
||||
while not self.condition.is_set():
|
||||
while not self.notifications.empty():
|
||||
notification = self.notifications.safe_get()
|
||||
print("Notification:", notification)
|
||||
print(self.active)
|
||||
# We do not care about the proc's name here, only values needed
|
||||
for name, info in self.active.items():
|
||||
print(info)
|
||||
# The proc has been stopped, tell processor to spin it down
|
||||
if info["condition"].is_set():
|
||||
self.spindown()
|
||||
continue
|
||||
for event_name in info["listeners"]:
|
||||
# name, data = notification
|
||||
if event_name != notification[0]:
|
||||
continue
|
||||
Shared(info["events"]).safe_put(notification, self.timeout)
|
||||
|
||||
# Clean up everything that needs to be stopped
|
||||
def stop(self):
|
||||
# Determine the max time procs will take to spin down
|
||||
timeout = 0 if self.timeout is None else self.timeout
|
||||
wait = timeout * len(self.procs)
|
||||
# Spin down all of the procs, then sets the stop condition
|
||||
self.spindown_all()
|
||||
# Give procs enough time to spin down
|
||||
self.condition.wait(wait)
|
||||
# Close the notification loop, return all of the entries not processed
|
||||
return Shared(self.notifications).safe_close()
|
||||
|
||||
# Informs the processor of the proc, does not start it
|
||||
def register(self, proc, name=None, simple=False):
|
||||
if name is None:
|
||||
# Class-to-String helper function, extracts name of class
|
||||
name = ctos(proc)
|
||||
if name in self.procs:
|
||||
self._error("events/Processor.register", name)
|
||||
return None
|
||||
proc_info = dict()
|
||||
proc_info["logic"] = proc
|
||||
proc_info["simple"] = simple
|
||||
self.procs[name] = proc_info
|
||||
return True
|
||||
|
||||
# Starts the proc and loads into into self.active
|
||||
def spinup(self, name, timeout=None, *args, **kwargs):
|
||||
print(f"Spinning up {name}")
|
||||
proc, active = self._check_process(name)
|
||||
if not proc or active:
|
||||
return None
|
||||
simple = proc.get("simple", False)
|
||||
logic = proc.get("logic", None)
|
||||
if logic is None:
|
||||
self._error("events/Processor.start", name)
|
||||
return None
|
||||
# These are defined here so that proc objects do not need to be shared
|
||||
listeners = self.manager.list()
|
||||
condition = self.manager.Event()
|
||||
events = self.manager.Queue()
|
||||
|
||||
exports = self.exports.copy()
|
||||
exports["proc"] = name
|
||||
exports["listeners"] = listeners
|
||||
exports["condition"] = condition
|
||||
exports["events"] = events
|
||||
if timeout is not None:
|
||||
exports["timeout"] = timeout
|
||||
proxy_args = (name, logic, exports, *args)
|
||||
if simple:
|
||||
# procedure = Proxy(SimpleProc).make(*proxy_args, **kwargs)
|
||||
procedure = SimpleProc(*proxy_args, **kwargs)
|
||||
else:
|
||||
# procedure = Proxy(Proc).make(*proxy_args, **kwargs)
|
||||
procedure = Proc(*proxy_args, **kwargs)
|
||||
procedure.start()
|
||||
active_info = dict()
|
||||
active_info["listeners"] = listeners
|
||||
active_info["condition"] = condition
|
||||
active_info["events"] = events
|
||||
print(active_info)
|
||||
self.active[name] = active_info
|
||||
print(self.active)
|
||||
return True
|
||||
|
||||
# Stops the proc and removes it from self.active
|
||||
def spindown(self, name):
|
||||
print(f"Spinning down {name}")
|
||||
proc, active = self._check_process(name)
|
||||
if not proc or not active:
|
||||
return None
|
||||
condition = active.get("condition", None)
|
||||
if condition is None:
|
||||
self._error("events/Processor.spindown", active)
|
||||
condition.set()
|
||||
del self.active[name]
|
||||
return True
|
||||
|
||||
# Start all of the procs
|
||||
def spinup_all(self, **kwargs):
|
||||
return self._all("spinup", **kwargs)
|
||||
|
||||
# Stop all of the procs
|
||||
def spindown_all(self, **kwargs):
|
||||
self.condition.set()
|
||||
return self._all("spindown", **kwargs)
|
|
@ -1,37 +0,0 @@
|
|||
"""
|
||||
|
||||
events/Proxy
|
||||
============
|
||||
|
||||
This is used to act as a proxy for sending classes / objects between other
|
||||
processes, ideally without running into race conditions.
|
||||
|
||||
"""
|
||||
|
||||
from abots.helpers import ctos
|
||||
|
||||
from re import compile as regex
|
||||
from multiprocessing.managers import BaseManager
|
||||
|
||||
class ProxyManager(BaseManager):
|
||||
pass
|
||||
|
||||
class Proxy:
|
||||
def __init__(self, module):
|
||||
self.module = module
|
||||
self.name = ctos(self.module)
|
||||
self._create_proxy_manager()
|
||||
|
||||
# Gets the module as-is, which should be a class to be called
|
||||
def get(self):
|
||||
return getattr(self._proxy_manager, self.name, None)
|
||||
|
||||
# Makes the proxy module into a proxy object
|
||||
def make(self, *args, **kwargs):
|
||||
return self.get()(*args, **kwargs)
|
||||
|
||||
def _create_proxy_manager(self):
|
||||
ProxyManager.register(self.name, self.module)
|
||||
proxy_manager = ProxyManager()
|
||||
proxy_manager.start()
|
||||
self._proxy_manager = proxy_manager
|
|
@ -1,56 +0,0 @@
|
|||
"""
|
||||
|
||||
events/Shared
|
||||
=============
|
||||
|
||||
This is a simple wrapper around the multiprocessing manager queue for safe
|
||||
usage of its functions when considering dangling entries during closing or
|
||||
empty/full events preventing getting / putting entries.
|
||||
|
||||
"""
|
||||
|
||||
from queue import Empty, Full
|
||||
from multiprocessing import get_context
|
||||
|
||||
class Shared:
|
||||
def __init__(self, queue):
|
||||
self.queue = queue
|
||||
|
||||
def empty(self):
|
||||
return self.queue.empty()
|
||||
|
||||
def done(self):
|
||||
return self.queue.task_done()
|
||||
|
||||
def safe_get(self, timeout=None):
|
||||
try:
|
||||
if timeout is None:
|
||||
return self.queue.get(block=False)
|
||||
else:
|
||||
return self.queue.get(block=True, timeout=timeout)
|
||||
except Empty:
|
||||
return None
|
||||
|
||||
def safe_put(self, entry, timeout=None):
|
||||
try:
|
||||
self.queue.put(entry, block=False, timeout=timeout)
|
||||
return True
|
||||
except Full:
|
||||
return False
|
||||
|
||||
def safe_apply(self, action, timeout=None, *args, **kwargs):
|
||||
entry = self.safe_get(timeout)
|
||||
result = action(entry, *args, **kwargs)
|
||||
self.done()
|
||||
return result
|
||||
|
||||
def gather(self, timeout=None):
|
||||
while not self.queue.empty():
|
||||
# Use yield to allow generators to gather entries with timeouts
|
||||
yield self.queue.safe_get(timeout)
|
||||
|
||||
def safe_close(self):
|
||||
# Gather all leftover messages still in the queue
|
||||
gathered = (entry for entry in self.gather())
|
||||
# self.queue.join()
|
||||
return gathered
|
Loading…
Reference in New Issue