Skip to content

Commit

Permalink
Refactor notify_link_state method to be event driven
Browse files Browse the repository at this point in the history
`notify_link_state()` method is doing a busy loop. Instead we can use
synchronization primitives to make the code event driven and remove
unnecessary sleep.
  • Loading branch information
tomasz-grz committed Feb 12, 2025
1 parent 36bd540 commit 1051cc3
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 32 deletions.
25 changes: 18 additions & 7 deletions nat-lab/tests/telio.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Runtime:
_error_events: List[ErrorEvent]
_started_tasks: List[str]
_stopped_tasks: List[str]
_peer_update_event: asyncio.Event
allowed_pub_keys: Set[str]

def __init__(self) -> None:
Expand All @@ -62,6 +63,7 @@ def __init__(self) -> None:
self._error_events = []
self._started_tasks = []
self._stopped_tasks = []
self._peer_update_event = asyncio.Event()
self.allowed_pub_keys = set()

async def handle_output_line(self, line) -> bool:
Expand All @@ -85,6 +87,8 @@ def _handle_task_information(self, line) -> bool:
def handle_event(self, event: Event):
if isinstance(event, Event.NODE):
self._handle_node_event(event.body)
# signal that there is a new event
self._peer_update_event.set()
elif isinstance(event, Event.RELAY):
self._handle_derp_event(event.body)
elif isinstance(event, Event.ERROR):
Expand Down Expand Up @@ -131,12 +135,13 @@ async def notify_peer_state(
await asyncio.sleep(0.1)

async def notify_link_state(self, public_key: str, states: List[LinkState]) -> None:
"""Busy wait until the last link_state event matches one of the `states` for `public_key`."""
"""Wait until a new link_state event matching one of the `states` for `public_key` is available."""
while True:
peer = self.get_peer_info(public_key)
if peer and peer.link_state in states:
return
await asyncio.sleep(0.1)
self._peer_update_event.clear()
await self._peer_update_event.wait()

async def notify_peer_event(
self,
Expand Down Expand Up @@ -271,13 +276,13 @@ async def wait_for_state_peer(
timeout,
)

async def wait_for_link_state(
async def wait_for_new_link_state(
self,
public_key: str,
state: List[LinkState],
timeout: Optional[float] = None,
) -> None:
"""Wait until the last link_state event matches one of the `states` for `public_key`."""
"""Wait until a new link_state event matching one of the `states` for `public_key` is available."""
await asyncio.wait_for(
self._runtime.notify_link_state(public_key, state), timeout
)
Expand All @@ -299,6 +304,9 @@ async def wait_for_event_peer(
def get_link_state_events(self, public_key: str) -> List[Optional[LinkState]]:
return self._runtime.get_link_state_events(public_key)

def get_last_link_state_event(self, public_key: str) -> Optional[LinkState]:
return self._runtime.get_link_state_events(public_key)[-1]

async def wait_for_state_derp(
self, server_ip: str, states: List[RelayState], timeout: Optional[float] = None
) -> None:
Expand Down Expand Up @@ -569,14 +577,14 @@ async def wait_for_state_peer(
link_state,
)

async def wait_for_link_state(
async def wait_for_new_link_state(
self,
public_key: str,
states: List[LinkState],
timeout: Optional[float] = None,
) -> None:
"""Wait until the last link_state event matches one of the `states` for `public_key`."""
await self.get_events().wait_for_link_state(
"""Wait until a new link_state event matching one of the `states` for `public_key` is available."""
await self.get_events().wait_for_new_link_state(
public_key,
states,
timeout,
Expand Down Expand Up @@ -607,6 +615,9 @@ async def wait_for_event_peer(
def get_link_state_events(self, public_key: str) -> List[Optional[LinkState]]:
return self.get_events().get_link_state_events(public_key)

def get_last_link_state_event(self, public_key: str) -> Optional[LinkState]:
return self.get_events().get_last_link_state_event(public_key)

async def wait_for_state_derp(
self, derp_ip, states: List[RelayState], timeout: Optional[float] = None
) -> None:
Expand Down
57 changes: 32 additions & 25 deletions nat-lab/tests/test_events_link_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,20 +155,20 @@ async def test_event_link_state_peers_idle_all_time(
alpha, beta = env.nodes
client_alpha, client_beta = env.clients

await client_alpha.wait_for_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_link_state(alpha.public_key, [LinkState.UP])
await client_alpha.wait_for_new_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_new_link_state(alpha.public_key, [LinkState.UP])

# Expect no link event while peers are idle
with pytest.raises(asyncio.TimeoutError):
await wait_for_any_with_timeout(
[
asyncio.create_task(
client_alpha.wait_for_link_state(
client_alpha.wait_for_new_link_state(
beta.public_key, [LinkState.DOWN]
)
),
asyncio.create_task(
client_beta.wait_for_link_state(
client_beta.wait_for_new_link_state(
alpha.public_key, [LinkState.DOWN]
)
),
Expand All @@ -190,15 +190,22 @@ async def test_event_link_state_peers_exchanging_data_for_a_long_time(
conn.connection for conn in env.connections
]

await client_alpha.wait_for_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_link_state(alpha.public_key, [LinkState.UP])
await client_alpha.wait_for_new_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_new_link_state(alpha.public_key, [LinkState.UP])

for _ in range(0, 40):
await asyncio.sleep(1)
await ping(connection_alpha, beta.ip_addresses[0])
await ping(connection_beta, alpha.ip_addresses[0])
await client_alpha.wait_for_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_link_state(alpha.public_key, [LinkState.UP])

assert (
client_alpha.get_link_state_events(beta.public_key).count(LinkState.DOWN)
== 1
)
assert (
client_beta.get_link_state_events(alpha.public_key).count(LinkState.DOWN)
== 1
)


@pytest.mark.asyncio
Expand All @@ -214,8 +221,8 @@ async def test_event_link_state_peers_exchanging_data_then_idling_then_resume(
conn.connection for conn in env.connections
]

await client_alpha.wait_for_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_link_state(alpha.public_key, [LinkState.UP])
await client_alpha.wait_for_new_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_new_link_state(alpha.public_key, [LinkState.UP])

await ping(connection_alpha, beta.ip_addresses[0])
await ping(connection_beta, alpha.ip_addresses[0])
Expand All @@ -225,12 +232,12 @@ async def test_event_link_state_peers_exchanging_data_then_idling_then_resume(
await wait_for_any_with_timeout(
[
asyncio.create_task(
client_alpha.wait_for_link_state(
client_alpha.wait_for_new_link_state(
beta.public_key, [LinkState.DOWN]
)
),
asyncio.create_task(
client_beta.wait_for_link_state(
client_beta.wait_for_new_link_state(
alpha.public_key, [LinkState.DOWN]
)
),
Expand All @@ -246,12 +253,12 @@ async def test_event_link_state_peers_exchanging_data_then_idling_then_resume(
await wait_for_any_with_timeout(
[
asyncio.create_task(
client_alpha.wait_for_link_state(
client_alpha.wait_for_new_link_state(
beta.public_key, [LinkState.DOWN]
)
),
asyncio.create_task(
client_beta.wait_for_link_state(
client_beta.wait_for_new_link_state(
alpha.public_key, [LinkState.DOWN]
)
),
Expand All @@ -260,8 +267,8 @@ async def test_event_link_state_peers_exchanging_data_then_idling_then_resume(
)

# Expect the links are still UP
await client_alpha.wait_for_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_link_state(alpha.public_key, [LinkState.UP])
assert client_alpha.get_last_link_state_event(beta.public_key) == LinkState.UP
assert client_beta.get_last_link_state_event(alpha.public_key) == LinkState.UP


@pytest.mark.asyncio
Expand All @@ -277,8 +284,8 @@ async def test_event_link_state_peer_goes_offline(
conn.connection for conn in env.connections
]

await client_alpha.wait_for_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_link_state(alpha.public_key, [LinkState.UP])
await client_alpha.wait_for_new_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_new_link_state(alpha.public_key, [LinkState.UP])

await ping(connection_alpha, beta.ip_addresses[0])
await ping(connection_beta, alpha.ip_addresses[0])
Expand All @@ -290,16 +297,16 @@ async def test_event_link_state_peer_goes_offline(

# Expect the link to still be UP for the fist 10 seconds
with pytest.raises(asyncio.TimeoutError):
await client_alpha.wait_for_link_state(
await client_alpha.wait_for_new_link_state(
beta.public_key, [LinkState.DOWN], 10
)

# Expect the link down event
# It should arrive in 11-15 seconds after the link is cut and ping mod disabled
# And 22-25 seconds if the ping mod is enabled
await client_alpha.wait_for_link_state(beta.public_key, [LinkState.DOWN])
await client_alpha.wait_for_new_link_state(beta.public_key, [LinkState.DOWN])
# Although the beta device has been stopped, it should still see alpha as up
await client_beta.wait_for_link_state(alpha.public_key, [LinkState.UP])
assert client_beta.get_last_link_state_event(alpha.public_key) == LinkState.UP


@pytest.mark.asyncio
Expand Down Expand Up @@ -377,8 +384,8 @@ async def test_event_link_state_peer_doesnt_respond(
conn.connection for conn in env.connections
]

await client_alpha.wait_for_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_link_state(alpha.public_key, [LinkState.UP])
await client_alpha.wait_for_new_link_state(beta.public_key, [LinkState.UP])
await client_beta.wait_for_new_link_state(alpha.public_key, [LinkState.UP])

async with ICMP_control(connection_beta):
with pytest.raises(asyncio.TimeoutError):
Expand All @@ -390,12 +397,12 @@ async def test_event_link_state_peer_doesnt_respond(
await wait_for_any_with_timeout(
[
asyncio.create_task(
client_alpha.wait_for_link_state(
client_alpha.wait_for_new_link_state(
beta.public_key, [LinkState.DOWN]
)
),
asyncio.create_task(
client_beta.wait_for_link_state(
client_beta.wait_for_new_link_state(
alpha.public_key, [LinkState.DOWN]
)
),
Expand Down

0 comments on commit 1051cc3

Please sign in to comment.