forked from aewens/abots
Compare commits
36 Commits
Author | SHA1 | Date |
---|---|---|
aewens | 52b270f499 | |
aewens | 3d072b5d5d | |
aewens | 3e2745e43f | |
aewens | 5f1f95b791 | |
aewens | 4da6a8d6ad | |
aewens | 20ff476e24 | |
aewens | 02af7e58a3 | |
aewens | 92b1cdb0d3 | |
aewens | ec16929aad | |
aewens | 52489a4ac9 | |
aewens | b9c222c496 | |
aewens | 333545c4e5 | |
aewens | 3d8fe8b327 | |
aewens | 951285c0a4 | |
aewens | ae8ebf9451 | |
aewens | 09c8958c3d | |
aewens | d271de75dc | |
aewens | 075bebc8c8 | |
aewens | 4ddbc1c915 | |
aewens | 4961e119b3 | |
aewens | b0e85842e3 | |
aewens | dff547bd97 | |
aewens | 80d47ecae8 | |
aewens | 665469ffff | |
aewens | f4fc9d2d01 | |
aewens | c462ab1be3 | |
aewens | e55717f601 | |
aewens | 8e29b9f257 | |
aewens | 15eddbcb82 | |
aewens | 2612fa149f | |
aewens | 883bebc20d | |
aewens | 263ee511fa | |
aewens | d25b16bc46 | |
aewens | 42f32fce15 | |
aewens | fec911b36a | |
aewens | 0955c338e7 |
|
@ -1,5 +1,15 @@
|
|||
# Application-specific
|
||||
TODO
|
||||
tags
|
||||
scratch
|
||||
scratch.py
|
||||
*.swp
|
||||
*.bak.py
|
||||
*.db
|
||||
*.log
|
||||
*.vim
|
||||
archive/
|
||||
.vscode/
|
||||
|
||||
# ---> Python
|
||||
# Byte-compiled / optimized / DLL files
|
||||
|
|
62
TODO
62
TODO
|
@ -1,62 +0,0 @@
|
|||
## net
|
||||
|
||||
### socket_server
|
||||
|
||||
- [x] Add comments
|
||||
- [x] Abstract default handler out to another file
|
||||
- [x] Send heartbeat
|
||||
- [x] Better handle clients disconnecting
|
||||
- [x] Remove lookup, use clients instead
|
||||
- [x] Add alias system for clients to replace using fd
|
||||
- [ ] Add support for cryptography
|
||||
- [ ] Add data retention verbs to default handler
|
||||
- [x] Abstract length prefix to message into handler
|
||||
- [x] Fix comments to be more helpful
|
||||
|
||||
### socket_client
|
||||
|
||||
- [ ] Add comments
|
||||
- [ ] Abstract default handler out to another file
|
||||
- [ ] Respond to heartbeat
|
||||
- [ ] Respond to alias from server
|
||||
- [ ] Add support for cryptography
|
||||
- [ ] Add data retention model to default handler
|
||||
- [ ] Abstract length prefix to message into handler
|
||||
|
||||
### socket_to_websocket
|
||||
|
||||
- [ ] Bridges socket_client and websocket_client together
|
||||
|
||||
## crypto
|
||||
|
||||
- [ ] Add crypto set
|
||||
- [ ] Add GPG wrapper functions
|
||||
- [ ] Add Diffie-Hellman functions
|
||||
- [ ] Add symmetric & asymmetric crypto functions
|
||||
|
||||
## helpers
|
||||
|
||||
- [x] Add helpers set
|
||||
- [x] In helpers add JSON encoding / decoding
|
||||
- [ ] Add helper for generating / reading Twitter's snowflake ID format
|
||||
- [ ] Add helpers for running shell commands and getting outputs
|
||||
- [ ] Add wrapper functions to reading / using git repositories
|
||||
|
||||
## db
|
||||
|
||||
- [ ] Add db set
|
||||
- [ ] In db add sqlite wrappers to create, modify, and delete tables
|
||||
- [ ] In db add sqlite wrappers to query, add, edit, and remove entries
|
||||
|
||||
## ui
|
||||
|
||||
- [x] Add ui set
|
||||
- [ ] Add framework for curses module
|
||||
|
||||
|
||||
## web
|
||||
|
||||
- [ ] Add web set
|
||||
- [ ] Add websocket server compatible with socket_server handlers
|
||||
- [ ] Add websocket client compatible with socket_client handlers
|
||||
- [ ] Add Flask that integrates websocket server and databases from db
|
|
@ -0,0 +1,2 @@
|
|||
from abots.db.sqlite import SQLite
|
||||
from abots.db.memoru import Memoru
|
|
@ -0,0 +1,9 @@
|
|||
from abots.helpers import infinitedict
|
||||
|
||||
class Memoru:
|
||||
def __init__(self, location=None):
|
||||
self._location = Path(location) if location is not None else None
|
||||
self_data = infinitedict()
|
||||
if self._location is not None:
|
||||
invalid_location = f"Location does not exist: {self._location}"
|
||||
assert self._location.exists(), invalid_location
|
|
@ -0,0 +1,193 @@
|
|||
from abots.helpers import eprint
|
||||
|
||||
from os import stat, remove
|
||||
from os.path import dirname, basename, isfile, isdir, join as path_join
|
||||
from shutil import copyfile
|
||||
from time import strftime
|
||||
from operator import itemgetter
|
||||
from contextlib import closing, contextmanager
|
||||
from threading import Lock
|
||||
from sqlite3 import connect, register_converter, PARSE_DECLTYPES
|
||||
from sqlite3 import ProgrammingError, Error as SQLiteError
|
||||
|
||||
register_converter("BOOLEAN", lambda value: bool(int(value)))
|
||||
|
||||
class SQLite:
|
||||
def __init__(self, db_file, lock=Lock()):
|
||||
self.db_file = db_file
|
||||
self.path = dirname(self.db_file)
|
||||
self.filename = basename(self.db_file)
|
||||
self.connection = None
|
||||
self.cursor = None
|
||||
self.memory = False
|
||||
self.lock = lock
|
||||
self._load()
|
||||
|
||||
def _load(self):
|
||||
settings = dict()
|
||||
settings["check_same_thread"] = False
|
||||
settings["isolation_level"] = "DEFERRED"
|
||||
settings["detect_types"] = PARSE_DECLTYPES
|
||||
try:
|
||||
self.connection = connect(self.db_file, **settings)
|
||||
self.connection.execute("PRAGMA journal_mode = WAL")
|
||||
self.connection.execute("PRAGMA foreign_keys = ON")
|
||||
except SQLiteError as e:
|
||||
eprint(e)
|
||||
self.connection = connect(":memory:")
|
||||
self.memory = True
|
||||
# self.cursor = self.connection.cursor()
|
||||
|
||||
def _unload(self):
|
||||
self.connection.close()
|
||||
|
||||
# Must be called from `backup`
|
||||
def _remove_old_backups(self, backup_dir, backups):
|
||||
contents = listdir(backup_dir)
|
||||
files = [(f, stat(f).st_mtime) for f in contents if isfile(f)]
|
||||
# Sort by mtime
|
||||
files.sort(key=itemgetter(1))
|
||||
remove_files = files[:-backups]
|
||||
for rfile in remove_files:
|
||||
remove(rfile)
|
||||
|
||||
def _convert(self, values):
|
||||
convert = lambda value: value if not None else "NULL"
|
||||
return (convert(value) for value in values)
|
||||
|
||||
def stop(self):
|
||||
self._unload()
|
||||
|
||||
def backup(self, backup_dir=None, backups=1):
|
||||
if backup_dir is None:
|
||||
backup_dir = self.path
|
||||
if not isdir(backup_dir):
|
||||
eprint("ERROR: Backup directory does not exist")
|
||||
return None
|
||||
suffix = strftime("-%Y%m%d-%H%M%S")
|
||||
backup_name = f"{self.filename} {suffix}"
|
||||
backup_file = path_join(backup_dir, backup_name)
|
||||
# Lock database
|
||||
with closing(self.connection.cursor()) as cursor:
|
||||
cursor.execute("begin immediate")
|
||||
copyfile(self.db_file, backup_file)
|
||||
# Unlock database
|
||||
self.conenction.rollback()
|
||||
self._remove_old_backups(backup_dir, backups)
|
||||
|
||||
def execute(self, executor, values=tuple(), fetch=False, commit=False):
|
||||
try:
|
||||
with closing(self.connection.cursor()) as cursor:
|
||||
args = (executor, values) if "?" in executor else (executor,)
|
||||
if commit:
|
||||
with self.transaction():
|
||||
result = cursor.execute(*args)
|
||||
else:
|
||||
result = cursor.execute(*args)
|
||||
|
||||
if not fetch:
|
||||
return True
|
||||
return cursor.fetchall()
|
||||
except ProgrammingError as e:
|
||||
eprint(e)
|
||||
return False
|
||||
|
||||
def fetch(self, executor, values=tuple(), commit=False):
|
||||
return self.execute(executor, values, fetch=True, commit=commit)
|
||||
|
||||
"""
|
||||
e.g. create_table("test", ["id INTEGER PRIMARY KEY", "value TEXT"])
|
||||
"""
|
||||
def create_table(self, name, fields, commit=False):
|
||||
fields_string = ",".join(fields)
|
||||
executor = f"CREATE TABLE {name} ({fields_string})"
|
||||
return self.execute(executor, commit=commit)
|
||||
|
||||
"""
|
||||
e.g. insert("test", {"value": "abc"})
|
||||
"""
|
||||
def insert(self, table, insertion, commit=False):
|
||||
assert isinstance(insertion, dict), "Expected dict"
|
||||
keys = ",".join(insertion.keys())
|
||||
places = ",".join(["?"] * len(insertion))
|
||||
values = tuple(self._convert(insertion.values()))
|
||||
executor = f"INSERT INTO {table} ({keys}) VALUES({places})"
|
||||
return self.execute(executor, values, commit=commit)
|
||||
|
||||
def update(self, table, modification, where, commit=False):
|
||||
assert isinstance(modification, dict), "Expected dict"
|
||||
assert isinstance(where, tuple), "Expected tuple"
|
||||
assert len(where) == 2, "Expected length of '2'"
|
||||
assert isinstance(where[0], str), "Expected str"
|
||||
assert isinstance(where[1], tuple), "Expected tuple"
|
||||
keys = ",".join(f"{key} = ?" for key in modification.keys())
|
||||
mod_values = tuple(self._convert(modification.values()))
|
||||
where_query = where[0]
|
||||
where_values = where[1]
|
||||
values = mod_values + where_values
|
||||
executor = f"UPDATE {table} SET {keys} WHERE {where_query}"
|
||||
return self.execute(executor, values, commit=commit)
|
||||
|
||||
def delete(self, table, where, commit=False):
|
||||
assert isinstance(where, tuple), "Expected tuple"
|
||||
assert len(where) == 2, "Expected length of '2'"
|
||||
assert isinstance(where[0], str), "Expected str"
|
||||
assert isinstance(where[1], tuple), "Expected tuple"
|
||||
where_query = where[0]
|
||||
where_values = where[1]
|
||||
values = where_values
|
||||
executor = f"DELETE FROM {table} WHERE {where_query}"
|
||||
return self.execute(executor, values, commit=commit)
|
||||
|
||||
def lookup(self, table, search, where=None):
|
||||
if type(search) != list:
|
||||
search = [search]
|
||||
keys = ",".join(search)
|
||||
executor = f"SELECT {keys} FROM {table}"
|
||||
if where:
|
||||
assert isinstance(where, tuple), "Expected tuple"
|
||||
assert len(where) == 2, "Expected length of '2'"
|
||||
assert isinstance(where[0], str), "Expected str"
|
||||
assert isinstance(where[1], tuple), "Expected tuple"
|
||||
where_query = where[0]
|
||||
where_values = where[1]
|
||||
executor = f"{executor} WHERE {where_query}"
|
||||
results = self.fetch(executor, where_values)
|
||||
else:
|
||||
results = self.fetch(executor)
|
||||
|
||||
found = list()
|
||||
for result in results:
|
||||
remap = dict()
|
||||
for index, term in enumerate(search):
|
||||
remap[term] = result[index]
|
||||
|
||||
found.append(remap)
|
||||
|
||||
return found
|
||||
|
||||
def lookup_one(self, table, search, where=None):
|
||||
result = self.lookup(table, search, where)
|
||||
return result[0] if len(result) > 0 else None
|
||||
|
||||
def lookup_all(self, table, where=None):
|
||||
return self.lookup(table, ["*"], where)
|
||||
|
||||
def get_all_tables(self):
|
||||
executor = "SELECT name FROM sqlite_master WHERE type='table'"
|
||||
tables = self.fetch(executor)
|
||||
return (table[0] for table in tables)
|
||||
|
||||
def drop_table(self, name, commit=False):
|
||||
executor = f"DROP TABLE IF EXISTS {name}"
|
||||
return self.execute(executor, commit=commit)
|
||||
|
||||
@contextmanager
|
||||
def transaction(self):
|
||||
with self.lock:
|
||||
try:
|
||||
yield
|
||||
self.connection.commit()
|
||||
except:
|
||||
self.connection.rollback()
|
||||
raise
|
|
@ -0,0 +1,4 @@
|
|||
from abots.events.every import Every
|
||||
from abots.events.threads import ThreadPool, ThreadMarshal, acquire_timeout
|
||||
from abots.events.duodecimer import Duodecimer, Cron
|
||||
from abots.events.coroutines import CoroEvent
|
|
@ -0,0 +1,35 @@
|
|||
class CoroEvent:
|
||||
def __init__(self):
|
||||
self._set = False
|
||||
self._targets = list()
|
||||
self._events = self._loop()
|
||||
self._events.__next__()
|
||||
|
||||
def _loop(self):
|
||||
try:
|
||||
while True:
|
||||
action = (yield)
|
||||
if action == "set":
|
||||
self._set = True
|
||||
for info in self._targets:
|
||||
target, event = info
|
||||
target.send(event)
|
||||
elif action == "clear":
|
||||
self._set = False
|
||||
except GeneratorExit:
|
||||
pass
|
||||
|
||||
def is_set(self):
|
||||
return self._set
|
||||
|
||||
def set(self):
|
||||
self._events.send("set")
|
||||
|
||||
def clear(self):
|
||||
self._events.send("clear")
|
||||
|
||||
def wait(self, target, event):
|
||||
self._targets.append((target, event))
|
||||
|
||||
def close(self):
|
||||
self._events.close()
|
|
@ -0,0 +1,237 @@
|
|||
from abots.events import Every, ThreadMarshal
|
||||
from abots.helpers import cast, utc_now
|
||||
|
||||
from queue import Queue, Empty
|
||||
from collections import defaultdict
|
||||
from threading import Thread, Event
|
||||
|
||||
"""
|
||||
|
||||
TODO:
|
||||
* Add way to export tasks to pickle file on stop?
|
||||
|
||||
Duodecimer: "duodec-" meaning twelve and the "-imer" for timer, or twelve timers
|
||||
|
||||
Will be a scheduler running 12 threads of the increments of time:
|
||||
* 1 second
|
||||
* 30 seconds
|
||||
* 1 minute
|
||||
* 5 minutes
|
||||
* 10 minutes
|
||||
* 15 minutes
|
||||
* 30 minutes
|
||||
* 1 hour
|
||||
* 1 day
|
||||
* 1 week
|
||||
* 1 fortnight (2 weeks)
|
||||
* cron
|
||||
|
||||
The cron being a thread that triggers tasks when a date/time trigger occurs
|
||||
(with 1 minute precision, but run outside the 1 minute thread). Triggers by:
|
||||
* Specific times in specific dates
|
||||
* Specific times every day
|
||||
* Change in the week day
|
||||
* Change in the month
|
||||
* Change in the year
|
||||
|
||||
"""
|
||||
|
||||
minutes_in_seconds = 60
|
||||
hour_in_seconds = 60 * minutes_in_seconds
|
||||
day_in_seconds = 24 * hour_in_seconds
|
||||
week_in_seconds = 7 * day_in_seconds
|
||||
fortnight_in_seconds = 2 * week_in_seconds
|
||||
|
||||
class Cron:
|
||||
def __init__(self, repeat, triggers):
|
||||
self.repeat = repeat
|
||||
self.triggers = triggers
|
||||
# self.start = utc_now()
|
||||
# self.date = self.get_date(self.start)
|
||||
# self.time = self.get_time(self.start)
|
||||
# self.weekday = self.get_weekday(self.start)
|
||||
# self.day = self.get_day(self.start)
|
||||
# self.month = self.get_month(self.start)
|
||||
# self.year = self.get_year(self.start)
|
||||
|
||||
@staticmethod
|
||||
def get_date(when=None):
|
||||
if when is None:
|
||||
when = utc_now()
|
||||
return int(f"{when.year}{when.month:02}{when.day:02}")
|
||||
|
||||
@staticmethod
|
||||
def get_time(when=None):
|
||||
if when is None:
|
||||
when = utc_now()
|
||||
return int(f"{when.hour:02}{when.minute:02}")
|
||||
|
||||
@staticmethod
|
||||
def get_weekday(when=None):
|
||||
if when is None:
|
||||
when = utc_now()
|
||||
return when.weekday()
|
||||
|
||||
@staticmethod
|
||||
def get_day(when=None):
|
||||
if when is None:
|
||||
when = utc_now()
|
||||
return when.day
|
||||
|
||||
@staticmethod
|
||||
def get_month(when=None):
|
||||
if when is None:
|
||||
when = utc_now()
|
||||
return when.month
|
||||
|
||||
@staticmethod
|
||||
def get_year(when=None):
|
||||
if when is None:
|
||||
when = utc_now()
|
||||
return when.year
|
||||
|
||||
@staticmethod
|
||||
def next_minutes(minutes):
|
||||
now = utc_now()
|
||||
hour = now.hour
|
||||
minute = now.minute + minutes
|
||||
if minute >= 60:
|
||||
hour = 0 if hour == 23 else hour + 1
|
||||
minute = minute % 60
|
||||
return int(f"{hour:02}{minute:02}")
|
||||
|
||||
@staticmethod
|
||||
def next_hours(hours):
|
||||
now = utc_now()
|
||||
minute = now.minute
|
||||
hour = now.hour + hours
|
||||
if hour >= 24:
|
||||
hour = hour % 24
|
||||
return int(f"{hour:02}{minute:02}")
|
||||
|
||||
class Duodecimer:
|
||||
def __init__(self):
|
||||
self.queues = dict()
|
||||
self.timers = dict()
|
||||
self._intervals = dict()
|
||||
self._intervals["5s"] = 5
|
||||
self._intervals["30s"] = 30
|
||||
self._intervals["1m"] = minutes_in_seconds
|
||||
self._intervals["5m"] = 5 * minutes_in_seconds
|
||||
self._intervals["10m"] = 10 * minutes_in_seconds
|
||||
self._intervals["15m"] = 15 * minutes_in_seconds
|
||||
self._intervals["30m"] = 30 * minutes_in_seconds
|
||||
self._intervals["1h"] = hour_in_seconds
|
||||
self._intervals["1d"] = day_in_seconds
|
||||
self._intervals["1w"] = week_in_seconds
|
||||
self._intervals["1f"] = fortnight_in_seconds
|
||||
self._load()
|
||||
|
||||
def _load(self):
|
||||
for name, interval in self._intervals.items():
|
||||
queue = Queue()
|
||||
timer = Every(interval, self._timer, queue)
|
||||
self.queues[name] = queue
|
||||
self.timers[name] = timer
|
||||
cron_queue = Queue()
|
||||
cron_timer = Every(minutes_in_seconds, self._cron, cron_queue)
|
||||
self.queues["cron"] = cron_queue
|
||||
self.timers["cron"] = cron_timer
|
||||
|
||||
def _process_queue(self, state, queue, task_size):
|
||||
if state is None:
|
||||
state = list()
|
||||
while True:
|
||||
try:
|
||||
job = queue.get_nowait()
|
||||
if len(job) != 2:
|
||||
continue
|
||||
cancel, task = job
|
||||
if cancel.is_set() or len(task) != task_size:
|
||||
# print(f"[worker:{worker_id}]: Task is malformed")
|
||||
continue
|
||||
state.append(job)
|
||||
queue.task_done()
|
||||
except Empty:
|
||||
break
|
||||
return state
|
||||
|
||||
def _process_trigger(self, unit, previous, value):
|
||||
now = cast(Cron, f"get_{unit}", utc_now())
|
||||
if value == "change" and previous is not None:
|
||||
return cast(Cron, f"get_{unit}", previous) != now
|
||||
else:
|
||||
return now == value
|
||||
|
||||
def _timer(self, state, queue):
|
||||
state = self._process_queue(state, queue, 3)
|
||||
marshal = ThreadMarshal(len(state), destroy=True)
|
||||
cancelled = list()
|
||||
for job in state:
|
||||
# print(f"[worker:{worker_id}]: Running task")
|
||||
cancel, task = job
|
||||
if cancel.is_set():
|
||||
cancelled.append(job)
|
||||
continue
|
||||
marshal.reserve(*task)
|
||||
for job in cancelled:
|
||||
state.remove(job)
|
||||
return state
|
||||
|
||||
def _cron(self, state, queue):
|
||||
if state is None:
|
||||
state = defaultdict(lambda: None)
|
||||
state["tasks"] = self._process_queue(state["tasks"], queue, 4)
|
||||
previous = state["previous"]
|
||||
removing = list()
|
||||
for job in state["tasks"]:
|
||||
cancel, task = job
|
||||
if cancel.is_set():
|
||||
removing.append(job)
|
||||
continue
|
||||
cron, target, args, kwargs = task
|
||||
assert isinstance(cron.triggers, dict), "Expected dict"
|
||||
triggered = list()
|
||||
for trigger, value in cron.triggers.items():
|
||||
if trigger == "date":
|
||||
triggered.append(Cron.get_date() >= value)
|
||||
elif trigger == "time":
|
||||
triggered.append(Cron.get_time() >= value)
|
||||
elif trigger in ["weekday", "day", "month", "year"]:
|
||||
processed = self._proccess_trigger(trigger, previous, value)
|
||||
triggered.append(processed)
|
||||
else:
|
||||
continue
|
||||
if all(triggered):
|
||||
thread = Thread(target=target, args=args, kwargs=kwargs)
|
||||
thread.start()
|
||||
if not cron.repeat:
|
||||
removing.append(task)
|
||||
for task in removing:
|
||||
state["tasks"].remove(task)
|
||||
state["previous"] = utc_now()
|
||||
return state
|
||||
|
||||
def start(self):
|
||||
for name, timer in self.timers.items():
|
||||
timer.start()
|
||||
|
||||
def stop(self):
|
||||
for name, timer in self.timers.items():
|
||||
timer.stop()
|
||||
|
||||
def assign(self, timer, method, args=tuple(), kwargs=dict()):
|
||||
if timer not in self.queues.keys():
|
||||
return None
|
||||
task = method, args, kwargs
|
||||
cancel = Event()
|
||||
job = cancel, task
|
||||
self.queues[timer].put(job)
|
||||
return cancel
|
||||
|
||||
def schedule(self, cron, target, args=tuple(), kwargs=dict()):
|
||||
task = cron, target, args, kwargs
|
||||
cancel = Event()
|
||||
job = cancel, task
|
||||
self.queues["cron"].put(job)
|
||||
return cancel
|
|
@ -0,0 +1,28 @@
|
|||
from time import monotonic, sleep
|
||||
from threading import Event, Thread
|
||||
|
||||
class Every:
|
||||
def __init__(self, interval, function, *args, **kwargs):
|
||||
self.interval = interval
|
||||
self.function = function
|
||||
self.args = args
|
||||
self.kwargs = kwargs
|
||||
self.event = Event()
|
||||
|
||||
def _wrapper(self, *args, **kwargs):
|
||||
start = monotonic()
|
||||
state = None
|
||||
while not self.event.is_set():
|
||||
state = self.function(state, *args, **kwargs)
|
||||
sleep(self.interval - ((monotonic() - start) % self.interval))
|
||||
|
||||
def start(self):
|
||||
args = self.args
|
||||
kwargs = self.kwargs
|
||||
thread = Thread(target=self._wrapper, args=args, kwargs=kwargs)
|
||||
thread.setDaemon(True)
|
||||
thread.start()
|
||||
return thread
|
||||
|
||||
def stop(self):
|
||||
self.event.set()
|
|
@ -0,0 +1,315 @@
|
|||
from abots.helpers import eprint, cast
|
||||
from abots.events import Every
|
||||
|
||||
from queue import Queue, Empty, Full
|
||||
from threading import Thread, Event, Lock, RLock, BoundedSemaphore
|
||||
from contextlib import contextmanager
|
||||
|
||||
"""
|
||||
TODO:
|
||||
"""
|
||||
|
||||
@contextmanager
|
||||
def acquire_timeout(lock, timeout=-1):
|
||||
if timeout is None:
|
||||
timeout = -1
|
||||
result = lock.acquire(timeout=timeout)
|
||||
yield result
|
||||
if result:
|
||||
lock.release()
|
||||
|
||||
class ThreadPool:
|
||||
def __init__(self, pool_size, timeout=None):
|
||||
self.locks = list()
|
||||
self.events = list()
|
||||
self.queues = list()
|
||||
self.workers = list()
|
||||
data = dict()
|
||||
for s in range(pool_size):
|
||||
lock = Lock()
|
||||
event = Event()
|
||||
queue = Queue()
|
||||
args = (s, event, queue, timeout)
|
||||
worker = Thread(target=self._worker, args=args)
|
||||
worker.setDaemon(True)
|
||||
self.locks.append(lock)
|
||||
self.events.append(event)
|
||||
self.queues.append(queue)
|
||||
self.workers.append(worker)
|
||||
worker.start()
|
||||
|
||||
def _exec_controls(self, controls):
|
||||
for action, methods in controls.items():
|
||||
for method in methods:
|
||||
cast(method, action)
|
||||
|
||||
def _worker(self, worker_id, event, queue, timeout=None):
|
||||
while not event.is_set():
|
||||
try:
|
||||
# NOTE: This is really spammy, use only in case of emergencies
|
||||
# print(f"[worker:{worker_id}]: Getting task")
|
||||
if timeout is not None:
|
||||
job = queue.get(block=True, timeout=timeout)
|
||||
else:
|
||||
job = queue.get_nowait()
|
||||
if len(job) != 2:
|
||||
# print(f"[worker:{worker_id}]: Job is malformed")
|
||||
continue
|
||||
controls, task = job
|
||||
if type(controls) != dict:
|
||||
# print(f"[worker:{worker_id}]: Controls are malformed")
|
||||
continue
|
||||
if task is None: # NOTE: Poison pill to kill worker
|
||||
# print(f"[worker:{worker_id}]: Poisoned")
|
||||
event.set()
|
||||
self._exec_controls(controls)
|
||||
break
|
||||
if len(task) != 3:
|
||||
# print(f"[worker:{worker_id}]: Task is malformed")
|
||||
self._exec_controls(controls)
|
||||
continue
|
||||
method, args, kwargs = task
|
||||
# print(f"[worker:{worker_id}]: Running task")
|
||||
try:
|
||||
method(*args, **kwargs)
|
||||
except Exception as e:
|
||||
eprint(e)
|
||||
finally:
|
||||
# print(f"[worker:{worker_id}]: Task complete")
|
||||
self._exec_controls(controls)
|
||||
queue.task_done()
|
||||
except Empty:
|
||||
continue
|
||||
# Clear out the queue
|
||||
# print(f"[worker:{worker_id}]: Clearing out queue")
|
||||
while True:
|
||||
try:
|
||||
queue.get_nowait()
|
||||
queue.task_done()
|
||||
except Empty:
|
||||
break
|
||||
|
||||
def stop(self, done=None, wait=True):
|
||||
# print(f"Stopping pool")
|
||||
for event in self.events:
|
||||
event.set()
|
||||
if wait:
|
||||
for worker in self.workers:
|
||||
worker.join()
|
||||
cast(done, "set")
|
||||
# print(f"Stopped pool")
|
||||
|
||||
class ThreadMarshal:
|
||||
def __init__(self, pool_size, monitor=1, cleanup=True, timeout=None,
|
||||
destroy=False):
|
||||
self.pool_size = pool_size
|
||||
self.monitor_interval = monitor
|
||||
self.cleanup = cleanup
|
||||
self.timeout = timeout
|
||||
self.destroy = destroy
|
||||
self.stopped = Event()
|
||||
self._manager = Lock() # NOTE: Maybe make this an RLock?
|
||||
self._load_presets()
|
||||
self._add_pool()
|
||||
|
||||
if self.monitor_interval > 0:
|
||||
self.monitor = Every(self.monitor_interval, self._monitor)
|
||||
self.monitor.start()
|
||||
|
||||
def _next_pool(self):
|
||||
self._pool_cursor = (self._pool_cursor + 1) % len(self.pools)
|
||||
|
||||
def _next_worker(self):
|
||||
self._worker_cursor = (self._worker_cursor + 1) % self.pool_size
|
||||
# NOTE: This is a potential optimization for later
|
||||
# if self._worker_cursor == 0:
|
||||
# self._next_pool()
|
||||
|
||||
def _load_presets(self):
|
||||
self._pool_cursor = 0
|
||||
self._worker_cursor = 0
|
||||
self.pools = list()
|
||||
self.locks = list()
|
||||
self.events = list()
|
||||
self.queues = list()
|
||||
self.workers = list()
|
||||
self.semaphores = list()
|
||||
|
||||
def _get_idle_pools(self):
|
||||
idle_pools = list()
|
||||
if len(self.pools) == 1:
|
||||
return idle_pools
|
||||
for index, queues in enumerate(self.queues):
|
||||
if index == 0:
|
||||
continue
|
||||
queues_empty = [queue.empty() for queue in queues]
|
||||
idle = all(queues_empty)
|
||||
if not idle:
|
||||
continue
|
||||
print(f"[manager] Pool {index} is idle")
|
||||
idle_pools.append(self.pools[index])
|
||||
return idle_pools
|
||||
|
||||
def _monitor(self, state):
|
||||
# print("[manager] Cleaning pools")
|
||||
if self._manager.locked():
|
||||
return # Try again later
|
||||
with self._manager:
|
||||
idle_pools = self._get_idle_pools()
|
||||
if self.destroy and len(idle_pools) == len(self.pools):
|
||||
self.stop()
|
||||
elif self.cleanup and len(idle_pools) > 0:
|
||||
cleaning = Event()
|
||||
self._cleaner(idle_pools, cleaning)
|
||||
cleaning.wait()
|
||||
return None
|
||||
|
||||
def _cleaner(self, idle_pools, done=None):
|
||||
print("[manager] Cleaning pools")
|
||||
for pool in idle_pools:
|
||||
self._stop_pool(pool, wait=done is not None)
|
||||
print("[manager] Pools are cleaned")
|
||||
cast(done, "set")
|
||||
|
||||
def _stop_pool(self, pool, done=None, wait=True):
|
||||
index = self.pools.index(pool)
|
||||
# print(f"[manager] Stopping pool {index}")
|
||||
lock = self.locks[index]
|
||||
event = self.events[index]
|
||||
queue = self.queues[index]
|
||||
worker = self.workers[index]
|
||||
semaphore = self.semaphores[index]
|
||||
self.pools.remove(pool)
|
||||
self.locks.remove(lock)
|
||||
self.events.remove(event)
|
||||
self.queues.remove(queue)
|
||||
self.workers.remove(worker)
|
||||
self.semaphores.remove(semaphore)
|
||||
pool.stop(done, wait)
|
||||
# print(f"[manager] Stopped pool {index}")
|
||||
|
||||
def _run(self, task, controls, reserve, coordinates):
|
||||
pool_index, worker_index = coordinates
|
||||
# print(f"[manager:reserve] Trying worker {self.worker_index}")
|
||||
lock = self.locks[pool_index][worker_index]
|
||||
event =self.events[pool_index][worker_index]
|
||||
queue =self.queues[pool_index][worker_index]
|
||||
if event.is_set() or lock.locked():
|
||||
return False
|
||||
if not reserve:
|
||||
lock = Lock()
|
||||
# print(f"[manager:reserve] Using worker {worker_index}")
|
||||
lock.acquire()
|
||||
release = controls.get("release", list())
|
||||
release.append(lock)
|
||||
job = controls, task
|
||||
queue.put_nowait(job)
|
||||
return True
|
||||
|
||||
def _add_pool(self):
|
||||
index = len(self.pools)
|
||||
# print(f"[manager] Adding pool {index}")
|
||||
pool = ThreadPool(self.pool_size, self.timeout)
|
||||
self.pools.append(pool)
|
||||
self.locks.append(pool.locks)
|
||||
self.events.append(pool.events)
|
||||
self.queues.append(pool.queues)
|
||||
self.workers.append(pool.workers)
|
||||
self.semaphores.append(BoundedSemaphore(self.pool_size))
|
||||
return index
|
||||
|
||||
def add_pool(self):
|
||||
with self._manager:
|
||||
self._add_pool()
|
||||
|
||||
def clean(self, done=None):
|
||||
with self._manager:
|
||||
idle_pools = self._get_idle_pools()
|
||||
if self.cleanup and len(idle_pools) > 0:
|
||||
self._cleaner(idle_pools, done)
|
||||
|
||||
def wait(self):
|
||||
# print("[manager] Waiting on workers in pools")
|
||||
with self._manager:
|
||||
for pool in self.pools:
|
||||
for worker in pool.workers:
|
||||
worker.join()
|
||||
|
||||
def stop(self, wait=True):
|
||||
# print("[manager] Stopping")
|
||||
if self.monitor_interval > 0:
|
||||
self.monitor.event.set()
|
||||
with self._manager:
|
||||
dones = list()
|
||||
threads = list()
|
||||
for pool in self.pools:
|
||||
done = Event()
|
||||
dones.append(done)
|
||||
thread = Thread(target=pool.stop, args=(done, wait))
|
||||
thread.setDaemon(True)
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
if wait:
|
||||
for thread in threads:
|
||||
thread.join(self.timeout)
|
||||
self._load_presets()
|
||||
cast(self.stopped, "set")
|
||||
# print("[manager] Stopped")
|
||||
|
||||
def run(self, task, done, reserve, coordinates):
|
||||
if len(task) != 3:
|
||||
return None
|
||||
if len(coordinates) != 2:
|
||||
return None
|
||||
method, args, kwargs = task
|
||||
if not callable(method) or type(args) != tuple or type(kwargs) != dict:
|
||||
return None
|
||||
pool_index, worker_index = coordinates
|
||||
if pool_index >= len(self.pools) or worker_index >= self.pool_size:
|
||||
return None
|
||||
with self.manager:
|
||||
semaphore = self.semaphores[pool_index]
|
||||
if not semaphore.acquire(False):
|
||||
return None
|
||||
controls = dict()
|
||||
controls["set"] = [done]
|
||||
controls["release"] = [sempahore]
|
||||
self._run(task, controls, reserve, coordinates)
|
||||
return True
|
||||
|
||||
def reserve(self, method, args=tuple(), kwargs=dict(), reserve=True):
|
||||
# print("[manager:reserve] Acquiring lock")
|
||||
done = Event()
|
||||
with self._manager:
|
||||
task = method, args, kwargs
|
||||
if self._pool_cursor >= len(self.pools):
|
||||
self._next_pool()
|
||||
pool_found = False
|
||||
for p in range(len(self.pools)):
|
||||
# print(f"[manager:reserve] Trying pool {self._pool_cursor}")
|
||||
semaphore = self.semaphores[self._pool_cursor]
|
||||
if not semaphore.acquire(False):
|
||||
self._next_pool()
|
||||
continue
|
||||
pool_found = True
|
||||
break
|
||||
if not pool_found:
|
||||
# print(f"[manager:reserve] Pools are full, adding new pool")
|
||||
index = self._add_pool()
|
||||
self._pool_cursor = index
|
||||
self._worker_cursor = 0
|
||||
semaphore = self.semaphores[self._pool_cursor]
|
||||
semaphore.acquire()
|
||||
# print(f"[manager:reserve] Using pool {self._pool_cursor}")
|
||||
pool = self.pools[self._pool_cursor]
|
||||
for w in range(self.pool_size):
|
||||
coordinates = (self._pool_cursor, self._worker_cursor)
|
||||
controls = dict()
|
||||
controls["set"] = [done]
|
||||
controls["release"] = [semaphore]
|
||||
queued = self._run(task, controls, reserve, coordinates)
|
||||
self._next_worker()
|
||||
if not queued:
|
||||
continue
|
||||
break
|
||||
return done
|
|
@ -1 +1,8 @@
|
|||
from abots.helpers.json import jots, jsto
|
||||
from abots.helpers.hash import create_hash, md5, sha1, sha256, sha512
|
||||
from abots.helpers.encode import jots, jsto, b64e, b64d, h2b64, b642h, ctos
|
||||
from abots.helpers.numbers import clamp, randfloat, isnumeric
|
||||
from abots.helpers.general import eprint, deduce, noop, cast, get_digit, obtain
|
||||
from abots.helpers.general import utc_now, utc_now_timestamp
|
||||
from abots.helpers.logging import Logger
|
||||
from abots.helpers.black_magic import infinitedict, debugger, singleton, curry
|
||||
from abots.helpers.black_magic import coroutine, generator, autoload
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
from collections import defaultdict
|
||||
from functools import wraps
|
||||
from os.path import dirname, basename, isfile, join
|
||||
from glob import glob
|
||||
from importlib import import_module
|
||||
|
||||
def infinitedict():
|
||||
d = lambda: defaultdict(d)
|
||||
return defaultdict(d)
|
||||
|
||||
def debugger(func):
|
||||
@wraps(func)
|
||||
def wrapper_debug(*args, **kwargs):
|
||||
args_repr = [repr(arg) for arg in args]
|
||||
kwargs_repr = [f"{key}={value!r}" for key, value in kwargs.items()]
|
||||
signature = ", ".join(args_repr + kwargs_repr)
|
||||
print(f"[DEBUGGER]: Calling {func.__name__}({signature})")
|
||||
result = func(*args, **kwargs)
|
||||
print(f"[DEBUGGER]: {func.__name__!r} returned {result!r}")
|
||||
return result
|
||||
return wrapper_debug
|
||||
|
||||
def coroutine(func):
|
||||
@wraps(func)
|
||||
def wrapper_coroutine(*args, **kwargs):
|
||||
coro = func(*args, **kwargs)
|
||||
coro.__next__()
|
||||
return coro
|
||||
return wrapper_coroutine
|
||||
|
||||
def generator(func):
|
||||
@wraps(func)
|
||||
def wrapper_generator(*args, **kwargs):
|
||||
try:
|
||||
while True:
|
||||
yield from func(*args, **kwargs)
|
||||
except GeneratorExit:
|
||||
pass
|
||||
return coroutine(wrapper_generator)
|
||||
|
||||
def singleton(cls):
|
||||
@wraps(cls)
|
||||
def wrapper_singleton(*args, **kwargs):
|
||||
if not wrapper_singleton.instance:
|
||||
wrapper_singleton.instance = cls(*args, **kwargs)
|
||||
return wrapper_singleton.instance
|
||||
wrapper_singleton.instance = None
|
||||
return wrapper_singleton
|
||||
|
||||
def curry(func, argc=None):
|
||||
if argc is None:
|
||||
argc = func.func_code.co_argcount
|
||||
@wraps(func)
|
||||
def wrapper_curry(*args):
|
||||
if len(args) == argc:
|
||||
return func(*args)
|
||||
def curried(*c_args):
|
||||
return func(*(args + c_args))
|
||||
return curry(curried, argc - len(args))
|
||||
return wrapper_curry
|
||||
|
||||
# This is used to automatically import the files in this directory
|
||||
# Essentially, it is a auto-loader for a plugin system
|
||||
# NOTE: Do as I say, not as I do. You should probably never do this
|
||||
def autoload(location, context, package, prefix=""):
|
||||
level = -(len(package.split(".")) + 1)
|
||||
for module in glob(join(dirname(location), "*.py")):
|
||||
if not isfile(module) or module.endswith("__init__.py"):
|
||||
continue
|
||||
# Equivalent of doing "import <package>.<module>"
|
||||
plugin = import_module(f".{basename(module)[:level]}", package)
|
||||
funcs = [f for f in dir(plugin) if f[0] != "_"]
|
||||
for func in funcs:
|
||||
# Translates the above to "from <package>.<module> import *"
|
||||
plugin_func = getattr(plugin, func)
|
||||
# To reduce conflicts in global, the prefix is used here
|
||||
# These should not be used directly and just fire off decorators
|
||||
context[f"{prefix}{func}"] = plugin_func
|
|
@ -0,0 +1,71 @@
|
|||
from json import dump, dumps, loads
|
||||
from codecs import encode as c_encode, decode as c_decode
|
||||
from base64 import b64encode, b64decode
|
||||
from re import compile as regex
|
||||
|
||||
# JSON encoder, converts a python object to a string
|
||||
def jots(data, readable=False, dest=None):
|
||||
kwargs = dict()
|
||||
|
||||
# If readable is set, it pretty prints the JSON to be more human-readable
|
||||
if readable:
|
||||
# kwargs["sort_keys"] = True
|
||||
kwargs["indent"] = 4
|
||||
kwargs["separators"] = (",", ":")
|
||||
try:
|
||||
if dest:
|
||||
return dump(data, dest, ensure_ascii=False, **kwargs)
|
||||
return dumps(data, ensure_ascii=False, **kwargs)
|
||||
except ValueError as e:
|
||||
return None
|
||||
|
||||
# JSON decoder, converts a string to a python object
|
||||
def jsto(data):
|
||||
try:
|
||||
return loads(data)
|
||||
except ValueError as e:
|
||||
print(e)
|
||||
return None
|
||||
|
||||
# Encodes data to base64
|
||||
def b64e(data, altchars=None, url=False, use_bin=False):
|
||||
if type(data) is str:
|
||||
data = data.encode("utf-8")
|
||||
if altchars is None and url:
|
||||
altchars = "-_"
|
||||
base64_data = b64encode(hex_data, altchars)
|
||||
if use_bin:
|
||||
return base64_data
|
||||
return base64_data.decode("utf-8")
|
||||
|
||||
# Decodes data from base64
|
||||
def b64d(base64_data, altchars=None, url=False, use_bin=False):
|
||||
if type(base64_data) is str:
|
||||
base64_data = base64_data.encode("utf-8")
|
||||
if altchars is None and url:
|
||||
altchars = "-_"
|
||||
data = b64decode(base64_data, altchars)
|
||||
if use_bin:
|
||||
return data
|
||||
return data.decode("utf-8")
|
||||
|
||||
# Converts hex to base64 encoding
|
||||
def h2b64(hex_data, altchars=None, url=False, use_bin=False):
|
||||
if type(hex_data) is str:
|
||||
hex_data = c_decode(hex_data, "hex")
|
||||
return b64e(hex_data, altchars, url, use_bin)
|
||||
|
||||
# Decodes base64 and converts to hex
|
||||
def b642h(base64_data, altchars=None, url=False, use_bin=False):
|
||||
base64_decoded = b64d(base64_data, altchars, url, use_bin)
|
||||
hex_data = c_encode(base64_decoded, "hex")
|
||||
if use_bin:
|
||||
return hex_data
|
||||
return hex_data.decode("utf-8")
|
||||
|
||||
# str(ClassName) -> "<class '__main__.ClassName'>"
|
||||
# This function extracts the class name from the str output
|
||||
def ctos(_class):
|
||||
pattern = regex(r"[<'>]")
|
||||
cleaned = pattern.sub("", str(_class))
|
||||
return cleaned.split("class ", 1)[1].split(".")[-1]
|
|
@ -0,0 +1,35 @@
|
|||
from sys import stderr
|
||||
from traceback import print_exc
|
||||
from datetime import datetime, timezone
|
||||
|
||||
def eprint(*args, **kwargs):
|
||||
print(*args, file=stderr, **kwargs)
|
||||
|
||||
def noop(*args, **kwargs):
|
||||
pass
|
||||
|
||||
def obtain(source, attribute):
|
||||
return getattr(source, attribute, None)
|
||||
|
||||
def cast(source, method, *args, **kwargs):
|
||||
source_method = getattr(source, method, noop)
|
||||
try:
|
||||
return source_method(*args, **kwargs)
|
||||
except Exception:
|
||||
status = print_exc()
|
||||
eprint(status)
|
||||
return status
|
||||
|
||||
def deduce(reference, attributes):
|
||||
if type(attributes) is not list:
|
||||
attributes = [attributes]
|
||||
return list(map(lambda attr: getattr(reference, attr, None), attributes))
|
||||
|
||||
def get_digit(number, position):
|
||||
return False if number - 10**position < 0 else number // 10*position % 10
|
||||
|
||||
def utc_now():
|
||||
return datetime.utcnow().replace(tzinfo=timezone.utc)
|
||||
|
||||
def utc_now_timestamp():
|
||||
return utc_now().timestamp()
|
|
@ -0,0 +1,64 @@
|
|||
from abots.helpers.general import eprint
|
||||
|
||||
from os import urandom
|
||||
from binascii import hexlify
|
||||
from hashlib import algorithms_available, pbkdf2_hmac, new as new_hash
|
||||
|
||||
def create_hash(algorithm, seed=None, random_bytes=32, use_bin=False):
|
||||
if algorithm not in algorithms_available:
|
||||
return None
|
||||
h = new_hash(algorithm)
|
||||
if seed is None:
|
||||
h.update(urandom(random_bytes))
|
||||
else:
|
||||
h.update(seed.encode("utf-8"))
|
||||
if use_bin:
|
||||
return h.digest()
|
||||
return h.hexdigest()
|
||||
|
||||
def md5(*args, **kwargs):
|
||||
return create_hash("md5", *args, **kwargs)
|
||||
|
||||
def sha1(*args, **kwargs):
|
||||
return create_hash("sha1", *args, **kwargs)
|
||||
|
||||
def sha256(*args, **kwargs):
|
||||
return create_hash("sha256", *args, **kwargs)
|
||||
|
||||
def sha512(*args, **kwargs):
|
||||
return create_hash("sha512", *args, **kwargs)
|
||||
|
||||
def pbkdf2(algo, pswd, salt=None, cycles=100000, key_len=None, use_bin=False):
|
||||
if algorithm not in algorithms_available:
|
||||
return None
|
||||
if type(pswd) is str:
|
||||
pswd = pswd.encode("utf-8")
|
||||
if salt is None:
|
||||
salt = urandom(16)
|
||||
elif type(salt) is str:
|
||||
salt = salt.encode("utf-8")
|
||||
derived_key = pbkdf2_hmac(algo, pswd, salt, cycles, key_len)
|
||||
if use_bin:
|
||||
return derived_key, salt
|
||||
return hexlify(derived_key).decode("utf-8"), salt.decode("utf-8")
|
||||
|
||||
# n = increase as general computing performance increases, min=14
|
||||
# r = increase in case of breakthroughs in memory technology, min=8
|
||||
# p = increase in case of breakthroughs in CPU technology, min=1
|
||||
def scrypt(password, salt=None, n=14, r=8, p=1, use_bin=False):
|
||||
try:
|
||||
from hashlib import scrypt as _scrypt
|
||||
except ImportError:
|
||||
eprint("Error: Missing OpenSSL 1.1+ for hashlib.scrypt")
|
||||
return None
|
||||
if type(password) is str:
|
||||
password = password.encode("utf-8")
|
||||
if salt is None:
|
||||
salt = urandom(16)
|
||||
elif type(salt) is str:
|
||||
salt = salt.encode("utf-8")
|
||||
# N = 2**n, allows simple increments to the value to get desired effect
|
||||
derived_key = _scrypt(password, salt, 2**n, r, p)
|
||||
if use_bin:
|
||||
return derived_key, salt
|
||||
return hexlify(derived_key).decode("utf-8"), salt.decode("utf-8")
|
|
@ -1,22 +0,0 @@
|
|||
from json import dumps, loads
|
||||
|
||||
# JSON encoder, converts a python object to a string
|
||||
def jots(self, data, readable=False):
|
||||
kwargs = dict()
|
||||
|
||||
# If readable is set, it pretty prints the JSON to be more human-readable
|
||||
if readable:
|
||||
kwargs["sort_keys"] = True
|
||||
kwargs["indent"] = 4
|
||||
kwargs["separators"] = (",", ":")
|
||||
try:
|
||||
return json.dumps(data, **kwargs)
|
||||
except ValueError as e:
|
||||
return None
|
||||
|
||||
# JSON decoder, converts a string to a python object
|
||||
def jsto(self, data):
|
||||
try:
|
||||
return json.loads(data)
|
||||
except ValueError as e:
|
||||
return None
|
|
@ -0,0 +1,100 @@
|
|||
from logging import getLogger, Formatter, DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||
from logging import StreamHandler
|
||||
from logging.handlers import TimedRotatingFileHandler
|
||||
from logging.handlers import QueueListener, QueueHandler
|
||||
from queue import Queue
|
||||
|
||||
def get_level(level):
|
||||
levels = dict()
|
||||
levels["debug"] = DEBUG
|
||||
levels["info"] = INFO
|
||||
levels["warning"] = WARNING
|
||||
levels["error"] = ERROR
|
||||
levels["critical"] = CRITICAL
|
||||
return levels.get(level, DEBUG)
|
||||
|
||||
def get_formatter(formatter, date_format):
|
||||
if formatter is None:
|
||||
formatter = u"%(asctime)s - %(name)s - %(threadName)s"
|
||||
formatter = formatter + u" - %(levelname)s - %(message)s"
|
||||
if date_format is None:
|
||||
date_format = "%Y-%m-%d %H:%M:%S"
|
||||
return Formatter(formatter)
|
||||
|
||||
class Stream(StreamHandler):
|
||||
def __init__(self, level=None, formatter=None, date_format=None):
|
||||
super().__init__()
|
||||
self.setLevel(get_level(level))
|
||||
self.setFormatter(get_formatter(formatter, date_format))
|
||||
|
||||
class Rotated(TimedRotatingFileHandler):
|
||||
def __init__(self, filename, when, interval, backupCount,
|
||||
level=None, formatter=None, date_format=None, **kwargs):
|
||||
super().__init__(filename, when, interval, backupCount, encoding="utf8",
|
||||
**kwargs)
|
||||
self.setLevel(get_level(level))
|
||||
self.setFormatter(get_formatter(formatter, date_format))
|
||||
|
||||
class Logger:
|
||||
def __init__(self, name, level=None, settings=dict()):
|
||||
self.name = name
|
||||
self.core = getLogger(name)
|
||||
self.core.setLevel(get_level(level))
|
||||
self._load(settings)
|
||||
|
||||
def _load(self, settings):
|
||||
disabled = settings.get("disabled", list())
|
||||
handlers = list()
|
||||
if "stream" not in disabled:
|
||||
stream = settings.get("stream", dict())
|
||||
stream_level = stream.get("level", None)
|
||||
stream_formatter = stream.get("formatter", None)
|
||||
stream_date_format = stream.get("date_format", None)
|
||||
stream_args = stream_level, stream_formatter, stream_date_format
|
||||
stream_handler = Stream(*stream_args)
|
||||
handlers.append(stream_handler)
|
||||
|
||||
if "file" not in disabled:
|
||||
rotated = settings.get("file", dict())
|
||||
rotated_filename = rotated.get("filename", f"{self.name}.log")
|
||||
rotated_when = rotated.get("when", "midnight")
|
||||
rotated_interval = rotated.get("interval", 1)
|
||||
rotated_backup_count = rotated.get("backup_count", 5)
|
||||
rotated_level = rotated.get("level", None)
|
||||
rotated_formatter = rotated.get("formatter", None)
|
||||
rotated_date_format = rotated.get("date_format", None)
|
||||
rotated_args = (rotated_filename, rotated_when, rotated_interval,
|
||||
rotated_backup_count, rotated_level, rotated_formatter,
|
||||
rotated_date_format)
|
||||
rotated_handler = Rotated(*rotated_args)
|
||||
handlers.append(rotated_handler)
|
||||
|
||||
self.queue = Queue()
|
||||
self.queue_handler = QueueHandler(self.queue)
|
||||
|
||||
args = tuple(handlers)
|
||||
kwargs = dict()
|
||||
kwargs["respect_handler_level"] = True
|
||||
self.listener = QueueListener(self.queue, *args, **kwargs)
|
||||
self.core.addHandler(self.queue_handler)
|
||||
|
||||
def start(self):
|
||||
self.listener.start()
|
||||
|
||||
def stop(self):
|
||||
self.listener.stop()
|
||||
|
||||
def debug(self, message):
|
||||
self.core.debug(message)
|
||||
|
||||
def info(self, message):
|
||||
self.core.info(message)
|
||||
|
||||
def warning(self, message):
|
||||
self.core.warning(message)
|
||||
|
||||
def error(self, message):
|
||||
self.core.error(message)
|
||||
|
||||
def critical(self, message):
|
||||
self.core.critical(message)
|
|
@ -0,0 +1,13 @@
|
|||
from random import random, randint
|
||||
|
||||
def clamp(number, low, high):
|
||||
return max(low, min(number, high))
|
||||
|
||||
def randfloat(start, end=None):
|
||||
if end is None:
|
||||
end = start
|
||||
start = 0
|
||||
return (end - start) * random() + start
|
||||
|
||||
def isnumeric(test):
|
||||
return test.replace(".", "", 1).isdigit()
|
|
@ -1,4 +1,4 @@
|
|||
from abots.net.socket_server import SocketServer
|
||||
from abots.net.socket_client import SocketClient
|
||||
from abots.net.socket_server_handler import SocketServerHandler
|
||||
from abots.net.socket_client_handler import SocketClientHandler
|
||||
from abots.net.prefix_socket_server import PrefixSocketServer
|
||||
from abots.net.prefix_socket_client import PrefixSocketClient
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
"""
|
||||
|
||||
Prefix Socket Client
|
||||
====================
|
||||
|
||||
Formatted to read and write prefix headers to socket data
|
||||
|
||||
"""
|
||||
|
||||
from abots.net import SocketClient
|
||||
|
||||
from struct import pack, unpack
|
||||
from socket import timeout as sock_timeout
|
||||
|
||||
class PrefixSocketClient(SocketClient):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def _format_message(self, message, *args):
|
||||
if len(args) > 0:
|
||||
formatted = message.format(*args)
|
||||
else:
|
||||
formatted = message
|
||||
try:
|
||||
encoded = formatted.encode("utf8")
|
||||
except UnicodeDecodeError:
|
||||
try:
|
||||
encoded = formatted.encode("iso-8859-1")
|
||||
except UnicodeDecodeError:
|
||||
encoded = "".encode()
|
||||
size = pack(">I", len(encoded))
|
||||
packaged = size + encoded
|
||||
return packaged
|
||||
|
||||
def _recv_bytes(self, get_bytes, decode=True):
|
||||
data = "".encode()
|
||||
attempts = 0
|
||||
while len(data) < get_bytes:
|
||||
# Automatically break loop to prevent infinite loop
|
||||
# Allow at least twice the needed iterations to occur exiting loop
|
||||
if attempts > 2 * (get_bytes / self.buffer_size):
|
||||
break
|
||||
else:
|
||||
attempts = attempts + 1
|
||||
bufsize = get_bytes - len(data)
|
||||
|
||||
# Force bufsize to cap out at buffer_size
|
||||
if bufsize > self.buffer_size:
|
||||
bufsize = self.buffer_size
|
||||
try:
|
||||
packet = self.sock.recv(bufsize)
|
||||
# The socket can either be broken or no longer open at all
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
if not isinstance(e, sock_timeout):
|
||||
self._attempt_reconnect()
|
||||
return None
|
||||
data = data + packet
|
||||
return data.decode() if decode else data
|
||||
|
||||
def _get_message_size(self):
|
||||
raw_message_size = self._recv_bytes(4, False)
|
||||
if not raw_message_size:
|
||||
return None
|
||||
message_size = unpack(">I", raw_message_size)[0]
|
||||
return message_size
|
||||
|
||||
def _get_message(self):
|
||||
message_size = self._get_message_size()
|
||||
if message_size is None:
|
||||
return
|
||||
result = self._recv_bytes(message_size)
|
||||
self._outbox.put(result)
|
|
@ -0,0 +1,45 @@
|
|||
"""
|
||||
|
||||
net/PrefixSocketServer
|
||||
================
|
||||
|
||||
TODO:
|
||||
* Add logging to broken pipe exceptions
|
||||
|
||||
"""
|
||||
|
||||
from abots.net import SocketServer
|
||||
|
||||
class PrefixSocketServer(SocketServer):
|
||||
def __init__(self, host, port, listeners=5, buffer_size=4096,
|
||||
secure=False, timeout=None, daemon=False):
|
||||
args = host, port, listeners, buffer_size, secure, timeout, daemon
|
||||
super().__init__(*args)
|
||||
|
||||
def _package_message(self, message, *args):
|
||||
if len(args) > 0:
|
||||
formatted = message.format(*args)
|
||||
else:
|
||||
formatted = message
|
||||
try:
|
||||
encoded = formatted.encode("utf8")
|
||||
except UnicodeDecodeError:
|
||||
try:
|
||||
encoded = formatted.encode("iso-8859-1")
|
||||
except UnicodeDecodeError:
|
||||
encoded = "".encode()
|
||||
size = pack(">I", len(encoded))
|
||||
packaged = size + encoded
|
||||
return packaged
|
||||
|
||||
# Packages a message and sends it to socket
|
||||
def send_message(self, uuid, message, *args):
|
||||
sock = self._sock_from_uuid(uuid)
|
||||
if sock is None:
|
||||
return None
|
||||
formatted = self._package_message(message)
|
||||
try:
|
||||
sock.send(formatted)
|
||||
# The socket can either be broken or no longer open at all
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
return
|
|
@ -1,10 +1,3 @@
|
|||
from abots.net.socket_client_handler import SocketClientHandler as handler
|
||||
|
||||
from struct import pack, unpack
|
||||
from multiprocessing import Process, Queue, JoinableQueue
|
||||
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
|
||||
from ssl import wrap_socket
|
||||
|
||||
"""
|
||||
|
||||
Socket Client
|
||||
|
@ -14,116 +7,201 @@ Socket Client
|
|||
|
||||
"""
|
||||
|
||||
class SocketClient(Process):
|
||||
def __init__(self, host, port, buffer_size=4096, end_of_line="\r\n",
|
||||
secure=False, inbox=JoinableQueue(), outbox=Queue(), handler=handler,
|
||||
**kwargs):
|
||||
Process.__init__(self)
|
||||
from abots.helpers import eprint, cast, jots, jsto, utc_now_timestamp, coroutine
|
||||
|
||||
from struct import pack, unpack
|
||||
from socket import socket, timeout as sock_timeout
|
||||
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
|
||||
from ssl import wrap_socket
|
||||
from threading import Thread, Event
|
||||
from queue import Queue, Empty
|
||||
from time import sleep
|
||||
from random import randint
|
||||
|
||||
class SocketClient(Thread):
|
||||
def __init__(self, host, port, buffer_size=4096, secure=False,
|
||||
timeout=None, daemon=False, reconnects=10, decode=True):
|
||||
super().__init__()
|
||||
self.setDaemon(daemon)
|
||||
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.buffer_size = buffer_size
|
||||
self.end_of_line = end_of_line
|
||||
self.secure = secure
|
||||
self.inbox = inbox
|
||||
self.outbox = outbox
|
||||
self.handler = handler(self)
|
||||
self.timeout = timeout
|
||||
self.reconnects = reconnects
|
||||
self.decode = decode
|
||||
self.sock = socket(AF_INET, SOCK_STREAM)
|
||||
if self.secure:
|
||||
self.sock = wrap_socket(self.sock, **kwargs)
|
||||
self.sock = wrap_socket(self.sock)
|
||||
|
||||
self.connection = (self.host, self.port)
|
||||
self.running = True
|
||||
self.error = None
|
||||
|
||||
def _recv_bytes(self, get_bytes, decode=True):
|
||||
data = "".encode()
|
||||
eol = self.end_of_line.encode()
|
||||
while len(data) < get_bytes:
|
||||
bufsize = get_bytes - len(data)
|
||||
if bufsize > self.buffer_size:
|
||||
bufsize = self.buffer_size
|
||||
try:
|
||||
packet = self.sock.recv(bufsize)
|
||||
except OSError:
|
||||
return None
|
||||
length = len(data) + len(packet)
|
||||
checker = packet if length < get_bytes else packet[:-2]
|
||||
if eol in checker:
|
||||
packet = packet.split(eol)[0] + eol
|
||||
return data + packet
|
||||
data = data + packet
|
||||
return data.decode() if decode else data
|
||||
self.kill_switch = Event()
|
||||
self.ready = Event()
|
||||
self.stopped = Event()
|
||||
self.broken = Event()
|
||||
self.reconnecting = Event()
|
||||
|
||||
def _package_message(self, message, *args):
|
||||
formatted = None
|
||||
if len(args) > 0:
|
||||
formatted = message.format(*args) + self.end_of_line
|
||||
else:
|
||||
formatted = message + self.end_of_line
|
||||
packaged = pack(">I", len(formatted)) + formatted.encode()
|
||||
return packaged
|
||||
self._inbox = Queue()
|
||||
self._events = Queue()
|
||||
self._outbox = Queue()
|
||||
self.queues = dict()
|
||||
self.queues["inbox"] = self._inbox
|
||||
self.queues["outbox"] = self._outbox
|
||||
self.queues["events"] = self._events
|
||||
|
||||
def _get_message_size(self):
|
||||
raw_message_size = self._recv_bytes(4, False)
|
||||
if not raw_message_size:
|
||||
return None
|
||||
message_size = unpack(">I", raw_message_size)[0]
|
||||
return message_size
|
||||
self._bridge = None
|
||||
|
||||
def _get_message(self):
|
||||
message_size = self._get_message_size()
|
||||
if message_size is None:
|
||||
return None
|
||||
try:
|
||||
return self._recv_bytes(message_size).strip(self.end_of_line)
|
||||
except OSError:
|
||||
return None
|
||||
|
||||
def _send_message(self, message, *args):
|
||||
packaged = self._package_message(message, *args)
|
||||
try:
|
||||
self.sock.send(packaged)
|
||||
except OSError:
|
||||
self.stop()
|
||||
|
||||
def _process_inbox(self):
|
||||
while not self.inbox.empty():
|
||||
message, args = self.inbox.get()
|
||||
self._send_message(message, *args)
|
||||
self.inbox.task_done()
|
||||
def _send_event(self, message):
|
||||
self._events.put(jots(message))
|
||||
cast(self._bridge, "send", ("events", message))
|
||||
|
||||
def _prepare(self):
|
||||
self.sock.setblocking(False)
|
||||
self.sock.settimeout(1)
|
||||
try:
|
||||
self.sock.connect(self.connection)
|
||||
except OSError as e:
|
||||
return e
|
||||
return None
|
||||
except Exception as e:
|
||||
return True, e
|
||||
return False, None
|
||||
|
||||
def send(self, message, *args):
|
||||
self.inbox.put((message, args))
|
||||
def _obtain(self, queue_name, timeout=False):
|
||||
queue = self.queues[queue_name]
|
||||
if timeout is False:
|
||||
timeout = self.timeout
|
||||
while True:
|
||||
try:
|
||||
#if timeout is not None:
|
||||
# message = queue.get(timeout=timeout)
|
||||
#else:
|
||||
# message = queue.get_nowait()
|
||||
message = queue.get_nowait()
|
||||
yield message
|
||||
#cast(self._bridge, "send", (queue_name, message))
|
||||
queue.task_done()
|
||||
except Empty:
|
||||
break
|
||||
|
||||
def results(self):
|
||||
messages = list()
|
||||
while not self.outbox.empty():
|
||||
messages.append(self.outbox.get())
|
||||
return messages
|
||||
def _queue_thread(self, inbox, timeout):
|
||||
while not self.kill_switch.is_set():
|
||||
for message in self._obtain("inbox", timeout):
|
||||
if self.broken.is_set():
|
||||
self.reconnecting.wait()
|
||||
self.send_message(message)
|
||||
|
||||
def _get_message(self):
|
||||
try:
|
||||
packet = self.sock.recv(self.buffer_size)
|
||||
result = packet.decode() if self.decode else packet
|
||||
self._outbox.put(result)
|
||||
cast(self._bridge, "send", ("outbox", result))
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
pass
|
||||
|
||||
def _format_message(self, message, *args):
|
||||
if len(args) > 0:
|
||||
formatted = message.format(*args)
|
||||
else:
|
||||
formatted = message
|
||||
return formatted.encode()
|
||||
|
||||
def _attempt_reconnect(self):
|
||||
if self.kill_switch.is_set():
|
||||
return
|
||||
print("BROKEN!")
|
||||
self.reconnecting.clear()
|
||||
self.broken.set()
|
||||
event = dict()
|
||||
event["name"] = "socket-down"
|
||||
event["data"] = dict()
|
||||
event["data"]["when"] = utc_now_timestamp()
|
||||
self._send_event(event)
|
||||
attempts = 0
|
||||
while attempts <= self.reconnects or not self.kill_switch.is_set():
|
||||
# Need to be run to prevent ConnectionAbortedError
|
||||
self.sock.__init__()
|
||||
err, report = self._prepare()
|
||||
if not err:
|
||||
self.reconnecting.set()
|
||||
self.broken.clear()
|
||||
event = dict()
|
||||
event["name"] = "socket-up"
|
||||
event["data"] = dict()
|
||||
event["data"]["when"] = utc_now_timestamp()
|
||||
self._send_event(event)
|
||||
return
|
||||
# Exponential backoff
|
||||
attempts = attempts + 1
|
||||
max_delay = (2**attempts) - 1
|
||||
delay = randint(0, max_delay)
|
||||
sleep(delay)
|
||||
self.stop()
|
||||
|
||||
def send_message(self, message, *args):
|
||||
formatted = self._format_message(message, *args)
|
||||
try:
|
||||
self.sock.send(formatted)
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
if not isinstance(e, sock_timeout):
|
||||
self._attempt_reconnect()
|
||||
self._attempt_reconnect()
|
||||
|
||||
def recv(self):
|
||||
yield from self._obtain("outbox")
|
||||
|
||||
def check(self):
|
||||
for message in self.recv():
|
||||
if message is not None and len(message) > 0:
|
||||
print(message)
|
||||
|
||||
def send(self, message):
|
||||
self._inbox.put(message)
|
||||
cast(self._bridge, "send", ("inbox", message))
|
||||
|
||||
def connect(self, bridge):
|
||||
self._bridge = bridge()
|
||||
|
||||
@coroutine
|
||||
def bridge(self, inbox, outbox, events):
|
||||
try:
|
||||
while True:
|
||||
task = (yield)
|
||||
source, message = task
|
||||
if source == "inbox":
|
||||
outbox.puts(message)
|
||||
elif source == "outbox":
|
||||
inbox.puts(message)
|
||||
elif source == "events":
|
||||
events.put(message)
|
||||
except GeneratorExit:
|
||||
pass
|
||||
|
||||
def run(self):
|
||||
err = self._prepare()
|
||||
if err is not None:
|
||||
print(err)
|
||||
return err
|
||||
# print("Ready!")
|
||||
err, report = self._prepare()
|
||||
if err:
|
||||
eprint(report)
|
||||
return report
|
||||
queue_args = self._inbox, self.timeout
|
||||
Thread(target=self._queue_thread, args=queue_args).start()
|
||||
print("Client ready!")
|
||||
self.ready.set()
|
||||
while self.running:
|
||||
data = self._get_message()
|
||||
if data is not None:
|
||||
self.outbox.put(self.handler(data))
|
||||
self._process_inbox()
|
||||
if self.broken.is_set():
|
||||
self.reconnecting.wait()
|
||||
self._get_message()
|
||||
|
||||
def stop(self):
|
||||
def stop(self, done=None):
|
||||
# print("Stopping client!")
|
||||
self.kill_switch.set()
|
||||
event = dict()
|
||||
event["name"] = "closing"
|
||||
event["data"] = dict()
|
||||
event["data"]["when"] = utc_now_timestamp()
|
||||
self._send_event(event)
|
||||
self.running = False
|
||||
self.sock.close()
|
||||
self.terminate()
|
||||
self.stopped.set()
|
||||
cast(done, "set")
|
||||
# print("Stopped client!")
|
||||
|
|
|
@ -1,33 +1,29 @@
|
|||
from abots.net.socket_server_handler import SocketServerHandler as handler
|
||||
"""
|
||||
|
||||
from threading import Thread
|
||||
net/SocketServer
|
||||
================
|
||||
|
||||
TODO:
|
||||
* Add logging to broken pipe exceptions
|
||||
|
||||
"""
|
||||
|
||||
from abots.helpers import eprint, cast, sha256, utc_now_timestamp
|
||||
from abots.helpers import jsto, jots
|
||||
|
||||
from threading import Thread, Event, Lock
|
||||
from struct import pack, unpack
|
||||
from select import select
|
||||
from socket import socket, AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
|
||||
from multiprocessing import Process, Queue, JoinableQueue
|
||||
from time import time
|
||||
from ssl import wrap_socket
|
||||
from queue import Queue, Empty
|
||||
|
||||
"""
|
||||
|
||||
net\SocketServer
|
||||
================
|
||||
|
||||
The intent behind this script is to provide a simple interface to start up a
|
||||
TCP socket server in the background, run each of the clients in their own
|
||||
thread, provide a simple system to handle server events, and provide simple
|
||||
functions to send/receive messages from the server.
|
||||
|
||||
"""
|
||||
|
||||
# Inherits Process so that server can be run as a daemon
|
||||
class SocketServer(Process):
|
||||
# There are many parameters here, but that is so that any constant used can
|
||||
# be easily tweaked and not remain hard-coded without an easy way to change
|
||||
def __init__(self, host, port, listeners=5, buffer_size=4096, secure=False,
|
||||
max_message_size=-1, end_of_line="\r\n", heartbeat=60,
|
||||
inbox=JoinableQueue(), outbox=Queue(), handler=handler, **kwargs):
|
||||
Process.__init__(self)
|
||||
class SocketServer(Thread):
|
||||
def __init__(self, host, port, listeners=5, buffer_size=4096,
|
||||
secure=False, timeout=None, daemon=False):
|
||||
super().__init__()
|
||||
self.setDaemon(daemon)
|
||||
|
||||
# The connection information for server, the clients will use this to
|
||||
# connect to the server
|
||||
|
@ -41,27 +37,19 @@ class SocketServer(Process):
|
|||
# Size of buffer pulled by `receive_bytes` when not specified
|
||||
self.buffer_size = buffer_size
|
||||
|
||||
# If max_message_size is -1, it allows any message size
|
||||
self.max_message_size = max_message_size
|
||||
|
||||
# Which character(s) will terminate a message
|
||||
self.end_of_line = end_of_line
|
||||
|
||||
# Determines if SSL wrapper is used
|
||||
self.secure = secure
|
||||
|
||||
# How often a heartbeat will be sent to a client
|
||||
self.heartbeat = heartbeat
|
||||
# Timeout set on queues
|
||||
self.timeout = timeout
|
||||
|
||||
# Queues used for sending messages and receiving results using `send`
|
||||
# and `results`
|
||||
self.inbox = inbox
|
||||
self.outbox = outbox
|
||||
|
||||
# An object that determines how the server reacts to events, will use
|
||||
# net\SocketServerHandler if none are specified. Use it as a model for
|
||||
# how other handlers should look / work.
|
||||
self.handler = handler(self)
|
||||
self._inbox = Queue()
|
||||
self._events = Queue()
|
||||
self._outbox = Queue()
|
||||
self.queues = dict()
|
||||
self.queues["inbox"] = self._inbox
|
||||
self.queues["outbox"] = self._outbox
|
||||
self.queues["events"] = self._events
|
||||
|
||||
# Sets up the socket itself
|
||||
self.sock = socket(AF_INET, SOCK_STREAM)
|
||||
|
@ -69,72 +57,81 @@ class SocketServer(Process):
|
|||
# Note: kwargs is used here to specify any SSL parameters desired
|
||||
self.sock = wrap_socket(self.sock, **kwargs)
|
||||
|
||||
# Will later be set to the file descriptor of the socket on the server
|
||||
# See `_prepare`
|
||||
self.sock_fd = -1
|
||||
|
||||
# Will later be set to the alias used for the socket on the server
|
||||
# See `_prepare`
|
||||
self.sock_alias = None
|
||||
|
||||
# List of all sockets involved (both client and server)
|
||||
self.sockets = list()
|
||||
|
||||
# Maps metadata about the clients
|
||||
self.clients = dict()
|
||||
self.clients = list()
|
||||
self.uuids = dict()
|
||||
|
||||
# State variable for if the server is running or not. See `run`.
|
||||
self.running = True
|
||||
self.kill_switch = Event()
|
||||
self.ready = Event()
|
||||
self.stopped = Event()
|
||||
|
||||
# Sends all messages queued in inbox
|
||||
def _process_inbox(self):
|
||||
while not self.inbox.empty():
|
||||
# In the format" mode, message, args
|
||||
data = self.inbox.get()
|
||||
mode = data[0]
|
||||
# Send to one socket
|
||||
if mode == self.handler.send_verb:
|
||||
client, message, args = data[1:]
|
||||
self.send_message(message, *args)
|
||||
# Broadcast to sockets
|
||||
elif mode == self.handler.broadcast_verb:
|
||||
message, args = data[1:]
|
||||
self.broadcast_message(self.sock, message, *args)
|
||||
self.inbox.task_done()
|
||||
def _send_event(self, message):
|
||||
self._events.put(jots(message))
|
||||
|
||||
def _new_client(self, sock, address):
|
||||
sock.settimeout(60)
|
||||
client_host, client_port = address
|
||||
self.sockets.append(sock)
|
||||
|
||||
client_kill = Event()
|
||||
client_uuid = sha256()
|
||||
self.uuids[client_uuid] = dict()
|
||||
self.uuids[client_uuid]["sock"] = sock
|
||||
self.uuids[client_uuid]["kill"] = client_kill
|
||||
|
||||
event = dict()
|
||||
event["name"] = "new_client"
|
||||
event["data"] = dict()
|
||||
event["data"]["host"] = client_host
|
||||
event["data"]["port"] = client_port
|
||||
event["data"]["uuid"] = client_uuid
|
||||
self._send_event(event)
|
||||
|
||||
client_args = sock, client_kill, client_uuid
|
||||
client_thread = Thread(target=self._client_thread, args=client_args)
|
||||
self.clients.append(client_thread)
|
||||
client_thread.start()
|
||||
|
||||
# Logic for the client socket running in its own thread
|
||||
def _client_thread(self, sock, alias):
|
||||
last = time()
|
||||
client = self.clients[alias]
|
||||
while self.running:
|
||||
now = time()
|
||||
# Run heartbeat after defined time elapses
|
||||
# This will probably drift somewhat, but this is fine here
|
||||
if now - last >= self.heartbeat:
|
||||
# If the client missed last heartbeat, close client
|
||||
if not client["alive"]:
|
||||
self.handler.close_client(alias)
|
||||
break
|
||||
# The handler must set this to True, this is how a missed
|
||||
# heartbeat is checked later on
|
||||
client["alive"] = False
|
||||
last = now
|
||||
self.handler.send_heartbeat(alias)
|
||||
def _client_thread(self, sock, kill_switch, uuid):
|
||||
while not kill_switch.is_set():
|
||||
try:
|
||||
message = self.handler.get_message(sock)
|
||||
message = self.get_message(uuid)
|
||||
# The socket can either be broken or no longer open at all
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
# In this case, the socket most likely died before the
|
||||
# heartbeat caught it
|
||||
self.handler.close_client(alias)
|
||||
eprint(e)
|
||||
break
|
||||
if message is None:
|
||||
continue
|
||||
# Each message returns a status code, exactly which code is
|
||||
# determined by the handler
|
||||
status = self.handler.message(sock, message)
|
||||
# Send status and message received to the outbox queue
|
||||
self.outbox.put((status, message))
|
||||
# Send message and uuid of sender to outbox queue
|
||||
letter = uuid, message
|
||||
self._outbox.put(letter)
|
||||
|
||||
def _queue_thread(self, inbox, timeout):
|
||||
while not self.kill_switch.is_set():
|
||||
for letter in self._obtain(inbox, timeout):
|
||||
if len(letter) != 2:
|
||||
continue
|
||||
uuid, message = letter
|
||||
if uuid == "cast":
|
||||
self.broadcast_message(uuid, message)
|
||||
else:
|
||||
self.send_message(uuid, message)
|
||||
|
||||
def _obtain(self, queue, timeout=False):
|
||||
if timeout is False:
|
||||
timeout = self.timeout
|
||||
while True:
|
||||
try:
|
||||
if timeout is not None:
|
||||
yield queue.get(timeout=timeout)
|
||||
else:
|
||||
yield queue.get_nowait()
|
||||
queue.task_done()
|
||||
except Empty:
|
||||
break
|
||||
|
||||
# Prepares socket server before starting it
|
||||
def _prepare(self):
|
||||
|
@ -145,48 +142,29 @@ class SocketServer(Process):
|
|||
except (BrokenPipeError, OSError) as e:
|
||||
# This usually means that the port is already in use
|
||||
return e
|
||||
self.sock.setblocking(0)
|
||||
self.sock.settimeout(0)
|
||||
self.sock.listen(self.listeners)
|
||||
|
||||
# Gets the file descriptor of the socket, which is a fallback for a
|
||||
# unique identifier for the sockets when an alias does not work
|
||||
self.sock_fd = self.sock.fileno()
|
||||
sock_address = self.sock.getsockname()
|
||||
sock_host, sock_port = sock_address
|
||||
|
||||
# This may change later, but for now aliases start at @0 and continue
|
||||
# on from there numerically
|
||||
self.sock_alias = "@{}".format(len(self.sockets))
|
||||
self.sockets.append(self.sock)
|
||||
|
||||
# Set metadata about the socket server, the fd and alias are both set
|
||||
# here to make obtaining the other metadata possible with less lookups
|
||||
self.clients[self.sock_fd] = dict()
|
||||
self.clients[self.sock_fd]["fd"] = self.sock_fd
|
||||
self.clients[self.sock_fd]["host"] = sock_host
|
||||
self.clients[self.sock_fd]["port"] = sock_port
|
||||
self.clients[self.sock_fd]["sock"] = self.sock
|
||||
self.clients[self.sock_fd]["alias"] = self.sock_alias
|
||||
|
||||
# Here the alias is just a pointer to the same data, or at least acts
|
||||
# like a pointer given how Python handles dictionaries referencing the
|
||||
# same data
|
||||
self.clients[self.sock_alias] = self.clients[self.sock_fd]
|
||||
return None
|
||||
|
||||
# Closes a connected socket and removes it from the server metadata
|
||||
def close_sock(self, alias):
|
||||
client = self.clients.get(alias, None)
|
||||
if client is None:
|
||||
return None
|
||||
sock = client["sock"]
|
||||
fd = client["fd"]
|
||||
self.sockets.remove(sock)
|
||||
if fd is not None:
|
||||
# While the alias is a pointer, you need to delete both
|
||||
# individually to truly remove the socket from `clients`
|
||||
del self.clients[fd]
|
||||
del self.clients[alias]
|
||||
sock.close()
|
||||
def _sock_from_uuid(self, uuid):
|
||||
return self.uuids.get(uuid, dict()).get("sock", None)
|
||||
|
||||
# Closes a connected socket and removes it from the sockets list
|
||||
def close_sock(self, uuid):
|
||||
event = dict()
|
||||
event["name"] = "close_client"
|
||||
event["data"] = dict()
|
||||
event["data"]["uuid"] = uuid
|
||||
self._send_event(event)
|
||||
if uuid in list(self.uuids):
|
||||
sock = self.uuids[uuid]["sock"]
|
||||
kill = self.uuids[uuid]["kill"]
|
||||
kill.set()
|
||||
del self.uuids[uuid]
|
||||
self.sockets.remove(sock)
|
||||
sock.close()
|
||||
|
||||
# Receives specified number of bytes from a socket
|
||||
# sock - one of the sockets in sockets
|
||||
|
@ -194,25 +172,14 @@ class SocketServer(Process):
|
|||
# decode - flag if the returned data is binary-to-string decoded
|
||||
def receive_bytes(self, sock, get_bytes, decode=True):
|
||||
data = "".encode()
|
||||
eol = self.end_of_line.encode()
|
||||
# Auto-fail if requested bytes is greater than allowed by server
|
||||
if self.max_message_size > 0 and get_bytes > self.max_message_size:
|
||||
return None
|
||||
attempts = 0
|
||||
while len(data) < get_bytes:
|
||||
# Automatically break loop to prevent infinite loop
|
||||
if self.max_message_size > 0:
|
||||
if attempts > self.max_message_size / self.buffer_size:
|
||||
break
|
||||
else:
|
||||
attempts = attempts + 1
|
||||
# Allow at least twice the needed iterations to occur exiting loop
|
||||
if attempts > 2 * (get_bytes / self.buffer_size):
|
||||
break
|
||||
else:
|
||||
# With max_message_size not set, allow at least twice the
|
||||
# needed iterations to occur before breaking loop
|
||||
if attempts > 2 * (get_bytes / self.buffer_size):
|
||||
break
|
||||
else:
|
||||
attempts = attempts + 1
|
||||
attempts = attempts + 1
|
||||
bufsize = get_bytes - len(data)
|
||||
|
||||
# Force bufsize to cap out at buffer_size
|
||||
|
@ -223,140 +190,83 @@ class SocketServer(Process):
|
|||
# The socket can either be broken or no longer open at all
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
return None
|
||||
length = len(data) + len(packet)
|
||||
checker = packet if length < get_bytes else packet[:-2]
|
||||
|
||||
# Automatically stop reading message if EOL character sent
|
||||
if eol in checker:
|
||||
packet = packet.split(eol)[0] + eol
|
||||
return data + packet
|
||||
data = data + packet
|
||||
return data.decode() if decode else data
|
||||
|
||||
# Get message from socket
|
||||
def get_message(self, uuid):
|
||||
sock = self._sock_from_uuid(uuid)
|
||||
if sock is None:
|
||||
return None
|
||||
raw_message_size = self.receive_bytes(sock, 4, False)
|
||||
if raw_message_size is None or len(raw_message_size) != 4:
|
||||
return None
|
||||
message_size = unpack(">I", raw_message_size)[0]
|
||||
return self.receive_bytes(sock, message_size)
|
||||
|
||||
# Packages a message and sends it to socket
|
||||
def send_message(self, sock, message, *args):
|
||||
formatted = self.handler.format_message(message, *args)
|
||||
def send_message(self, uuid, message, *args):
|
||||
sock = self._sock_from_uuid(uuid)
|
||||
if sock is None:
|
||||
return None
|
||||
try:
|
||||
sock.send(formatted)
|
||||
sock.send(message)
|
||||
# The socket can either be broken or no longer open at all
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
alias = self.get_client_alias_by_sock(sock)
|
||||
if alias is not None:
|
||||
self.close_sock(alias)
|
||||
return
|
||||
|
||||
# Like send_message, but sends to all sockets but the server and the sender
|
||||
def broadcast_message(self, client_sock, client_message, *args):
|
||||
for sock in self.sockets:
|
||||
not_server = sock != self.sock
|
||||
not_client = sock != client_sock
|
||||
if not_server and not_client:
|
||||
self.send_message(sock, client_message, *args)
|
||||
def broadcast_message(self, client_uuid, message, *args):
|
||||
for uuid in list(self.uuids):
|
||||
if uuid != client_uuid:
|
||||
self.send_message(uuid, message, *args)
|
||||
|
||||
# Obtains file descriptor of the socket
|
||||
def get_client_fd(self, client_sock):
|
||||
try:
|
||||
# First, try the easy route of just pulling it directly
|
||||
return client_sock.fileno()
|
||||
# The socket can either be broken or no longer open at all
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
# Otherwise, the socket is probably dead and we can try finding it
|
||||
# using brute-force. This sometimes works
|
||||
for fd, sock in self.sockets:
|
||||
if sock != client_sock:
|
||||
continue
|
||||
return fd
|
||||
# If the brute-force option does not work, I cannot think of a good
|
||||
# way to get the fd aside from passing it along everywhere that
|
||||
# sock is also used, which would be extremely tedios. However, if
|
||||
# you have the alias you can skip this entirely and just pull the
|
||||
# fd from `clients` using the alias
|
||||
return None
|
||||
def recv(self):
|
||||
#return [letter for letter in self._obtain(self._outbox)]
|
||||
yield from self._obtain(self._outbox)
|
||||
|
||||
# I realize the function name here is long, but for the few times I use
|
||||
# this it makes it clear exactly what magic is going on
|
||||
def get_client_alias_by_sock(self, client_sock):
|
||||
client_fd = self.get_client_fd(client_sock)
|
||||
if client_fd is None:
|
||||
return None
|
||||
return self.clients.get(client_fd, dict()).get("alias", None)
|
||||
def send(self, uuid, message):
|
||||
letter = uuid, message
|
||||
self._inbox.put(letter)
|
||||
|
||||
# Externally called function to send a message to a client
|
||||
def send(self, client, message, *args):
|
||||
# This queue will be read by `_process_inbox` during the next loop
|
||||
self.inbox.put((self.handler.send_verb, client, message, args))
|
||||
def check(self):
|
||||
for letter in self.recv():
|
||||
if letter is not None and len(letter) > 0:
|
||||
print(letter)
|
||||
|
||||
# Externally called function to broadcast a message to all clients
|
||||
def broadcast(self, message, *args):
|
||||
# This queue will be read by `_process_inbox` during the next loop
|
||||
self.inbox.put((self.handler.broadcast_verb, message, args))
|
||||
|
||||
# Externally called function to iterates over the outbox queue and returns
|
||||
# them as a list in FIFO order
|
||||
def results(self, remove_status=False):
|
||||
messages = list()
|
||||
while not self.outbox.empty():
|
||||
result = self.outbox.get()
|
||||
# For when you do not care about the status codes
|
||||
if remove_status:
|
||||
status, message = result
|
||||
messages.append(message)
|
||||
else:
|
||||
messages.append(result)
|
||||
return messages
|
||||
|
||||
# The Process function for running the socket server logic loop
|
||||
# The function for running the socket server logic loop
|
||||
def run(self):
|
||||
err = self._prepare()
|
||||
if err is not None:
|
||||
print(err)
|
||||
eprint(err)
|
||||
return err
|
||||
# print("Server ready!")
|
||||
while self.running:
|
||||
queue_args = self._inbox, self.timeout
|
||||
Thread(target=self._queue_thread, args=queue_args).start()
|
||||
print("Server ready!")
|
||||
self.ready.set()
|
||||
while not self.kill_switch.is_set():
|
||||
try:
|
||||
# Accept new socket client
|
||||
client_sock, client_address = self.sock.accept()
|
||||
client_sock.settimeout(60)
|
||||
# print(client_address)
|
||||
self._new_client(client_sock, client_address)
|
||||
# The socket can either be broken or no longer open at all
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
continue
|
||||
|
||||
# Collect the metadata of the client socket
|
||||
client_name = "{}:{}".format(*client_address)
|
||||
client_host, client_port = client_address
|
||||
client_fd = client_sock.fileno()
|
||||
client_alias = "@{}".format(len(self.sockets))
|
||||
|
||||
# Define metadata for client
|
||||
self.sockets.append(client_sock)
|
||||
self.clients[client_fd] = dict()
|
||||
self.clients[client_fd]["fd"] = client_fd
|
||||
self.clients[client_fd]["host"] = client_host
|
||||
self.clients[client_fd]["port"] = client_port
|
||||
self.clients[client_fd]["sock"] = client_sock
|
||||
self.clients[client_fd]["alias"] = client_alias
|
||||
self.clients[client_fd]["alive"] = True
|
||||
|
||||
# The alias is just a key that points to the same metadata
|
||||
self.clients[client_alias] = self.clients[client_fd]
|
||||
|
||||
# Have handler process new client event
|
||||
status = self.handler.open_client(client_alias)
|
||||
|
||||
# Send status and message received to the outbox queue
|
||||
self.outbox.put((status, message))
|
||||
|
||||
# Spawn new thread for client
|
||||
args = (client_sock, client_alias)
|
||||
Thread(target=self._client_thread, args=args).start()
|
||||
|
||||
# Process messages waiting in inbox queue
|
||||
# This is done at the end in case for some weird reason a message
|
||||
# is sent to the new client in the middle of processing this data
|
||||
# it eliminates the chance of a race condition.
|
||||
self._process_inbox()
|
||||
|
||||
# Stop the socket server
|
||||
def stop(self):
|
||||
self.handler.close_server()
|
||||
self.running = False
|
||||
self.sock.close()
|
||||
def stop(self, done=None, join=False):
|
||||
event = dict()
|
||||
event["name"] = "closing"
|
||||
event["data"] = dict()
|
||||
event["data"]["when"] = utc_now_timestamp()
|
||||
self._send_event(event)
|
||||
for uuid in list(self.uuids):
|
||||
self.close_sock(uuid)
|
||||
self.kill_switch.set()
|
||||
self.sock.close()
|
||||
if join:
|
||||
for client in self.clients:
|
||||
client.join(self.timeout)
|
||||
self.stopped.set()
|
||||
cast(done, "set")
|
||||
|
|
|
@ -1,166 +0,0 @@
|
|||
"""
|
||||
|
||||
Socket Server Handlers
|
||||
======================
|
||||
|
||||
The socket server was made to be versitile where the handler can be swapped out
|
||||
in favor of another handler. This handler is the default provided if one is not
|
||||
passed to get the basic client-server relationship working for either starting
|
||||
with something simple, testing, and/or providing a template to build other
|
||||
handlers from.
|
||||
|
||||
"""
|
||||
|
||||
class SocketServerHandler:
|
||||
def __init__(self, server):
|
||||
self.server = server
|
||||
|
||||
# These are used when processing inbox messages
|
||||
self.send_verb = "SEND"
|
||||
self.broadcast_verb = "CAST"
|
||||
|
||||
self.close_verb = "STOP"
|
||||
|
||||
# Tells all clients that a node joined the socket server
|
||||
def open_client(self, alias):
|
||||
alias = message[5:]
|
||||
client = self.server.clients.get(alias, None)
|
||||
if client is None:
|
||||
return 1
|
||||
self.server.broadcast_message(client, message)
|
||||
return 0
|
||||
|
||||
# Informs the other clients a client left and closes that client's socket
|
||||
def close_client(self, alias):
|
||||
client = self.server.clients.get(alias, None)
|
||||
if client is None:
|
||||
return 1
|
||||
message = "LEFT {}".format(alias)
|
||||
self.server.broadcast_message(self.server.sock, message)
|
||||
self.server.close_sock(alias)
|
||||
return 0
|
||||
|
||||
# Lets the clients know the server is intentionally closing
|
||||
def close_server(self):
|
||||
self.server.broadcast_message(self.server.sock, self.close_verb)
|
||||
return -1
|
||||
|
||||
# Sends a heartbeat to the client to detect if it is still responding
|
||||
def send_heartbeat(self, alias):
|
||||
client = self.server.clients.get(alias, None)
|
||||
if client is None:
|
||||
return 1
|
||||
sock = client.get("sock", None)
|
||||
if sock is None:
|
||||
return 1
|
||||
self.server.send_message(sock, "PING")
|
||||
return 0
|
||||
|
||||
# Format a message before sending to client(s)
|
||||
# Prepends message size code along with replacing variables in message
|
||||
def format_message(self, message, *args):
|
||||
formatted = None
|
||||
if len(args) > 0:
|
||||
formatted = message.format(*args) + self.server.end_of_line
|
||||
else:
|
||||
formatted = message + self.server.end_of_line
|
||||
|
||||
# Puts message size at the front of the message
|
||||
prefixed = pack(">I", len(formatted)) + formatted.encode()
|
||||
return prefixed
|
||||
|
||||
# Get message from socket with `format_message` in mind
|
||||
def get_message(self, sock):
|
||||
raw_message_size = self.server.receive_bytes(sock, 4, False)
|
||||
if raw_message_size is None:
|
||||
return None
|
||||
message_size = unpack(">I", raw_message_size)[0]
|
||||
if self.max_message_size > 0 and message_size > self.max_message_size:
|
||||
return None
|
||||
eol = self.server.end_of_line
|
||||
return self.server.receive_bytes(sock, message_size).strip(eol)
|
||||
|
||||
# Takes the server object, the client socket, and a message to process
|
||||
# Each message returns a status code:
|
||||
# -1 : Going offline
|
||||
# 0 : Success
|
||||
# 1 : Failure
|
||||
# 2 : Invalid
|
||||
def message(self, sock, message):
|
||||
# print("DEBUG:", message)
|
||||
|
||||
send = self.send_verb + " "
|
||||
cast = self.broadcast_verb + " "
|
||||
send_size = len(send)
|
||||
cast_size = len(cast)
|
||||
|
||||
# React to heartbeat from client
|
||||
if message == "PONG":
|
||||
client_fd = self.server.get_client_fd(sock)
|
||||
client = self.server.clients.get(client_fd, dict())
|
||||
client_alive = client.get("alive", None)
|
||||
if client_aliave is None:
|
||||
return 1
|
||||
elif client_alive:
|
||||
return 1
|
||||
# Setting this to True is what tells the server the heartbeat worked
|
||||
client["alive"] = True
|
||||
return 0
|
||||
|
||||
# Tell the clients to stop before server itself stops
|
||||
elif message == self.close_verb:
|
||||
status = self.close_server()
|
||||
self.server.stop()
|
||||
return status
|
||||
|
||||
# Informs the other clients one left and closes that client's socket
|
||||
elif message == "QUIT":
|
||||
client_alias = self.server.get_client_alias_by_sock(sock)
|
||||
if client_alias is None:
|
||||
return 1
|
||||
return self.close_client(client_alias)
|
||||
|
||||
# Lists all client alises, puts itself first and the server second
|
||||
elif message == "LIST":
|
||||
aliases = list()
|
||||
client_alias = self.server.get_client_alias_by_sock(sock)
|
||||
if client_alias is None:
|
||||
return 1
|
||||
self.server_alias = self.server.sock_alias
|
||||
for alias in self.server.clients.keys():
|
||||
# We need to skip ints since the file descriptors are also keys
|
||||
if type(alias) is int:
|
||||
continue
|
||||
# The server and sending client are skipped to retain ordering
|
||||
elif alias == self.server.sock_alias:
|
||||
continue
|
||||
elif alias == client_alias:
|
||||
continue
|
||||
else:
|
||||
aliases.append(alias)
|
||||
listed = ",".join([client_alias, self.server_alias] + aliases)
|
||||
self.server.send_message(sock, listed)
|
||||
return 0
|
||||
|
||||
# Sends a message to the client with the specified client (via an alias)
|
||||
elif message[:send_size] == send:
|
||||
params = message[(send_size + 1):].split(" ", 1)
|
||||
if len(params) < 2:
|
||||
return 1
|
||||
alias, response = params
|
||||
client = self.server.clients.get(alias, dict())
|
||||
client_sock = client.get("sock", None)
|
||||
if client_sock is None:
|
||||
return 1
|
||||
self.server.send_message(client_sock, response)
|
||||
return 0
|
||||
|
||||
# Broadcasts a message to all other clients
|
||||
elif message[:cast_size] == cast:
|
||||
response = message[(cast_size + 1):]
|
||||
self.server.broadcast_message(sock, response)
|
||||
return 0
|
||||
|
||||
# All other commands are invalid
|
||||
else:
|
||||
return 2
|
|
@ -0,0 +1 @@
|
|||
from abots.tools.pixels import make_pixels, get_random_pixels, show_pixels
|
|
@ -0,0 +1,18 @@
|
|||
from numpy import zeros, uint8
|
||||
from PIL import Image
|
||||
from random import randint
|
||||
|
||||
def make_pixels(width, height):
|
||||
return zeros((height, width, 3), dtype=uint8)
|
||||
|
||||
def get_random_pixels(width, height):
|
||||
pixels = make_pixels(width, height)
|
||||
for y in range(height):
|
||||
for x in range(width):
|
||||
pixels[y, x] = [randint(0, 255), randint(0, 255), randint(0, 255)]
|
||||
return pixels
|
||||
|
||||
def show_pixels(pixels):
|
||||
image = Image.fromarray(pixels)
|
||||
image.show()
|
||||
|
|
@ -1,8 +1,6 @@
|
|||
from curses import initscr, noecho, cbreak
|
||||
|
||||
"""
|
||||
|
||||
ui\TUI: Text User Interface
|
||||
ui/TUI: Text User Interface
|
||||
===========================
|
||||
|
||||
The curses library is one of those things I always wanted to use, but never got
|
||||
|
@ -12,7 +10,99 @@ nice framework I can re-use without needing to pull out a manual for curses to
|
|||
figure out exactly what everything does (which is what I will be doing during
|
||||
the duration of writing this script).
|
||||
|
||||
I would like to note here that while I usually avoid using `import <module>` in
|
||||
favor of `from <module> import <component>`, I am making an exception here for
|
||||
curses due to how excessive and overkill that would be.
|
||||
|
||||
"""
|
||||
|
||||
import curses
|
||||
from time import sleep
|
||||
|
||||
class TUI:
|
||||
pass
|
||||
def __init__(self):
|
||||
# self.screen = None
|
||||
self.windows = dict()
|
||||
self.running = True
|
||||
|
||||
# This removes the need for `initialize` and `cleanup`
|
||||
curses.wrapper(self.run)
|
||||
|
||||
# Puts terminal into correct modes and state
|
||||
def initialize(self):
|
||||
self.screen = curses.initscr()
|
||||
# So that input will be read and not just echo'd onto the screen
|
||||
curses.noecho()
|
||||
# Will read keys as they are typed without needing Enter pressed
|
||||
curses.cbreak()
|
||||
# Converts special keys into curses variables
|
||||
self.screen.keypad(True)
|
||||
self.running = True
|
||||
|
||||
# Returns terminal back to how it was beforehand
|
||||
def cleanup(self):
|
||||
self.running = False
|
||||
curses.nocbreak()
|
||||
self.screen.keypad(False)
|
||||
curses.echo()
|
||||
curses.endwin()
|
||||
|
||||
# Stop loop running in `run`
|
||||
def stop(self):
|
||||
self.running = False
|
||||
|
||||
# Easily check if getch matches a letter or curses.KEY_* variable
|
||||
def is_key(self, getch, test):
|
||||
if type(test) is str:
|
||||
return getch == ord(test)
|
||||
return getch == test
|
||||
|
||||
# Curses uses the format: height, width, y, x or y, x
|
||||
def create_window(start_x, start_y, width=None, height=None):
|
||||
window = dict()
|
||||
|
||||
|
||||
# Curses logic run inside wrapper to initialize/cleanup automatically
|
||||
def run(self, screen):
|
||||
|
||||
win = dict()
|
||||
win["width"] = 80
|
||||
win["height"] = 24
|
||||
win["start_x"] = 10
|
||||
win["start_y"] = 10
|
||||
win["window"] = screen.subwin(win["height"], win["width"],
|
||||
win["start_y"], win["start_x"])
|
||||
window = win["window"]
|
||||
|
||||
# # Curses logic run inside wrapper to initialize/cleanup automatically
|
||||
# def run(self, screen):
|
||||
# # Make getch non-blocking and clear the screen
|
||||
# screen.nodelay(True)
|
||||
# screen.clear()
|
||||
|
||||
# width = 4
|
||||
# count = 0
|
||||
# direction = 1
|
||||
|
||||
# while self.running:
|
||||
# # Get user input
|
||||
# char = screen.getch()
|
||||
|
||||
# # Flush the user input and clear the screen
|
||||
# curses.flushinp()
|
||||
# screen.clear()
|
||||
|
||||
# if self.is_key(char, "q"):
|
||||
# screen.addstr("Closing...")
|
||||
# self.stop()
|
||||
# break
|
||||
# elif self.is_key(char, curses.KEY_UP):
|
||||
# width = width + 1
|
||||
|
||||
# screen.addstr("#" * count)
|
||||
# count = count + direction
|
||||
# if count == width:
|
||||
# direction = -1
|
||||
# elif count == 0:
|
||||
# direction = 1
|
||||
# sleep(0.1)
|
|
@ -2,4 +2,9 @@ pytest
|
|||
python-gnupg
|
||||
cryptography
|
||||
flask
|
||||
websockets
|
||||
websockets
|
||||
tensorflow
|
||||
numpy
|
||||
matplotlib
|
||||
textblob
|
||||
Pillow
|
||||
|
|
|
@ -1,13 +1,17 @@
|
|||
#!env/bin/python3
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, "/center/lib")
|
||||
from os.path import dirname, realpath
|
||||
|
||||
source = "/".join(dirname(realpath(__file__)).split("/")[:-1])
|
||||
sys.path.insert(0, source)
|
||||
|
||||
from abots.net import SocketClient
|
||||
|
||||
host = "127.0.0.1"
|
||||
port = 10701
|
||||
timeout = 3
|
||||
|
||||
client = SocketClient(host, port)
|
||||
client = SocketClient(host, port, timeout=timeout)
|
||||
client.daemon = True
|
||||
client.start()
|
|
@ -1,13 +1,17 @@
|
|||
#!env/bin/python3
|
||||
|
||||
import sys
|
||||
sys.path.insert(0, "/center/lib")
|
||||
from os.path import dirname, realpath
|
||||
|
||||
source = "/".join(dirname(realpath(__file__)).split("/")[:-1])
|
||||
sys.path.insert(0, source)
|
||||
|
||||
from abots.net import SocketServer
|
||||
|
||||
host = "127.0.0.1"
|
||||
port = 10701
|
||||
timeout = 3
|
||||
|
||||
server = SocketServer(host, port)
|
||||
server = SocketServer(host, port, timeout=timeout)
|
||||
server.daemon = True
|
||||
server.start()
|
|
@ -1,6 +1,57 @@
|
|||
#!env/bin/python3
|
||||
|
||||
# import sys
|
||||
# sys.path.insert(0, "/center/lib")
|
||||
|
||||
#from abots.net import SocketServer, SocketClient
|
||||
#
|
||||
#host = "localhost"
|
||||
#port = 10401
|
||||
#timeout = 3
|
||||
#
|
||||
#server = SocketServer(host, port, timeout=timeout)
|
||||
#client = SocketClient(host, port, timeout=timeout)
|
||||
#
|
||||
#server.start()
|
||||
#server.ready.wait()
|
||||
#client.start()
|
||||
from abots.events import ThreadMarshal
|
||||
from abots.helpers import generator
|
||||
from queue import Queue, Empty
|
||||
from threading import Thread, Event
|
||||
|
||||
def worker(queue, event, timeout):
|
||||
while not event.is_set():
|
||||
try:
|
||||
job = queue.get(timeout=timeout)
|
||||
except Empty:
|
||||
continue
|
||||
assert len(job) == 2, "Expected two parameters"
|
||||
done, task = job
|
||||
assert hasattr(done, "set"), "Expected event"
|
||||
assert len(task) == 3, "Expected three parameters"
|
||||
method, args, kwargs = task
|
||||
assert callable(method), "Expected function"
|
||||
assert isinstance(args, tuple), "Expected tuple"
|
||||
assert isinstance(kwargs, dict), "Expected dict"
|
||||
try:
|
||||
result = method(*args, **kwargs)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
done.set()
|
||||
queue.task_done()
|
||||
|
||||
def manager(queue, event, timeout):
|
||||
workers = list()
|
||||
current = 0
|
||||
while not event.is_set():
|
||||
try:
|
||||
task = queue.get(timeout=timeout)
|
||||
except Empty:
|
||||
continue
|
||||
assert len(task) == 2, "Expected two parameters"
|
||||
action, payload = task
|
||||
assert isinstance(action, str), "Expected str"
|
||||
if action == "new":
|
||||
assert callable(payload), "Expected function"
|
||||
workers.append((payload))
|
||||
elif action == "reserve" and len(workers) > 0:
|
||||
worker = workers[current]
|
||||
|
||||
|
|
Loading…
Reference in New Issue