Unstable commit, partially works for multiple servers

This commit is contained in:
aewens 2019-05-31 22:20:26 -04:00
parent ee5927bb46
commit 2ea5a6a51c
7 changed files with 190 additions and 89 deletions

View File

@ -1,22 +1,27 @@
from .socket import IRCSocketClient
from .utils import hooks
from abots.events import CoroEvent
from abots.helpers import Logger, infinitedict, isnumeric, sha256
from abots.helpers import coroutine, generator
from abots.helpers import Logger, infinitedict, isnumeric, sha256, jsto, jots
from abots.helpers import coroutine, generator, debugger
from time import sleep
from os import remove as delete_file
from os.path import isfile
from functools import wraps
from collections import defaultdict
from threading import Thread
from queue import Queue, Empty
class Composer:
def __init__(self, logger=None):
self.logger = logger
self._state = dict()
self._listeners = dict()
self.kill_switch = CoroEvent()
self._state = infinitedict()
self._listeners = infinitedict()
self._queue = Queue()
self.kill_switch = CoroEvent()
self.hooks = hooks.use(self)
Thread(target=self.get_messages).start()
def get_state(self, key):
return self._state.get(key, None)
@ -24,22 +29,86 @@ class Composer:
def set_state(self, key, value):
self._state[key] = value
def start_irc_socket(self, host, port, timeout=None, secure=False):
def get_sock(self, sock_name):
return self._state["sockets"]["irc"][sock_name].get("sock", None)
def get_host(self, sock_name):
return self._state["sockets"]["irc"][sock_name].get("host", None)
def get_port(self, sock_name):
return self._state["sockets"]["irc"][sock_name].get("port", None)
def get_setting(self, path):
settings = self.get_state("settings")
if settings is None:
return None
keys = path.split("/")
pointer = settings
for key in keys:
pointer = pointer.get(key, None)
if pointer is None:
return None
return pointer
def load_settings(self, settings_file):
with open(settings_file, "r") as config:
settings = jsto(config.read())
assert isinstance(settings, dict), "expected dict"
irc_settings = settings.get("irc", dict())
self.set_state("settings", settings)
def load_irc_servers(self):
settings = self.get_state("settings")
if settings is None:
return None
irc_settings = settings.get("irc", None)
assert isinstance(irc_settings, dict), "expected dict"
servers = irc_settings.get("servers", None)
assert isinstance(servers, dict), "expected dict"
responses = list()
for name, server in servers.items():
#self.logger.debug((name, server))
servers[name]["require_met"] = CoroEvent()
host = server.get("host", None)
port = server.get("port", None)
timeout = server.get("timeout", None)
secure = server.get("secure", None)
response = self.add_irc_socket(name, host, port, timeout, secure)
responses.append(response)
return responses
def add_irc_socket(self, name, host, port, timeout=None, secure=False):
assert isinstance(name, str), "expected str"
assert isinstance(host, str), "expected str"
assert isinstance(port, int), "expected int"
sock = IRCSocketClient(host, port, timeout=timeout, secure=secure)
settings = dict()
settings["sock"] = sock
settings["host"] = host
settings["port"] = port
settings["timeout"] = timeout
settings["secure"] = secure
self._state["sockets"]["irc"][name] = settings
sock.start()
inbox = sock.queues.get("inbox")
outbox = sock.queues.get("outbox")
events = sock.queues.get("events")
return sock, inbox, outbox, events
self.queue_socket("irc", name)
response = dict()
response["name"] = name
response["sock"] = sock
response["inbox"] = sock.queues.get("inbox")
response["outbox"] = sock.queues.get("outbox")
response["events"] = sock.queues.get("events")
return response
def subscribe(self, hook_name):
key = sha256()
hook = self.hooks.attach(hook_name)
namespace, name = hook_name.split("/", 1)
if hook is not None:
event = CoroEvent()
cancel = self.unsubscribe(key, event)
gen_hook = generator(hook)
self._listeners[key] = gen_hook(self, cancel, event)
self._state["hooks"][namespace][name] = key
def subscribe_many(self, hook_names):
for hook_name in hook_names:
@ -54,36 +123,61 @@ class Composer:
pass
return cancel
#@debugger
def get_listeners(self, namespace="_none"):
assert isinstance(namespace, str), "Expected str"
hooks = self._state["hooks"][namespace]
keys = list(hooks.values())
#self.logger.debug(("LISTENERS", keys, self._state["hooks"]))
return ((key, self._listeners.get(key, None)) for key in keys)
@generator
def send_to_listeners(self, sock):
line = (yield)
for key in list(self._listeners):
listener = self._listeners.get(key, None)
def send_to_listeners(self):
namespace, sock_name, line = (yield)
sock = self.get_sock(sock_name)
listeners = self.get_listeners(namespace)
#self.logger.debug(("STL", sock_name, sock, listeners, line))
for key, listener in listeners:
#self.logger.debug(("KEY", sock_name, key, listener))
if listener is None:
return
continue
try:
meta = self.parse_message(line)
meta["socket"] = sock_name
listener.send((sock, meta))
except StopIteration:
cancel = self.unsubscribe(key)
cancel()
def get_messages(self, sock):
cache = ""
listeners = self.send_to_listeners(sock)
def queue_socket(self, namespace, sock_name):
self._queue.put((namespace, sock_name))
def get_messages(self):
socks = dict()
namespaces = dict()
listeners = self.send_to_listeners()
while not self.kill_switch.is_set():
for recv in sock.recv():
if recv is None or len(recv) == 0:
continue
if "\n" in recv:
lines = recv.split("\n")
listeners.send(cache + lines[0].strip("\r"))
for line in lines[1:-1]:
listeners.send(line.strip("\r"))
cache = lines[-1]
else:
cache = cache + recv
return cache
while True:
try:
namespace, name = self._queue.get_nowait()
sock = self.get_sock(name)
if sock is None:
self._queue.task_done()
continue
socks[name] = sock
namespaces[name] = namespace
#self.logger.debug(f"MSG: {namespace} {name}")
self._queue.task_done()
except Empty:
break
for name, sock in socks.items():
namespace = namespaces[name]
#self.logger.debug(("GET", namespace, name, sock))
for line in sock.recv():
if len(line) == 0:
continue
#self.logger.debug(f"[LINE]: {line}")
listeners.send((namespace, name, line))
# from section 2.3.1 of rfc1459
def parse_message(self, message):
@ -121,15 +215,19 @@ class Composer:
meta["user"] = user
meta["host"] = host
meta["command"] = command
if "capabilities" in self._state:
if "identify-msg" in self._state["capabilities"]:
if params[0] in ["+", "-"]:
meta["registered"] = params[0] == "+"
params = params[1:]
#capabilities = self.get_state("capabilities")
#if capabilities is not None:
# sock_caps = capabilities.get(sock_name, list())
# if "identify-msg" in sock_caps:
# if params[0] in ["+", "-"]:
# meta["registered"] = params[0] == "+"
# params = params[1:]
meta["params"] = params
return meta
def is_admin(self, meta, admin):
def is_admin(self, meta):
socket = meta["socket"]
admin = self.get_setting(f"irc/servers/{socket}/author")
tags = meta.get("tags", dict())
account = tags.get("account", None)
registered = meta.get("registered", None)

View File

@ -1,7 +1,7 @@
from abots.helpers import isnumeric
# from section 2.3.1 of rfc1459
def parse_message(message, caps=list()):
def parse_message(message):
meta = dict()
meta["raw"] = message
if message is None or len(message) == 0:
@ -36,10 +36,6 @@ def parse_message(message, caps=list()):
meta["user"] = user
meta["host"] = host
meta["command"] = command
if "identify-msg" in caps:
if params[0] in ["+", "-"]:
meta["registered"] = params[0] == "+"
params = params[1:]
meta["params"] = params
return meta

View File

@ -1,30 +1,26 @@
from ..utils import hooks
from abots.helpers import jsto, jots
from abots.events import CoroEvent
from abots.helpers import jots
from time import sleep
with open("settings.json", "r") as config:
irc_settings = jsto(config.read())
assert isinstance(irc_settings, dict), "Expected dict"
settings = irc_settings.get("irc", dict())
settings["require_met"] = CoroEvent()
def spindown(composer, sock, reason="Bye-bye!"):
composer.logger.critical("GOING DOWN!")
sock.send(f"QUIT {reason}")
sleep(sock.timeout)
sock.stop()
composer.kill_switch.set()
@hooks.register("debug")
@hooks.register("irc")
def log(composer, cancel, event):
sock, meta = (yield)
socket = meta["socket"]
if meta.get("command", None) == "ERROR":
composer.logger.error(meta.get("raw"))
event.set()
composer.logger.debug(meta.get("raw"))
prefix_client = composer.get_state("prefix-client")
if prefix_client is not None:
prefix_client.send(meta.get("raw"))
raw = meta.get("raw")
composer.logger.debug(f"[LOG:{socket}]: {raw}")
#prefix_client = composer.get_state("prefix-client")
#if prefix_client is not None:
# prefix_client.send(meta.get("raw"))
from .startup import *
from .basic import *

View File

@ -1,27 +1,26 @@
from . import hooks, spindown, settings, sleep
from . import hooks, spindown, sleep
author = settings["author"]
auto_join = settings["auto-join"]
@hooks.register("basic")
@hooks.register("irc")
def pivot(composer, cancel, event):
sock, meta = (yield)
socket = meta["socket"]
auto_join = composer.get_setting(f"irc/servers/{socket}/auto-join")
for channel in auto_join:
sock.send(f"JOIN {channel}")
composer.subscribe_many(["basic/quit", "basic/debug"])
composer.subscribe_many(["irc/quit", "irc/debug"])
@hooks.register("basic")
@hooks.register("irc")
def quit(composer, cancel, event):
sock, meta = (yield)
if meta.get("command", None) != "PRIVMSG":
return
source, message = composer.privmsg(meta)
if message == "!quit":
if composer.is_admin(meta, author):
if composer.is_admin(meta):
cancel()
spindown(composer, sock)
@hooks.register("basic")
@hooks.register("irc")
def debug(composer, cancel, event):
sock, meta = (yield)
if meta.get("command", None) != "PRIVMSG":

View File

@ -1,12 +1,6 @@
from . import hooks, spindown, settings, sleep
from . import hooks, spindown, sleep
name = settings["name"]
password = settings["password"]
requirements = settings["requirements"]
additional = settings["additional"]
require_met = settings["require_met"]
@hooks.register("startup")
@hooks.register("irc")
def init(composer, cancel, event):
sock, meta = (yield)
if meta.get("source", None) != "server":
@ -15,13 +9,14 @@ def init(composer, cancel, event):
return
if "hostname" not in meta.get("params", ""):
return
composer.logger.debug(f"INIT: {meta['raw']}")
cancel()
composer.subscribe("startup/caps")
composer.logger.debug(meta["raw"])
composer.subscribe("irc/caps")
#composer.logger.debug(meta["raw"])
sleep(0.01)
sock.send("CAP LS")
@hooks.register("startup")
@hooks.register("irc")
def caps(composer, cancel, event):
sock, meta = (yield)
if meta.get("source", None) != "server":
@ -29,7 +24,11 @@ def caps(composer, cancel, event):
if meta.get("command", None) != "CAP":
return
params = composer.cleanup(meta.get("params", ""))
socket = meta["socket"]
require_met = composer.get_setting(f"irc/servers/{socket}/require_met")
if "* LS" in params:
requirements = composer.get_setting("irc/requirements")
additional = composer.get_setting("irc/additional")
capabilities = params.split("* LS", 1)[1].strip()
composer.logger.debug(f"Capabilities: {capabilities}")
reqs = requirements[:]
@ -44,21 +43,24 @@ def caps(composer, cancel, event):
if not require_met.is_set():
cancel()
spindown(composer, sock, "requirements not met")
return
req_caps = " ".join(reqs + others)
composer.logger.debug(f"Requesting: {req_caps}")
composer.set_state("capabilities", req_caps)
composer.set_state(f"{socket}-capabilities", req_caps)
sock.send(f"CAP REQ :{req_caps}")
elif "* ACK" in params:
cancel()
composer.logger.debug(f"ACK!")
if require_met.is_set():
composer.subscribe("startup/identify")
socket = meta["socket"]
name = composer.get_setting(f"irc/servers/{socket}/name")
composer.subscribe("irc/identify")
sleep(0.01)
sock.send("CAP END")
sock.send(f"USER {name} - - -")
sock.send(f"NICK {name}")
@hooks.register("startup")
@hooks.register("irc")
def identify(composer, cancel, event):
sock, meta = (yield)
if meta.get("command", None) != "NOTICE":
@ -69,8 +71,10 @@ def identify(composer, cancel, event):
choose_nick = "choose a different nick"
if choose_nick not in params:
return
socket = meta["socket"]
password = composer.get_setting(f"irc/servers/{socket}/password")
sock.send(f"PRIVMSG NickServ :IDENTIFY {password}")
cancel()
sleep(0.01)
#composer.subscribe_many(["debug/log", "basic/pivot"])
composer.subscribe("basic/pivot")
#composer.subscribe_many(["irc/log", "irc/pivot"])
composer.subscribe("irc/pivot")

View File

@ -26,17 +26,21 @@ class IRCSocketClient(SocketClient):
else:
formatted = message
packaged = f"{formatted}\r\n"
#print(f"FORMAT: {packaged}")
return packaged.encode()
def recv(self):
data = self.cache
for buf in self._obtain("outbox"):
data = data + buf
if len(data) > 0:
if data[-1] != "\n":
data_buffer, separator, cache = data.rpartition("\n")
self.cache = cache
data = data_buffer + separator
else:
self.cache = ""
return data
if buf is not None and len(buf) > 0:
data = data + buf
if len(data) == 0:
return list()
#print(f"RECV: {data}")
if data[-1] != "\n":
data_buffer, separator, cache = data.rpartition("\n")
self.cache = cache
data = data_buffer + separator
else:
self.cache = ""
return (line.strip("\r") for line in data.split("\n"))

View File

@ -29,7 +29,11 @@ class Hooks:
return func(kind)
def _get(self, kind_name, name):
return self._run(kind_name, lambda k: access(k, name.split("/")))
return self._run(kind_name, lambda k: access(k, name.split("/", 1)))
def gather(self, namespace="_none"):
assert isinstance(namespace, str), "Expected str"
return self._hooks[namespace]
def attach(self, hook_name):
if "/" not in hook_name: