from asyncio import get_event_loop, sleep, wait, Queue async def push(loop, queues): print("PUSH") while all([queue for queue in queues]): send = queues.get("push") receive = queues.get("pull") event = await send.get() print("PUSH", event) assert isinstance(event, tuple), f"Expected tuple: {event}" if len(event) == 2: error, response = event if error: break elif len(event) == 4: where, context, old_value, new_value = event await receive.put(event) await sleep(0) return async def pull(loop, queues): plugins = dict() print("PULL") while all([queue for queue in queues]): hook = queues.get("hook") send = queues.get("push") receive = queues.get("pull") while not hook.empty(): plugin = await hook.get() print("PULL", plugin) assert callable(plugin), f"Expected function: {plugin}" plugins[plugin.__name__.lower()] = plugin event = await receive.get() print("PULL", event) assert isinstance(event, tuple), f"Expected tuple: {event}" assert len(event) == 4, f"Invalid format: {event}" where, context, old_value, new_value = event stop = False for plugin_name, plugin in plugins.items(): if plugin_name == where: result = plugin(context, old_value, new_value) assert isinstance(result, tuple), f"Expected tuple: {result}" assert len(result) == 2, f"Invalid format: {result}" await send.put(result) error, response = result if error: stop = True break if stop: break await sleep(0) return def test(context, old_value, new_value): status = old_value == new_value return True, status async def main(loop): tasks = list() queues = dict() queues["hook"] = Queue() queues["push"] = Queue() queues["pull"] = Queue() await queues["hook"].put(test) await queues["push"].put(("test", "change", None, True)) tasks.append(loop.create_task(pull(loop, queues))) tasks.append(loop.create_task(push(loop, queues))) await wait(tasks) loop.stop() loop = get_event_loop() try: loop.create_task(main(loop)) loop.run_forever() finally: loop.close()