Skip to content

Commit

Permalink
Detach SessionKeeper from direct feature
Browse files Browse the repository at this point in the history
SessionKeeper initially was used only for direct
connections so it made sense to be only present when
direct feature was also present. However now SessionKeeper
is used additionally by proxying, stun and vpn peers
which makes it more generic.

Without this detachment, enabling batching feature
requires direct feature to be enabled as well or else
libtelio will fail to work(for example derp disconnects
will not be detected since no keepalive will happen).

This commit detaches SessionKeeper from direct entities
and makes it always present.

Signed-off-by: Lukas Pukenis <[email protected]>
  • Loading branch information
LukasPukenis committed Nov 29, 2024
1 parent e949d77 commit e11fb52
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 50 deletions.
119 changes: 107 additions & 12 deletions nat-lab/tests/test_derp_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
from config import DERP_PRIMARY, DERP_SECONDARY, DERP_TERTIARY, DERP_SERVERS
from contextlib import AsyncExitStack
from copy import deepcopy
from helpers import SetupParameters, setup_mesh_nodes
from helpers import SetupParameters, setup_mesh_nodes, Features
from typing import List
from utils.bindings import RelayState
from utils.bindings import RelayState, FeatureBatching, default_features
from utils.connection_util import ConnectionTag
from utils.ping import ping

Expand All @@ -15,13 +15,46 @@
DERP3_IP = str(DERP_TERTIARY.ipv4)


def generate_features(batching: bool):
features = default_features()
features.wireguard.persistent_keepalive.proxying = 10
features.batching = (
FeatureBatching(
# TODO: the parameter name doesn't make sense anymore since it's been
# reused for VPN, STUN, PROXY thresholds as well not only for direct. It should be renamed.
direct_connection_threshold=5,
trigger_cooldown_duration=60,
trigger_effective_duration=10,
)
if batching
else None
)
return features


@pytest.mark.asyncio
@pytest.mark.parametrize(
"setup_params",
[
[
SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1),
SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1,
features=generate_features(batching=False),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2,
features=generate_features(batching=False),
),
],
[
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1,
features=generate_features(batching=True),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2,
features=generate_features(batching=True),
),
],
],
)
Expand Down Expand Up @@ -83,9 +116,32 @@ async def test_derp_reconnect_2clients(setup_params: List[SetupParameters]) -> N
"setup_params",
[
[
SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1),
SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2),
SetupParameters(connection_tag=ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1,
features=generate_features(batching=False),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2,
features=generate_features(batching=False),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1,
features=generate_features(batching=False),
),
],
[
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1,
features=generate_features(batching=True),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2,
features=generate_features(batching=True),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1,
features=generate_features(batching=True),
),
],
],
)
Expand Down Expand Up @@ -224,9 +280,32 @@ async def test_derp_reconnect_3clients(setup_params: List[SetupParameters]) -> N
"setup_params",
[
[
SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1),
SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2),
SetupParameters(connection_tag=ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1,
features=generate_features(batching=False),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2,
features=generate_features(batching=False),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1,
features=generate_features(batching=False),
),
],
[
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1,
features=generate_features(batching=True),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2,
features=generate_features(batching=True),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1,
features=generate_features(batching=True),
),
],
],
)
Expand Down Expand Up @@ -409,8 +488,24 @@ async def test_derp_restart(setup_params: List[SetupParameters]) -> None:
"setup_params",
[
[
SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1),
SetupParameters(connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1,
features=generate_features(batching=False),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2,
features=generate_features(batching=False),
),
],
[
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_1,
features=generate_features(batching=True),
),
SetupParameters(
connection_tag=ConnectionTag.DOCKER_CONE_CLIENT_2,
features=generate_features(batching=True),
),
],
],
)
Expand Down
68 changes: 30 additions & 38 deletions src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,9 @@ pub struct MeshnetEntities {

// Starcast components for multicast support.
starcast: Option<StarcastEntities>,

// Keepalive sender
session_keeper: Arc<SessionKeeper>,
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -320,9 +323,7 @@ impl Entities {
}

pub fn session_keeper(&self) -> Option<&Arc<SessionKeeper>> {
self.meshnet
.left()
.and_then(|m| m.direct.as_ref().map(|d| &d.session_keeper))
self.meshnet.left().map(|m| &m.session_keeper)
}

fn endpoint_providers(&self) -> Vec<&Arc<dyn EndpointProvider>> {
Expand Down Expand Up @@ -368,9 +369,6 @@ pub struct DirectEntities {

// Meshnet WG Connection upgrade synchronization
upgrade_sync: Arc<UpgradeSync>,

// Keepalive sender
session_keeper: Arc<SessionKeeper>,
}

pub struct EventListeners {
Expand Down Expand Up @@ -1005,10 +1003,9 @@ impl MeshnetEntities {
if let Some(upnp) = direct.upnp_endpoint_provider {
stop_arc_entity!(upnp, "UpnpEndpointProvider");
}

stop_arc_entity!(direct.session_keeper, "SessionKeeper");
}

stop_arc_entity!(self.session_keeper, "SessionKeeper");
stop_arc_entity!(self.multiplexer, "Multiplexer");
stop_arc_entity!(self.derp, "Derp");

Expand Down Expand Up @@ -1452,30 +1449,14 @@ impl Runtime {
self.requested_state.device_config.private_key.public(),
)?);

match SessionKeeper::start(
self.entities.socket_pool.clone(),
self.features.batching.unwrap_or_default(),
self.entities
.wireguard_interface
.subscribe_to_network_activity()
.await?,
)
.map(Arc::new)
{
Ok(session_keeper) => Some(DirectEntities {
local_interfaces_endpoint_provider,
stun_endpoint_provider,
upnp_endpoint_provider,
endpoint_providers,
cross_ping_check,
upgrade_sync,
session_keeper,
}),
Err(e) => {
telio_log_warn!("Session keeper startup failed: {e:?} - direct connections will not be formed");
None
}
}
Some(DirectEntities {
local_interfaces_endpoint_provider,
stun_endpoint_provider,
upnp_endpoint_provider,
endpoint_providers,
cross_ping_check,
upgrade_sync,
})
} else {
None
};
Expand All @@ -1485,12 +1466,22 @@ impl Runtime {
None
});

let session_keeper = Arc::new(SessionKeeper::start(
self.entities.socket_pool.clone(),
self.features.batching.unwrap_or_default(),
self.entities
.wireguard_interface
.subscribe_to_network_activity()
.await?,
)?);

Ok(MeshnetEntities {
multiplexer,
derp,
proxy,
direct,
starcast,
session_keeper,
})
}

Expand Down Expand Up @@ -2364,12 +2355,13 @@ impl TaskRuntime for Runtime {
.await?;
direct_entities.upgrade_sync.clear_accepted_session(public_key).await;

if self.features.batching.is_none() {
// When the batcher is enabled we use session keeper for all the connections
// direct, proxy, vpn and stun. This call is guarded by the batcher feature flag
// because it can disable keepalives when we don't want to.
direct_entities.session_keeper.remove_node(&public_key).await?;
}
// TODO what to do with this
//if self.features.batching.is_none() {
// // When the batcher is enabled we use session keeper for all the connections
// // direct, proxy, vpn and stun. This call is guarded by the batcher feature flag
// // because it can disable keepalives when we don't want to.
// direct_entities.session_keeper.remove_node(&public_key).await?;
//}
} else {
telio_log_warn!("Connection downgraded while direct entities are disabled");
}
Expand Down

0 comments on commit e11fb52

Please sign in to comment.