Migrated event loop from gevent to asyncio

This commit is contained in:
aewens 2019-08-27 21:25:33 -05:00
parent 2e0af94286
commit bd21d1501e
1 changed files with 40 additions and 26 deletions

View File

@ -1,23 +1,12 @@
from gevent import Greenlet, spawn, sleep, joinall
from gevent.queue import Queue
from socket import socket, timeout as sock_timeout
from socket import AF_INET, SOCK_STREAM, SOL_SOCKET, SO_REUSEADDR
from ssl import wrap_socket
from asyncio import get_event_loop, sleep, wait, Queue
queues = dict()
queues["hook"] = Queue()
queues["push"] = Queue()
queues["pull"] = Queue()
def test(context, old_value, new_value):
status = old_value == new_value
return True, status
def push(queues):
async def push(loop, queues):
print("PUSH")
while all([queue for queue in queues]):
send = queues.get("push")
receive = queues.get("pull")
event = send.get()
event = await send.get()
print("PUSH", event)
assert isinstance(event, tuple), f"Expected tuple: {event}"
if len(event) == 2:
error, response = event
@ -26,22 +15,27 @@ def push(queues):
elif len(event) == 4:
where, context, old_value, new_value = event
receive.put(event)
await receive.put(event)
await sleep(0)
return
def pull(queues):
async def pull(loop, queues):
plugins = dict()
print("PULL")
while all([queue for queue in queues]):
hook = queues.get("hook")
send = queues.get("push")
receive = queues.get("pull")
while not hook.empty():
plugin = hook.get()
plugin = await hook.get()
print("PULL", plugin)
assert callable(plugin), f"Expected function: {plugin}"
plugins[plugin.__name__.lower()] = plugin
event = receive.get()
event = await receive.get()
print("PULL", event)
assert isinstance(event, tuple), f"Expected tuple: {event}"
assert len(event) == 4, f"Invalid format: {event}"
where, context, old_value, new_value = event
@ -52,7 +46,7 @@ def pull(queues):
result = plugin(context, old_value, new_value)
assert isinstance(result, tuple), f"Expected tuple: {result}"
assert len(result) == 2, f"Invalid format: {result}"
send.put(result)
await send.put(result)
error, response = result
if error:
stop = True
@ -61,11 +55,31 @@ def pull(queues):
if stop:
break
await sleep(0)
return
queues["hook"].put(test)
queues["push"].put(("test", "change", None, True))
def test(context, old_value, new_value):
status = old_value == new_value
return True, status
yin = spawn(push, queues)
yang = spawn(pull, queues)
start = lambda: joinall([yang, yin])
async def main(loop):
queues = dict()
queues["hook"] = Queue()
queues["push"] = Queue()
queues["pull"] = Queue()
await queues["hook"].put(test)
await queues["push"].put(("test", "change", None, True))
yang = loop.create_task(pull(loop, queues))
yin = loop.create_task(push(loop, queues))
await wait([yin, yang])
loop.stop()
loop = get_event_loop()
try:
loop.create_task(main(loop))
loop.run_forever()
finally:
loop.close()