Compare commits

..

No commits in common. "master" and "master" have entirely different histories.

32 changed files with 615 additions and 1844 deletions

10
.gitignore vendored
View File

@ -1,15 +1,5 @@
# Application-specific
TODO
tags
scratch
scratch.py
*.swp
*.bak.py
*.db
*.log
*.vim
archive/
.vscode/
# ---> Python
# Byte-compiled / optimized / DLL files

62
TODO Executable file
View File

@ -0,0 +1,62 @@
## net
### socket_server
- [x] Add comments
- [x] Abstract default handler out to another file
- [x] Send heartbeat
- [x] Better handle clients disconnecting
- [x] Remove lookup, use clients instead
- [x] Add alias system for clients to replace using fd
- [ ] Add support for cryptography
- [ ] Add data retention verbs to default handler
- [x] Abstract length prefix to message into handler
- [x] Fix comments to be more helpful
### socket_client
- [ ] Add comments
- [ ] Abstract default handler out to another file
- [ ] Respond to heartbeat
- [ ] Respond to alias from server
- [ ] Add support for cryptography
- [ ] Add data retention model to default handler
- [ ] Abstract length prefix to message into handler
### socket_to_websocket
- [ ] Bridges socket_client and websocket_client together
## crypto
- [ ] Add crypto set
- [ ] Add GPG wrapper functions
- [ ] Add Diffie-Hellman functions
- [ ] Add symmetric & asymmetric crypto functions
## helpers
- [x] Add helpers set
- [x] In helpers add JSON encoding / decoding
- [ ] Add helper for generating / reading Twitter's snowflake ID format
- [ ] Add helpers for running shell commands and getting outputs
- [ ] Add wrapper functions to reading / using git repositories
## db
- [ ] Add db set
- [ ] In db add sqlite wrappers to create, modify, and delete tables
- [ ] In db add sqlite wrappers to query, add, edit, and remove entries
## ui
- [x] Add ui set
- [ ] Add framework for curses module
## web
- [ ] Add web set
- [ ] Add websocket server compatible with socket_server handlers
- [ ] Add websocket client compatible with socket_client handlers
- [ ] Add Flask that integrates websocket server and databases from db

View File

@ -1,2 +0,0 @@
from abots.db.sqlite import SQLite
from abots.db.memoru import Memoru

View File

@ -1,9 +0,0 @@
from abots.helpers import infinitedict
class Memoru:
def __init__(self, location=None):
self._location = Path(location) if location is not None else None
self_data = infinitedict()
if self._location is not None:
invalid_location = f"Location does not exist: {self._location}"
assert self._location.exists(), invalid_location

View File

@ -1,193 +0,0 @@
from abots.helpers import eprint
from os import stat, remove
from os.path import dirname, basename, isfile, isdir, join as path_join
from shutil import copyfile
from time import strftime
from operator import itemgetter
from contextlib import closing, contextmanager
from threading import Lock
from sqlite3 import connect, register_converter, PARSE_DECLTYPES
from sqlite3 import ProgrammingError, Error as SQLiteError
register_converter("BOOLEAN", lambda value: bool(int(value)))
class SQLite:
def __init__(self, db_file, lock=Lock()):
self.db_file = db_file
self.path = dirname(self.db_file)
self.filename = basename(self.db_file)
self.connection = None
self.cursor = None
self.memory = False
self.lock = lock
self._load()
def _load(self):
settings = dict()
settings["check_same_thread"] = False
settings["isolation_level"] = "DEFERRED"
settings["detect_types"] = PARSE_DECLTYPES
try:
self.connection = connect(self.db_file, **settings)
self.connection.execute("PRAGMA journal_mode = WAL")
self.connection.execute("PRAGMA foreign_keys = ON")
except SQLiteError as e:
eprint(e)
self.connection = connect(":memory:")
self.memory = True
# self.cursor = self.connection.cursor()
def _unload(self):
self.connection.close()
# Must be called from `backup`
def _remove_old_backups(self, backup_dir, backups):
contents = listdir(backup_dir)
files = [(f, stat(f).st_mtime) for f in contents if isfile(f)]
# Sort by mtime
files.sort(key=itemgetter(1))
remove_files = files[:-backups]
for rfile in remove_files:
remove(rfile)
def _convert(self, values):
convert = lambda value: value if not None else "NULL"
return (convert(value) for value in values)
def stop(self):
self._unload()
def backup(self, backup_dir=None, backups=1):
if backup_dir is None:
backup_dir = self.path
if not isdir(backup_dir):
eprint("ERROR: Backup directory does not exist")
return None
suffix = strftime("-%Y%m%d-%H%M%S")
backup_name = f"{self.filename} {suffix}"
backup_file = path_join(backup_dir, backup_name)
# Lock database
with closing(self.connection.cursor()) as cursor:
cursor.execute("begin immediate")
copyfile(self.db_file, backup_file)
# Unlock database
self.conenction.rollback()
self._remove_old_backups(backup_dir, backups)
def execute(self, executor, values=tuple(), fetch=False, commit=False):
try:
with closing(self.connection.cursor()) as cursor:
args = (executor, values) if "?" in executor else (executor,)
if commit:
with self.transaction():
result = cursor.execute(*args)
else:
result = cursor.execute(*args)
if not fetch:
return True
return cursor.fetchall()
except ProgrammingError as e:
eprint(e)
return False
def fetch(self, executor, values=tuple(), commit=False):
return self.execute(executor, values, fetch=True, commit=commit)
"""
e.g. create_table("test", ["id INTEGER PRIMARY KEY", "value TEXT"])
"""
def create_table(self, name, fields, commit=False):
fields_string = ",".join(fields)
executor = f"CREATE TABLE {name} ({fields_string})"
return self.execute(executor, commit=commit)
"""
e.g. insert("test", {"value": "abc"})
"""
def insert(self, table, insertion, commit=False):
assert isinstance(insertion, dict), "Expected dict"
keys = ",".join(insertion.keys())
places = ",".join(["?"] * len(insertion))
values = tuple(self._convert(insertion.values()))
executor = f"INSERT INTO {table} ({keys}) VALUES({places})"
return self.execute(executor, values, commit=commit)
def update(self, table, modification, where, commit=False):
assert isinstance(modification, dict), "Expected dict"
assert isinstance(where, tuple), "Expected tuple"
assert len(where) == 2, "Expected length of '2'"
assert isinstance(where[0], str), "Expected str"
assert isinstance(where[1], tuple), "Expected tuple"
keys = ",".join(f"{key} = ?" for key in modification.keys())
mod_values = tuple(self._convert(modification.values()))
where_query = where[0]
where_values = where[1]
values = mod_values + where_values
executor = f"UPDATE {table} SET {keys} WHERE {where_query}"
return self.execute(executor, values, commit=commit)
def delete(self, table, where, commit=False):
assert isinstance(where, tuple), "Expected tuple"
assert len(where) == 2, "Expected length of '2'"
assert isinstance(where[0], str), "Expected str"
assert isinstance(where[1], tuple), "Expected tuple"
where_query = where[0]
where_values = where[1]
values = where_values
executor = f"DELETE FROM {table} WHERE {where_query}"
return self.execute(executor, values, commit=commit)
def lookup(self, table, search, where=None):
if type(search) != list:
search = [search]
keys = ",".join(search)
executor = f"SELECT {keys} FROM {table}"
if where:
assert isinstance(where, tuple), "Expected tuple"
assert len(where) == 2, "Expected length of '2'"
assert isinstance(where[0], str), "Expected str"
assert isinstance(where[1], tuple), "Expected tuple"
where_query = where[0]
where_values = where[1]
executor = f"{executor} WHERE {where_query}"
results = self.fetch(executor, where_values)
else:
results = self.fetch(executor)
found = list()
for result in results:
remap = dict()
for index, term in enumerate(search):
remap[term] = result[index]
found.append(remap)
return found
def lookup_one(self, table, search, where=None):
result = self.lookup(table, search, where)
return result[0] if len(result) > 0 else None
def lookup_all(self, table, where=None):
return self.lookup(table, ["*"], where)
def get_all_tables(self):
executor = "SELECT name FROM sqlite_master WHERE type='table'"
tables = self.fetch(executor)
return (table[0] for table in tables)
def drop_table(self, name, commit=False):
executor = f"DROP TABLE IF EXISTS {name}"
return self.execute(executor, commit=commit)
@contextmanager
def transaction(self):
with self.lock:
try:
yield
self.connection.commit()
except:
self.connection.rollback()
raise

View File

@ -1,4 +0,0 @@
from abots.events.every import Every
from abots.events.threads import ThreadPool, ThreadMarshal, acquire_timeout
from abots.events.duodecimer import Duodecimer, Cron
from abots.events.coroutines import CoroEvent

View File

@ -1,35 +0,0 @@
class CoroEvent:
def __init__(self):
self._set = False
self._targets = list()
self._events = self._loop()
self._events.__next__()
def _loop(self):
try:
while True:
action = (yield)
if action == "set":
self._set = True
for info in self._targets:
target, event = info
target.send(event)
elif action == "clear":
self._set = False
except GeneratorExit:
pass
def is_set(self):
return self._set
def set(self):
self._events.send("set")
def clear(self):
self._events.send("clear")
def wait(self, target, event):
self._targets.append((target, event))
def close(self):
self._events.close()

View File

@ -1,237 +0,0 @@
from abots.events import Every, ThreadMarshal
from abots.helpers import cast, utc_now
from queue import Queue, Empty
from collections import defaultdict
from threading import Thread, Event
"""
TODO:
* Add way to export tasks to pickle file on stop?
Duodecimer: "duodec-" meaning twelve and the "-imer" for timer, or twelve timers
Will be a scheduler running 12 threads of the increments of time:
* 1 second
* 30 seconds
* 1 minute
* 5 minutes
* 10 minutes
* 15 minutes
* 30 minutes
* 1 hour
* 1 day
* 1 week
* 1 fortnight (2 weeks)
* cron
The cron being a thread that triggers tasks when a date/time trigger occurs
(with 1 minute precision, but run outside the 1 minute thread). Triggers by:
* Specific times in specific dates
* Specific times every day
* Change in the week day
* Change in the month
* Change in the year
"""
minutes_in_seconds = 60
hour_in_seconds = 60 * minutes_in_seconds
day_in_seconds = 24 * hour_in_seconds
week_in_seconds = 7 * day_in_seconds
fortnight_in_seconds = 2 * week_in_seconds
class Cron:
def __init__(self, repeat, triggers):
self.repeat = repeat
self.triggers = triggers
# self.start = utc_now()
# self.date = self.get_date(self.start)
# self.time = self.get_time(self.start)
# self.weekday = self.get_weekday(self.start)
# self.day = self.get_day(self.start)
# self.month = self.get_month(self.start)
# self.year = self.get_year(self.start)
@staticmethod
def get_date(when=None):
if when is None:
when = utc_now()
return int(f"{when.year}{when.month:02}{when.day:02}")
@staticmethod
def get_time(when=None):
if when is None:
when = utc_now()
return int(f"{when.hour:02}{when.minute:02}")
@staticmethod
def get_weekday(when=None):
if when is None:
when = utc_now()
return when.weekday()
@staticmethod
def get_day(when=None):
if when is None:
when = utc_now()
return when.day
@staticmethod
def get_month(when=None):
if when is None:
when = utc_now()
return when.month
@staticmethod
def get_year(when=None):
if when is None:
when = utc_now()
return when.year
@staticmethod
def next_minutes(minutes):
now = utc_now()
hour = now.hour
minute = now.minute + minutes
if minute >= 60:
hour = 0 if hour == 23 else hour + 1
minute = minute % 60
return int(f"{hour:02}{minute:02}")
@staticmethod
def next_hours(hours):
now = utc_now()
minute = now.minute
hour = now.hour + hours
if hour >= 24:
hour = hour % 24
return int(f"{hour:02}{minute:02}")
class Duodecimer:
def __init__(self):
self.queues = dict()
self.timers = dict()
self._intervals = dict()
self._intervals["5s"] = 5
self._intervals["30s"] = 30
self._intervals["1m"] = minutes_in_seconds
self._intervals["5m"] = 5 * minutes_in_seconds
self._intervals["10m"] = 10 * minutes_in_seconds
self._intervals["15m"] = 15 * minutes_in_seconds
self._intervals["30m"] = 30 * minutes_in_seconds
self._intervals["1h"] = hour_in_seconds
self._intervals["1d"] = day_in_seconds
self._intervals["1w"] = week_in_seconds
self._intervals["1f"] = fortnight_in_seconds
self._load()
def _load(self):
for name, interval in self._intervals.items():
queue = Queue()
timer = Every(interval, self._timer, queue)
self.queues[name] = queue
self.timers[name] = timer
cron_queue = Queue()
cron_timer = Every(minutes_in_seconds, self._cron, cron_queue)
self.queues["cron"] = cron_queue
self.timers["cron"] = cron_timer
def _process_queue(self, state, queue, task_size):
if state is None:
state = list()
while True:
try:
job = queue.get_nowait()
if len(job) != 2:
continue
cancel, task = job
if cancel.is_set() or len(task) != task_size:
# print(f"[worker:{worker_id}]: Task is malformed")
continue
state.append(job)
queue.task_done()
except Empty:
break
return state
def _process_trigger(self, unit, previous, value):
now = cast(Cron, f"get_{unit}", utc_now())
if value == "change" and previous is not None:
return cast(Cron, f"get_{unit}", previous) != now
else:
return now == value
def _timer(self, state, queue):
state = self._process_queue(state, queue, 3)
marshal = ThreadMarshal(len(state), destroy=True)
cancelled = list()
for job in state:
# print(f"[worker:{worker_id}]: Running task")
cancel, task = job
if cancel.is_set():
cancelled.append(job)
continue
marshal.reserve(*task)
for job in cancelled:
state.remove(job)
return state
def _cron(self, state, queue):
if state is None:
state = defaultdict(lambda: None)
state["tasks"] = self._process_queue(state["tasks"], queue, 4)
previous = state["previous"]
removing = list()
for job in state["tasks"]:
cancel, task = job
if cancel.is_set():
removing.append(job)
continue
cron, target, args, kwargs = task
assert isinstance(cron.triggers, dict), "Expected dict"
triggered = list()
for trigger, value in cron.triggers.items():
if trigger == "date":
triggered.append(Cron.get_date() >= value)
elif trigger == "time":
triggered.append(Cron.get_time() >= value)
elif trigger in ["weekday", "day", "month", "year"]:
processed = self._proccess_trigger(trigger, previous, value)
triggered.append(processed)
else:
continue
if all(triggered):
thread = Thread(target=target, args=args, kwargs=kwargs)
thread.start()
if not cron.repeat:
removing.append(task)
for task in removing:
state["tasks"].remove(task)
state["previous"] = utc_now()
return state
def start(self):
for name, timer in self.timers.items():
timer.start()
def stop(self):
for name, timer in self.timers.items():
timer.stop()
def assign(self, timer, method, args=tuple(), kwargs=dict()):
if timer not in self.queues.keys():
return None
task = method, args, kwargs
cancel = Event()
job = cancel, task
self.queues[timer].put(job)
return cancel
def schedule(self, cron, target, args=tuple(), kwargs=dict()):
task = cron, target, args, kwargs
cancel = Event()
job = cancel, task
self.queues["cron"].put(job)
return cancel

View File

@ -1,28 +0,0 @@
from time import monotonic, sleep
from threading import Event, Thread
class Every:
def __init__(self, interval, function, *args, **kwargs):
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.event = Event()
def _wrapper(self, *args, **kwargs):
start = monotonic()
state = None
while not self.event.is_set():
state = self.function(state, *args, **kwargs)
sleep(self.interval - ((monotonic() - start) % self.interval))
def start(self):
args = self.args
kwargs = self.kwargs
thread = Thread(target=self._wrapper, args=args, kwargs=kwargs)
thread.setDaemon(True)
thread.start()
return thread
def stop(self):
self.event.set()

View File

@ -1,315 +0,0 @@
from abots.helpers import eprint, cast
from abots.events import Every
from queue import Queue, Empty, Full
from threading import Thread, Event, Lock, RLock, BoundedSemaphore
from contextlib import contextmanager
"""
TODO:
"""
@contextmanager
def acquire_timeout(lock, timeout=-1):
if timeout is None:
timeout = -1
result = lock.acquire(timeout=timeout)
yield result
if result:
lock.release()
class ThreadPool:
def __init__(self, pool_size, timeout=None):
self.locks = list()
self.events = list()
self.queues = list()
self.workers = list()
data = dict()
for s in range(pool_size):
lock = Lock()
event = Event()
queue = Queue()
args = (s, event, queue, timeout)
worker = Thread(target=self._worker, args=args)
worker.setDaemon(True)
self.locks.append(lock)
self.events.append(event)
self.queues.append(queue)
self.workers.append(worker)
worker.start()
def _exec_controls(self, controls):
for action, methods in controls.items():
for method in methods:
cast(method, action)
def _worker(self, worker_id, event, queue, timeout=None):
while not event.is_set():
try:
# NOTE: This is really spammy, use only in case of emergencies
# print(f"[worker:{worker_id}]: Getting task")
if timeout is not None:
job = queue.get(block=True, timeout=timeout)
else:
job = queue.get_nowait()
if len(job) != 2:
# print(f"[worker:{worker_id}]: Job is malformed")
continue
controls, task = job
if type(controls) != dict:
# print(f"[worker:{worker_id}]: Controls are malformed")
continue
if task is None: # NOTE: Poison pill to kill worker
# print(f"[worker:{worker_id}]: Poisoned")
event.set()
self._exec_controls(controls)
break
if len(task) != 3:
# print(f"[worker:{worker_id}]: Task is malformed")
self._exec_controls(controls)
continue
method, args, kwargs = task
# print(f"[worker:{worker_id}]: Running task")
try:
method(*args, **kwargs)
except Exception as e:
eprint(e)
finally:
# print(f"[worker:{worker_id}]: Task complete")
self._exec_controls(controls)
queue.task_done()
except Empty:
continue
# Clear out the queue
# print(f"[worker:{worker_id}]: Clearing out queue")
while True:
try:
queue.get_nowait()
queue.task_done()
except Empty:
break
def stop(self, done=None, wait=True):
# print(f"Stopping pool")
for event in self.events:
event.set()
if wait:
for worker in self.workers:
worker.join()
cast(done, "set")
# print(f"Stopped pool")
class ThreadMarshal:
def __init__(self, pool_size, monitor=1, cleanup=True, timeout=None,
destroy=False):
self.pool_size = pool_size
self.monitor_interval = monitor
self.cleanup = cleanup
self.timeout = timeout
self.destroy = destroy
self.stopped = Event()
self._manager = Lock() # NOTE: Maybe make this an RLock?
self._load_presets()
self._add_pool()
if self.monitor_interval > 0:
self.monitor = Every(self.monitor_interval, self._monitor)
self.monitor.start()
def _next_pool(self):
self._pool_cursor = (self._pool_cursor + 1) % len(self.pools)
def _next_worker(self):
self._worker_cursor = (self._worker_cursor + 1) % self.pool_size
# NOTE: This is a potential optimization for later
# if self._worker_cursor == 0:
# self._next_pool()
def _load_presets(self):
self._pool_cursor = 0
self._worker_cursor = 0
self.pools = list()
self.locks = list()
self.events = list()
self.queues = list()
self.workers = list()
self.semaphores = list()
def _get_idle_pools(self):
idle_pools = list()
if len(self.pools) == 1:
return idle_pools
for index, queues in enumerate(self.queues):
if index == 0:
continue
queues_empty = [queue.empty() for queue in queues]
idle = all(queues_empty)
if not idle:
continue
print(f"[manager] Pool {index} is idle")
idle_pools.append(self.pools[index])
return idle_pools
def _monitor(self, state):
# print("[manager] Cleaning pools")
if self._manager.locked():
return # Try again later
with self._manager:
idle_pools = self._get_idle_pools()
if self.destroy and len(idle_pools) == len(self.pools):
self.stop()
elif self.cleanup and len(idle_pools) > 0:
cleaning = Event()
self._cleaner(idle_pools, cleaning)
cleaning.wait()
return None
def _cleaner(self, idle_pools, done=None):
print("[manager] Cleaning pools")
for pool in idle_pools:
self._stop_pool(pool, wait=done is not None)
print("[manager] Pools are cleaned")
cast(done, "set")
def _stop_pool(self, pool, done=None, wait=True):
index = self.pools.index(pool)
# print(f"[manager] Stopping pool {index}")
lock = self.locks[index]
event = self.events[index]
queue = self.queues[index]
worker = self.workers[index]
semaphore = self.semaphores[index]
self.pools.remove(pool)
self.locks.remove(lock)
self.events.remove(event)
self.queues.remove(queue)
self.workers.remove(worker)
self.semaphores.remove(semaphore)
pool.stop(done, wait)
# print(f"[manager] Stopped pool {index}")
def _run(self, task, controls, reserve, coordinates):
pool_index, worker_index = coordinates
# print(f"[manager:reserve] Trying worker {self.worker_index}")
lock = self.locks[pool_index][worker_index]
event =self.events[pool_index][worker_index]
queue =self.queues[pool_index][worker_index]
if event.is_set() or lock.locked():
return False
if not reserve:
lock = Lock()
# print(f"[manager:reserve] Using worker {worker_index}")
lock.acquire()
release = controls.get("release", list())
release.append(lock)
job = controls, task
queue.put_nowait(job)
return True
def _add_pool(self):
index = len(self.pools)
# print(f"[manager] Adding pool {index}")
pool = ThreadPool(self.pool_size, self.timeout)
self.pools.append(pool)
self.locks.append(pool.locks)
self.events.append(pool.events)
self.queues.append(pool.queues)
self.workers.append(pool.workers)
self.semaphores.append(BoundedSemaphore(self.pool_size))
return index
def add_pool(self):
with self._manager:
self._add_pool()
def clean(self, done=None):
with self._manager:
idle_pools = self._get_idle_pools()
if self.cleanup and len(idle_pools) > 0:
self._cleaner(idle_pools, done)
def wait(self):
# print("[manager] Waiting on workers in pools")
with self._manager:
for pool in self.pools:
for worker in pool.workers:
worker.join()
def stop(self, wait=True):
# print("[manager] Stopping")
if self.monitor_interval > 0:
self.monitor.event.set()
with self._manager:
dones = list()
threads = list()
for pool in self.pools:
done = Event()
dones.append(done)
thread = Thread(target=pool.stop, args=(done, wait))
thread.setDaemon(True)
threads.append(thread)
thread.start()
if wait:
for thread in threads:
thread.join(self.timeout)
self._load_presets()
cast(self.stopped, "set")
# print("[manager] Stopped")
def run(self, task, done, reserve, coordinates):
if len(task) != 3:
return None
if len(coordinates) != 2:
return None
method, args, kwargs = task
if not callable(method) or type(args) != tuple or type(kwargs) != dict:
return None
pool_index, worker_index = coordinates
if pool_index >= len(self.pools) or worker_index >= self.pool_size:
return None
with self.manager:
semaphore = self.semaphores[pool_index]
if not semaphore.acquire(False):
return None
controls = dict()
controls["set"] = [done]
controls["release"] = [sempahore]
self._run(task, controls, reserve, coordinates)
return True
def reserve(self, method, args=tuple(), kwargs=dict(), reserve=True):
# print("[manager:reserve] Acquiring lock")
done = Event()
with self._manager:
task = method, args, kwargs
if self._pool_cursor >= len(self.pools):
self._next_pool()
pool_found = False
for p in range(len(self.pools)):
# print(f"[manager:reserve] Trying pool {self._pool_cursor}")
semaphore = self.semaphores[self._pool_cursor]
if not semaphore.acquire(False):
self._next_pool()
continue
pool_found = True
break
if not pool_found:
# print(f"[manager:reserve] Pools are full, adding new pool")
index = self._add_pool()
self._pool_cursor = index
self._worker_cursor = 0
semaphore = self.semaphores[self._pool_cursor]
semaphore.acquire()
# print(f"[manager:reserve] Using pool {self._pool_cursor}")
pool = self.pools[self._pool_cursor]
for w in range(self.pool_size):
coordinates = (self._pool_cursor, self._worker_cursor)
controls = dict()
controls["set"] = [done]
controls["release"] = [semaphore]
queued = self._run(task, controls, reserve, coordinates)
self._next_worker()
if not queued:
continue
break
return done

View File

@ -1,8 +1 @@
from abots.helpers.hash import create_hash, md5, sha1, sha256, sha512
from abots.helpers.encode import jots, jsto, b64e, b64d, h2b64, b642h, ctos
from abots.helpers.numbers import clamp, randfloat, isnumeric
from abots.helpers.general import eprint, deduce, noop, cast, get_digit, obtain
from abots.helpers.general import utc_now, utc_now_timestamp
from abots.helpers.logging import Logger
from abots.helpers.black_magic import infinitedict, debugger, singleton, curry
from abots.helpers.black_magic import coroutine, generator, autoload
from abots.helpers.json import jots, jsto

View File

@ -1,78 +0,0 @@
from collections import defaultdict
from functools import wraps
from os.path import dirname, basename, isfile, join
from glob import glob
from importlib import import_module
def infinitedict():
d = lambda: defaultdict(d)
return defaultdict(d)
def debugger(func):
@wraps(func)
def wrapper_debug(*args, **kwargs):
args_repr = [repr(arg) for arg in args]
kwargs_repr = [f"{key}={value!r}" for key, value in kwargs.items()]
signature = ", ".join(args_repr + kwargs_repr)
print(f"[DEBUGGER]: Calling {func.__name__}({signature})")
result = func(*args, **kwargs)
print(f"[DEBUGGER]: {func.__name__!r} returned {result!r}")
return result
return wrapper_debug
def coroutine(func):
@wraps(func)
def wrapper_coroutine(*args, **kwargs):
coro = func(*args, **kwargs)
coro.__next__()
return coro
return wrapper_coroutine
def generator(func):
@wraps(func)
def wrapper_generator(*args, **kwargs):
try:
while True:
yield from func(*args, **kwargs)
except GeneratorExit:
pass
return coroutine(wrapper_generator)
def singleton(cls):
@wraps(cls)
def wrapper_singleton(*args, **kwargs):
if not wrapper_singleton.instance:
wrapper_singleton.instance = cls(*args, **kwargs)
return wrapper_singleton.instance
wrapper_singleton.instance = None
return wrapper_singleton
def curry(func, argc=None):
if argc is None:
argc = func.func_code.co_argcount
@wraps(func)
def wrapper_curry(*args):
if len(args) == argc:
return func(*args)
def curried(*c_args):
return func(*(args + c_args))
return curry(curried, argc - len(args))
return wrapper_curry
# This is used to automatically import the files in this directory
# Essentially, it is a auto-loader for a plugin system
# NOTE: Do as I say, not as I do. You should probably never do this
def autoload(location, context, package, prefix=""):
level = -(len(package.split(".")) + 1)
for module in glob(join(dirname(location), "*.py")):
if not isfile(module) or module.endswith("__init__.py"):
continue
# Equivalent of doing "import <package>.<module>"
plugin = import_module(f".{basename(module)[:level]}", package)
funcs = [f for f in dir(plugin) if f[0] != "_"]
for func in funcs:
# Translates the above to "from <package>.<module> import *"
plugin_func = getattr(plugin, func)
# To reduce conflicts in global, the prefix is used here
# These should not be used directly and just fire off decorators
context[f"{prefix}{func}"] = plugin_func

View File

@ -1,71 +0,0 @@
from json import dump, dumps, loads
from codecs import encode as c_encode, decode as c_decode
from base64 import b64encode, b64decode
from re import compile as regex
# JSON encoder, converts a python object to a string
def jots(data, readable=False, dest=None):
kwargs = dict()
# If readable is set, it pretty prints the JSON to be more human-readable
if readable:
# kwargs["sort_keys"] = True
kwargs["indent"] = 4
kwargs["separators"] = (",", ":")
try:
if dest:
return dump(data, dest, ensure_ascii=False, **kwargs)
return dumps(data, ensure_ascii=False, **kwargs)
except ValueError as e:
return None
# JSON decoder, converts a string to a python object
def jsto(data):
try:
return loads(data)
except ValueError as e:
print(e)
return None
# Encodes data to base64
def b64e(data, altchars=None, url=False, use_bin=False):
if type(data) is str:
data = data.encode("utf-8")
if altchars is None and url:
altchars = "-_"
base64_data = b64encode(hex_data, altchars)
if use_bin:
return base64_data
return base64_data.decode("utf-8")
# Decodes data from base64
def b64d(base64_data, altchars=None, url=False, use_bin=False):
if type(base64_data) is str:
base64_data = base64_data.encode("utf-8")
if altchars is None and url:
altchars = "-_"
data = b64decode(base64_data, altchars)
if use_bin:
return data
return data.decode("utf-8")
# Converts hex to base64 encoding
def h2b64(hex_data, altchars=None, url=False, use_bin=False):
if type(hex_data) is str:
hex_data = c_decode(hex_data, "hex")
return b64e(hex_data, altchars, url, use_bin)
# Decodes base64 and converts to hex
def b642h(base64_data, altchars=None, url=False, use_bin=False):
base64_decoded = b64d(base64_data, altchars, url, use_bin)
hex_data = c_encode(base64_decoded, "hex")
if use_bin:
return hex_data
return hex_data.decode("utf-8")
# str(ClassName) -> "<class '__main__.ClassName'>"
# This function extracts the class name from the str output
def ctos(_class):
pattern = regex(r"[<'>]")
cleaned = pattern.sub("", str(_class))
return cleaned.split("class ", 1)[1].split(".")[-1]

View File

@ -1,35 +0,0 @@
from sys import stderr
from traceback import print_exc
from datetime import datetime, timezone
def eprint(*args, **kwargs):
print(*args, file=stderr, **kwargs)
def noop(*args, **kwargs):
pass
def obtain(source, attribute):
return getattr(source, attribute, None)
def cast(source, method, *args, **kwargs):
source_method = getattr(source, method, noop)
try:
return source_method(*args, **kwargs)
except Exception:
status = print_exc()
eprint(status)
return status
def deduce(reference, attributes):
if type(attributes) is not list:
attributes = [attributes]
return list(map(lambda attr: getattr(reference, attr, None), attributes))
def get_digit(number, position):
return False if number - 10**position < 0 else number // 10*position % 10
def utc_now():
return datetime.utcnow().replace(tzinfo=timezone.utc)
def utc_now_timestamp():
return utc_now().timestamp()

View File

@ -1,64 +0,0 @@
from abots.helpers.general import eprint
from os import urandom
from binascii import hexlify
from hashlib import algorithms_available, pbkdf2_hmac, new as new_hash
def create_hash(algorithm, seed=None, random_bytes=32, use_bin=False):
if algorithm not in algorithms_available:
return None
h = new_hash(algorithm)
if seed is None:
h.update(urandom(random_bytes))
else:
h.update(seed.encode("utf-8"))
if use_bin:
return h.digest()
return h.hexdigest()
def md5(*args, **kwargs):
return create_hash("md5", *args, **kwargs)
def sha1(*args, **kwargs):
return create_hash("sha1", *args, **kwargs)
def sha256(*args, **kwargs):
return create_hash("sha256", *args, **kwargs)
def sha512(*args, **kwargs):
return create_hash("sha512", *args, **kwargs)
def pbkdf2(algo, pswd, salt=None, cycles=100000, key_len=None, use_bin=False):
if algorithm not in algorithms_available:
return None
if type(pswd) is str:
pswd = pswd.encode("utf-8")
if salt is None:
salt = urandom(16)
elif type(salt) is str:
salt = salt.encode("utf-8")
derived_key = pbkdf2_hmac(algo, pswd, salt, cycles, key_len)
if use_bin:
return derived_key, salt
return hexlify(derived_key).decode("utf-8"), salt.decode("utf-8")
# n = increase as general computing performance increases, min=14
# r = increase in case of breakthroughs in memory technology, min=8
# p = increase in case of breakthroughs in CPU technology, min=1
def scrypt(password, salt=None, n=14, r=8, p=1, use_bin=False):
try:
from hashlib import scrypt as _scrypt
except ImportError:
eprint("Error: Missing OpenSSL 1.1+ for hashlib.scrypt")
return None
if type(password) is str:
password = password.encode("utf-8")
if salt is None:
salt = urandom(16)
elif type(salt) is str:
salt = salt.encode("utf-8")
# N = 2**n, allows simple increments to the value to get desired effect
derived_key = _scrypt(password, salt, 2**n, r, p)
if use_bin:
return derived_key, salt
return hexlify(derived_key).decode("utf-8"), salt.decode("utf-8")

22
abots/helpers/json.py Executable file
View File

@ -0,0 +1,22 @@
from json import dumps, loads
# JSON encoder, converts a python object to a string
def jots(self, data, readable=False):
kwargs = dict()
# If readable is set, it pretty prints the JSON to be more human-readable
if readable:
kwargs["sort_keys"] = True
kwargs["indent"] = 4
kwargs["separators"] = (",", ":")
try:
return json.dumps(data, **kwargs)
except ValueError as e:
return None
# JSON decoder, converts a string to a python object
def jsto(self, data):
try:
return json.loads(data)
except ValueError as e:
return None

View File

@ -1,100 +0,0 @@
from logging import getLogger, Formatter, DEBUG, INFO, WARNING, ERROR, CRITICAL
from logging import StreamHandler
from logging.handlers import TimedRotatingFileHandler
from logging.handlers import QueueListener, QueueHandler
from queue import Queue
def get_level(level):
levels = dict()
levels["debug"] = DEBUG
levels["info"] = INFO
levels["warning"] = WARNING
levels["error"] = ERROR
levels["critical"] = CRITICAL
return levels.get(level, DEBUG)
def get_formatter(formatter, date_format):
if formatter is None:
formatter = u"%(asctime)s - %(name)s - %(threadName)s"
formatter = formatter + u" - %(levelname)s - %(message)s"
if date_format is None:
date_format = "%Y-%m-%d %H:%M:%S"
return Formatter(formatter)
class Stream(StreamHandler):
def __init__(self, level=None, formatter=None, date_format=None):
super().__init__()
self.setLevel(get_level(level))
self.setFormatter(get_formatter(formatter, date_format))
class Rotated(TimedRotatingFileHandler):
def __init__(self, filename, when, interval, backupCount,
level=None, formatter=None, date_format=None, **kwargs):
super().__init__(filename, when, interval, backupCount, encoding="utf8",
**kwargs)
self.setLevel(get_level(level))
self.setFormatter(get_formatter(formatter, date_format))
class Logger:
def __init__(self, name, level=None, settings=dict()):
self.name = name
self.core = getLogger(name)
self.core.setLevel(get_level(level))
self._load(settings)
def _load(self, settings):
disabled = settings.get("disabled", list())
handlers = list()
if "stream" not in disabled:
stream = settings.get("stream", dict())
stream_level = stream.get("level", None)
stream_formatter = stream.get("formatter", None)
stream_date_format = stream.get("date_format", None)
stream_args = stream_level, stream_formatter, stream_date_format
stream_handler = Stream(*stream_args)
handlers.append(stream_handler)
if "file" not in disabled:
rotated = settings.get("file", dict())
rotated_filename = rotated.get("filename", f"{self.name}.log")
rotated_when = rotated.get("when", "midnight")
rotated_interval = rotated.get("interval", 1)
rotated_backup_count = rotated.get("backup_count", 5)
rotated_level = rotated.get("level", None)
rotated_formatter = rotated.get("formatter", None)
rotated_date_format = rotated.get("date_format", None)
rotated_args = (rotated_filename, rotated_when, rotated_interval,
rotated_backup_count, rotated_level, rotated_formatter,
rotated_date_format)
rotated_handler = Rotated(*rotated_args)
handlers.append(rotated_handler)
self.queue = Queue()
self.queue_handler = QueueHandler(self.queue)
args = tuple(handlers)
kwargs = dict()
kwargs["respect_handler_level"] = True
self.listener = QueueListener(self.queue, *args, **kwargs)
self.core.addHandler(self.queue_handler)
def start(self):
self.listener.start()
def stop(self):
self.listener.stop()
def debug(self, message):
self.core.debug(message)
def info(self, message):
self.core.info(message)
def warning(self, message):
self.core.warning(message)
def error(self, message):
self.core.error(message)
def critical(self, message):
self.core.critical(message)

View File

@ -1,13 +0,0 @@
from random import random, randint
def clamp(number, low, high):
return max(low, min(number, high))
def randfloat(start, end=None):
if end is None:
end = start
start = 0
return (end - start) * random() + start
def isnumeric(test):
return test.replace(".", "", 1).isdigit()

View File

@ -1,4 +1,4 @@
from abots.net.socket_server import SocketServer
from abots.net.socket_client import SocketClient
from abots.net.prefix_socket_server import PrefixSocketServer
from abots.net.prefix_socket_client import PrefixSocketClient
from abots.net.socket_server_handler import SocketServerHandler
from abots.net.socket_client_handler import SocketClientHandler

View File

@ -1,72 +0,0 @@
"""
Prefix Socket Client
====================
Formatted to read and write prefix headers to socket data
"""
from abots.net import SocketClient
from struct import pack, unpack
from socket import timeout as sock_timeout
class PrefixSocketClient(SocketClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _format_message(self, message, *args):
if len(args) > 0:
formatted = message.format(*args)
else:
formatted = message
try:
encoded = formatted.encode("utf8")
except UnicodeDecodeError:
try:
encoded = formatted.encode("iso-8859-1")
except UnicodeDecodeError:
encoded = "".encode()
size = pack(">I", len(encoded))
packaged = size + encoded
return packaged
def _recv_bytes(self, get_bytes, decode=True):
data = "".encode()
attempts = 0
while len(data) < get_bytes:
# Automatically break loop to prevent infinite loop
# Allow at least twice the needed iterations to occur exiting loop
if attempts > 2 * (get_bytes / self.buffer_size):
break
else:
attempts = attempts + 1
bufsize = get_bytes - len(data)
# Force bufsize to cap out at buffer_size
if bufsize > self.buffer_size:
bufsize = self.buffer_size
try:
packet = self.sock.recv(bufsize)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
if not isinstance(e, sock_timeout):
self._attempt_reconnect()
return None
data = data + packet
return data.decode() if decode else data
def _get_message_size(self):
raw_message_size = self._recv_bytes(4, False)
if not raw_message_size:
return None
message_size = unpack(">I", raw_message_size)[0]
return message_size
def _get_message(self):
message_size = self._get_message_size()
if message_size is None:
return
result = self._recv_bytes(message_size)
self._outbox.put(result)

View File

@ -1,45 +0,0 @@
"""
net/PrefixSocketServer
================
TODO:
* Add logging to broken pipe exceptions
"""
from abots.net import SocketServer
class PrefixSocketServer(SocketServer):
def __init__(self, host, port, listeners=5, buffer_size=4096,
secure=False, timeout=None, daemon=False):
args = host, port, listeners, buffer_size, secure, timeout, daemon
super().__init__(*args)
def _package_message(self, message, *args):
if len(args) > 0:
formatted = message.format(*args)
else:
formatted = message
try:
encoded = formatted.encode("utf8")
except UnicodeDecodeError:
try:
encoded = formatted.encode("iso-8859-1")
except UnicodeDecodeError:
encoded = "".encode()
size = pack(">I", len(encoded))
packaged = size + encoded
return packaged
# Packages a message and sends it to socket
def send_message(self, uuid, message, *args):
sock = self._sock_from_uuid(uuid)
if sock is None:
return None
formatted = self._package_message(message)
try:
sock.send(formatted)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
return

View File

@ -1,3 +1,10 @@
from abots.net.socket_client_handler import SocketClientHandler as handler
from struct import pack, unpack
from multiprocessing import Process, Queue, JoinableQueue
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
"""
Socket Client
@ -7,201 +14,116 @@ Socket Client
"""
from abots.helpers import eprint, cast, jots, jsto, utc_now_timestamp, coroutine
from struct import pack, unpack
from socket import socket, timeout as sock_timeout
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
from threading import Thread, Event
from queue import Queue, Empty
from time import sleep
from random import randint
class SocketClient(Thread):
def __init__(self, host, port, buffer_size=4096, secure=False,
timeout=None, daemon=False, reconnects=10, decode=True):
super().__init__()
self.setDaemon(daemon)
class SocketClient(Process):
def __init__(self, host, port, buffer_size=4096, end_of_line="\r\n",
secure=False, inbox=JoinableQueue(), outbox=Queue(), handler=handler,
**kwargs):
Process.__init__(self)
self.host = host
self.port = port
self.buffer_size = buffer_size
self.end_of_line = end_of_line
self.secure = secure
self.timeout = timeout
self.reconnects = reconnects
self.decode = decode
self.inbox = inbox
self.outbox = outbox
self.handler = handler(self)
self.sock = socket(AF_INET, SOCK_STREAM)
if self.secure:
self.sock = wrap_socket(self.sock)
self.sock = wrap_socket(self.sock, **kwargs)
self.connection = (self.host, self.port)
self.running = True
self.error = None
self.kill_switch = Event()
self.ready = Event()
self.stopped = Event()
self.broken = Event()
self.reconnecting = Event()
def _recv_bytes(self, get_bytes, decode=True):
data = "".encode()
eol = self.end_of_line.encode()
while len(data) < get_bytes:
bufsize = get_bytes - len(data)
if bufsize > self.buffer_size:
bufsize = self.buffer_size
try:
packet = self.sock.recv(bufsize)
except OSError:
return None
length = len(data) + len(packet)
checker = packet if length < get_bytes else packet[:-2]
if eol in checker:
packet = packet.split(eol)[0] + eol
return data + packet
data = data + packet
return data.decode() if decode else data
self._inbox = Queue()
self._events = Queue()
self._outbox = Queue()
self.queues = dict()
self.queues["inbox"] = self._inbox
self.queues["outbox"] = self._outbox
self.queues["events"] = self._events
def _package_message(self, message, *args):
formatted = None
if len(args) > 0:
formatted = message.format(*args) + self.end_of_line
else:
formatted = message + self.end_of_line
packaged = pack(">I", len(formatted)) + formatted.encode()
return packaged
self._bridge = None
def _get_message_size(self):
raw_message_size = self._recv_bytes(4, False)
if not raw_message_size:
return None
message_size = unpack(">I", raw_message_size)[0]
return message_size
def _send_event(self, message):
self._events.put(jots(message))
cast(self._bridge, "send", ("events", message))
def _get_message(self):
message_size = self._get_message_size()
if message_size is None:
return None
try:
return self._recv_bytes(message_size).strip(self.end_of_line)
except OSError:
return None
def _send_message(self, message, *args):
packaged = self._package_message(message, *args)
try:
self.sock.send(packaged)
except OSError:
self.stop()
def _process_inbox(self):
while not self.inbox.empty():
message, args = self.inbox.get()
self._send_message(message, *args)
self.inbox.task_done()
def _prepare(self):
self.sock.setblocking(False)
self.sock.settimeout(1)
try:
self.sock.connect(self.connection)
except Exception as e:
return True, e
return False, None
except OSError as e:
return e
return None
def _obtain(self, queue_name, timeout=False):
queue = self.queues[queue_name]
if timeout is False:
timeout = self.timeout
while True:
try:
#if timeout is not None:
# message = queue.get(timeout=timeout)
#else:
# message = queue.get_nowait()
message = queue.get_nowait()
yield message
#cast(self._bridge, "send", (queue_name, message))
queue.task_done()
except Empty:
break
def send(self, message, *args):
self.inbox.put((message, args))
def _queue_thread(self, inbox, timeout):
while not self.kill_switch.is_set():
for message in self._obtain("inbox", timeout):
if self.broken.is_set():
self.reconnecting.wait()
self.send_message(message)
def _get_message(self):
try:
packet = self.sock.recv(self.buffer_size)
result = packet.decode() if self.decode else packet
self._outbox.put(result)
cast(self._bridge, "send", ("outbox", result))
except (BrokenPipeError, OSError) as e:
pass
def _format_message(self, message, *args):
if len(args) > 0:
formatted = message.format(*args)
else:
formatted = message
return formatted.encode()
def _attempt_reconnect(self):
if self.kill_switch.is_set():
return
print("BROKEN!")
self.reconnecting.clear()
self.broken.set()
event = dict()
event["name"] = "socket-down"
event["data"] = dict()
event["data"]["when"] = utc_now_timestamp()
self._send_event(event)
attempts = 0
while attempts <= self.reconnects or not self.kill_switch.is_set():
# Need to be run to prevent ConnectionAbortedError
self.sock.__init__()
err, report = self._prepare()
if not err:
self.reconnecting.set()
self.broken.clear()
event = dict()
event["name"] = "socket-up"
event["data"] = dict()
event["data"]["when"] = utc_now_timestamp()
self._send_event(event)
return
# Exponential backoff
attempts = attempts + 1
max_delay = (2**attempts) - 1
delay = randint(0, max_delay)
sleep(delay)
self.stop()
def send_message(self, message, *args):
formatted = self._format_message(message, *args)
try:
self.sock.send(formatted)
except (BrokenPipeError, OSError) as e:
if not isinstance(e, sock_timeout):
self._attempt_reconnect()
self._attempt_reconnect()
def recv(self):
yield from self._obtain("outbox")
def check(self):
for message in self.recv():
if message is not None and len(message) > 0:
print(message)
def send(self, message):
self._inbox.put(message)
cast(self._bridge, "send", ("inbox", message))
def connect(self, bridge):
self._bridge = bridge()
@coroutine
def bridge(self, inbox, outbox, events):
try:
while True:
task = (yield)
source, message = task
if source == "inbox":
outbox.puts(message)
elif source == "outbox":
inbox.puts(message)
elif source == "events":
events.put(message)
except GeneratorExit:
pass
def results(self):
messages = list()
while not self.outbox.empty():
messages.append(self.outbox.get())
return messages
def run(self):
err, report = self._prepare()
if err:
eprint(report)
return report
queue_args = self._inbox, self.timeout
Thread(target=self._queue_thread, args=queue_args).start()
print("Client ready!")
self.ready.set()
err = self._prepare()
if err is not None:
print(err)
return err
# print("Ready!")
while self.running:
if self.broken.is_set():
self.reconnecting.wait()
self._get_message()
data = self._get_message()
if data is not None:
self.outbox.put(self.handler(data))
self._process_inbox()
def stop(self, done=None):
# print("Stopping client!")
self.kill_switch.set()
event = dict()
event["name"] = "closing"
event["data"] = dict()
event["data"]["when"] = utc_now_timestamp()
self._send_event(event)
def stop(self):
self.running = False
self.sock.close()
self.stopped.set()
cast(done, "set")
# print("Stopped client!")
self.terminate()

View File

View File

@ -1,29 +1,33 @@
"""
from abots.net.socket_server_handler import SocketServerHandler as handler
net/SocketServer
================
TODO:
* Add logging to broken pipe exceptions
"""
from abots.helpers import eprint, cast, sha256, utc_now_timestamp
from abots.helpers import jsto, jots
from threading import Thread, Event, Lock
from threading import Thread
from struct import pack, unpack
from select import select
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from multiprocessing import Process, Queue, JoinableQueue
from time import time
from ssl import wrap_socket
from queue import Queue, Empty
class SocketServer(Thread):
def __init__(self, host, port, listeners=5, buffer_size=4096,
secure=False, timeout=None, daemon=False):
super().__init__()
self.setDaemon(daemon)
"""
net\SocketServer
================
The intent behind this script is to provide a simple interface to start up a
TCP socket server in the background, run each of the clients in their own
thread, provide a simple system to handle server events, and provide simple
functions to send/receive messages from the server.
"""
# Inherits Process so that server can be run as a daemon
class SocketServer(Process):
# There are many parameters here, but that is so that any constant used can
# be easily tweaked and not remain hard-coded without an easy way to change
def __init__(self, host, port, listeners=5, buffer_size=4096, secure=False,
max_message_size=-1, end_of_line="\r\n", heartbeat=60,
inbox=JoinableQueue(), outbox=Queue(), handler=handler, **kwargs):
Process.__init__(self)
# The connection information for server, the clients will use this to
# connect to the server
@ -37,19 +41,27 @@ class SocketServer(Thread):
# Size of buffer pulled by `receive_bytes` when not specified
self.buffer_size = buffer_size
# If max_message_size is -1, it allows any message size
self.max_message_size = max_message_size
# Which character(s) will terminate a message
self.end_of_line = end_of_line
# Determines if SSL wrapper is used
self.secure = secure
# Timeout set on queues
self.timeout = timeout
# How often a heartbeat will be sent to a client
self.heartbeat = heartbeat
self._inbox = Queue()
self._events = Queue()
self._outbox = Queue()
self.queues = dict()
self.queues["inbox"] = self._inbox
self.queues["outbox"] = self._outbox
self.queues["events"] = self._events
# Queues used for sending messages and receiving results using `send`
# and `results`
self.inbox = inbox
self.outbox = outbox
# An object that determines how the server reacts to events, will use
# net\SocketServerHandler if none are specified. Use it as a model for
# how other handlers should look / work.
self.handler = handler(self)
# Sets up the socket itself
self.sock = socket(AF_INET, SOCK_STREAM)
@ -57,81 +69,72 @@ class SocketServer(Thread):
# Note: kwargs is used here to specify any SSL parameters desired
self.sock = wrap_socket(self.sock, **kwargs)
# Will later be set to the file descriptor of the socket on the server
# See `_prepare`
self.sock_fd = -1
# Will later be set to the alias used for the socket on the server
# See `_prepare`
self.sock_alias = None
# List of all sockets involved (both client and server)
self.sockets = list()
self.clients = list()
self.uuids = dict()
# Maps metadata about the clients
self.clients = dict()
# State variable for if the server is running or not. See `run`.
self.kill_switch = Event()
self.ready = Event()
self.stopped = Event()
self.running = True
def _send_event(self, message):
self._events.put(jots(message))
def _new_client(self, sock, address):
sock.settimeout(60)
client_host, client_port = address
self.sockets.append(sock)
client_kill = Event()
client_uuid = sha256()
self.uuids[client_uuid] = dict()
self.uuids[client_uuid]["sock"] = sock
self.uuids[client_uuid]["kill"] = client_kill
event = dict()
event["name"] = "new_client"
event["data"] = dict()
event["data"]["host"] = client_host
event["data"]["port"] = client_port
event["data"]["uuid"] = client_uuid
self._send_event(event)
client_args = sock, client_kill, client_uuid
client_thread = Thread(target=self._client_thread, args=client_args)
self.clients.append(client_thread)
client_thread.start()
# Sends all messages queued in inbox
def _process_inbox(self):
while not self.inbox.empty():
# In the format" mode, message, args
data = self.inbox.get()
mode = data[0]
# Send to one socket
if mode == self.handler.send_verb:
client, message, args = data[1:]
self.send_message(message, *args)
# Broadcast to sockets
elif mode == self.handler.broadcast_verb:
message, args = data[1:]
self.broadcast_message(self.sock, message, *args)
self.inbox.task_done()
# Logic for the client socket running in its own thread
def _client_thread(self, sock, kill_switch, uuid):
while not kill_switch.is_set():
def _client_thread(self, sock, alias):
last = time()
client = self.clients[alias]
while self.running:
now = time()
# Run heartbeat after defined time elapses
# This will probably drift somewhat, but this is fine here
if now - last >= self.heartbeat:
# If the client missed last heartbeat, close client
if not client["alive"]:
self.handler.close_client(alias)
break
# The handler must set this to True, this is how a missed
# heartbeat is checked later on
client["alive"] = False
last = now
self.handler.send_heartbeat(alias)
try:
message = self.get_message(uuid)
message = self.handler.get_message(sock)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
eprint(e)
# In this case, the socket most likely died before the
# heartbeat caught it
self.handler.close_client(alias)
break
if message is None:
continue
# Send message and uuid of sender to outbox queue
letter = uuid, message
self._outbox.put(letter)
def _queue_thread(self, inbox, timeout):
while not self.kill_switch.is_set():
for letter in self._obtain(inbox, timeout):
if len(letter) != 2:
continue
uuid, message = letter
if uuid == "cast":
self.broadcast_message(uuid, message)
else:
self.send_message(uuid, message)
def _obtain(self, queue, timeout=False):
if timeout is False:
timeout = self.timeout
while True:
try:
if timeout is not None:
yield queue.get(timeout=timeout)
else:
yield queue.get_nowait()
queue.task_done()
except Empty:
break
# Each message returns a status code, exactly which code is
# determined by the handler
status = self.handler.message(sock, message)
# Send status and message received to the outbox queue
self.outbox.put((status, message))
# Prepares socket server before starting it
def _prepare(self):
@ -142,29 +145,48 @@ class SocketServer(Thread):
except (BrokenPipeError, OSError) as e:
# This usually means that the port is already in use
return e
self.sock.setblocking(0)
self.sock.settimeout(0)
self.sock.listen(self.listeners)
# Gets the file descriptor of the socket, which is a fallback for a
# unique identifier for the sockets when an alias does not work
self.sock_fd = self.sock.fileno()
sock_address = self.sock.getsockname()
sock_host, sock_port = sock_address
# This may change later, but for now aliases start at @0 and continue
# on from there numerically
self.sock_alias = "@{}".format(len(self.sockets))
self.sockets.append(self.sock)
# Set metadata about the socket server, the fd and alias are both set
# here to make obtaining the other metadata possible with less lookups
self.clients[self.sock_fd] = dict()
self.clients[self.sock_fd]["fd"] = self.sock_fd
self.clients[self.sock_fd]["host"] = sock_host
self.clients[self.sock_fd]["port"] = sock_port
self.clients[self.sock_fd]["sock"] = self.sock
self.clients[self.sock_fd]["alias"] = self.sock_alias
# Here the alias is just a pointer to the same data, or at least acts
# like a pointer given how Python handles dictionaries referencing the
# same data
self.clients[self.sock_alias] = self.clients[self.sock_fd]
return None
def _sock_from_uuid(self, uuid):
return self.uuids.get(uuid, dict()).get("sock", None)
# Closes a connected socket and removes it from the sockets list
def close_sock(self, uuid):
event = dict()
event["name"] = "close_client"
event["data"] = dict()
event["data"]["uuid"] = uuid
self._send_event(event)
if uuid in list(self.uuids):
sock = self.uuids[uuid]["sock"]
kill = self.uuids[uuid]["kill"]
kill.set()
del self.uuids[uuid]
self.sockets.remove(sock)
sock.close()
# Closes a connected socket and removes it from the server metadata
def close_sock(self, alias):
client = self.clients.get(alias, None)
if client is None:
return None
sock = client["sock"]
fd = client["fd"]
self.sockets.remove(sock)
if fd is not None:
# While the alias is a pointer, you need to delete both
# individually to truly remove the socket from `clients`
del self.clients[fd]
del self.clients[alias]
sock.close()
# Receives specified number of bytes from a socket
# sock - one of the sockets in sockets
@ -172,14 +194,25 @@ class SocketServer(Thread):
# decode - flag if the returned data is binary-to-string decoded
def receive_bytes(self, sock, get_bytes, decode=True):
data = "".encode()
eol = self.end_of_line.encode()
# Auto-fail if requested bytes is greater than allowed by server
if self.max_message_size > 0 and get_bytes > self.max_message_size:
return None
attempts = 0
while len(data) < get_bytes:
# Automatically break loop to prevent infinite loop
# Allow at least twice the needed iterations to occur exiting loop
if attempts > 2 * (get_bytes / self.buffer_size):
break
if self.max_message_size > 0:
if attempts > self.max_message_size / self.buffer_size:
break
else:
attempts = attempts + 1
else:
attempts = attempts + 1
# With max_message_size not set, allow at least twice the
# needed iterations to occur before breaking loop
if attempts > 2 * (get_bytes / self.buffer_size):
break
else:
attempts = attempts + 1
bufsize = get_bytes - len(data)
# Force bufsize to cap out at buffer_size
@ -190,83 +223,140 @@ class SocketServer(Thread):
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
return None
length = len(data) + len(packet)
checker = packet if length < get_bytes else packet[:-2]
# Automatically stop reading message if EOL character sent
if eol in checker:
packet = packet.split(eol)[0] + eol
return data + packet
data = data + packet
return data.decode() if decode else data
# Get message from socket
def get_message(self, uuid):
sock = self._sock_from_uuid(uuid)
if sock is None:
return None
raw_message_size = self.receive_bytes(sock, 4, False)
if raw_message_size is None or len(raw_message_size) != 4:
return None
message_size = unpack(">I", raw_message_size)[0]
return self.receive_bytes(sock, message_size)
# Packages a message and sends it to socket
def send_message(self, uuid, message, *args):
sock = self._sock_from_uuid(uuid)
if sock is None:
return None
def send_message(self, sock, message, *args):
formatted = self.handler.format_message(message, *args)
try:
sock.send(message)
sock.send(formatted)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
return
alias = self.get_client_alias_by_sock(sock)
if alias is not None:
self.close_sock(alias)
# Like send_message, but sends to all sockets but the server and the sender
def broadcast_message(self, client_uuid, message, *args):
for uuid in list(self.uuids):
if uuid != client_uuid:
self.send_message(uuid, message, *args)
def broadcast_message(self, client_sock, client_message, *args):
for sock in self.sockets:
not_server = sock != self.sock
not_client = sock != client_sock
if not_server and not_client:
self.send_message(sock, client_message, *args)
def recv(self):
#return [letter for letter in self._obtain(self._outbox)]
yield from self._obtain(self._outbox)
# Obtains file descriptor of the socket
def get_client_fd(self, client_sock):
try:
# First, try the easy route of just pulling it directly
return client_sock.fileno()
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
# Otherwise, the socket is probably dead and we can try finding it
# using brute-force. This sometimes works
for fd, sock in self.sockets:
if sock != client_sock:
continue
return fd
# If the brute-force option does not work, I cannot think of a good
# way to get the fd aside from passing it along everywhere that
# sock is also used, which would be extremely tedios. However, if
# you have the alias you can skip this entirely and just pull the
# fd from `clients` using the alias
return None
def send(self, uuid, message):
letter = uuid, message
self._inbox.put(letter)
# I realize the function name here is long, but for the few times I use
# this it makes it clear exactly what magic is going on
def get_client_alias_by_sock(self, client_sock):
client_fd = self.get_client_fd(client_sock)
if client_fd is None:
return None
return self.clients.get(client_fd, dict()).get("alias", None)
def check(self):
for letter in self.recv():
if letter is not None and len(letter) > 0:
print(letter)
# Externally called function to send a message to a client
def send(self, client, message, *args):
# This queue will be read by `_process_inbox` during the next loop
self.inbox.put((self.handler.send_verb, client, message, args))
# The function for running the socket server logic loop
# Externally called function to broadcast a message to all clients
def broadcast(self, message, *args):
# This queue will be read by `_process_inbox` during the next loop
self.inbox.put((self.handler.broadcast_verb, message, args))
# Externally called function to iterates over the outbox queue and returns
# them as a list in FIFO order
def results(self, remove_status=False):
messages = list()
while not self.outbox.empty():
result = self.outbox.get()
# For when you do not care about the status codes
if remove_status:
status, message = result
messages.append(message)
else:
messages.append(result)
return messages
# The Process function for running the socket server logic loop
def run(self):
err = self._prepare()
if err is not None:
eprint(err)
print(err)
return err
queue_args = self._inbox, self.timeout
Thread(target=self._queue_thread, args=queue_args).start()
print("Server ready!")
self.ready.set()
while not self.kill_switch.is_set():
# print("Server ready!")
while self.running:
try:
# Accept new socket client
client_sock, client_address = self.sock.accept()
# print(client_address)
self._new_client(client_sock, client_address)
client_sock.settimeout(60)
# The socket can either be broken or no longer open at all
except (BrokenPipeError, OSError) as e:
continue
# Collect the metadata of the client socket
client_name = "{}:{}".format(*client_address)
client_host, client_port = client_address
client_fd = client_sock.fileno()
client_alias = "@{}".format(len(self.sockets))
# Define metadata for client
self.sockets.append(client_sock)
self.clients[client_fd] = dict()
self.clients[client_fd]["fd"] = client_fd
self.clients[client_fd]["host"] = client_host
self.clients[client_fd]["port"] = client_port
self.clients[client_fd]["sock"] = client_sock
self.clients[client_fd]["alias"] = client_alias
self.clients[client_fd]["alive"] = True
# The alias is just a key that points to the same metadata
self.clients[client_alias] = self.clients[client_fd]
# Have handler process new client event
status = self.handler.open_client(client_alias)
# Send status and message received to the outbox queue
self.outbox.put((status, message))
# Spawn new thread for client
args = (client_sock, client_alias)
Thread(target=self._client_thread, args=args).start()
# Process messages waiting in inbox queue
# This is done at the end in case for some weird reason a message
# is sent to the new client in the middle of processing this data
# it eliminates the chance of a race condition.
self._process_inbox()
# Stop the socket server
def stop(self, done=None, join=False):
event = dict()
event["name"] = "closing"
event["data"] = dict()
event["data"]["when"] = utc_now_timestamp()
self._send_event(event)
for uuid in list(self.uuids):
self.close_sock(uuid)
self.kill_switch.set()
self.sock.close()
if join:
for client in self.clients:
client.join(self.timeout)
self.stopped.set()
cast(done, "set")
def stop(self):
self.handler.close_server()
self.running = False
self.sock.close()

View File

@ -0,0 +1,166 @@
"""
Socket Server Handlers
======================
The socket server was made to be versitile where the handler can be swapped out
in favor of another handler. This handler is the default provided if one is not
passed to get the basic client-server relationship working for either starting
with something simple, testing, and/or providing a template to build other
handlers from.
"""
class SocketServerHandler:
def __init__(self, server):
self.server = server
# These are used when processing inbox messages
self.send_verb = "SEND"
self.broadcast_verb = "CAST"
self.close_verb = "STOP"
# Tells all clients that a node joined the socket server
def open_client(self, alias):
alias = message[5:]
client = self.server.clients.get(alias, None)
if client is None:
return 1
self.server.broadcast_message(client, message)
return 0
# Informs the other clients a client left and closes that client's socket
def close_client(self, alias):
client = self.server.clients.get(alias, None)
if client is None:
return 1
message = "LEFT {}".format(alias)
self.server.broadcast_message(self.server.sock, message)
self.server.close_sock(alias)
return 0
# Lets the clients know the server is intentionally closing
def close_server(self):
self.server.broadcast_message(self.server.sock, self.close_verb)
return -1
# Sends a heartbeat to the client to detect if it is still responding
def send_heartbeat(self, alias):
client = self.server.clients.get(alias, None)
if client is None:
return 1
sock = client.get("sock", None)
if sock is None:
return 1
self.server.send_message(sock, "PING")
return 0
# Format a message before sending to client(s)
# Prepends message size code along with replacing variables in message
def format_message(self, message, *args):
formatted = None
if len(args) > 0:
formatted = message.format(*args) + self.server.end_of_line
else:
formatted = message + self.server.end_of_line
# Puts message size at the front of the message
prefixed = pack(">I", len(formatted)) + formatted.encode()
return prefixed
# Get message from socket with `format_message` in mind
def get_message(self, sock):
raw_message_size = self.server.receive_bytes(sock, 4, False)
if raw_message_size is None:
return None
message_size = unpack(">I", raw_message_size)[0]
if self.max_message_size > 0 and message_size > self.max_message_size:
return None
eol = self.server.end_of_line
return self.server.receive_bytes(sock, message_size).strip(eol)
# Takes the server object, the client socket, and a message to process
# Each message returns a status code:
# -1 : Going offline
# 0 : Success
# 1 : Failure
# 2 : Invalid
def message(self, sock, message):
# print("DEBUG:", message)
send = self.send_verb + " "
cast = self.broadcast_verb + " "
send_size = len(send)
cast_size = len(cast)
# React to heartbeat from client
if message == "PONG":
client_fd = self.server.get_client_fd(sock)
client = self.server.clients.get(client_fd, dict())
client_alive = client.get("alive", None)
if client_aliave is None:
return 1
elif client_alive:
return 1
# Setting this to True is what tells the server the heartbeat worked
client["alive"] = True
return 0
# Tell the clients to stop before server itself stops
elif message == self.close_verb:
status = self.close_server()
self.server.stop()
return status
# Informs the other clients one left and closes that client's socket
elif message == "QUIT":
client_alias = self.server.get_client_alias_by_sock(sock)
if client_alias is None:
return 1
return self.close_client(client_alias)
# Lists all client alises, puts itself first and the server second
elif message == "LIST":
aliases = list()
client_alias = self.server.get_client_alias_by_sock(sock)
if client_alias is None:
return 1
self.server_alias = self.server.sock_alias
for alias in self.server.clients.keys():
# We need to skip ints since the file descriptors are also keys
if type(alias) is int:
continue
# The server and sending client are skipped to retain ordering
elif alias == self.server.sock_alias:
continue
elif alias == client_alias:
continue
else:
aliases.append(alias)
listed = ",".join([client_alias, self.server_alias] + aliases)
self.server.send_message(sock, listed)
return 0
# Sends a message to the client with the specified client (via an alias)
elif message[:send_size] == send:
params = message[(send_size + 1):].split(" ", 1)
if len(params) < 2:
return 1
alias, response = params
client = self.server.clients.get(alias, dict())
client_sock = client.get("sock", None)
if client_sock is None:
return 1
self.server.send_message(client_sock, response)
return 0
# Broadcasts a message to all other clients
elif message[:cast_size] == cast:
response = message[(cast_size + 1):]
self.server.broadcast_message(sock, response)
return 0
# All other commands are invalid
else:
return 2

View File

@ -1 +0,0 @@
from abots.tools.pixels import make_pixels, get_random_pixels, show_pixels

View File

@ -1,18 +0,0 @@
from numpy import zeros, uint8
from PIL import Image
from random import randint
def make_pixels(width, height):
return zeros((height, width, 3), dtype=uint8)
def get_random_pixels(width, height):
pixels = make_pixels(width, height)
for y in range(height):
for x in range(width):
pixels[y, x] = [randint(0, 255), randint(0, 255), randint(0, 255)]
return pixels
def show_pixels(pixels):
image = Image.fromarray(pixels)
image.show()

View File

@ -1,6 +1,8 @@
from curses import initscr, noecho, cbreak
"""
ui/TUI: Text User Interface
ui\TUI: Text User Interface
===========================
The curses library is one of those things I always wanted to use, but never got
@ -10,99 +12,7 @@ nice framework I can re-use without needing to pull out a manual for curses to
figure out exactly what everything does (which is what I will be doing during
the duration of writing this script).
I would like to note here that while I usually avoid using `import <module>` in
favor of `from <module> import <component>`, I am making an exception here for
curses due to how excessive and overkill that would be.
"""
import curses
from time import sleep
class TUI:
def __init__(self):
# self.screen = None
self.windows = dict()
self.running = True
# This removes the need for `initialize` and `cleanup`
curses.wrapper(self.run)
# Puts terminal into correct modes and state
def initialize(self):
self.screen = curses.initscr()
# So that input will be read and not just echo'd onto the screen
curses.noecho()
# Will read keys as they are typed without needing Enter pressed
curses.cbreak()
# Converts special keys into curses variables
self.screen.keypad(True)
self.running = True
# Returns terminal back to how it was beforehand
def cleanup(self):
self.running = False
curses.nocbreak()
self.screen.keypad(False)
curses.echo()
curses.endwin()
# Stop loop running in `run`
def stop(self):
self.running = False
# Easily check if getch matches a letter or curses.KEY_* variable
def is_key(self, getch, test):
if type(test) is str:
return getch == ord(test)
return getch == test
# Curses uses the format: height, width, y, x or y, x
def create_window(start_x, start_y, width=None, height=None):
window = dict()
# Curses logic run inside wrapper to initialize/cleanup automatically
def run(self, screen):
win = dict()
win["width"] = 80
win["height"] = 24
win["start_x"] = 10
win["start_y"] = 10
win["window"] = screen.subwin(win["height"], win["width"],
win["start_y"], win["start_x"])
window = win["window"]
# # Curses logic run inside wrapper to initialize/cleanup automatically
# def run(self, screen):
# # Make getch non-blocking and clear the screen
# screen.nodelay(True)
# screen.clear()
# width = 4
# count = 0
# direction = 1
# while self.running:
# # Get user input
# char = screen.getch()
# # Flush the user input and clear the screen
# curses.flushinp()
# screen.clear()
# if self.is_key(char, "q"):
# screen.addstr("Closing...")
# self.stop()
# break
# elif self.is_key(char, curses.KEY_UP):
# width = width + 1
# screen.addstr("#" * count)
# count = count + direction
# if count == width:
# direction = -1
# elif count == 0:
# direction = 1
# sleep(0.1)
pass

View File

@ -2,9 +2,4 @@ pytest
python-gnupg
cryptography
flask
websockets
tensorflow
numpy
matplotlib
textblob
Pillow
websockets

View File

@ -1,17 +1,13 @@
#!env/bin/python3
import sys
from os.path import dirname, realpath
source = "/".join(dirname(realpath(__file__)).split("/")[:-1])
sys.path.insert(0, source)
sys.path.insert(0, "/center/lib")
from abots.net import SocketClient
host = "127.0.0.1"
port = 10701
timeout = 3
client = SocketClient(host, port, timeout=timeout)
client = SocketClient(host, port)
client.daemon = True
client.start()

View File

@ -1,17 +1,13 @@
#!env/bin/python3
import sys
from os.path import dirname, realpath
source = "/".join(dirname(realpath(__file__)).split("/")[:-1])
sys.path.insert(0, source)
sys.path.insert(0, "/center/lib")
from abots.net import SocketServer
host = "127.0.0.1"
port = 10701
timeout = 3
server = SocketServer(host, port, timeout=timeout)
server = SocketServer(host, port)
server.daemon = True
server.start()

View File

@ -1,57 +1,6 @@
#!env/bin/python3
# import sys
# sys.path.insert(0, "/center/lib")
#from abots.net import SocketServer, SocketClient
#
#host = "localhost"
#port = 10401
#timeout = 3
#
#server = SocketServer(host, port, timeout=timeout)
#client = SocketClient(host, port, timeout=timeout)
#
#server.start()
#server.ready.wait()
#client.start()
from abots.events import ThreadMarshal
from abots.helpers import generator
from queue import Queue, Empty
from threading import Thread, Event
def worker(queue, event, timeout):
while not event.is_set():
try:
job = queue.get(timeout=timeout)
except Empty:
continue
assert len(job) == 2, "Expected two parameters"
done, task = job
assert hasattr(done, "set"), "Expected event"
assert len(task) == 3, "Expected three parameters"
method, args, kwargs = task
assert callable(method), "Expected function"
assert isinstance(args, tuple), "Expected tuple"
assert isinstance(kwargs, dict), "Expected dict"
try:
result = method(*args, **kwargs)
except Exception as e:
print(e)
done.set()
queue.task_done()
def manager(queue, event, timeout):
workers = list()
current = 0
while not event.is_set():
try:
task = queue.get(timeout=timeout)
except Empty:
continue
assert len(task) == 2, "Expected two parameters"
action, payload = task
assert isinstance(action, str), "Expected str"
if action == "new":
assert callable(payload), "Expected function"
workers.append((payload))
elif action == "reserve" and len(workers) > 0:
worker = workers[current]