133 lines
3.8 KiB
Python
133 lines
3.8 KiB
Python
from abots.helpers import generator, coroutine, infinitedict
|
|
|
|
from asyncio import get_event_loop, sleep, wait, Queue
|
|
from socket import socket, timeout as sock_timeout
|
|
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
|
|
from ssl import wrap_socket
|
|
from time import sleep as nap
|
|
|
|
async def handle_client(loop, client):
|
|
while True:
|
|
try:
|
|
request = await loop.sock_recv(client, 256)
|
|
except Exception as e:
|
|
continue
|
|
|
|
if request:
|
|
message = request.decode()
|
|
print(message)
|
|
|
|
await sleep(0)
|
|
|
|
client.close()
|
|
|
|
async def run_server(loop, server):
|
|
while True:
|
|
client, _ = await loop.sock_accept(server)
|
|
loop.create_task(handle_client(loop, client))
|
|
await sleep(0)
|
|
|
|
async def push(loop, queues):
|
|
while all([queue for queue in queues]):
|
|
send = queues.get("push")
|
|
receive = queues.get("pull")
|
|
event = await send.get()
|
|
print("PUSH", event)
|
|
assert isinstance(event, tuple), f"Expected tuple: {event}"
|
|
if len(event) == 2:
|
|
error, response = event
|
|
if error:
|
|
break
|
|
|
|
elif len(event) == 4:
|
|
where, context, old_value, new_value = event
|
|
await receive.put(event)
|
|
|
|
await sleep(0)
|
|
|
|
return
|
|
|
|
async def pull(loop, queues):
|
|
plugins = dict()
|
|
print("PULL")
|
|
while all([queue for queue in queues]):
|
|
hook = queues.get("hook")
|
|
send = queues.get("push")
|
|
receive = queues.get("pull")
|
|
while not hook.empty():
|
|
plugin = await hook.get()
|
|
print("PULL", plugin)
|
|
assert callable(plugin), f"Expected function: {plugin}"
|
|
plugins[plugin.__name__.lower()] = plugin
|
|
|
|
event = await receive.get()
|
|
print("PULL", event)
|
|
assert isinstance(event, tuple), f"Expected tuple: {event}"
|
|
assert len(event) == 4, f"Invalid format: {event}"
|
|
where, context, old_value, new_value = event
|
|
|
|
stop = False
|
|
for plugin_name, plugin in plugins.items():
|
|
if plugin_name == where:
|
|
result = plugin(context, old_value, new_value)
|
|
assert isinstance(result, tuple), f"Expected tuple: {result}"
|
|
assert len(result) == 2, f"Invalid format: {result}"
|
|
await send.put(result)
|
|
error, response = result
|
|
if error:
|
|
stop = True
|
|
break
|
|
|
|
if stop:
|
|
break
|
|
|
|
await sleep(0)
|
|
|
|
return
|
|
|
|
def test(context, old_value, new_value):
|
|
status = old_value == new_value
|
|
return True, status
|
|
|
|
async def main(loop):
|
|
tasks = list()
|
|
queues = dict()
|
|
queues["hook"] = Queue()
|
|
queues["push"] = Queue()
|
|
queues["pull"] = Queue()
|
|
|
|
server = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
|
server.setblocking(False)
|
|
server.settimeout(1)
|
|
server.connect(("", 6697))
|
|
|
|
await queues["hook"].put(test)
|
|
await queues["push"].put(("test", "change", None, True))
|
|
|
|
tasks.append(loop.create_task(handle_client(loop, server)))
|
|
tasks.append(loop.create_task(pull(loop, queues)))
|
|
tasks.append(loop.create_task(push(loop, queues)))
|
|
await wait(tasks)
|
|
loop.stop()
|
|
|
|
loop = get_event_loop()
|
|
try:
|
|
#loop.create_task(main(loop))
|
|
#server = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
|
server = socket(AF_INET, SOCK_STREAM)
|
|
server.setblocking(False)
|
|
#server.settimeout(1)
|
|
server.bind(("localhost", 10201))
|
|
server.listen(5)
|
|
loop.create_task(run_server(loop, server))
|
|
loop.run_forever()
|
|
finally:
|
|
loop.close()
|
|
|
|
#server = wrap_socket(socket(AF_INET, SOCK_STREAM))
|
|
#server.connect(("irc.tilde.chat", 6697))
|
|
#while True:
|
|
# print(server.recv(4096))
|
|
# res = input("> ")
|
|
# server.send(f"{res}\r\n".encode())
|