From b9488eda59df26ff9ac5a4f45d4fa288ce78e6bd Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Mon, 13 Jan 2025 12:36:19 +0100 Subject: [PATCH] Get rid of deprecate warning for apply the custom event loop policy --- src/plumpy/events.py | 5 ++--- tests/rmq/test_process_comms.py | 12 ++++++++++++ tests/test_communications.py | 7 ++++++- 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/plumpy/events.py b/src/plumpy/events.py index 6b0e1ec1..6cebffdf 100644 --- a/src/plumpy/events.py +++ b/src/plumpy/events.py @@ -23,8 +23,6 @@ def create_running_loop(): - poly = asyncio.get_event_loop_policy() - # print(poly) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -67,7 +65,8 @@ def set_event_loop_policy() -> None: def reset_event_loop_policy() -> None: """Reset the event loop policy to the default.""" - loop = get_event_loop() + + loop = asyncio.get_event_loop() cls = loop.__class__ diff --git a/tests/rmq/test_process_comms.py b/tests/rmq/test_process_comms.py index 7a03fac4..5f1cc40d 100644 --- a/tests/rmq/test_process_comms.py +++ b/tests/rmq/test_process_comms.py @@ -44,6 +44,7 @@ def sync_controller(thread_communicator: rmq.RmqThreadCommunicator): class TestRemoteProcessController: @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_pause(self, thread_communicator, async_controller): proc = utils.WaitForSignalProcess(communicator=thread_communicator) # Run the process in the background @@ -56,6 +57,7 @@ async def test_pause(self, thread_communicator, async_controller): assert proc.paused @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_play(self, thread_communicator, async_controller): proc = utils.WaitForSignalProcess(communicator=thread_communicator) # Run the process in the background @@ -74,6 +76,7 @@ async def test_play(self, thread_communicator, async_controller): await async_controller.kill_process(proc.pid) @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_kill(self, thread_communicator, async_controller): proc = utils.WaitForSignalProcess(communicator=thread_communicator) # Run the process in the event loop @@ -87,6 +90,7 @@ async def test_kill(self, thread_communicator, async_controller): assert proc.state == plumpy.ProcessState.KILLED @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_status(self, thread_communicator, async_controller): proc = utils.WaitForSignalProcess(communicator=thread_communicator) # Run the process in the background @@ -100,6 +104,7 @@ async def test_status(self, thread_communicator, async_controller): # make sure proc reach the final state await async_controller.kill_process(proc.pid) + @pytest.mark.usefixtures('custom_event_loop_policy') def test_broadcast(self, thread_communicator): messages = [] @@ -122,6 +127,7 @@ def on_broadcast_receive(**msg): class TestRemoteProcessThreadController: @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_pause(self, thread_communicator, sync_controller): proc = utils.WaitForSignalProcess(communicator=thread_communicator) @@ -136,6 +142,7 @@ async def test_pause(self, thread_communicator, sync_controller): assert proc.paused @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_pause_all(self, thread_communicator, sync_controller): """Test pausing all processes on a communicator""" procs = [] @@ -147,6 +154,7 @@ async def test_pause_all(self, thread_communicator, sync_controller): await utils.wait_util(lambda: all([proc.paused for proc in procs])) @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_play_all(self, thread_communicator, sync_controller): """Test pausing all processes on a communicator""" procs = [] @@ -161,6 +169,7 @@ async def test_play_all(self, thread_communicator, sync_controller): await utils.wait_util(lambda: all([not proc.paused for proc in procs])) @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_play(self, thread_communicator, sync_controller): proc = utils.WaitForSignalProcess(communicator=thread_communicator) assert proc.pause() @@ -175,6 +184,7 @@ async def test_play(self, thread_communicator, sync_controller): assert proc.state == plumpy.ProcessState.CREATED @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_kill(self, thread_communicator, sync_controller): proc = utils.WaitForSignalProcess(communicator=thread_communicator) @@ -189,6 +199,7 @@ async def test_kill(self, thread_communicator, sync_controller): assert proc.state == plumpy.ProcessState.KILLED @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_kill_all(self, thread_communicator, sync_controller): """Test pausing all processes on a communicator""" procs = [] @@ -200,6 +211,7 @@ async def test_kill_all(self, thread_communicator, sync_controller): assert all([proc.state == plumpy.ProcessState.KILLED for proc in procs]) @pytest.mark.asyncio + @pytest.mark.usefixtures('custom_event_loop_policy') async def test_status(self, thread_communicator, sync_controller): proc = utils.WaitForSignalProcess(communicator=thread_communicator) # Run the process in the background diff --git a/tests/test_communications.py b/tests/test_communications.py index f7e04255..699092e7 100644 --- a/tests/test_communications.py +++ b/tests/test_communications.py @@ -36,7 +36,7 @@ def subscriber(): """Return an instance of `Subscriber`.""" return Subscriber() - +@pytest.mark.usefixtures('custom_event_loop_policy') def test_add_rpc_subscriber(loop_communicator, subscriber): """Test the `LoopCommunicator.add_rpc_subscriber` method.""" assert loop_communicator.add_rpc_subscriber(subscriber) is not None @@ -45,12 +45,14 @@ def test_add_rpc_subscriber(loop_communicator, subscriber): assert loop_communicator.add_rpc_subscriber(subscriber, identifier) == identifier +@pytest.mark.usefixtures('custom_event_loop_policy') def test_remove_rpc_subscriber(loop_communicator, subscriber): """Test the `LoopCommunicator.remove_rpc_subscriber` method.""" identifier = loop_communicator.add_rpc_subscriber(subscriber) loop_communicator.remove_rpc_subscriber(identifier) +@pytest.mark.usefixtures('custom_event_loop_policy') def test_add_broadcast_subscriber(loop_communicator, subscriber): """Test the `LoopCommunicator.add_broadcast_subscriber` method.""" assert loop_communicator.add_broadcast_subscriber(subscriber) is not None @@ -59,17 +61,20 @@ def test_add_broadcast_subscriber(loop_communicator, subscriber): assert loop_communicator.add_broadcast_subscriber(subscriber, identifier) == identifier +@pytest.mark.usefixtures('custom_event_loop_policy') def test_remove_broadcast_subscriber(loop_communicator, subscriber): """Test the `LoopCommunicator.remove_broadcast_subscriber` method.""" identifier = loop_communicator.add_broadcast_subscriber(subscriber) loop_communicator.remove_broadcast_subscriber(identifier) +@pytest.mark.usefixtures('custom_event_loop_policy') def test_add_task_subscriber(loop_communicator, subscriber): """Test the `LoopCommunicator.add_task_subscriber` method.""" assert loop_communicator.add_task_subscriber(subscriber) is not None +@pytest.mark.usefixtures('custom_event_loop_policy') def test_remove_task_subscriber(loop_communicator, subscriber): """Test the `LoopCommunicator.remove_task_subscriber` method.""" identifier = loop_communicator.add_task_subscriber(subscriber)