Skip to content

Commit

Permalink
Get rid of deprecate warning for apply the custom event loop policy
Browse files Browse the repository at this point in the history
  • Loading branch information
unkcpz committed Jan 13, 2025
1 parent 99699e1 commit b9488ed
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
5 changes: 2 additions & 3 deletions src/plumpy/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@


def create_running_loop():
poly = asyncio.get_event_loop_policy()
# print(poly)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

Expand Down Expand Up @@ -67,7 +65,8 @@ def set_event_loop_policy() -> None:

def reset_event_loop_policy() -> None:
"""Reset the event loop policy to the default."""
loop = get_event_loop()

loop = asyncio.get_event_loop()

cls = loop.__class__

Expand Down
12 changes: 12 additions & 0 deletions tests/rmq/test_process_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def sync_controller(thread_communicator: rmq.RmqThreadCommunicator):

class TestRemoteProcessController:
@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_pause(self, thread_communicator, async_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
# Run the process in the background
Expand All @@ -56,6 +57,7 @@ async def test_pause(self, thread_communicator, async_controller):
assert proc.paused

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_play(self, thread_communicator, async_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
# Run the process in the background
Expand All @@ -74,6 +76,7 @@ async def test_play(self, thread_communicator, async_controller):
await async_controller.kill_process(proc.pid)

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_kill(self, thread_communicator, async_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
# Run the process in the event loop
Expand All @@ -87,6 +90,7 @@ async def test_kill(self, thread_communicator, async_controller):
assert proc.state == plumpy.ProcessState.KILLED

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_status(self, thread_communicator, async_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
# Run the process in the background
Expand All @@ -100,6 +104,7 @@ async def test_status(self, thread_communicator, async_controller):
# make sure proc reach the final state
await async_controller.kill_process(proc.pid)

@pytest.mark.usefixtures('custom_event_loop_policy')
def test_broadcast(self, thread_communicator):
messages = []

Expand All @@ -122,6 +127,7 @@ def on_broadcast_receive(**msg):

class TestRemoteProcessThreadController:
@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_pause(self, thread_communicator, sync_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)

Expand All @@ -136,6 +142,7 @@ async def test_pause(self, thread_communicator, sync_controller):
assert proc.paused

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_pause_all(self, thread_communicator, sync_controller):
"""Test pausing all processes on a communicator"""
procs = []
Expand All @@ -147,6 +154,7 @@ async def test_pause_all(self, thread_communicator, sync_controller):
await utils.wait_util(lambda: all([proc.paused for proc in procs]))

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_play_all(self, thread_communicator, sync_controller):
"""Test pausing all processes on a communicator"""
procs = []
Expand All @@ -161,6 +169,7 @@ async def test_play_all(self, thread_communicator, sync_controller):
await utils.wait_util(lambda: all([not proc.paused for proc in procs]))

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_play(self, thread_communicator, sync_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
assert proc.pause()
Expand All @@ -175,6 +184,7 @@ async def test_play(self, thread_communicator, sync_controller):
assert proc.state == plumpy.ProcessState.CREATED

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_kill(self, thread_communicator, sync_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)

Expand All @@ -189,6 +199,7 @@ async def test_kill(self, thread_communicator, sync_controller):
assert proc.state == plumpy.ProcessState.KILLED

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_kill_all(self, thread_communicator, sync_controller):
"""Test pausing all processes on a communicator"""
procs = []
Expand All @@ -200,6 +211,7 @@ async def test_kill_all(self, thread_communicator, sync_controller):
assert all([proc.state == plumpy.ProcessState.KILLED for proc in procs])

@pytest.mark.asyncio
@pytest.mark.usefixtures('custom_event_loop_policy')
async def test_status(self, thread_communicator, sync_controller):
proc = utils.WaitForSignalProcess(communicator=thread_communicator)
# Run the process in the background
Expand Down
7 changes: 6 additions & 1 deletion tests/test_communications.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def subscriber():
"""Return an instance of `Subscriber`."""
return Subscriber()


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_add_rpc_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.add_rpc_subscriber` method."""
assert loop_communicator.add_rpc_subscriber(subscriber) is not None
Expand All @@ -45,12 +45,14 @@ def test_add_rpc_subscriber(loop_communicator, subscriber):
assert loop_communicator.add_rpc_subscriber(subscriber, identifier) == identifier


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_remove_rpc_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.remove_rpc_subscriber` method."""
identifier = loop_communicator.add_rpc_subscriber(subscriber)
loop_communicator.remove_rpc_subscriber(identifier)


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_add_broadcast_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.add_broadcast_subscriber` method."""
assert loop_communicator.add_broadcast_subscriber(subscriber) is not None
Expand All @@ -59,17 +61,20 @@ def test_add_broadcast_subscriber(loop_communicator, subscriber):
assert loop_communicator.add_broadcast_subscriber(subscriber, identifier) == identifier


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_remove_broadcast_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.remove_broadcast_subscriber` method."""
identifier = loop_communicator.add_broadcast_subscriber(subscriber)
loop_communicator.remove_broadcast_subscriber(identifier)


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_add_task_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.add_task_subscriber` method."""
assert loop_communicator.add_task_subscriber(subscriber) is not None


@pytest.mark.usefixtures('custom_event_loop_policy')
def test_remove_task_subscriber(loop_communicator, subscriber):
"""Test the `LoopCommunicator.remove_task_subscriber` method."""
identifier = loop_communicator.add_task_subscriber(subscriber)
Expand Down

0 comments on commit b9488ed

Please sign in to comment.