Added first stable version of actor model, among other test components with it

This commit is contained in:
aewens 2019-04-12 22:57:22 +02:00
parent 0955c338e7
commit fec911b36a
19 changed files with 1101 additions and 658 deletions

2
.gitignore vendored
View File

@ -1,5 +1,7 @@
# Application-specific
scratch
*.bak.py
archive/
# ---> Python
# Byte-compiled / optimized / DLL files

View File

@ -2,4 +2,5 @@ from abots.events.proxy import Proxy
from abots.events.every import Every
from abots.events.shared import Shared
from abots.events.processor import Processor
from abots.events.actor import Actor, Envelope, Supervisor
# from abots.events.scheduler import Scheduler

254
abots/events/actor.py Normal file
View File

@ -0,0 +1,254 @@
from abots.helpers import eprint, noop
from multiprocessing import Process, Event, Queue
from traceback import print_exc
from enum import Enum
class MailCode(Enum):
NORMAL = 2
URGENT = 3
CRITICAL = 5
SENDER = 7
NO_SENDER = 11
class Ledger:
def __init__(self, entries=list(), pids=dict()):
self.entries = entries
self.pids = pids
def _size(self):
return len(self.entries)
def get_pid(self, name):
return self.pids.get(name, None)
def get_by_pid(self, pid):
if pid >= self._size():
return None
entry = self.entries[pid]
event, queue = entry
if event.is_set():
return None
return event, queue
def get(self, name):
pid = self.get_pid(name)
if pid is None:
return None
return self.get_by_pid(pid)
def register(self, name):
if self.get_pid(name) is not None:
return None
pid = self._size()
event = Event()
queue = Queue()
self.entries.append((event, queue))
self.pids[name] = pid
return pid
class Envelope:
def __init__(self, pid, ledger, envelope=None):
self.pid = pid
self.ledger = ledger
self.code = None
self.header = None
self.message = None
self.sender = None
self.valid = True
self.can_reply = False
self.can_deliver = False
if envelope is not None:
self.allowed = self.parse(envelope)
def parse(self, envelope):
if len(envelope) != 3:
self.valid = False
return None
code, header, message = envelope
self.code = code
self.header = header
self.message = message
self.sender = self.code % MailCode.SENDER == 0 and len(self.header) == 2
if self.sender:
self.can_reply = True
from_pid, to_pid = header
if to_pid != self.pid:
self.can_deliver = True
allowed = dict()
allowed["reply"] = self.can_reply
allowed["deliver"] = self.can_deliver
return allowed
def get_code(self, urgent, critical, sender=True):
code = MailCode.SENDER if sender else MailCode.NO_SENDER
if critical:
code = code * MailCode.CRITICAL
elif urgent:
code = code * MailCode.URGENT
else:
code = code * MailCode.NORMAL
return code
def compose(self, target, code, header, message):
if type(target) != int:
entry = self.ledger.get(target)
else:
entry = self.ledger.get_by_pid(target)
if entry is None:
return None
event, queue = entry
envelope = code, header, message
queue.put(envelope)
return True
def send(self, to_pid, message, urgent=False, critical=False):
code = self.get_code(urgent, critical)
header = self.pid, to_pid
return self.compose(to_pid, code, header, message)
def send_to(self, to_pid, message, urgent=False, critical=False):
code = self.get_code(urgent, critical, sender=False)
return self.compose(to_pid, code, to_pid, message)
def send_faux(self, faux_from_pid, faux_to_pid, to_pid, message,
urgent=False, critical=False):
code = self.get_code(urgent, critical)
header = faux_from_pid, to_pid
return self.compose(faux_to_pid, code, header, message)
def reply(self, message, urgent=False, critical=False,
deliver=False):
if not self.can_reply or (deliver and not self.can_deliver):
return None
from_pid, to_pid = self.header
code = self.get_code(urgent, critical)
if deliver:
return self.compose(to_pid, code, header, message)
else:
header = to_pid, from_pid
return self.compose(from_pid, code, header, message)
def deliver(self, message, urgent=False, critical=False):
return self.reply(message, urgent, critical, deliver=True)
class Actor(Process):
def __init__(self, name, proc, pid, ledger):
super().__init__()
self.name = name
self.proc = proc
self.own_pid = pid
self.ledger = ledger
self.valid = False
self.kill_switch = None
self.mailbox = None
entry = self.ledger.get_by_pid(self.own_pid)
if entry is not None:
event, queue = entry
self.kill_switch = event
self.mailbox = queue
self.valid = True
def call(self, source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
return status
def run(self):
print(f"Starting {self.name}")
if not self.valid:
return None
print(f"[{self.name}]: Starting proc")
self.proc.from_actor(self.pid, self.kill_switch, self.mailbox)
proc = Process(target=self.proc.start)
proc.start()
actions = ["reply", "deliver", "send", "send_to", "send_faux"]
print(f"[{self.name}]: Started proc")
self.kill_switch.wait()
# while not self.kill_switch.is_set():
# while not self.mailbox.empty():
# envelope = Envelope(self.own_pid, self.mailbox.get())
# if not envelope.valid:
# continue
# response = self.call(self.proc, "handle", envelope.message)
# if response is None or len(response) != 3:
# continue
# action, args, kwargs = response
# if action not in actions:
# continue
# elif action == "reply" and not envelope.can_reply:
# continue
# elif action == "deliver" and not envelope.can_deliver:
# continue
# self.call(envelope, action, *args, **kwargs)
self.stop()
def stop(self, done=None, timeout=None):
print(f"Stopping {self.name}")
delay = Event()
self.call(self.proc, "stop", delay)
delay.wait(timeout)
self.kill_switch.set()
self.mailbox.close()
self.mailbox.join_thread()
self.call(done, "set")
print(f"Stopped {self.name}")
class Supervisor:
def __init__(self, name="__ROOT__", ledger=Ledger()):
self.ledger = ledger
self.children = set()
self.pid = self.ledger.register(name)
def call(self, source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
return status
def spawn(self, name, proc):
pid = self.ledger.register(name)
if pid is None:
return None
self.children.add(pid)
actor = Actor(name, proc, pid, self.ledger)
return actor
def dismiss(self, child, done=None):
print(f"Dismissing {child}")
if type(child) != int:
child = self.ledger.get_pid(child)
if child not in self.children:
return None
entry = self.ledger.get_by_pid(child)
if entry is None:
return None
event, queue = entry
event.set()
queue.close()
queue.join_thread()
self.call(done, "set")
print(f"Dismissed {child}")
def stop(self, done=None):
for child in self.children:
self.dismiss(child)
entry = self.ledger.get_by_pid(self.pid)
if entry is None:
return None
event, queue = entry
event.set()
queue.close()
queue.join_thread()
self.call(done, "set")

View File

@ -1,54 +0,0 @@
from abots.events.pipe import Pipe
from abots.events.module import Module
from time import time
from multiprocessing import Process, Manager
# Queue, JoinableQueue
class Dispatcher:
def __init__(self, handler=None):
self.modules = dict()
self.handler = handler
manager = Manager()
self.extenstions = manager.dict()
self.notifications = manager.dict()
def _error(self, name, data):
if self.handler is None:
return None
elif getattr(self.handler, "error", None):
return None
self.handler.error(name, data)
def _is_module_init(self, module, destroy=False):
if not proc:
return False
instance = getattr(module, "instance", None)
return not instance if destroy else instance
def _start(self, mod, handler=None, *args, **kwargs):
module = self.modules.get(mod, None)
if self._is_module_init(module):
self._error("_start", mod)
return None
events = None
kwargs = dict()
if handler is not None:
kwargs["handler"] = handler
events = Pipe(module, self.extensions, self.notifications, **kwargs)
module.instance = module.logic(events)
if getattr(module.instance, "create", None) is not None:
return module.instance.create(*args, **kwargs)
def start(self, module):
pass
def extend(self, name, logic):
self.extensions[name] = logic
def register(self, module, logic):
if self.modules.get(module, None) is None:
self._error("register", module)
return None
self.modules[name] = Module(logic)

View File

@ -1,26 +1,35 @@
"""
events\Pipe
events/Pipe
===========
You may notice from the lack of imports used here that Pipe is purely an
abstraction layer that allows a (Raw)Proc to interact with another (Raw)Proc
through the Processor's notification/event system. It also allows a (Raw)Proc
to stop itself or use one of the extensions added to the processor.
This is purely an abstraction layer that allows a (Simple)Proc to interact with
another (Simple)Proc through the Processor's notification/event system. It also
allows a (Simple)Proc to stop itself or use one of the extensions added to the
processor.
"""
from abots.events import Shared
class Pipe:
def __init__(self, exports):
# This is only here to use existing extensions, not to add new ones
self.extensions = exports.get("extensions", None)
# This is only here to append new notifications, not to read them
self.notifications = exports.get("notifications", None)
self.notifications = Shared(exports.get("notifications", None))
#
# This is used to append new listeners to subscribe to
self.listeners = exports.get("listeners", None)
# This is used to stop the proc the pipe is connected to
self.condition = exports.get("condition", None)
# This is used to stop the proc the pipe is connected to
self.events = exports.get("events", None)
# This is an optional handler used for logging
self.handler = exports.get("handler", None)
# While this one is not used now, it will be very useful for debugging
@ -28,17 +37,20 @@ class Pipe:
# Is set to False if any of the needed exports are missing
self.stable = True
if self.exenstions is None:
self._error("events\Pipe.__init__", "extensions")
if self.extensions is None:
self._error("events/Pipe.__init__", "extensions")
self.stable = False
if self.notifications is None:
self._error("events\Pipe.__init__", "notifications")
self._error("events/Pipe.__init__", "notifications")
self.stable = False
if self.listeners is None:
self._error("events\Pipe.__init__", "listeners")
self._error("events/Pipe.__init__", "listeners")
self.stable = False
if self.condition is None:
self._error("events\Pipe.__init__", "condition")
self._error("events/Pipe.__init__", "condition")
self.stable = False
if self.events is None:
self._error("events/Pipe.__init__", "events")
self.stable = False
# Works if, and only if, handler has an error method to call
@ -49,28 +61,38 @@ class Pipe:
return None
self.handler.error(name, data)
# Informs the associated proc to stop running
def stop(self):
if self.condition.is_set():
self.condition.set()
# Dispatches a new notification to the processor
def notify(self, name, data):
return False if not self.stable
self.notifications.safe_put(name, data)
def listen(self, notification, callback):
return False if not self.stable
if self.listeners.get(notification, None) is not None:
self._error("events\Pipe.listen", notification)
if not self.stable:
return False
self.listeners[notification] = callback
print(f"Notifying {name}: {data}")
return self.notifications.safe_put((name, data))
# Informs the pipe to add a new notification listener
def listen(self, notification):
if not self.stable:
return False
if notification in self.listeners:
self._error("events/Pipe.listen", notification)
return False
self.listeners.append(notification)
# Informs the pipe to remove a new notification listener
def silence(self, notification):
return False if not self.stable
if self.listeners.get(notification, None) is not None:
self._error("events\Pipe.listen", notification)
if not self.stable:
return False
self.listeners[notification] = callback
if notification in self.listeners:
self._error("events/Pipe.listen", notification)
return False
self.listeners.remove(notification)
# Utilizes one of the extensions loaded by the processor
def use(self, extension):
return False if not self.stable
if not self.stable:
return False
return self.extensions.get(extension, None)

View File

@ -1,54 +1,51 @@
"""
events\RawProc & events\Proc
events/SimpleProc & events/Proc
============================
This is designed to be used run by events\Processor to allow it to control
whatever class is sent to it. The class will be spawned inside of events\Proxy
This is designed to be used run by events/Processor to allow it to control
whatever class is sent to it. The class will be spawned inside of events/Proxy
so that operate better in a multiprocessing environment. As well, they will be
provided an event\Shared for an event queue as well as a multiprocessing event
provided an event/Shared for an event queue as well as a multiprocessing event
to trigger the stop condition.
"""
from abots.events.pipe import Pipe
from abots.events.shared import Shared
from abots.events import Every, Proxy
from abots.events import Every, Proxy, Shared
from abots.helpers import eprint
from multiprocessing import Event, Process, Manager
from multiprocessing import Process
from traceback import print_exc
from time import sleep
# The logic in Proc will have full control of how to process events
class Proc(Process):
def __init__(self, logic, exports, *args, **kwargs):
super().__init__(self, *args, **kwargs)
def __init__(self, name, logic, exports, *args, **kwargs):
super().__init__(*args, **kwargs)
# Mainly used for SimpleProc
self.name = name
# This is an optional handler used for logging
self.handler = exports.get("handler", None)
self.handler = exports.get("handler")
# How long to wait for condition before timing out
self.timeout = exports.get("timeout", None)
self.timeout = exports.get("timeout")
# This is just a wrapper around multiprocessing.Queue
self.events = Shared()
# This is just a wrapper around multiprocessing.manager.Queue
self.events = Shared(exports.get("events"))
# This is used to indicate the stop condition for the process
self.condition = Event()
self.condition = exports.get("condition")
# The notifications that the process will listen to
self.listeners = Manger().dict()
# Pass along multiprocessing controls to exports for pipe
exports["events"] = self.events
exports["condition"] = self.condition
exports["listeners"] = self.listeners
self.listeners = exports.get("listeners")
# Controls the events sent to/received by the Processor
self.pipe = Pipe(exports)
# All actions needed for logic can be leveraged through the pipe
self.logic = logic(self.pipe)
# Mainly used for setting self.logic in run, isolated from SimpleProc
self._logic = logic
# Works if, and only if, handler has an error method to call
def _error(self, name, data):
@ -63,34 +60,28 @@ class Proc(Process):
def call(self, method, *args, **kwargs):
logic_method = getattr(self.logic, method, None)
if logic_method is None:
self._error("event\RawProc:call", method)
self._error("event/Proc:call", method)
return None
return logic_method(*args, **kwargs)
# This is launched with `start` because of the Process inheritance
def run(self):
# In case the method errors out, catch it here to prevent crashing
try:
status = self.call("start")
return logic_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
self._error("event\RawProc:run", status)
return None
return status
self._error("event/Proc:call!", status)
return status
# This is launched with `start` because of the Process inheritance
def run(self):
# All actions needed for logic can be leveraged through the pipe
self.logic = self._logic(self.pipe)
return self.call("start")
# Cleans up the multiprocessing components
def stop(self):
# Stop is called here so it can choose what to do with the queue
# Once it is done, it should set the condition so the events can close
try:
status = self.call("stop")
except Exception:
status = print_exc()
eprint(status)
self._error("event\RawProc:stop", status)
# If the condition was not already set, do it now
if self.condition.is_set():
self.condition.set()
status = self.call("stop")
# Wait until logic is done with the events, then close the queue
self.condition.wait(timeout=self.timeout)
self.events.safe_close()
@ -98,27 +89,30 @@ class Proc(Process):
# A simplified version where logic will only process the event given
class SimpleProc(Proc):
def __init__(self, logic, exports, *args, **kwargs):
def __init__(self, name, logic, exports, *args, **kwargs):
# While timeout is still used, it is used only in `run` here
super().__init__(self, logic, exports, *args, **kwargs)
super().__init__(name, logic, exports, *args, **kwargs)
# Since logic can be a function in Proc, it is not initalized here
self.logic = logic
# By default, adds a listener from the proc's name
self.pipe.listen(self.name)
# This is launched with `start` because of the Process inheritance
def run(self):
while not self.condition.is_set():
while not self.events.empty():
# The pipe is sent so logic can use it to react to the event
self.logic(self.pipe, events.safe_get())
self.events.safe_apply(self.logic, self.pipe)
# timeout is used here to add a delay in processing, if desired
if self.timeout is not None:
sleep(self.timeout)
self.stop()
# Cleans up the multiprocessing components
def stop(self):
# If the condition was not already set, do it now
if self.condition.is_set():
# This will probably never be called, but better safe than sorry
self.condition.set()
self.events.safe_close()

View File

@ -1,24 +1,55 @@
from abots.events.proc import Proc
"""
events/Processor
================
This is where all of the magic happens. The processor will allow you to spin up
a bunch of processes and allow them to communicate over a shared event queue.
It does all of this using a handful of multiprocessing abstractions included in
the "events" sub-module.
"""
from abots.events.proc import Proc, SimpleProc
from abots.events.shared import Shared
from abots.events import Every, Proxy
from abots.helpers import ctos
from time import sleep
from multiprocessing import Event, Process, Manager
from multiprocessing import Process, Manager
class Processor(Process):
def __init__(self, handler=None, timeout=None):
super().__init__(self)
manager = Manager()
super().__init__()
# This provides the means to create shareable dictionaries
self.manager = Manager()
# This is an optional handler used for logging
self.handler = handler
# Dictates whether the notifications need a timeout for get/put actions
self.timeout = timeout
self.procs = manager.dict()
self.active = manager.dict()
self.extenstions = manager.dict()
self.notifications = Shared()
self.condition = Event()
# All of the procs handled by the processor
self.procs = dict()
# All of the procs that are actively running
self.active = dict()
# Any extensions that have been loaded into the processor
self.extensions = self.manager.dict()
# The shared event queue for notifications for the procs
self.notifications = Shared(self.manager.Queue())
# The stop condition to halt the processor and all of its processes
self.condition = self.manager.Event()
# Used later to export settings inherited by the procs
self.exports = self._export()
# Works if, and only if, handler has an error method to call
def _error(self, name, data):
if self.handler is None:
return None
@ -27,100 +58,145 @@ class Processor(Process):
self.handler.error(name, data)
return True
# Creates a dictionary of the shared settings to pass to procs
def _export(self):
exports = dict()
exports["notifications"] = self.notifications
exports["extensions"] = self.extenstions
exports["notifications"] = self.notifications.queue
exports["extensions"] = self.extensions
if self.handler is not None:
exports["handler"] = self.handler
return exports
# Determine if a process exists, is formatted correctly, and active
def _check_process(self, name):
proc = self.procs.get(name, None)
if proc is None or type(proc) is not dict:
self._error("events\Processor._check_process", name)
if proc is None:
self._error("events/Processor._check_process", name)
return None, None
active = self.active.get(name, None)
return proc, active
# Wrapper for calling actions on all procs
def _all(self, action_name, **kwargs):
statuses = dict()
action = getattr(self, action_name, None)
if action is None:
self._error("events\Processor._all", action_name)
self._error("events/Processor._all", action_name)
return None
for proc in self.procs:
proc_kwargs = proc_kwargs = kwargs.get(proc, dict())
statuses[proc] = action(proc, **proc_kwargs)
for proc_name in self.procs.keys():
proc_kwargs = kwargs.get(proc_name, dict())
# Save the status of the action for any error reporting
statuses[proc_name] = action(proc_name, **proc_kwargs)
return statuses
# Add the extension through a Proxy to work across procs
def extend(self, name, logic, *args, **kwargs):
self.extensions[name] = Proxy(logic).make(*args, **kwargs)
# The loop that handles distributing notifications to their procs
def run(self):
while not self.condition.is_set():
while not self.notifications.empty():
notification = self.notifications.safe_get()
name, data = notification
for proc_name, proc in self.active.items():
if proc.condition.is_set():
print("Notification:", notification)
print(self.active)
# We do not care about the proc's name here, only values needed
for name, info in self.active.items():
print(info)
# The proc has been stopped, tell processor to spin it down
if info["condition"].is_set():
self.spindown()
continue
for event_name in proc.listeners:
if name == event_name:
proc.events.safe_put(notification, self.timeout)
for event_name in info["listeners"]:
# name, data = notification
if event_name != notification[0]:
continue
Shared(info["events"]).safe_put(notification, self.timeout)
# Clean up everything that needs to be stopped
def stop(self):
# Determine the max time procs will take to spin down
timeout = 0 if self.timeout is None else self.timeout
wait = timeout * len(self.procs)
# Spin down all of the procs, then sets the stop condition
self.spindown_all()
self.notifications.safe_close()
def extend(self, name, logic, *args, **kwargs):
self.extensions[name] = Proxy(logic).make(*args, **kwargs)
# Give procs enough time to spin down
self.condition.wait(wait)
# Close the notification loop, return all of the entries not processed
return Shared(self.notifications).safe_close()
# Informs the processor of the proc, does not start it
def register(self, proc, name=None, simple=False):
if name is None:
# Class-to-String helper function, extracts name of class
name = ctos(proc)
if name in self.procs:
self._error("events\Processor.register", name)
self._error("events/Processor.register", name)
return None
self.procs[name] = dict()
self.procs[name]["logic"] = proc
self.procs[name]["simple"] = simple
proc_info = dict()
proc_info["logic"] = proc
proc_info["simple"] = simple
self.procs[name] = proc_info
return True
# Starts the proc and loads into into self.active
def spinup(self, name, timeout=None, *args, **kwargs):
proc, active = self._check_process(self, name)
print(f"Spinning up {name}")
proc, active = self._check_process(name)
if not proc or active:
return None
simple = proc.get("simple", False)
logic = proc.get("logic", None)
if logic is None:
self._error("events\Processor.start", name)
self._error("events/Processor.start", name)
return None
# These are defined here so that proc objects do not need to be shared
listeners = self.manager.list()
condition = self.manager.Event()
events = self.manager.Queue()
exports = self.exports.copy()
exports["proc"] = name
exports["listeners"] = listeners
exports["condition"] = condition
exports["events"] = events
if timeout is not None:
exports["timeout"] = timeout
proxy_args = (logic, exports, *args, **kwargs)
proxy_args = (name, logic, exports, *args)
if simple:
procedure = Proxy(SimpleProc).make(*proxy_args)
# procedure = Proxy(SimpleProc).make(*proxy_args, **kwargs)
procedure = SimpleProc(*proxy_args, **kwargs)
else:
procedure = Proxy(Proc).make(*proxy_args)
# procedure = Proxy(Proc).make(*proxy_args, **kwargs)
procedure = Proc(*proxy_args, **kwargs)
procedure.start()
self.active[name] = procedure
active_info = dict()
active_info["listeners"] = listeners
active_info["condition"] = condition
active_info["events"] = events
print(active_info)
self.active[name] = active_info
print(self.active)
return True
# Stops the proc and removes it from self.active
def spindown(self, name):
proc, active = self._check_process(self, name)
print(f"Spinning down {name}")
proc, active = self._check_process(name)
if not proc or not active:
return None
active.stop()
condition = active.get("condition", None)
if condition is None:
self._error("events/Processor.spindown", active)
condition.set()
del self.active[name]
return True
# Start all of the procs
def spinup_all(self, **kwargs):
return self._all("spinup")
return self._all("spinup", **kwargs)
# Stop all of the procs
def spindown_all(self, **kwargs):
statuses = self._all("spindown")
self.condition.set()
return statuses
return self._all("spindown", **kwargs)

View File

@ -1,6 +1,6 @@
"""
events\Proxy
events/Proxy
============
This is used to act as a proxy for sending classes / objects between other
@ -17,33 +17,21 @@ class ProxyManager(BaseManager):
pass
class Proxy:
def __init__(self, modules):
self.modules = modules if type(modules) is list else [modules]
def __init__(self, module):
self.module = module
self.name = ctos(self.module)
self._create_proxy_manager()
# Gets the module as-is, which should be a class to be called
def get(self, module):
return getattr(self._proxy_manager, module, None)
def get(self):
return getattr(self._proxy_manager, self.name, None)
# Returns the module as an object using the provided arguments
def obj(self, module, *args, **kwargs):
return self.get(module)(*args, **kwargs)
# Lists all of the modules stored in the proxy manager
def gather(self):
return [pm for pm in dir(self._proxy_manager) if pm[0].isupper()]
# If there is only one module, it makes it into a proxy object
# Makes the proxy module into a proxy object
def make(self, *args, **kwargs):
gather = self.gather
if len(gather) != 1:
return None
return self.obj(gather[0], *args, **kwargs)
return self.get()(*args, **kwargs)
def _create_proxy_manager(self):
for module in self.modules:
module_name = ctos(module)
ProxyManager.register(module_name, module)
ProxyManager.register(self.name, self.module)
proxy_manager = ProxyManager()
proxy_manager.start()
self._proxy_manager = proxy_manager

View File

@ -1,33 +1,56 @@
"""
events/Shared
=============
This is a simple wrapper around the multiprocessing manager queue for safe
usage of its functions when considering dangling entries during closing or
empty/full events preventing getting / putting entries.
"""
from queue import Empty, Full
from multiprocessing import get_context
import multiprocessing.queues import Queue as MPQueue
class Shared(MPQueue):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, ctx=get_context())
class Shared:
def __init__(self, queue):
self.queue = queue
def empty(self):
return self.queue.empty()
def done(self):
return self.queue.task_done()
def safe_get(self, timeout=None):
try:
if timeout is None:
return self.get(block=False)
return self.queue.get(block=False)
else:
return self.get(block=True, timeout=timeout)
return self.queue.get(block=True, timeout=timeout)
except Empty:
return None
def safe_put(self, entry, timeout=None):
try:
self.put(entry, block=False, timeout=timeout)
self.queue.put(entry, block=False, timeout=timeout)
return True
except Full:
return False
def safe_apply(self, action, timeout=None, *args, **kwargs):
entry = self.safe_get(timeout)
result = action(entry, *args, **kwargs)
self.done()
return result
def gather(self, timeout=None):
while not self.empty():
yield self.safe_get(timeout)
while not self.queue.empty():
# Use yield to allow generators to gather entries with timeouts
yield self.queue.safe_get(timeout)
def safe_close(self):
closed = sum(1 for entry in self.gather())
self.close()
self.join_thread()
return closed
# Gather all leftover messages still in the queue
gathered = (entry for entry in self.gather())
# self.queue.join()
return gathered

View File

@ -1,3 +1,3 @@
from abots.helpers.hash import create_hash, md5, sha1, sha256, sha512
from abots.helpers.encode import jots, jsto, b64e, b64d, h2b64, b642h, ctos
from abots.helpers.general import eprint
from abots.helpers.general import eprint, deduce, noop, cast, get_digit

View File

@ -1,6 +1,7 @@
from json import dumps, loads
from codecs import encode as c_encode, decode as c_decode
from base64 import b64encode, b64decode
from re import compile as regex
# JSON encoder, converts a python object to a string
def jots(data, readable=False):

View File

@ -1,4 +1,18 @@
from sys import stderr
def eprint(*args, **kwargs):
print(*args, file=stderr, **kwargs)
print(*args, file=stderr, **kwargs)
def deduce(reference, attributes):
if type(attributes) is not list:
attributes = [attributes]
return list(map(lambda attr: getattr(reference, attr, None), attributes))
def noop(*args, **kwargs):
pass
def cast(obj, method, *args, **kwargs):
return getattr(obj, method, noop)(*args, **kwargs)
def get_digit(number, position):
return False if number - 10**position < 0 else number // 10*position % 10

View File

@ -1,6 +1,8 @@
from abots.helpers.general import eprint
from os import urandom
from binascii import hexlify
from hashlib import algorithms_available, pbkdf2_hmac, script, new as new_hash
from hashlib import algorithms_available, pbkdf2_hmac, new as new_hash
def create_hash(algorithm, seed=None, random_bytes=32, use_bin=False):
if algorithm not in algorithms_available:
@ -33,10 +35,31 @@ def pbkdf2(algo, pswd, salt=None, cycles=100000, key_len=None, use_bin=False):
if type(pswd) is str:
pswd = pswd.encode("utf-8")
if salt is None:
salt = os.urandom(16)
salt = urandom(16)
elif type(salt) is str:
salt = salt.encode("utf-8")
derived_key = pbkdf2_hmac(algo, pswd, salt, cycles, key_len)
if use_bin:
return derived_key
return hexlify(derived_key).decode("utf-8")
return derived_key, salt
return hexlify(derived_key).decode("utf-8"), salt.decode("utf-8")
# n = increase as general computing performance increases, min=14
# r = increase in case of breakthroughs in memory technology, min=8
# p = increase in case of breakthroughs in CPU technology, min=1
def scrypt(password, salt=None, n=14, r=8, p=1, use_bin=False):
try:
from hashlib import scrypt as _scrypt
except ImportError:
eprint("Error: Missing OpenSSL 1.1+ for hashlib.scrypt")
return None
if type(password) is str:
password = password.encode("utf-8")
if salt is None:
salt = urandom(16)
elif type(salt) is str:
salt = salt.encode("utf-8")
# N = 2**n, allows simple increments to the value to get desired effect
derived_key = _scrypt(password, salt, 2**n, r, p)
if use_bin:
return derived_key, salt
return hexlify(derived_key).decode("utf-8"), salt.decode("utf-8")

View File

@ -7,60 +7,59 @@ Socket Client
"""
from abots.net.socket_client_handler import SocketClientHandler as handler
from abots.helpers import eprint, noop
from struct import pack, unpack
from multiprocessing import Process, Queue, JoinableQueue
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
from traceback import print_exc
class SocketClient(Process):
def __init__(self, host, port, buffer_size=4096, end_of_line="\r\n",
secure=False, inbox=JoinableQueue(), outbox=Queue(), handler=handler,
**kwargs):
super().__init__(self)
class SocketClient():
def __init__(self, host, port, handler, buffer_size=4096, secure=False,
*args, **kwargs):
self.host = host
self.port = port
self.buffer_size = buffer_size
self.end_of_line = end_of_line
self.secure = secure
self.inbox = inbox
self.outbox = outbox
self.handler = handler(self)
self.handler = handler(self, *args, **kwargs)
self.sock = socket(AF_INET, SOCK_STREAM)
if self.secure:
self.sock = wrap_socket(self.sock, **kwargs)
self.connection = (self.host, self.port)
self.running = True
self.error = None
self.pid = None
self.kill_switch = None
self.mailbox = None
def _recv_bytes(self, get_bytes, decode=True):
data = "".encode()
eol = self.end_of_line.encode()
attempts = 0
while len(data) < get_bytes:
# Automatically break loop to prevent infinite loop
# Allow at least twice the needed iterations to occur exiting loop
if attempts > 2 * (get_bytes / self.buffer_size):
break
else:
attempts = attempts + 1
bufsize = get_bytes - len(data)
# Force bufsize to cap out at buffer_size
if bufsize > self.buffer_size:
bufsize = self.buffer_size
try:
packet = self.sock.recv(bufsize)
except OSError:
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
return None
length = len(data) + len(packet)
checker = packet if length < get_bytes else packet[:-2]
if eol in checker:
packet = packet.split(eol)[0] + eol
return data + packet
data = data + packet
return data.decode() if decode else data
def _package_message(self, message, *args):
formatted = None
if len(args) > 0:
formatted = message.format(*args) + self.end_of_line
formatted = message.format(*args)
else:
formatted = message + self.end_of_line
formatted = message
packaged = pack(">I", len(formatted)) + formatted.encode()
return packaged
@ -76,23 +75,17 @@ class SocketClient(Process):
if message_size is None:
return None
try:
return self._recv_bytes(message_size).strip(self.end_of_line)
return self._recv_bytes(message_size)
except OSError:
return None
def _send_message(self, message, *args):
def send_message(self, message, *args):
packaged = self._package_message(message, *args)
try:
self.sock.send(packaged)
except OSError:
self.stop()
def _process_inbox(self):
while not self.inbox.empty():
message, args = self.inbox.get()
self._send_message(message, *args)
self.inbox.task_done()
def _prepare(self):
self.sock.setblocking(False)
self.sock.settimeout(1)
@ -102,28 +95,39 @@ class SocketClient(Process):
return e
return None
def send(self, message, *args):
self.inbox.put((message, args))
def from_actor(self, pid, event, queue):
self.pid = pid
self.kill_switch = event
self.mailbox = queue
def results(self):
messages = list()
while not self.outbox.empty():
messages.append(self.outbox.get())
return messages
def run(self):
def start(self):
err = self._prepare()
if err is not None:
print(err)
eprint(err)
return err
# print("Ready!")
print("Ready!")
self.handler.initialize()
while self.running:
data = self._get_message()
if data is not None:
self.outbox.put(self.handler(data))
self._process_inbox()
if self.call(self.kill_switch, "is_set"):
self.stop()
break
message = self._get_message()
if message is None:
continue
self.handler.message(message)
def stop(self):
def call(self, source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
return status
def stop(self, done=None):
print("Stopping client!")
self.running = False
self.sock.close()
self.terminate()
self.call(done, "set")
print("Stopped client!")

View File

@ -0,0 +1,36 @@
"""
Socket Server Handlers
======================
"""
from time import sleep
from struct import pack, unpack
class SocketClientHandler:
def __init__(self, client):
self.client = client
# Prepends message size code along with replacing variables in message
def format(self, message, *args):
formatted = None
if len(args) > 0:
formatted = message.format(*args)
else:
formatted = message
# Puts message size at the front of the message
prefixed = pack(">I", len(formatted)) + formatted.encode()
return prefixed
def initialize(self):
self.client.send_message("PING")
def message(self, message):
print(f"DEBUG: {message}")
if message == "PONG":
sleep(1)
self.client.send_message("PING")

View File

@ -1,33 +1,24 @@
"""
net\SocketServer
net/SocketServer
================
The intent behind this script is to provide a simple interface to start up a
TCP socket server in the background, run each of the clients in their own
thread, provide a simple system to handle server events, and provide simple
functions to send/receive messages from the server.
"""
from abots.net.socket_server_handler import SocketServerHandler as handler
from abots.helpers import eprint, noop
from threading import Thread
from struct import pack, unpack
from select import select
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from multiprocessing import Process, Queue, JoinableQueue
from time import time
from ssl import wrap_socket
from traceback import print_exc
# Inherits Process so that server can be run as a daemon
class SocketServer(Process):
# There are many parameters here, but that is so that any constant used can
# be easily tweaked and not remain hard-coded without an easy way to change
def __init__(self, host, port, listeners=5, buffer_size=4096, secure=False,
timeout=0.02, max_message_size=-1, end_of_line="\r\n", heartbeat=60,
inbox=JoinableQueue(), outbox=Queue(), handler=handler, **kwargs):
super().__init__(self)
class SocketServer:
def __init__(self, host, port, handler, listeners=5, buffer_size=4096,
secure=False, *args, **kwargs):
# The connection information for server, the clients will use this to
# connect to the server
@ -41,103 +32,43 @@ class SocketServer(Process):
# Size of buffer pulled by `receive_bytes` when not specified
self.buffer_size = buffer_size
# Timeout for the socket server, in term of seconds
self.timeout = timeout
# If max_message_size is -1, it allows any message size
self.max_message_size = max_message_size
# Which character(s) will terminate a message
self.end_of_line = end_of_line
# Determines if SSL wrapper is used
self.secure = secure
# How often a heartbeat will be sent to a client
self.heartbeat = heartbeat
# Queues used for sending messages and receiving results using `send`
# and `results`
self.inbox = inbox
self.outbox = outbox
# An object that determines how the server reacts to events, will use
# net\SocketServerHandler if none are specified. Use it as a model for
# how other handlers should look / work.
self.handler = handler(self)
self.handler = handler(self, *args, **kwargs)
# Sets up the socket itself
self.sock = socket(AF_INET, SOCK_STREAM)
if self.secure:
# Note: kwargs is used here to specify any SSL parameters desired
self.sock = wrap_socket(self.sock, **kwargs)
# Will later be set to the file descriptor of the socket on the server
# See `_prepare`
self.sock_fd = -1
# Will later be set to the alias used for the socket on the server
# See `_prepare`
self.sock_alias = None
# List of all sockets involved (both client and server)
self.sockets = list()
# Maps metadata about the clients
self.clients = dict()
self.clients = list()
# State variable for if the server is running or not. See `run`.
self.running = True
# Sends all messages queued in inbox
def _process_inbox(self):
while not self.inbox.empty():
# In the format" mode, message, args
data = self.inbox.get()
mode = data[0]
# Send to one socket
if mode == self.handler.send_verb:
client, message, args = data[1:]
self.send_message(message, *args)
# Broadcast to sockets
elif mode == self.handler.broadcast_verb:
message, args = data[1:]
self.broadcast_message(self.sock, message, *args)
self.inbox.task_done()
self.pid = None
self.kill_switch = None
self.mailbox = None
# Logic for the client socket running in its own thread
def _client_thread(self, sock, alias):
last = time()
client = self.clients[alias]
def _client_thread(self, sock):
while self.running:
now = time()
# Run heartbeat after defined time elapses
# This will probably drift somewhat, but this is fine here
if now - last >= self.heartbeat:
# If the client missed last heartbeat, close client
if not client["alive"]:
self.handler.close_client(alias)
break
# The handler must set this to True, this is how a missed
# heartbeat is checked later on
client["alive"] = False
last = now
self.handler.send_heartbeat(alias)
try:
message = self.handler.get_message(sock)
message = self.get_message(sock)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
# In this case, the socket most likely died before the
# heartbeat caught it
self.handler.close_client(alias)
self.handler.close_client(sock)
break
if message is None:
continue
# Each message returns a status code, exactly which code is
# determined by the handler
status = self.handler.message(sock, message)
# Send status and message received to the outbox queue
self.outbox.put((status, message))
self.handler.message(sock, message)
self.close_sock(sock)
return
# Prepares socket server before starting it
def _prepare(self):
@ -148,48 +79,16 @@ class SocketServer(Process):
except (BrokenPipeError, OSError) as e:
# This usually means that the port is already in use
return e
self.sock.settimeout(self.timeout)
self.sock.setblocking(0)
self.sock.settimeout(0)
self.sock.listen(self.listeners)
# Gets the file descriptor of the socket, which is a fallback for a
# unique identifier for the sockets when an alias does not work
self.sock_fd = self.sock.fileno()
sock_address = self.sock.getsockname()
sock_host, sock_port = sock_address
# This may change later, but for now aliases start at @0 and continue
# on from there numerically
self.sock_alias = "@{}".format(len(self.sockets))
self.sockets.append(self.sock)
# Set metadata about the socket server, the fd and alias are both set
# here to make obtaining the other metadata possible with less lookups
self.clients[self.sock_fd] = dict()
self.clients[self.sock_fd]["fd"] = self.sock_fd
self.clients[self.sock_fd]["host"] = sock_host
self.clients[self.sock_fd]["port"] = sock_port
self.clients[self.sock_fd]["sock"] = self.sock
self.clients[self.sock_fd]["alias"] = self.sock_alias
# Here the alias is just a pointer to the same data, or at least acts
# like a pointer given how Python handles dictionaries referencing the
# same data
self.clients[self.sock_alias] = self.clients[self.sock_fd]
return None
# Closes a connected socket and removes it from the server metadata
def close_sock(self, alias):
client = self.clients.get(alias, None)
if client is None:
return None
sock = client["sock"]
fd = client["fd"]
self.sockets.remove(sock)
if fd is not None:
# While the alias is a pointer, you need to delete both
# individually to truly remove the socket from `clients`
del self.clients[fd]
del self.clients[alias]
# Closes a connected socket and removes it from the sockets list
def close_sock(self, sock):
if sock in self.sockets:
self.sockets.remove(sock)
sock.close()
# Receives specified number of bytes from a socket
@ -198,25 +97,14 @@ class SocketServer(Process):
# decode - flag if the returned data is binary-to-string decoded
def receive_bytes(self, sock, get_bytes, decode=True):
data = "".encode()
eol = self.end_of_line.encode()
# Auto-fail if requested bytes is greater than allowed by server
if self.max_message_size > 0 and get_bytes > self.max_message_size:
return None
attempts = 0
while len(data) < get_bytes:
# Automatically break loop to prevent infinite loop
if self.max_message_size > 0:
if attempts > self.max_message_size / self.buffer_size:
break
else:
attempts = attempts + 1
# Allow at least twice the needed iterations to occur exiting loop
if attempts > 2 * (get_bytes / self.buffer_size):
break
else:
# With max_message_size not set, allow at least twice the
# needed iterations to occur before breaking loop
if attempts > 2 * (get_bytes / self.buffer_size):
break
else:
attempts = attempts + 1
attempts = attempts + 1
bufsize = get_bytes - len(data)
# Force bufsize to cap out at buffer_size
@ -227,26 +115,25 @@ class SocketServer(Process):
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
return None
length = len(data) + len(packet)
checker = packet if length < get_bytes else packet[:-2]
# Automatically stop reading message if EOL character sent
if eol in checker:
packet = packet.split(eol)[0] + eol
return data + packet
data = data + packet
return data.decode() if decode else data
# Get message from socket
def get_message(self, sock):
raw_message_size = self.receive_bytes(sock, 4, False)
if raw_message_size is None:
return None
message_size = unpack(">I", raw_message_size)[0]
return self.receive_bytes(sock, message_size)
# Packages a message and sends it to socket
def send_message(self, sock, message, *args):
formatted = self.handler.format_message(message, *args)
formatted = self.handler.format(message, *args)
try:
sock.send(formatted)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
alias = self.get_client_alias_by_sock(sock)
if alias is not None:
self.close_sock(alias)
self.close_sock(sock)
# Like send_message, but sends to all sockets but the server and the sender
def broadcast_message(self, client_sock, client_message, *args):
@ -256,66 +143,22 @@ class SocketServer(Process):
if not_server and not_client:
self.send_message(sock, client_message, *args)
# Obtains file descriptor of the socket
def get_client_fd(self, client_sock):
try:
# First, try the easy route of just pulling it directly
return client_sock.fileno()
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
# Otherwise, the socket is probably dead and we can try finding it
# using brute-force. This sometimes works
for fd, sock in self.sockets:
if sock != client_sock:
continue
return fd
# If the brute-force option does not work, I cannot think of a good
# way to get the fd aside from passing it along everywhere that
# sock is also used, which would be extremely tedios. However, if
# you have the alias you can skip this entirely and just pull the
# fd from `clients` using the alias
return None
# I realize the function name here is long, but for the few times I use
# this it makes it clear exactly what magic is going on
def get_client_alias_by_sock(self, client_sock):
client_fd = self.get_client_fd(client_sock)
if client_fd is None:
return None
return self.clients.get(client_fd, dict()).get("alias", None)
# Externally called function to send a message to a client
def send(self, client, message, *args):
# This queue will be read by `_process_inbox` during the next loop
self.inbox.put((self.handler.send_verb, client, message, args))
# Externally called function to broadcast a message to all clients
def broadcast(self, message, *args):
# This queue will be read by `_process_inbox` during the next loop
self.inbox.put((self.handler.broadcast_verb, message, args))
# Externally called function to iterates over the outbox queue and returns
# them as a list in FIFO order
def results(self, remove_status=False):
messages = list()
while not self.outbox.empty():
result = self.outbox.get()
# For when you do not care about the status codes
if remove_status:
status, message = result
messages.append(message)
else:
messages.append(result)
return messages
def from_actor(self, pid, event, queue):
self.pid = None
self.kill_switch = event
self.mailbox = queue
# The Process function for running the socket server logic loop
def run(self):
def start(self):
err = self._prepare()
if err is not None:
print(err)
eprint(err)
return err
# print("Server ready!")
print("Server ready!")
while self.running:
if self.call(self.kill_switch, "is_set"):
self.stop()
break
try:
# Accept new socket client
client_sock, client_address = self.sock.accept()
@ -325,42 +168,39 @@ class SocketServer(Process):
continue
# Collect the metadata of the client socket
client_name = "{}:{}".format(*client_address)
client_host, client_port = client_address
client_fd = client_sock.fileno()
client_alias = "@{}".format(len(self.sockets))
# Define metadata for client
self.sockets.append(client_sock)
self.clients[client_fd] = dict()
self.clients[client_fd]["fd"] = client_fd
self.clients[client_fd]["host"] = client_host
self.clients[client_fd]["port"] = client_port
self.clients[client_fd]["sock"] = client_sock
self.clients[client_fd]["alias"] = client_alias
self.clients[client_fd]["alive"] = True
# The alias is just a key that points to the same metadata
self.clients[client_alias] = self.clients[client_fd]
# Have handler process new client event
status = self.handler.open_client(client_alias)
# Send status and message received to the outbox queue
self.outbox.put((status, message))
self.handler.open_client(client_host, client_port)
# Spawn new thread for client
args = (client_sock, client_alias)
Thread(target=self._client_thread, args=args).start()
# Process messages waiting in inbox queue
# This is done at the end in case for some weird reason a message
# is sent to the new client in the middle of processing this data
# it eliminates the chance of a race condition.
self._process_inbox()
args = (client_sock,)
client_thread = Thread(target=self._client_thread, args=args)
self.clients.append(client_thread)
client_thread.start()
def call(self, source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
return status
# Stop the socket server
def stop(self):
self.handler.close_server()
def stop(self, done=None):
print("Stopping server!")
self.handler.close()
for sock in self.sockets:
if sock != self.sock:
sock.close()
self.running = False
self.sock.close()
self.sock.close()
for client in self.clients:
client.join()
self.call(done, "set")
print("Stopped server!")

View File

@ -3,164 +3,43 @@
Socket Server Handlers
======================
The socket server was made to be versitile where the handler can be swapped out
in favor of another handler. This handler is the default provided if one is not
passed to get the basic client-server relationship working for either starting
with something simple, testing, and/or providing a template to build other
handlers from.
"""
from time import sleep
from struct import pack, unpack
class SocketServerHandler:
def __init__(self, server):
self.server = server
# These are used when processing inbox messages
self.send_verb = "SEND"
self.broadcast_verb = "CAST"
self.close_verb = "STOP"
# Tells all clients that a node joined the socket server
def open_client(self, alias):
alias = message[5:]
client = self.server.clients.get(alias, None)
if client is None:
return 1
self.server.broadcast_message(client, message)
return 0
def open_client(self, address, port):
pass
# Informs the other clients a client left and closes that client's socket
def close_client(self, alias):
client = self.server.clients.get(alias, None)
if client is None:
return 1
message = "LEFT {}".format(alias)
self.server.broadcast_message(self.server.sock, message)
self.server.close_sock(alias)
return 0
def close_client(self, sock):
pass
# Lets the clients know the server is intentionally closing
def close_server(self):
self.server.broadcast_message(self.server.sock, self.close_verb)
return -1
# Sends a heartbeat to the client to detect if it is still responding
def send_heartbeat(self, alias):
client = self.server.clients.get(alias, None)
if client is None:
return 1
sock = client.get("sock", None)
if sock is None:
return 1
self.server.send_message(sock, "PING")
return 0
def close(self):
pass
# Format a message before sending to client(s)
# Prepends message size code along with replacing variables in message
def format_message(self, message, *args):
def format(self, message, *args):
formatted = None
if len(args) > 0:
formatted = message.format(*args) + self.server.end_of_line
formatted = message.format(*args)
else:
formatted = message + self.server.end_of_line
formatted = message
# Puts message size at the front of the message
prefixed = pack(">I", len(formatted)) + formatted.encode()
return prefixed
# Get message from socket with `format_message` in mind
def get_message(self, sock):
raw_message_size = self.server.receive_bytes(sock, 4, False)
if raw_message_size is None:
return None
message_size = unpack(">I", raw_message_size)[0]
if self.max_message_size > 0 and message_size > self.max_message_size:
return None
eol = self.server.end_of_line
return self.server.receive_bytes(sock, message_size).strip(eol)
# Takes the server object, the client socket, and a message to process
# Each message returns a status code:
# -1 : Going offline
# 0 : Success
# 1 : Failure
# 2 : Invalid
def message(self, sock, message):
# print("DEBUG:", message)
send = self.send_verb + " "
cast = self.broadcast_verb + " "
send_size = len(send)
cast_size = len(cast)
# React to heartbeat from client
if message == "PONG":
client_fd = self.server.get_client_fd(sock)
client = self.server.clients.get(client_fd, dict())
client_alive = client.get("alive", None)
if client_aliave is None:
return 1
elif client_alive:
return 1
# Setting this to True is what tells the server the heartbeat worked
client["alive"] = True
return 0
# Tell the clients to stop before server itself stops
elif message == self.close_verb:
status = self.close_server()
self.server.stop()
return status
# Informs the other clients one left and closes that client's socket
elif message == "QUIT":
client_alias = self.server.get_client_alias_by_sock(sock)
if client_alias is None:
return 1
return self.close_client(client_alias)
# Lists all client alises, puts itself first and the server second
elif message == "LIST":
aliases = list()
client_alias = self.server.get_client_alias_by_sock(sock)
if client_alias is None:
return 1
self.server_alias = self.server.sock_alias
for alias in self.server.clients.keys():
# We need to skip ints since the file descriptors are also keys
if type(alias) is int:
continue
# The server and sending client are skipped to retain ordering
elif alias == self.server.sock_alias:
continue
elif alias == client_alias:
continue
else:
aliases.append(alias)
listed = ",".join([client_alias, self.server_alias] + aliases)
self.server.send_message(sock, listed)
return 0
# Sends a message to the client with the specified client (via an alias)
elif message[:send_size] == send:
params = message[(send_size + 1):].split(" ", 1)
if len(params) < 2:
return 1
alias, response = params
client = self.server.clients.get(alias, dict())
client_sock = client.get("sock", None)
if client_sock is None:
return 1
self.server.send_message(client_sock, response)
return 0
# Broadcasts a message to all other clients
elif message[:cast_size] == cast:
response = message[(cast_size + 1):]
self.server.broadcast_message(sock, response)
return 0
# All other commands are invalid
else:
return 2
print(f"DEBUG: {message}")
if message == "PING":
sleep(1)
self.server.send_message(sock, "PONG")

View File

@ -1,6 +1,6 @@
"""
ui\TUI: Text User Interface
ui/TUI: Text User Interface
===========================
The curses library is one of those things I always wanted to use, but never got

View File

@ -3,29 +3,11 @@
import sys
sys.path.insert(0, "/center/lib")
# from abots.net import SocketServer, SocketClient
# from abots.ui import TUI
# tui = TUI()
# from abots.events import Proxy
# class Counter(object):
# def __init__(self):
# self._value = 0
# def update(self, value):
# self._value = self._value + value
# def get_value(self):
# return self._value
# def __str__(self):
# return "Counter"
# modules = list()
# modules.append(Counter)
# proxy = Proxy(modules)
# ========================
# from abots.events.event import Event
# from abots.events.proc import Proc
@ -67,20 +49,378 @@ sys.path.insert(0, "/center/lib")
# nag_proc.start()
# blab_proc.start()
from abots.events import Processor
# ========================
class Test:
def __init__(self, condition, events):
self.counter = 0
self.condition = condition
self.events = events
# from abots.events import Proxy
def handler(self, event):
print(event)
self.counter = self.counter + 1
# class Counter(object):
# def __init__(self):
# self._value = 0
# def update(self, value):
# self._value = self._value + value
def start(self):
print("Started")
# def get(self):
# return self._value
def stop(self):
print("Stopped")
# def __str__(self):
# return "Counter"
# proxy = Proxy(Counter)
# ========================
# from abots.events import Processor, Every, Shared
# from time import sleep
# class Simple:
# def __init__(self):
# self.counter = 0
# def handler(self, pipe, event):
# self.counter = self.counter + 1
# print(event, self.counter)
# self.pipe.notify("complex", self.counter)
# def start(self):
# print("Started")
# def stop(self):
# print("Stopped")
# class Complex:
# def __init__(self, pipe):
# self.pipe = pipe
# self.events = self.pipe.events
# self.condition = self.pipe.condition
# self.counter = 0
# self.runner = Every(10, self.handler)
# def handler(self):
# self.pipe.notify("simple", self.counter)
# events = Shared(self.events)
# while not self.condition.is_set():
# while not events.empty():
# self.counter = self.counter + 1
# event = events.safe_get()
# print(event, self.counter)
# events.done()
# self.pipe.notify("simple", self.counter)
# self.stop()
# def start(self):
# print("Started")
# self.pipe.listen("complex")
# self.runner.start()
# def stop(self):
# print("Stopped")
# if self.condition.is_set():
# self.condition.set()
# self.runner.stop()
# simple = Simple()
# processor = Processor()
# processor.start()
# processor.register(simple.handler, name="simple", simple=True)
# processor.register(Complex)
# processor.spinup_all()
# sleep(10)
# processor.stop()
# ========================
# from abots.events import Actor
# from abots.helpers import noop
# class Recaman:
# def __init__(self, steps):
# self.current = 0
# self.steps = steps
# self.seq = list()
# self.seq.append(self.current)
# def next(self):
# step = len(self.seq)
# if step == self.steps:
# return None
# backwards = self.current - step
# if backwards > 0 and backwards not in self.seq:
# self.current = backwards
# else:
# self.current = self.current + step
# self.seq.append(self.current)
# return self.current
# class Fibonacci:
# def __init__(self, steps):
# self.steps = steps
# self.seq = [0, 1]
# def next(self):
# step = len(self.seq)
# if step == self.steps:
# return None
# a, b = self.seq[-2:]
# c = a + b
# self.seq.append(c)
# return c
# class Watcher:
# def __init__(self, watching):
# self.seqs = list()
# self.done = list()
# self.names = dict()
# self.ready = False
# for watch_id, name in enumerate(watching):
# self.seqs.append(list())
# self.done.append(False)
# self.names[name] = watch_id
# def receive(self, name, entry):
# if self.ready:
# return None
# watch_id = self.names.get(name, None)
# if watch_id is None:
# return None
# if entry is None:
# self.done[watch_id] = True
# if all(self.done):
# self.ready = True
# return True
# return None
# self.seqs[watch_id].append(entry)
# return False
# class Cruncher:
# def __init__(self):
# self.seqs = None
# self.seq = list()
# self.step = 0
# def load(self, seqs):
# self.seqs = seqs
# def next(self):
# if self.seqs is None:
# return None
# steps = max(map(len, self.seqs))
# if self.step == steps:
# return None
# entries = list()
# for seq in self.seqs:
# entries.append(seq[self.step])
# state = self.average(entries)
# print(state, entries)
# self.seq.append(state)
# return state
# def average(self, entries):
# return sum(entries) / len(entries)
# def compute(self):
# if self.seqs is None:
# return None
# running = True
# while running:
# running = False if self.next() is None else True
# return self.seq
# steps = 15
# r = Recaman(steps)
# f = Fibonacci(steps)
# w = Watcher([r, f])
# c = Cruncher()
# def crunch(actor, message):
# pass
# def collect(actor, message):
# cpid, name, entry = message
# status = w.receive(name, entry)
# if status:
# procs = Manager().list()
# ac = Actor(crunch, procs)
# ar = Actor(noop, procs)
# af = Actor(noop, procs)
# aw = Actor(collect, procs)
# ar.start()
# af.start()
# aw.start()
# ac.start()
# ========================
# from multiprocessing import Process, Queue, Event, Manager
# from time import sleep
# from enum import Enum
"""
class Test(Actor):
def __init__(self, name):
super().__init__(name)
def handler(self, code, header, message):
result = message[::-1]
if code == MailCode.NO_SENDER:
return None
from_pid, to_pid = header
if code == MailCode.DELIVER:
self.deliver(from_pid, to_pid, result)
else:
self.send(to_pid, result)
"""
# def basic(pid, handler):
# print("go")
# kill_switch, mailbox = supervisor.export(pid)
# while not kill_switch.is_set():
# while not mailbox.empty():
# sleep(1)
# handler = dict()
# handler[MailCode.SENDER] = lambda s, r, m: supervisor.send(pid, r, m[::-1])
# handler[MailCode.NO_SENDER] = lambda r, m: supervisor.send(pid, r, m[::-1])
# handler[MailCode.DEFAULT] = lambda s, r, m: supervisor.send(pid, r, m[::-1])
# header, message = supervisor.receive(mailbox, handler)
# print("no")
# supervisor = Actor("__root__")
# pid1, proc1 = supervisor.spawn("a", basic, ("b"))
# pid2, proc2 = supervisor.spawn("b", basic, ("a"))
# proc1.start()
# proc2.start()
# supervisor.deliver("a", "Hello, world!")
# p1.start()
# p2.start()
# sleep(5)
# procs.stop()
# ========================
# import asyncio
# from time import time, monotonic
# begin = monotonic()
# # Awful on purpose
# def fib(n):
# if n <= 1:
# return 1
# return fib(n - 1) + fib(n - 2)
# @asyncio.coroutine
# def coroutine(n):
# print("Processing: Coroutine")
# print("Starting: Part 1")
# result1 = yield from part1(n)
# print("Starting: Part 2")
# result2 = yield from part2(n, result1)
# return (result1, result2)
# @asyncio.coroutine
# def part1(n):
# print("Processing: Part 1")
# return fib(n)
# @asyncio.coroutine
# def part2(n, p1):
# print("Processing: Part 2")
# return p1 + fib(n)
# num = 30
# # print(fib(num))
# # print(fib(num))
# loop = asyncio.get_event_loop()
# try:
# print("Starting")
# coro = coroutine(num)
# print("Looping")
# value = loop.run_until_complete(coro)
# print(f"Returning: {value}")
# finally:
# print("Closing")
# loop.close()
# print(f"{monotonic() - begin:.2f}s")
# def compute(name):
# print(f"Starting {name} compute")
# try:
# while True:
# message = (yield)
# if name in message:
# result = sum(fib(30) for _ in range(len(message)))
# print(f"{name}[compute]: {result}")
# except GeneratorExit:
# print(f"Closing {name} compute")
# def reverse(name):
# print(f"Starting {name} reverse")
# try:
# while True:
# message = (yield)
# if name in message:
# print(f"{name}[reverse]: {message[::-1]}")
# except GeneratorExit:
# print(f"Closing {name} reverse")
# def echo(name):
# print(f"Starting {name} echo")
# rn = reverse(name)
# cn = compute(name)
# next(rn)
# next(cn)
# try:
# while True:
# message = (yield)
# rn.send(message)
# cn.send(message)
# if name in message:
# print(f"{name}[echo]: {message}")
# except GeneratorExit:
# print(f"Closing {name} echo")
# e1 = echo("test")
# next(e1)
# ========================
from abots.helpers import eprint, noop
from abots.events import Supervisor, Envelope
from abots.net import (SocketServer, SocketClient, SocketServerHandler,
SocketClientHandler)
from time import sleep
supervisor = Supervisor()
# (server, *a, **k), format|message|open_client|close_client|close
server = SocketServer("localhost", 10901, handler=SocketServerHandler)
# server_proc = Process(target=server)
server_actor = supervisor.spawn("server", server)
# (client, *a, **k), format|message|initialize
client = SocketClient("localhost", 10901, handler=SocketClientHandler)
# client_proc = Process(target=client.start)
client_actor = supervisor.spawn("client", client)
# server_proc.start()
# client_proc.start()
server_actor.start()
client_actor.start()
sleep(5)
print("Stopping")
supervisor.stop()
print("Stopped")