diff --git a/implementations/rust/ockam/ockam/src/remote/worker.rs b/implementations/rust/ockam/ockam/src/remote/worker.rs index 4f2a294d87d..09bc11d4cf5 100644 --- a/implementations/rust/ockam/ockam/src/remote/worker.rs +++ b/implementations/rust/ockam/ockam/src/remote/worker.rs @@ -32,7 +32,7 @@ impl Worker for RemoteRelay { ctx: &mut Context, msg: Routed, ) -> Result<()> { - if msg.msg_addr() == self.addresses.main_remote { + if msg.msg_addr() == &self.addresses.main_remote { let mut local_message = msg.into_local_message(); // Remove my address from the onward_route diff --git a/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs b/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs index fc7b33f5bfb..b696bdfdfbe 100644 --- a/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs +++ b/implementations/rust/ockam/ockam_api/src/influxdb/portal.rs @@ -39,6 +39,8 @@ impl NodeManagerWorker { policy_expression, privileged, tls, + skip_handshake, + enable_nagle, } = body.tcp_outlet; let address = self .node_manager @@ -95,6 +97,8 @@ impl NodeManagerWorker { reachable_from_default_secure_channel, OutletAccessControl::WithPolicyExpression(policy_expression), privileged, + skip_handshake, + enable_nagle, ) .await { @@ -121,6 +125,8 @@ impl NodeManagerWorker { disable_tcp_fallback, privileged, tls_certificate_provider, + skip_handshake, + enable_nagle, } = body.tcp_inlet.clone(); //TODO: should be an easier way to tweak the multiaddr @@ -189,6 +195,8 @@ impl NodeManagerWorker { disable_tcp_fallback, privileged, tls_certificate_provider, + skip_handshake, + enable_nagle, ) .await { @@ -334,7 +342,8 @@ impl InfluxDBPortals for BackgroundNodeClient { policy_expression: Option, influxdb_config: InfluxDBOutletConfig, ) -> miette::Result { - let mut outlet_payload = CreateOutlet::new(to, tls, from.cloned(), true, false); + let mut outlet_payload = + CreateOutlet::new(to, tls, from.cloned(), true, false, false, false); if let Some(policy_expression) = policy_expression { outlet_payload.set_policy_expression(policy_expression); } @@ -376,6 +385,8 @@ impl InfluxDBPortals for BackgroundNodeClient { disable_tcp_fallback, false, tls_certificate_provider, + false, + false, ); let payload = CreateInfluxDBInlet::new(inlet_payload, lease_usage, lease_issuer_route); Request::post("/node/influxdb_inlet").body(payload) diff --git a/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs b/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs index 2e5b6d1b47b..d611ef5fef8 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/inlet_controller.rs @@ -141,6 +141,8 @@ impl KafkaInletController { false, false, None, + false, + false, ) .await?; diff --git a/implementations/rust/ockam/ockam_api/src/kafka/outlet_controller.rs b/implementations/rust/ockam/ockam_api/src/kafka/outlet_controller.rs index c5d25c70e64..e4045702852 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/outlet_controller.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/outlet_controller.rs @@ -77,6 +77,8 @@ impl KafkaOutletController { false, OutletAccessControl::WithPolicyExpression(self.policy_expression.clone()), false, + false, + false, ) .await .map(|info| info.to)?; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs b/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs index 71d2f8d7880..bcffc185474 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/models/portal.rs @@ -57,6 +57,10 @@ pub struct CreateInlet { #[n(12)] pub(crate) privileged: bool, /// TLS certificate provider route. #[n(13)] pub(crate) tls_certificate_provider: Option, + /// Skip Portal handshake for lower latency, but also lower throughput + #[n(14)] pub(crate) skip_handshake: bool, + /// Enable Nagle's algorithm for potentially higher throughput, but higher latency + #[n(15)] pub(crate) enable_nagle: bool, } impl CreateInlet { @@ -69,6 +73,8 @@ impl CreateInlet { enable_udp_puncture: bool, disable_tcp_fallback: bool, privileged: bool, + skip_handshake: bool, + enable_nagle: bool, ) -> Self { Self { listen_addr: listen, @@ -83,6 +89,8 @@ impl CreateInlet { disable_tcp_fallback, privileged, tls_certificate_provider: None, + skip_handshake, + enable_nagle, } } @@ -96,6 +104,8 @@ impl CreateInlet { enable_udp_puncture: bool, disable_tcp_fallback: bool, privileged: bool, + skip_handshake: bool, + enable_nagle: bool, ) -> Self { Self { listen_addr: listen, @@ -110,6 +120,8 @@ impl CreateInlet { disable_tcp_fallback, privileged, tls_certificate_provider: None, + skip_handshake, + enable_nagle, } } @@ -169,7 +181,11 @@ pub struct CreateOutlet { /// will be used. #[n(5)] pub policy_expression: Option, /// Use eBPF and RawSocket to access TCP packets instead of TCP data stream. - #[n(6)] pub privileged: bool + #[n(6)] pub privileged: bool, + /// Skip Portal handshake for lower latency, but also lower throughput + #[n(7)] pub skip_handshake: bool, + /// Enable Nagle's algorithm for potentially higher throughput, but higher latency + #[n(8)] pub(crate) enable_nagle: bool, } impl CreateOutlet { @@ -179,6 +195,8 @@ impl CreateOutlet { worker_addr: Option
, reachable_from_default_secure_channel: bool, privileged: bool, + skip_handshake: bool, + enable_nagle: bool, ) -> Self { Self { hostname_port, @@ -187,6 +205,8 @@ impl CreateOutlet { reachable_from_default_secure_channel, policy_expression: None, privileged, + skip_handshake, + enable_nagle, } } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs index 53fa7b2c489..5dcf38fea57 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/kafka_services.rs @@ -220,6 +220,8 @@ impl InMemoryNode { false, false, None, + false, + false, ) .await?; @@ -324,6 +326,8 @@ impl InMemoryNode { false, OutletAccessControl::WithPolicyExpression(outlet_policy_expression), false, + false, + false, ) .await?; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/background_node_client.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/background_node_client.rs index c217aeb3f04..9da2e6a35a9 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/background_node_client.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/background_node_client.rs @@ -26,6 +26,8 @@ pub fn create_inlet_payload( disable_tcp_fallback: bool, privileged: bool, tls_certificate_provider: &Option, + skip_handshake: bool, + enable_nagle: bool, ) -> CreateInlet { let via_project = outlet_addr.matches(0, &[ProjectProto::CODE.into()]); let mut payload = if via_project { @@ -37,6 +39,8 @@ pub fn create_inlet_payload( enable_udp_puncture, disable_tcp_fallback, privileged, + skip_handshake, + enable_nagle, ) } else { CreateInlet::to_node( @@ -48,6 +52,8 @@ pub fn create_inlet_payload( enable_udp_puncture, disable_tcp_fallback, privileged, + skip_handshake, + enable_nagle, ) }; if let Some(e) = policy_expression.as_ref() { @@ -80,6 +86,8 @@ impl Inlets for BackgroundNodeClient { disable_tcp_fallback: bool, privileged: bool, tls_certificate_provider: &Option, + skip_handshake: bool, + enable_nagle: bool, ) -> miette::Result> { let request = { let payload = create_inlet_payload( @@ -95,6 +103,8 @@ impl Inlets for BackgroundNodeClient { disable_tcp_fallback, privileged, tls_certificate_provider, + skip_handshake, + enable_nagle, ); Request::post("/node/inlet").body(payload) }; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/in_memory_node.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/in_memory_node.rs index d7df5111bd2..a4bc8c7f611 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/in_memory_node.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/in_memory_node.rs @@ -30,6 +30,8 @@ impl InMemoryNode { disable_tcp_fallback: bool, privileged: bool, tls_certificate_provider: Option, + skip_handshake: bool, + enable_nagle: bool, ) -> Result { self.node_manager .create_inlet( @@ -48,6 +50,8 @@ impl InMemoryNode { disable_tcp_fallback, privileged, tls_certificate_provider, + skip_handshake, + enable_nagle, ) .await } diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/inlets_trait.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/inlets_trait.rs index 439160f1b71..def6df933c5 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/inlets_trait.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/inlets_trait.rs @@ -27,6 +27,8 @@ pub trait Inlets { disable_tcp_fallback: bool, privileged: bool, tls_certificate_provider: &Option, + skip_handshake: bool, + enable_nagle: bool, ) -> miette::Result>; async fn show_inlet(&self, ctx: &Context, alias: &str) -> miette::Result>; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs index 10948eaa488..bb8fd70c0f1 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager.rs @@ -41,6 +41,8 @@ impl NodeManager { disable_tcp_fallback: bool, privileged: bool, tls_certificate_provider: Option, + skip_handshake: bool, + enable_nagle: bool, ) -> Result { debug! { %listen_address, @@ -50,6 +52,8 @@ impl NodeManager { %alias, %enable_udp_puncture, %disable_tcp_fallback, + %skip_handshake, + %enable_nagle, "creating inlet" } @@ -127,6 +131,8 @@ impl NodeManager { udp_puncture: None, additional_route: None, privileged, + skip_handshake, + enable_nagle, }; let replacer = Arc::new(Mutex::new(replacer)); diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager_worker.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager_worker.rs index 5aa912a5e4c..c2119a73458 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager_worker.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/node_manager_worker.rs @@ -30,6 +30,8 @@ impl NodeManagerWorker { disable_tcp_fallback, privileged, tls_certificate_provider, + skip_handshake, + enable_nagle, } = create_inlet; match self .node_manager @@ -49,6 +51,8 @@ impl NodeManagerWorker { disable_tcp_fallback, privileged, tls_certificate_provider, + skip_handshake, + enable_nagle, ) .await { diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs index b72778b388a..90f12f370bf 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_inlets/session_replacer.rs @@ -55,6 +55,8 @@ pub(super) struct InletSessionReplacer { pub(super) udp_puncture: Option, pub(super) additional_route: Option, pub(super) privileged: bool, + pub(super) skip_handshake: bool, + pub(super) enable_nagle: bool, } impl InletSessionReplacer { @@ -109,7 +111,9 @@ impl InletSessionReplacer { let (incoming_ac, outgoing_ac) = self.access_control(node_manager).await?; let options = TcpInletOptions::new() .with_incoming_access_control(incoming_ac) - .with_outgoing_access_control(outgoing_ac); + .with_outgoing_access_control(outgoing_ac) + .set_skip_handshake(self.skip_handshake) + .set_enable_nagle(self.enable_nagle); let options = if self.udp_puncture_enabled() && self.disable_tcp_fallback { options.paused() diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs index 5a53f7a6315..ff948c9a3f9 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/tcp_outlets.rs @@ -28,6 +28,8 @@ impl NodeManagerWorker { policy_expression, tls, privileged, + skip_handshake, + enable_nagle, } = create_outlet; match self @@ -40,6 +42,8 @@ impl NodeManagerWorker { reachable_from_default_secure_channel, OutletAccessControl::WithPolicyExpression(policy_expression), privileged, + skip_handshake, + enable_nagle, ) .await { @@ -99,6 +103,8 @@ impl NodeManager { reachable_from_default_secure_channel: bool, access_control: OutletAccessControl, privileged: bool, + skip_handshake: bool, + enable_nagle: bool, ) -> Result { let worker_addr = self.registry.outlets.generate_worker_addr(worker_addr); @@ -134,7 +140,9 @@ impl NodeManager { let mut options = TcpOutletOptions::new() .with_incoming_access_control(incoming_ac) .with_outgoing_access_control(outgoing_ac) - .with_tls(tls); + .with_tls(tls) + .set_skip_handshake(skip_handshake) + .set_enable_nagle(enable_nagle); if self.project_authority().is_none() { for api_transport_flow_control_id in &self.api_transport_flow_control_ids { options = options.as_consumer(api_transport_flow_control_id) @@ -241,6 +249,7 @@ impl NodeManager { #[async_trait] pub trait Outlets { + #[allow(clippy::too_many_arguments)] async fn create_outlet( &self, ctx: &Context, @@ -249,6 +258,8 @@ pub trait Outlets { from: Option<&Address>, policy_expression: Option, privileged: bool, + skip_handshake: bool, + enable_nagle: bool, ) -> miette::Result; } @@ -263,8 +274,18 @@ impl Outlets for BackgroundNodeClient { from: Option<&Address>, policy_expression: Option, privileged: bool, + skip_handshake: bool, + enable_nagle: bool, ) -> miette::Result { - let mut payload = CreateOutlet::new(to, tls, from.cloned(), true, privileged); + let mut payload = CreateOutlet::new( + to, + tls, + from.cloned(), + true, + privileged, + skip_handshake, + enable_nagle, + ); if let Some(policy_expression) = policy_expression { payload.set_policy_expression(policy_expression); } diff --git a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs index 6a0beabfe64..287f795e643 100644 --- a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs @@ -214,7 +214,10 @@ impl TestNode { } pub async fn create(runtime: Arc, listen_addr: Option<&str>) -> Self { - let (mut context, executor) = NodeBuilder::new().with_runtime(runtime).build(); + let (mut context, executor) = NodeBuilder::new() + .with_runtime(runtime) + .no_logging() + .build(); let node_manager_handle = start_manager_for_tests( &mut context, listen_addr, diff --git a/implementations/rust/ockam/ockam_api/tests/latency.rs b/implementations/rust/ockam/ockam_api/tests/latency.rs index 4df4c55379c..73836836c25 100644 --- a/implementations/rust/ockam/ockam_api/tests/latency.rs +++ b/implementations/rust/ockam/ockam_api/tests/latency.rs @@ -140,6 +140,8 @@ pub fn measure_buffer_latency_two_nodes_portal() { true, OutletAccessControl::AccessControl((Arc::new(AllowAll), Arc::new(AllowAll))), false, + false, + true, ) .await?; @@ -166,6 +168,8 @@ pub fn measure_buffer_latency_two_nodes_portal() { false, false, None, + false, + true, ) .await?; @@ -208,3 +212,98 @@ pub fn measure_buffer_latency_two_nodes_portal() { result.unwrap(); drop(runtime_cloned); } + +#[ignore] +#[test] +pub fn measure_connection_latency_two_nodes_portal() { + let runtime = Arc::new(Runtime::new().unwrap()); + let runtime_cloned = runtime.clone(); + std::env::set_var("OCKAM_LOGGING", "0"); + + let result: ockam::Result<()> = runtime_cloned.block_on(async move { + let test_body = async move { + let echo_server_handle = start_tcp_echo_server().await; + + TestNode::clean().await?; + let first_node = TestNode::create(runtime.clone(), None).await; + let second_node = TestNode::create(runtime.clone(), None).await; + + let _outlet_status = second_node + .node_manager + .create_outlet( + &second_node.context, + echo_server_handle.chosen_addr.clone(), + false, + Some(Address::from_string("outlet")), + true, + OutletAccessControl::AccessControl((Arc::new(AllowAll), Arc::new(AllowAll))), + false, + true, + true, + ) + .await?; + + let second_node_listen_address = second_node.listen_address().await; + + // create inlet in the first node pointing to the second one + let inlet_status = first_node + .node_manager + .create_inlet( + &first_node.context, + HostnamePort::new("127.0.0.1", 0)?, + route![], + route![], + second_node_listen_address + .multi_addr()? + .concat(&MultiAddr::from_string("/secure/api/service/outlet")?)?, + "inlet_alias".to_string(), + None, + None, + None, + true, + None, + false, + false, + false, + None, + true, + true, + ) + .await?; + + let now = Instant::now(); + + for _ in 0..1000 { + // connect to inlet_status.bind_addr and send dummy payload + let mut socket = TcpStream::connect(inlet_status.bind_addr.clone()) + .await + .unwrap(); + + socket.set_nodelay(true).unwrap(); + + let mut buffer = [0u8; 5]; + + socket.write_all(b"hello").await.unwrap(); + socket.read_exact(&mut buffer).await.unwrap(); + } + + let elapsed = now.elapsed(); + println!( + "short payload, connect + roundtrip latency: {:?}", + elapsed.div_f32(1_000f32) + ); + + first_node.context.shutdown_node().await?; + second_node.context.shutdown_node().await?; + + Ok(()) + }; + + timeout(Duration::from_secs(30), test_body) + .await + .unwrap_or_else(|_| Err(Error::new(Origin::Node, Kind::Timeout, "Test timed out"))) + }); + + result.unwrap(); + drop(runtime_cloned); +} diff --git a/implementations/rust/ockam/ockam_api/tests/portals.rs b/implementations/rust/ockam/ockam_api/tests/portals.rs index c16c0c4f695..80ebf6e1f6e 100644 --- a/implementations/rust/ockam/ockam_api/tests/portals.rs +++ b/implementations/rust/ockam/ockam_api/tests/portals.rs @@ -36,6 +36,8 @@ async fn inlet_outlet_local_successful(context: &mut Context) -> ockam::Result<( true, OutletAccessControl::AccessControl((Arc::new(AllowAll), Arc::new(AllowAll))), false, + false, + false, ) .await?; @@ -60,6 +62,8 @@ async fn inlet_outlet_local_successful(context: &mut Context) -> ockam::Result<( false, false, None, + false, + false, ) .await?; @@ -112,6 +116,8 @@ fn portal_node_goes_down_reconnect() { true, OutletAccessControl::AccessControl((Arc::new(AllowAll), Arc::new(AllowAll))), false, + false, + false, ) .await?; @@ -138,6 +144,8 @@ fn portal_node_goes_down_reconnect() { false, false, None, + false, + false, ) .await?; @@ -183,6 +191,8 @@ fn portal_node_goes_down_reconnect() { true, OutletAccessControl::AccessControl((Arc::new(AllowAll), Arc::new(AllowAll))), false, + false, + false, ) .await?; @@ -258,6 +268,8 @@ fn portal_low_bandwidth_connection_keep_working_for_60s() { true, OutletAccessControl::AccessControl((Arc::new(AllowAll), Arc::new(AllowAll))), false, + false, + false, ) .await?; @@ -296,6 +308,8 @@ fn portal_low_bandwidth_connection_keep_working_for_60s() { false, false, None, + false, + false, ) .await?; @@ -379,6 +393,8 @@ fn portal_heavy_load_exchanged() { true, OutletAccessControl::AccessControl((Arc::new(AllowAll), Arc::new(AllowAll))), false, + false, + false, ) .await?; @@ -410,6 +426,8 @@ fn portal_heavy_load_exchanged() { false, false, None, + false, + false, ) .await?; @@ -525,6 +543,8 @@ fn test_portal_payload_transfer(outgoing_disruption: Disruption, incoming_disrup true, OutletAccessControl::AccessControl((Arc::new(AllowAll), Arc::new(AllowAll))), false, + false, + false, ) .await?; @@ -563,6 +583,8 @@ fn test_portal_payload_transfer(outgoing_disruption: Disruption, incoming_disrup false, false, None, + false, + false, ) .await?; diff --git a/implementations/rust/ockam/ockam_app_lib/src/incoming_services/commands.rs b/implementations/rust/ockam/ockam_app_lib/src/incoming_services/commands.rs index 14ef18e9e30..b9d69ba530e 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/incoming_services/commands.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/incoming_services/commands.rs @@ -212,6 +212,8 @@ impl AppState { false, false, &None, + false, + false, ) .await .map_err(|err| { diff --git a/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/create.rs b/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/create.rs index 41723cc7de7..46b341535a6 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/create.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/create.rs @@ -47,6 +47,8 @@ impl AppState { true, OutletAccessControl::AccessControl((Arc::new(incoming_ac), Arc::new(outgoing_ac))), false, + false, + false, ) .await { diff --git a/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/state.rs b/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/state.rs index e092e0e6dbb..5f48ea3bb52 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/state.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/shared_service/tcp_outlet/state.rs @@ -77,6 +77,8 @@ impl AppState { Arc::new(outgoing_ac), )), false, + false, + false, ) .await .map_err(|e| { diff --git a/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt b/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt index 3112d543235..be6361bfa71 100644 --- a/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt +++ b/implementations/rust/ockam/ockam_command/src/environment/static/env_info.txt @@ -55,6 +55,8 @@ UDP TCP - OCKAM_PRIVILEGED: if variable is set, all TCP Inlets/Outlets will use eBPF (overrides `--privileged` argument for `ockam tcp-inlet create` and `ockam tcp-outlet create`). - OCKAM_TCP_PORTAL_PAYLOAD_LENGTH: size of the buffer into which TCP Portal reads the TCP stream. Default value: `128 * 1024` +- OCKAM_TCP_PORTAL_SKIP_HANDSHAKE: skip Portal handshake for lower latency, but also lower throughput +- OCKAM_TCP_PORTAL_ENABLE_NAGLE: enable Nagle's algorithm for Portal TCP streams for potentially higher throughput, but higher latency Devs Usage - OCKAM: a `string` that defines the path to the ockam binary to use. diff --git a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs index a0dec99e7a2..ec21e53a137 100644 --- a/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/tcp/inlet/create.rs @@ -153,6 +153,14 @@ pub struct CreateCommand { /// Requires `ockam-tls-certificate` credential attribute. #[arg(long, value_name = "ROUTE", hide = true)] pub tls_certificate_provider: Option, + + /// Skip Portal handshake for lower latency, but also lower throughput + #[arg(long, env = "OCKAM_TCP_PORTAL_SKIP_HANDSHAKE", value_parser = FalseyValueParser::default())] + pub skip_handshake: bool, + + /// Enable Nagle's algorithm for potentially higher throughput, but higher latency + #[arg(long, env = "OCKAM_TCP_PORTAL_ENABLE_NAGLE", value_parser = FalseyValueParser::default())] + pub enable_nagle: bool, } pub(crate) fn tcp_inlet_default_from_addr() -> SchemeHostnamePort { @@ -199,6 +207,8 @@ impl Command for CreateCommand { cmd.no_tcp_fallback, cmd.privileged, &cmd.tls_certificate_provider, + cmd.skip_handshake, + cmd.enable_nagle, ) .await?; diff --git a/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs b/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs index 7f278e4388e..6626263c261 100644 --- a/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs +++ b/implementations/rust/ockam/ockam_command/src/tcp/outlet/create.rs @@ -73,6 +73,14 @@ pub struct CreateCommand { /// If `OCKAM_PRIVILEGED` env variable is set to 1, this argument will be `true`. #[arg(long, env = "OCKAM_PRIVILEGED", value_parser = FalseyValueParser::default(), hide = true)] pub privileged: bool, + + /// Skip Portal handshake for lower latency, but also lower throughput + #[arg(long, env = "OCKAM_TCP_PORTAL_SKIP_HANDSHAKE", value_parser = FalseyValueParser::default())] + pub skip_handshake: bool, + + /// Enable Nagle's algorithm for potentially higher throughput, but higher latency + #[arg(long, env = "OCKAM_TCP_PORTAL_ENABLE_NAGLE", value_parser = FalseyValueParser::default())] + pub enable_nagle: bool, } #[async_trait] @@ -100,6 +108,8 @@ impl Command for CreateCommand { cmd.name.clone().map(Address::from).as_ref(), cmd.allow.clone(), cmd.privileged, + cmd.skip_handshake, + cmd.enable_nagle, ) .await? }; diff --git a/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats b/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats index aa81ecfcf7b..0a24d93451b 100644 --- a/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats +++ b/implementations/rust/ockam/ockam_command/tests/bats/local/portals.bats @@ -14,193 +14,108 @@ teardown() { # ===== TESTS -@test "portals - create tcp outlet on implicit default node" { - run_success "$OCKAM" node delete --all -y - - outlet_port="$(random_port)" - run_success $OCKAM tcp-outlet create --to "127.0.0.1:$outlet_port" - assert_output --partial "/service/outlet" -} - -@test "portals - create tcp outlet" { - run_success "$OCKAM" node delete --all -y - - outlet_port="$(random_port)" - run_success $OCKAM tcp-outlet create --to "127.0.0.1:$outlet_port" --from "test-outlet" - assert_output --partial "/service/test-outlet" - - # The first outlet that is created without `--from` flag should be named `outlet` - run_success $OCKAM tcp-outlet create --to "127.0.0.1:$outlet_port" - assert_output --partial "/service/outlet" - - # After that, the next outlet should be randomly named - run_success $OCKAM tcp-outlet create --to "127.0.0.1:$outlet_port" - refute_output --partial "/service/outlet" -} - -@test "portals - tcp inlet CRUD" { - - # Create nodes for inlet/outlet pair - run_success "$OCKAM" node create n1 - run_success "$OCKAM" node create n2 - - # Create inlet/outlet pair - outlet_port="$(random_port)" - run_success $OCKAM tcp-outlet create --at /node/n1 --to "127.0.0.1:$outlet_port" - assert_output --partial "/service/outlet" - - inlet_port="$(random_port)" - run_success $OCKAM tcp-inlet create "test-inlet" --at /node/n2 --from 127.0.0.1:$inlet_port --to /node/n1/service/outlet - run_success $OCKAM tcp-inlet create --at /node/n2 --from 6102 --to /node/n1/service/outlet - - sleep 1 - - # Check that inlet is available for deletion and delete it - run_success $OCKAM tcp-inlet show test-inlet --at /node/n2 --output json - assert_output --partial "\"alias\": \"test-inlet\"" - assert_output --partial "\"bind_addr\": \"127.0.0.1:$inlet_port\"" - - run_success $OCKAM tcp-inlet delete "test-inlet" --at /node/n2 --yes - - # Test deletion of a previously deleted TCP inlet - run_failure $OCKAM tcp-inlet delete "test-inlet" --at /node/n2 --yes - assert_output --partial "not found" -} - -@test "portals - tcp outlet CRUD" { - run_success "$OCKAM" node create n1 - - run_success "$OCKAM" node create n2 - - port_1="$(random_port)" - run_success $OCKAM tcp-outlet create --at /node/n1 --to "127.0.0.1:$port_1" - assert_output --partial "/service/outlet" - - port_2="$(random_port)" - run_success $OCKAM tcp-outlet create --at /node/n2 --to $port_2 - - run_success $OCKAM tcp-outlet show outlet --at /node/n1 - assert_output --partial "\"worker_address\": \"/service/outlet\"" - assert_output --partial "\"to\": \"127.0.0.1:$port_1\"" - - run_success $OCKAM tcp-outlet delete "outlet" --yes - - # Test deletion of a previously deleted TCP outlet - run_success $OCKAM tcp-outlet delete "outlet" --yes - assert_output --partial "[]" -} - -@test "portals - list inlets on a node" { +@test "portals - create an inlet/outlet pair and move tcp traffic through it" { run_success "$OCKAM" node create n1 run_success "$OCKAM" node create n2 + run_success "$OCKAM" tcp-outlet create --at /node/n1 --to "$PYTHON_SERVER_PORT" port="$(random_port)" - run_success $OCKAM tcp-inlet create tcp-inlet-2 --at /node/n2 --from $port --to /node/n1/service/outlet - sleep 1 + run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet - run_success $OCKAM tcp-inlet list --at /node/n2 - assert_output --partial "tcp-inlet-2" - assert_output --partial "127.0.0.1:$port" + run_success curl -sfI --retry-all-errors --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$port" } -@test "portals - list inlets on a node, using deprecated --alias flag" { +@test "portals - create an inlet/outlet, download file" { run_success "$OCKAM" node create n1 run_success "$OCKAM" node create n2 + run_success "$OCKAM" tcp-outlet create --at /node/n1 --to "$PYTHON_SERVER_PORT" port="$(random_port)" - run_success $OCKAM tcp-inlet create --at /node/n2 --from $port --to /node/n1/service/outlet --alias tcp-inlet-2 - sleep 1 + run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet - run_success $OCKAM tcp-inlet list --at /node/n2 - assert_output --partial "tcp-inlet-2" - assert_output --partial "127.0.0.1:$port" + file_name="$(random_str)".bin + pushd "$OCKAM_HOME_BASE" && dd if=/dev/urandom of="./.tmp/$file_name" bs=1M count=50 && popd + run_success curl -sSf --retry-all-errors --retry-delay 5 --retry 10 -m 20 -o /dev/null "http://127.0.0.1:$port/.tmp/$file_name" } -@test "portals - list inlets on a node, using deprecated --alias flag overriding name" { +@test "portals - create an inlet/outlet, upload file" { run_success "$OCKAM" node create n1 run_success "$OCKAM" node create n2 + run_success "$OCKAM" tcp-outlet create --at /node/n1 --to "$PYTHON_SERVER_PORT" port="$(random_port)" - run_success $OCKAM tcp-inlet create my-inlet --at /node/n2 --from $port --to /node/n1/service/outlet --alias tcp-inlet-2 - sleep 1 - - run_success $OCKAM tcp-inlet list --at /node/n2 - assert_output --partial "tcp-inlet-2" - assert_output --partial "127.0.0.1:$port" -} - -@test "portals - list outlets on a node" { - run_success "$OCKAM" node create n1 - - port="$(random_port)" - run_success $OCKAM tcp-outlet create --at /node/n1 --to "$port" - assert_output --partial "/service/outlet" + run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet - run_success $OCKAM tcp-outlet list --at /node/n1 - assert_output --partial "/service/outlet" - assert_output --partial "127.0.0.1:$port" + file_name="$(random_str)".bin + tmp_dir_name="$(random_str)" + pushd "$OCKAM_HOME_BASE/.tmp" + mkdir "$tmp_dir_name" + dd if=/dev/urandom of="./$tmp_dir_name/$file_name" bs=1M count=50 + popd + run_success curl -sS --retry-all-errors --retry-delay 5 --retry 10 -m 20 -X POST "http://127.0.0.1:$port/upload" -F "files=@$OCKAM_HOME_BASE/.tmp/$tmp_dir_name/$file_name" } -@test "portals - show a tcp inlet" { +@test "portals - create an inlet/outlet pair and move tcp traffic through it, where the outlet points to an HTTPs endpoint" { run_success "$OCKAM" node create n1 run_success "$OCKAM" node create n2 + run_success "$OCKAM" tcp-outlet create --at /node/n1 --to google.com:443 port="$(random_port)" - run_success $OCKAM tcp-inlet create "test-inlet" --at /node/n2 --from $port --to /node/n1/service/outlet - sleep 1 - - run_success $OCKAM tcp-inlet show "test-inlet" --at /node/n2 + run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet - # Test if non-existing TCP inlet returns NotFound - run_failure $OCKAM tcp-inlet show "non-existing-inlet" --at /node/n2 - assert_output --partial "not found" + # This test does not pass on CI + # run_success curl --fail --head --max-time 10 "127.0.0.1:$port" } -@test "portals - show a tcp outlet" { - run_success "$OCKAM" node create n1 +@test "portals - create an inlet/outlet pair with relay through a relay and move tcp traffic through it" { + run_success "$OCKAM" node create relay + run_success "$OCKAM" node create blue + run_success "$OCKAM" tcp-outlet create --at /node/blue --to "$PYTHON_SERVER_PORT" + run_success "$OCKAM" relay create blue --at /node/relay --to /node/blue + + run_success "$OCKAM" node create green port="$(random_port)" - run_success $OCKAM tcp-outlet create --at /node/n1 --to "$port" - assert_output --partial "/service/outlet" + run_success bash -c "$OCKAM secure-channel create --from /node/green --to /node/relay/service/forward_to_blue/service/api \ + | $OCKAM tcp-inlet create --at /node/green --from $port --to -/service/outlet" - run_success $OCKAM tcp-outlet show "outlet" + run_success curl -sfI --retry-all-errors --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$port" - # Test if non-existing TCP outlet returns NotFound - run_failure $OCKAM tcp-outlet show "non-existing-outlet" - assert_output --partial "not found" + run_success "$OCKAM" secure-channel list --at green + assert_output --partial "/service" } -@test "portals - create an inlet/outlet pair and move tcp traffic through it" { +@test "portals no handshake - create an inlet/outlet pair and move tcp traffic through it" { run_success "$OCKAM" node create n1 run_success "$OCKAM" node create n2 - run_success "$OCKAM" tcp-outlet create --at /node/n1 --to "$PYTHON_SERVER_PORT" + run_success "$OCKAM" tcp-outlet create --at /node/n1 --to "$PYTHON_SERVER_PORT" --skip-handshake port="$(random_port)" - run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet + run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet --skip-handshake run_success curl -sfI --retry-all-errors --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$port" } -@test "portals - create an inlet/outlet, download file" { +@test "portals no handshake - create an inlet/outlet, download file" { run_success "$OCKAM" node create n1 run_success "$OCKAM" node create n2 - run_success "$OCKAM" tcp-outlet create --at /node/n1 --to "$PYTHON_SERVER_PORT" + run_success "$OCKAM" tcp-outlet create --at /node/n1 --to "$PYTHON_SERVER_PORT" --skip-handshake port="$(random_port)" - run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet + run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet --skip-handshake file_name="$(random_str)".bin pushd "$OCKAM_HOME_BASE" && dd if=/dev/urandom of="./.tmp/$file_name" bs=1M count=50 && popd run_success curl -sSf --retry-all-errors --retry-delay 5 --retry 10 -m 20 -o /dev/null "http://127.0.0.1:$port/.tmp/$file_name" } -@test "portals - create an inlet/outlet, upload file" { +@test "portals no handshake - create an inlet/outlet, upload file" { run_success "$OCKAM" node create n1 run_success "$OCKAM" node create n2 - run_success "$OCKAM" tcp-outlet create --at /node/n1 --to "$PYTHON_SERVER_PORT" + run_success "$OCKAM" tcp-outlet create --at /node/n1 --to "$PYTHON_SERVER_PORT" --skip-handshake port="$(random_port)" - run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet + run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet --skip-handshake file_name="$(random_str)".bin tmp_dir_name="$(random_str)" @@ -211,29 +126,29 @@ teardown() { run_success curl -sS --retry-all-errors --retry-delay 5 --retry 10 -m 20 -X POST "http://127.0.0.1:$port/upload" -F "files=@$OCKAM_HOME_BASE/.tmp/$tmp_dir_name/$file_name" } -@test "portals - create an inlet/outlet pair and move tcp traffic through it, where the outlet points to an HTTPs endpoint" { +@test "portals no handshake - create an inlet/outlet pair and move tcp traffic through it, where the outlet points to an HTTPs endpoint" { run_success "$OCKAM" node create n1 run_success "$OCKAM" node create n2 - run_success "$OCKAM" tcp-outlet create --at /node/n1 --to google.com:443 + run_success "$OCKAM" tcp-outlet create --at /node/n1 --to google.com:443 --skip-handshake port="$(random_port)" - run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet + run_success "$OCKAM" tcp-inlet create --at /node/n2 --from "$port" --to /node/n1/service/outlet --skip-handshake # This test does not pass on CI # run_success curl --fail --head --max-time 10 "127.0.0.1:$port" } -@test "portals - create an inlet/outlet pair with relay through a relay and move tcp traffic through it" { +@test "portals no handshake - create an inlet/outlet pair with relay through a relay and move tcp traffic through it" { run_success "$OCKAM" node create relay run_success "$OCKAM" node create blue - run_success "$OCKAM" tcp-outlet create --at /node/blue --to "$PYTHON_SERVER_PORT" + run_success "$OCKAM" tcp-outlet create --at /node/blue --to "$PYTHON_SERVER_PORT" --skip-handshake run_success "$OCKAM" relay create blue --at /node/relay --to /node/blue run_success "$OCKAM" node create green port="$(random_port)" run_success bash -c "$OCKAM secure-channel create --from /node/green --to /node/relay/service/forward_to_blue/service/api \ - | $OCKAM tcp-inlet create --at /node/green --from $port --to -/service/outlet" + | $OCKAM tcp-inlet create --at /node/green --from $port --to -/service/outlet --skip-handshake" run_success curl -sfI --retry-all-errors --retry-delay 5 --retry 10 -m 5 "127.0.0.1:$port" diff --git a/implementations/rust/ockam/ockam_command/tests/bats/local/portals_lifecycle.bats b/implementations/rust/ockam/ockam_command/tests/bats/local/portals_lifecycle.bats new file mode 100644 index 00000000000..90314a2b333 --- /dev/null +++ b/implementations/rust/ockam/ockam_command/tests/bats/local/portals_lifecycle.bats @@ -0,0 +1,166 @@ +#!/bin/bash + +# ===== SETUP + +setup() { + load ../load/base.bash + load_bats_ext + setup_home_dir +} + +teardown() { + teardown_home_dir +} + +# ===== TESTS + +@test "portals - create tcp outlet on implicit default node" { + outlet_port="$(random_port)" + run_success $OCKAM tcp-outlet create --to "127.0.0.1:$outlet_port" + assert_output --partial "/service/outlet" +} + +@test "portals - create tcp outlet" { + outlet_port="$(random_port)" + run_success $OCKAM tcp-outlet create --to "127.0.0.1:$outlet_port" --from "test-outlet" + assert_output --partial "/service/test-outlet" + + # The first outlet that is created without `--from` flag should be named `outlet` + run_success $OCKAM tcp-outlet create --to "127.0.0.1:$outlet_port" + assert_output --partial "/service/outlet" + + # After that, the next outlet should be randomly named + run_success $OCKAM tcp-outlet create --to "127.0.0.1:$outlet_port" + refute_output --partial "/service/outlet" +} + +@test "portals - tcp inlet CRUD" { + # Create nodes for inlet/outlet pair + run_success "$OCKAM" node create n1 + run_success "$OCKAM" node create n2 + + # Create inlet/outlet pair + outlet_port="$(random_port)" + run_success $OCKAM tcp-outlet create --at /node/n1 --to "127.0.0.1:$outlet_port" + assert_output --partial "/service/outlet" + + inlet_port="$(random_port)" + run_success $OCKAM tcp-inlet create "test-inlet" --at /node/n2 --from 127.0.0.1:$inlet_port --to /node/n1/service/outlet + run_success $OCKAM tcp-inlet create --at /node/n2 --from 6102 --to /node/n1/service/outlet + + sleep 1 + + # Check that inlet is available for deletion and delete it + run_success $OCKAM tcp-inlet show test-inlet --at /node/n2 --output json + assert_output --partial "\"alias\": \"test-inlet\"" + assert_output --partial "\"bind_addr\": \"127.0.0.1:$inlet_port\"" + + run_success $OCKAM tcp-inlet delete "test-inlet" --at /node/n2 --yes + + # Test deletion of a previously deleted TCP inlet + run_failure $OCKAM tcp-inlet delete "test-inlet" --at /node/n2 --yes + assert_output --partial "not found" +} + +@test "portals - tcp outlet CRUD" { + run_success "$OCKAM" node create n1 + + run_success "$OCKAM" node create n2 + + port_1="$(random_port)" + run_success $OCKAM tcp-outlet create --at /node/n1 --to "127.0.0.1:$port_1" + assert_output --partial "/service/outlet" + + port_2="$(random_port)" + run_success $OCKAM tcp-outlet create --at /node/n2 --to $port_2 + + run_success $OCKAM tcp-outlet show outlet --at /node/n1 + assert_output --partial "\"worker_address\": \"/service/outlet\"" + assert_output --partial "\"to\": \"127.0.0.1:$port_1\"" + + run_success $OCKAM tcp-outlet delete "outlet" --yes + + # Test deletion of a previously deleted TCP outlet + run_success $OCKAM tcp-outlet delete "outlet" --yes + assert_output --partial "[]" +} + +@test "portals - list inlets on a node" { + run_success "$OCKAM" node create n1 + run_success "$OCKAM" node create n2 + + port="$(random_port)" + run_success $OCKAM tcp-inlet create tcp-inlet-2 --at /node/n2 --from $port --to /node/n1/service/outlet + sleep 1 + + run_success $OCKAM tcp-inlet list --at /node/n2 + assert_output --partial "tcp-inlet-2" + assert_output --partial "127.0.0.1:$port" +} + +@test "portals - list inlets on a node, using deprecated --alias flag" { + run_success "$OCKAM" node create n1 + run_success "$OCKAM" node create n2 + + port="$(random_port)" + run_success $OCKAM tcp-inlet create --at /node/n2 --from $port --to /node/n1/service/outlet --alias tcp-inlet-2 + sleep 1 + + run_success $OCKAM tcp-inlet list --at /node/n2 + assert_output --partial "tcp-inlet-2" + assert_output --partial "127.0.0.1:$port" +} + +@test "portals - list inlets on a node, using deprecated --alias flag overriding name" { + run_success "$OCKAM" node create n1 + run_success "$OCKAM" node create n2 + + port="$(random_port)" + run_success $OCKAM tcp-inlet create my-inlet --at /node/n2 --from $port --to /node/n1/service/outlet --alias tcp-inlet-2 + sleep 1 + + run_success $OCKAM tcp-inlet list --at /node/n2 + assert_output --partial "tcp-inlet-2" + assert_output --partial "127.0.0.1:$port" +} + +@test "portals - list outlets on a node" { + run_success "$OCKAM" node create n1 + + port="$(random_port)" + run_success $OCKAM tcp-outlet create --at /node/n1 --to "$port" + assert_output --partial "/service/outlet" + + run_success $OCKAM tcp-outlet list --at /node/n1 + assert_output --partial "/service/outlet" + assert_output --partial "127.0.0.1:$port" +} + +@test "portals - show a tcp inlet" { + run_success "$OCKAM" node create n1 + run_success "$OCKAM" node create n2 + + port="$(random_port)" + run_success $OCKAM tcp-inlet create "test-inlet" --at /node/n2 --from $port --to /node/n1/service/outlet + sleep 1 + + run_success $OCKAM tcp-inlet show "test-inlet" --at /node/n2 + + # Test if non-existing TCP inlet returns NotFound + run_failure $OCKAM tcp-inlet show "non-existing-inlet" --at /node/n2 + assert_output --partial "not found" +} + +@test "portals - show a tcp outlet" { + run_success "$OCKAM" node create n1 + + port="$(random_port)" + run_success $OCKAM tcp-outlet create --at /node/n1 --to "$port" + assert_output --partial "/service/outlet" + + run_success $OCKAM tcp-outlet show "outlet" + + # Test if non-existing TCP outlet returns NotFound + run_failure $OCKAM tcp-outlet show "non-existing-outlet" + assert_output --partial "not found" +} diff --git a/implementations/rust/ockam/ockam_command/tests/bats/run.sh b/implementations/rust/ockam/ockam_command/tests/bats/run.sh index 7f4166a2af0..02f723edb1a 100755 --- a/implementations/rust/ockam/ockam_command/tests/bats/run.sh +++ b/implementations/rust/ockam/ockam_command/tests/bats/run.sh @@ -48,11 +48,12 @@ done if [ "$local_suite" = true ]; then echo "Running local suite..." bats "$current_directory/local" --timing -j 3 + OCKAM_TCP_PORTAL_SKIP_HANDSHAKE=1 bats "$current_directory/local/portals.bats" --timing -j 3 fi if [ "$local_as_root_suite" = true ]; then echo "Running local root suite..." - OCKAM_PRIVILEGED=1 bats "$current_directory/local/portals.bats" --timing -j 3 + OCKAM_PRIVILEGED=1 bats "$current_directory/local/portals_lifecycle.bats" "$current_directory/local/portals.bats" --timing -j 3 fi if [ -z "${ORCHESTRATOR_TESTS}" ]; then diff --git a/implementations/rust/ockam/ockam_core/src/message.rs b/implementations/rust/ockam/ockam_core/src/message.rs index ded39a1a60a..9c965f5e01e 100644 --- a/implementations/rust/ockam/ockam_core/src/message.rs +++ b/implementations/rust/ockam/ockam_core/src/message.rs @@ -214,14 +214,14 @@ impl Routed { /// Return a copy of the message address. #[inline] - pub fn msg_addr(&self) -> Address { - self.msg_addr.clone() + pub fn msg_addr(&self) -> &Address { + &self.msg_addr } /// True sender of the message #[inline] - pub fn src_addr(&self) -> Address { - self.src_addr.clone() + pub fn src_addr(&self) -> &Address { + &self.src_addr } /// Return a copy of the onward route for the wrapped message. diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs index 784de418dc5..aa7c8238c01 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/encryptor_worker.rs @@ -369,16 +369,16 @@ impl Worker for EncryptorWorker { let msg_addr = msg.msg_addr(); if self.key_exchange_only { - if msg_addr == self.addresses.encryptor_api { + if msg_addr == &self.addresses.encryptor_api { self.handle_encrypt_api(ctx, msg).await?; } else { return Err(IdentityError::UnknownChannelMsgDestination)?; } - } else if msg_addr == self.addresses.encryptor { + } else if msg_addr == &self.addresses.encryptor { self.handle_encrypt(ctx, msg).await?; - } else if msg_addr == self.addresses.encryptor_api { + } else if msg_addr == &self.addresses.encryptor_api { self.handle_encrypt_api(ctx, msg).await?; - } else if msg_addr == self.addresses.encryptor_internal { + } else if msg_addr == &self.addresses.encryptor_internal { self.handle_refresh_credentials(ctx).await?; } else { return Err(IdentityError::UnknownChannelMsgDestination)?; diff --git a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs index a14e2dc5752..010942aa99b 100644 --- a/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs +++ b/implementations/rust/ockam/ockam_identity/src/secure_channel/handshake/handshake_worker.rs @@ -332,14 +332,14 @@ impl HandshakeWorker { let msg_addr = message.msg_addr(); if self.key_exchange_only { - if msg_addr == self.addresses.decryptor_api { + if msg_addr == &self.addresses.decryptor_api { decryptor_handler.handle_decrypt_api(context, message).await } else { Err(IdentityError::UnknownChannelMsgDestination)? } - } else if msg_addr == self.addresses.decryptor_remote { + } else if msg_addr == &self.addresses.decryptor_remote { decryptor_handler.handle_decrypt(context, message).await - } else if msg_addr == self.addresses.decryptor_api { + } else if msg_addr == &self.addresses.decryptor_api { decryptor_handler.handle_decrypt_api(context, message).await } else { Err(IdentityError::UnknownChannelMsgDestination)? diff --git a/implementations/rust/ockam/ockam_node/src/error.rs b/implementations/rust/ockam/ockam_node/src/error.rs index 04319e6b778..a4a4d7954e4 100644 --- a/implementations/rust/ockam/ockam_node/src/error.rs +++ b/implementations/rust/ockam/ockam_node/src/error.rs @@ -154,6 +154,8 @@ pub enum WorkerReason { Faulty, /// The worker is otherwise corrupt and can not be recovered Corrupt, + /// Couldn't send shutdown signal + CtrlChannelError, } impl fmt::Display for WorkerReason { @@ -165,6 +167,7 @@ impl fmt::Display for WorkerReason { Self::Shutdown => "target worker is shutting down", Self::Faulty => "target worker is faulty and waiting for supervisor", Self::Corrupt => "target worker is corrupt and can not be recovered", + Self::CtrlChannelError => "target worker cannot receive shutdown signal", } ) } diff --git a/implementations/rust/ockam/ockam_node/src/router/record.rs b/implementations/rust/ockam/ockam_node/src/router/record.rs index 7c12a9c0797..c5cc70cd433 100644 --- a/implementations/rust/ockam/ockam_node/src/router/record.rs +++ b/implementations/rust/ockam/ockam_node/src/router/record.rs @@ -1,7 +1,7 @@ use crate::channel_types::{oneshot_channel, MessageSender, OneshotReceiver, OneshotSender}; -use crate::error::{NodeError, NodeReason}; +use crate::error::NodeError; use crate::relay::CtrlSignal; -use crate::WorkerShutdownPriority; +use crate::{WorkerReason, WorkerShutdownPriority}; use core::default::Default; use core::fmt::Debug; use core::sync::atomic::{AtomicUsize, Ordering}; @@ -461,7 +461,7 @@ impl AddressRecord { if !self.meta.detached && !skip_sending_stop_signal { self.ctrl_tx .send(CtrlSignal::InterruptStop) - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + .map_err(|_| NodeError::WorkerState(WorkerReason::CtrlChannelError).internal())?; } Ok(()) diff --git a/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs b/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs index d3f8dbeec72..3ca8e8ce3fc 100644 --- a/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs +++ b/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs @@ -110,11 +110,11 @@ impl Worker for BleRouter { async fn handle_message(&mut self, ctx: &mut Context, msg: Routed) -> Result<()> { let msg_addr = msg.msg_addr(); - if msg_addr == self.main_addr { + if msg_addr == &self.main_addr { let msg = LocalMessage::decode(msg.payload())?; trace!("handle_message route: {:?}", msg.onward_route()); self.handle_route(ctx, msg).await?; - } else if msg_addr == self.api_addr { + } else if msg_addr == &self.api_addr { let msg = BleRouterMessage::decode(msg.payload())?; match msg { BleRouterMessage::Register { accepts, self_addr } => { diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs index 4a47b9d148b..51c9cc4c407 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/inlet_listener.rs @@ -177,7 +177,10 @@ impl Processor for TcpInletListenProcessor { #[instrument(skip_all, name = "TcpInletListenProcessor::process")] async fn process(&mut self, ctx: &mut Self::Context) -> Result { let (stream, socket_addr) = self.inner.accept().await.map_err(TransportError::from)?; - stream.set_nodelay(true).map_err(TransportError::from)?; + + stream + .set_nodelay(!self.options.enable_nagle) + .map_err(TransportError::from)?; let addresses = Addresses::generate(PortalType::Inlet); @@ -227,6 +230,7 @@ impl Processor for TcpInletListenProcessor { self.options.incoming_access_control.clone(), self.options.outgoing_access_control.clone(), self.options.portal_payload_length, + self.options.skip_handshake, )?; Ok(true) diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs index 4826e34420e..004dd580b68 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/interceptor.rs @@ -95,7 +95,7 @@ impl Worker for PortalOutletInterceptor { context: &mut Context, message: Routed, ) -> ockam_core::Result<()> { - let source_address = message.src_addr(); + let source_address = message.src_addr().clone(); let mut message = message.into_local_message(); // Remove our address @@ -314,7 +314,7 @@ impl Worker for PortalInterceptorWorker { match self.direction { Direction::FromInletToOutlet => { // if we receive a pong message, it means it must be from the other worker - if routed_message.src_addr() == self.other_worker_address { + if routed_message.src_addr() == &self.other_worker_address { if let Some(fixed_onward_route) = self.fixed_onward_route.as_ref() { debug!( "updating onward route from {} to {}", diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs index 069634bbd5d..c6d3e42077d 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/mod.rs @@ -4,6 +4,7 @@ mod inlet_shared_state; mod interceptor; pub mod options; mod outlet_listener; +mod outlet_listener_registry; mod portal_message; mod portal_receiver; mod portal_worker; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs index fd290284ae5..98f3971ecf9 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/options.rs @@ -18,6 +18,8 @@ pub struct TcpInletOptions { pub(crate) is_paused: bool, pub(crate) tls_certificate_provider: Option>, pub(crate) portal_payload_length: usize, + pub(crate) skip_handshake: bool, + pub(crate) enable_nagle: bool, } impl TcpInletOptions { @@ -29,9 +31,35 @@ impl TcpInletOptions { is_paused: false, tls_certificate_provider: None, portal_payload_length: read_portal_payload_length(), + skip_handshake: false, + enable_nagle: false, } } + /// Skip Portal handshake for lower latency, but also lower throughput + pub fn set_skip_handshake(mut self, skip_handshake: bool) -> Self { + self.skip_handshake = skip_handshake; + self + } + + /// Skip Portal handshake for lower latency, but also lower throughput + pub fn skip_handshake(mut self) -> Self { + self.skip_handshake = true; + self + } + + /// Enable Nagle's algorithm for potentially higher throughput, but higher latency + pub fn set_enable_nagle(mut self, enable_nagle: bool) -> Self { + self.enable_nagle = enable_nagle; + self + } + + /// Enable Nagle's algorithm for potentially higher throughput, but higher latency + pub fn enable_nagle(mut self) -> Self { + self.enable_nagle = true; + self + } + /// Set TCP inlet to paused mode after start. No unpause call [`TcpInlet::unpause`] pub fn paused(mut self) -> Self { self.is_paused = true; @@ -121,6 +149,8 @@ pub struct TcpOutletOptions { pub(crate) outgoing_access_control: Arc, pub(crate) tls: bool, pub(crate) portal_payload_length: usize, + pub(crate) skip_handshake: bool, + pub(crate) enable_nagle: bool, } impl TcpOutletOptions { @@ -132,9 +162,35 @@ impl TcpOutletOptions { outgoing_access_control: Arc::new(AllowAll), tls: false, portal_payload_length: read_portal_payload_length(), + skip_handshake: false, + enable_nagle: false, } } + /// Skip Portal handshake for lower latency, but also lower throughput + pub fn set_skip_handshake(mut self, skip_handshake: bool) -> Self { + self.skip_handshake = skip_handshake; + self + } + + /// Skip Portal handshake for lower latency, but also lower throughput + pub fn skip_handshake(mut self) -> Self { + self.skip_handshake = true; + self + } + + /// Enable Nagle's algorithm for potentially higher throughput, but higher latency + pub fn set_enable_nagle(mut self, enable_nagle: bool) -> Self { + self.enable_nagle = enable_nagle; + self + } + + /// Enable Nagle's algorithm for potentially higher throughput, but higher latency + pub fn enable_nagle(mut self) -> Self { + self.enable_nagle = true; + self + } + /// Set Incoming Access Control pub fn with_incoming_access_control_impl( mut self, diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs index 7bc2b750bed..30fb8131d29 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener.rs @@ -1,7 +1,9 @@ use crate::portal::addresses::{Addresses, PortalType}; +use crate::portal::outlet_listener_registry::{MapKey, OutletListenerRegistry}; use crate::{portal::TcpPortalWorker, PortalMessage, TcpOutletOptions, TcpRegistry}; use ockam_core::{ - async_trait, Address, DenyAll, NeutralMessage, Result, Routed, SecureChannelLocalInfo, Worker, + async_trait, route, Address, AllowAll, LocalMessage, NeutralMessage, Result, Routed, + SecureChannelLocalInfo, Worker, }; use ockam_node::{Context, WorkerBuilder}; use ockam_transport_core::{HostnamePort, TransportError}; @@ -16,6 +18,7 @@ pub(crate) struct TcpOutletListenWorker { registry: TcpRegistry, hostname_port: HostnamePort, options: TcpOutletOptions, + outlet_registry: OutletListenerRegistry, } impl TcpOutletListenWorker { @@ -25,6 +28,7 @@ impl TcpOutletListenWorker { registry, hostname_port, options, + outlet_registry: Default::default(), } } @@ -44,11 +48,33 @@ impl TcpOutletListenWorker { WorkerBuilder::new(worker) .with_address(address) .with_incoming_access_control_arc(access_control) - .with_outgoing_access_control(DenyAll) + .with_outgoing_access_control(AllowAll) .start(ctx)?; Ok(()) } + + async fn reroute_msg(ctx: &Context, sender_remote: Address, msg: LocalMessage) -> Result<()> { + let res = ctx + .forward_from_address( + LocalMessage::new() + .with_onward_route(route![sender_remote.clone()]) + .with_return_route(msg.return_route) + .with_local_info(msg.local_info) + .with_payload(msg.payload), + ctx.primary_address().clone(), + ) + .await; + + if res.is_err() { + debug!( + "Couldn't forward message from the outlet to {}", + sender_remote + ) + } + + Ok(()) + } } #[async_trait] @@ -81,34 +107,84 @@ impl Worker for TcpOutletListenWorker { let their_identifier = SecureChannelLocalInfo::find_info(msg.local_message()) .map(|l| l.their_identifier()) .ok(); - let src_addr = msg.src_addr(); + + let src_addr = msg.src_addr().clone(); let msg = msg.into_local_message(); - let return_route = msg.return_route; - let body = msg.payload; - let msg = PortalMessage::decode(&body)?; - if !matches!(msg, PortalMessage::Ping) { - return Err(TransportError::Protocol)?; + let remote_address = msg.return_route.recipient()?.clone(); + + let map_key = MapKey { + identifier: their_identifier.clone(), + remote_address, + }; + + if self.options.skip_handshake { + let sender_remote = self + .outlet_registry + .started_workers + .read() + .unwrap() + .get(&map_key) + .cloned(); + if let Some(sender_remote) = sender_remote { + return Self::reroute_msg(ctx, sender_remote.clone(), msg).await; + } + } else { + let msg = PortalMessage::decode(msg.payload())?; + + if !matches!(msg, PortalMessage::Ping) { + return Err(TransportError::Protocol)?; + } } let addresses = Addresses::generate(PortalType::Outlet); - TcpOutletOptions::setup_flow_control_for_outlet(ctx.flow_controls(), &addresses, &src_addr); - - TcpPortalWorker::start_new_outlet( - ctx, - self.registry.clone(), - self.hostname_port.clone(), - self.options.tls, - return_route.clone(), - their_identifier, - addresses.clone(), - self.options.incoming_access_control.clone(), - self.options.outgoing_access_control.clone(), - self.options.portal_payload_length, - )?; - - debug!("Created Tcp Outlet at {}", addresses.sender_remote); + if self.options.skip_handshake { + TcpPortalWorker::start_new_outlet_no_handshake( + ctx, + self.registry.clone(), + self.hostname_port.clone(), + self.options.tls, + msg.return_route.clone(), + their_identifier, + addresses.clone(), + self.options.outgoing_access_control.clone(), + self.options.portal_payload_length, + map_key.clone(), + self.outlet_registry.clone(), + )?; + + debug!("Created Tcp Outlet at {}", addresses.sender_remote); + + self.outlet_registry + .started_workers + .write() + .unwrap() + .insert(map_key.clone(), addresses.sender_remote.clone()); + + Self::reroute_msg(ctx, addresses.sender_remote, msg).await?; + } else { + TcpOutletOptions::setup_flow_control_for_outlet( + ctx.flow_controls(), + &addresses, + &src_addr, + ); + + TcpPortalWorker::start_new_outlet( + ctx, + self.registry.clone(), + self.hostname_port.clone(), + self.options.tls, + msg.return_route.clone(), + their_identifier, + addresses.clone(), + self.options.incoming_access_control.clone(), + self.options.outgoing_access_control.clone(), + self.options.portal_payload_length, + )?; + + debug!("Created Tcp Outlet at {}", addresses.sender_remote); + } Ok(()) } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener_registry.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener_registry.rs new file mode 100644 index 00000000000..0180a05268a --- /dev/null +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/outlet_listener_registry.rs @@ -0,0 +1,14 @@ +use ockam_core::compat::collections::HashMap; +use ockam_core::compat::sync::{Arc, RwLock}; +use ockam_core::{Address, LocalInfoIdentifier}; + +#[derive(Hash, Eq, PartialEq, Clone)] +pub(crate) struct MapKey { + pub(crate) identifier: Option, + pub(crate) remote_address: Address, +} + +#[derive(Default, Clone)] +pub struct OutletListenerRegistry { + pub(crate) started_workers: Arc>>, +} diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs index 53375759e24..1caae0d149a 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_receiver.rs @@ -10,7 +10,7 @@ use opentelemetry::global; use opentelemetry::trace::Tracer; use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; -use tracing::{error, instrument, warn}; +use tracing::{debug, error, instrument}; /// A TCP Portal receiving message processor /// @@ -96,21 +96,28 @@ impl Processor for TcpPortalRecvPr ) .await { - warn!( + debug!( "Error notifying Tcp Portal Sender about dropped connection {}", err ); } - ctx.forward_from_address( - LocalMessage::new() - .with_tracing_context(tracing_context.clone()) - .with_onward_route(self.onward_route.clone()) - .with_return_route(route![self.addresses.sender_remote.clone()]) - .with_payload(PortalMessage::Disconnect.encode()?), - self.addresses.receiver_remote.clone(), - ) - .await?; + if let Err(err) = ctx + .forward_from_address( + LocalMessage::new() + .with_tracing_context(tracing_context.clone()) + .with_onward_route(self.onward_route.clone()) + .with_return_route(route![self.addresses.sender_remote.clone()]) + .with_payload(PortalMessage::Disconnect.encode()?), + self.addresses.receiver_remote.clone(), + ) + .await + { + debug!( + "Error notifying the other side of the portal about dropped connection {}", + err + ); + } return Ok(false); } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs index d4db4a2d237..0b0272e0694 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_worker.rs @@ -1,15 +1,17 @@ use crate::portal::addresses::{Addresses, PortalType}; +use crate::portal::outlet_listener_registry::{MapKey, OutletListenerRegistry}; use crate::portal::portal_worker::ReadHalfMaybeTls::{ReadHalfNoTls, ReadHalfWithTls}; use crate::portal::portal_worker::WriteHalfMaybeTls::{WriteHalfNoTls, WriteHalfWithTls}; use crate::transport::{connect, connect_tls}; use crate::{portal::TcpPortalRecvProcessor, PortalInternalMessage, PortalMessage, TcpRegistry}; use ockam_core::compat::{boxed::Box, sync::Arc}; use ockam_core::{ - async_trait, AllowOnwardAddress, AllowSourceAddress, Decodable, DenyAll, IncomingAccessControl, - LocalInfoIdentifier, Mailbox, Mailboxes, OutgoingAccessControl, SecureChannelLocalInfo, + async_trait, AllowAll, AllowOnwardAddress, AllowSourceAddress, Decodable, DenyAll, + IncomingAccessControl, LocalInfoIdentifier, Mailbox, Mailboxes, OutgoingAccessControl, + SecureChannelLocalInfo, }; use ockam_core::{Any, Result, Route, Routed, Worker}; -use ockam_node::{Context, ProcessorBuilder, WorkerBuilder}; +use ockam_node::{Context, ProcessorBuilder, WorkerBuilder, WorkerShutdownPriority}; use ockam_transport_core::{HostnamePort, TransportError}; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWriteExt, ReadHalf, WriteHalf}; @@ -32,6 +34,13 @@ enum State { Initialized, } +pub(crate) enum HandshakeMode { + Regular, + Skip { + map: Option<(MapKey, OutletListenerRegistry)>, + }, +} + /// A TCP Portal worker /// /// A TCP Portal worker is responsible for managing the life-cycle of @@ -53,6 +62,8 @@ pub(crate) struct TcpPortalWorker { outgoing_access_control: Arc, is_tls: bool, portal_payload_length: usize, + handshake_mode: HandshakeMode, + enable_nagle: bool, } pub(crate) enum ReadHalfMaybeTls { @@ -80,19 +91,29 @@ impl TcpPortalWorker { incoming_access_control: Arc, outgoing_access_control: Arc, // To propagate to the receiver portal_payload_length: usize, + skip_handshake: bool, ) -> Result<()> { + let handshake_mode = if skip_handshake { + HandshakeMode::Skip { map: None } + } else { + HandshakeMode::Regular + }; + Self::start( ctx, registry, hostname_port, false, State::SendPing { ping_route }, + None, their_identifier, Some(streams), addresses, incoming_access_control, outgoing_access_control, portal_payload_length, + handshake_mode, + false, ) } @@ -117,12 +138,52 @@ impl TcpPortalWorker { hostname_port, tls, State::SendPong { pong_route }, + None, their_identifier, None, addresses, incoming_access_control, outgoing_access_control, portal_payload_length, + HandshakeMode::Regular, + false, + ) + } + + /// Start a new `TcpPortalWorker` of type [`TypeName::Outlet`] + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all)] + pub(super) fn start_new_outlet_no_handshake( + ctx: &Context, + registry: TcpRegistry, + hostname_port: HostnamePort, + tls: bool, + pong_route: Route, + their_identifier: Option, + addresses: Addresses, + outgoing_access_control: Arc, + portal_payload_length: usize, + map_key: MapKey, + outlet_listener_registry: OutletListenerRegistry, + ) -> Result<()> { + Self::start( + ctx, + registry, + hostname_port, + tls, + State::Initialized, + Some(pong_route), + their_identifier, + None, + addresses, + // We now only receive messages from the "outlet" address on our own node + Arc::new(AllowAll), + outgoing_access_control, + portal_payload_length, + HandshakeMode::Skip { + map: Some((map_key, outlet_listener_registry)), + }, + false, ) } @@ -135,12 +196,15 @@ impl TcpPortalWorker { hostname_port: HostnamePort, is_tls: bool, state: State, + remote_route: Option, their_identifier: Option, streams: Option<(ReadHalfMaybeTls, WriteHalfMaybeTls)>, addresses: Addresses, incoming_access_control: Arc, outgoing_access_control: Arc, portal_payload_length: usize, + handshake_mode: HandshakeMode, + enable_nagle: bool, ) -> Result<()> { let portal_type = if streams.is_some() { PortalType::Inlet @@ -166,13 +230,15 @@ impl TcpPortalWorker { read_half: rx, hostname_port, addresses: addresses.clone(), - remote_route: None, + remote_route, is_disconnecting: false, portal_type, last_received_packet_counter: u16::MAX, is_tls, outgoing_access_control: outgoing_access_control.clone(), portal_payload_length, + enable_nagle, + handshake_mode, }; let internal_mailbox = Mailbox::new( @@ -192,6 +258,7 @@ impl TcpPortalWorker { // start worker WorkerBuilder::new(worker) .with_mailboxes(Mailboxes::new(internal_mailbox, vec![remote_mailbox])) + .with_shutdown_priority(WorkerShutdownPriority::Priority4) .start(ctx)?; Ok(()) @@ -206,8 +273,11 @@ enum DisconnectionReason { } impl TcpPortalWorker { - fn clone_state(&self) -> State { - self.state.clone() + fn skip_handshake(&self) -> bool { + match &self.handshake_mode { + HandshakeMode::Regular => false, + HandshakeMode::Skip { .. } => true, + } } /// Start a `TcpPortalRecvProcessor` @@ -254,39 +324,59 @@ impl TcpPortalWorker { ProcessorBuilder::new(receiver) .with_mailboxes(Mailboxes::new(remote, vec![internal])) + .with_shutdown_priority(WorkerShutdownPriority::Priority3) .start(ctx)?; Ok(()) } #[instrument(skip_all)] - async fn notify_remote_about_disconnection(&mut self, ctx: &Context) -> Result<()> { + async fn notify_remote_about_disconnection(&mut self, ctx: &Context) { // Notify the other end - if let Some(remote_route) = self.remote_route.take() { - ctx.send_from_address( + let remote_route = if let Some(remote_route) = self.remote_route.take() { + remote_route + } else { + return; + }; + + let disconnect_msg = match PortalMessage::Disconnect.to_neutral_message() { + Ok(msg) => msg, + Err(_) => return, + }; + + if ctx + .send_from_address( remote_route, - PortalMessage::Disconnect.to_neutral_message()?, + disconnect_msg, self.addresses.sender_remote.clone(), ) - .await?; - + .await + .is_err() + { + debug!( + portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, + "error notifying the other side of portal that the connection is dropped", + ); + } else { debug!( portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "notified the other side of portal that the connection is dropped", ); } - - Ok(()) } #[instrument(skip_all)] - fn stop_receiver(&self, ctx: &Context) -> Result<()> { - if ctx.stop_address(&self.addresses.receiver_remote).is_ok() { - debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, + fn stop_receiver(&self, ctx: &Context) { + match ctx.stop_address(&self.addresses.receiver_remote) { + Ok(_) => { + debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "stopped receiver due to connection drop"); + } + Err(_) => { + debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, + "error stopping receiver due to connection drop"); + } } - - Ok(()) } #[instrument(skip_all)] @@ -307,8 +397,8 @@ impl TcpPortalWorker { // We couldn't send data to the tcp connection, let's notify the other end about dropped // connection and shut down both processor and worker DisconnectionReason::FailedTx => { - self.notify_remote_about_disconnection(ctx).await?; - self.stop_receiver(ctx)?; + self.notify_remote_about_disconnection(ctx).await; + self.stop_receiver(ctx); // Sleep, so that if connection is dropped on both sides at the same time, the other // side had time to notify us about the closure. Otherwise, the message won't be // delivered which can lead to a warning message from a secure channel (or whatever @@ -319,8 +409,8 @@ impl TcpPortalWorker { // Packets were dropped while traveling to us, let's notify the other end about dropped // connection and DisconnectionReason::InvalidCounter => { - self.notify_remote_about_disconnection(ctx).await?; - self.stop_receiver(ctx)?; + self.notify_remote_about_disconnection(ctx).await; + self.stop_receiver(ctx); self.stop_sender(ctx)?; } // We couldn't read data from the tcp connection @@ -336,7 +426,7 @@ impl TcpPortalWorker { // Other end notifies us that the tcp connection is dropped // Let's shut down both processor and worker DisconnectionReason::Remote => { - self.stop_receiver(ctx)?; + self.stop_receiver(ctx); self.stop_sender(ctx)?; } } @@ -348,10 +438,10 @@ impl TcpPortalWorker { } #[instrument(skip_all)] - async fn handle_send_ping(&self, ctx: &Context, ping_route: Route) -> Result { + async fn handle_send_ping(&mut self, ctx: &Context, ping_route: Route) -> Result { // Force creation of Outlet on the other side ctx.send_from_address( - ping_route, + ping_route.clone(), PortalMessage::Ping.to_neutral_message()?, self.addresses.sender_remote.clone(), ) @@ -359,27 +449,41 @@ impl TcpPortalWorker { debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "sent ping"); - Ok(State::ReceivePong) - } + if self.skip_handshake() { + self.remote_route = Some(ping_route.clone()); + self.start_receiver(ctx, ping_route)?; - #[instrument(skip_all)] - async fn handle_send_pong(&mut self, ctx: &Context, pong_route: Route) -> Result { - if self.write_half.is_some() { - // Should not happen - return Err(TransportError::PortalInvalidState)?; + Ok(State::Initialized) + } else { + Ok(State::ReceivePong) } + } + + async fn connect(&mut self) -> Result<()> { if self.is_tls { debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "connect to {} via TLS", &self.hostname_port); - let (rx, tx) = connect_tls(&self.hostname_port).await?; + let (rx, tx) = connect_tls(&self.hostname_port, self.enable_nagle).await?; self.write_half = Some(WriteHalfWithTls(tx)); self.read_half = Some(ReadHalfWithTls(rx)); } else { debug!(portal_type = %self.portal_type, sender_internal = %self.addresses.sender_internal, "connect to {}", self.hostname_port); - let (rx, tx) = connect(&self.hostname_port, None).await?; + let (rx, tx) = connect(&self.hostname_port, self.enable_nagle, None).await?; self.write_half = Some(WriteHalfNoTls(tx)); self.read_half = Some(ReadHalfNoTls(rx)); } + Ok(()) + } + + #[instrument(skip_all)] + async fn handle_send_pong(&mut self, ctx: &Context, pong_route: Route) -> Result { + if self.write_half.is_some() { + // Should not happen + return Err(TransportError::PortalInvalidState)?; + } + + self.connect().await?; + // Respond to Inlet before starting the processor but // after the connection has been established // to avoid a payload being sent before the pong @@ -406,16 +510,18 @@ impl Worker for TcpPortalWorker { #[instrument(skip_all, name = "TcpPortalWorker::initialize")] async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> { - let state = self.clone_state(); - - match state { + match &self.state { State::SendPing { ping_route } => { self.state = self.handle_send_ping(ctx, ping_route.clone()).await?; } State::SendPong { pong_route } => { self.state = self.handle_send_pong(ctx, pong_route.clone()).await?; } - State::ReceivePong | State::Initialized { .. } => { + State::Initialized => { + self.connect().await?; + self.start_receiver(ctx, self.remote_route.clone().unwrap())?; + } + State::ReceivePong => { return Err(TransportError::PortalInvalidState)?; } } @@ -432,6 +538,16 @@ impl Worker for TcpPortalWorker { #[instrument(skip_all, name = "TcpPortalWorker::shutdown")] async fn shutdown(&mut self, _ctx: &mut Self::Context) -> Result<()> { + if let HandshakeMode::Skip { map } = &mut self.handshake_mode { + if let Some((map_key, outlet_listener_registry)) = map.take() { + outlet_listener_registry + .started_workers + .write() + .unwrap() + .remove(&map_key); + } + } + self.registry .remove_portal_worker(&self.addresses.sender_remote); @@ -450,7 +566,6 @@ impl Worker for TcpPortalWorker { // knows what to do with the incoming message let msg = msg.into_local_message(); - let state = self.clone_state(); let mut onward_route = msg.onward_route; let recipient = onward_route.step()?; if onward_route.next().is_ok() { @@ -476,7 +591,7 @@ impl Worker for TcpPortalWorker { let return_route = msg.return_route; let payload = msg.payload; - match state { + match &self.state { State::ReceivePong => { if !remote_packet { return Err(TransportError::PortalInvalidState)?; @@ -503,7 +618,7 @@ impl Worker for TcpPortalWorker { self.start_disconnection(ctx, DisconnectionReason::Remote) .await } - PortalMessage::Ping | PortalMessage::Pong => Err(TransportError::Protocol)?, + PortalMessage::Ping | PortalMessage::Pong => Ok(()), } } else { let msg = PortalInternalMessage::decode(&payload)?; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/common.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/common.rs index 28423195df2..9155369602c 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/common.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/common.rs @@ -17,14 +17,18 @@ use tracing::{debug, instrument}; #[instrument(skip_all)] pub(crate) async fn connect( to: &HostnamePort, + enable_nagle: bool, timeout: Option, ) -> Result<(OwnedReadHalf, OwnedWriteHalf)> { - Ok(create_tcp_stream(to, timeout).await?.into_split()) + Ok(create_tcp_stream(to, enable_nagle, timeout) + .await? + .into_split()) } /// Create a TCP stream to a given socket address pub(crate) async fn create_tcp_stream( to: &HostnamePort, + enable_nagle: bool, timeout: Option, ) -> Result { debug!(addr = %to, "Connecting"); @@ -66,7 +70,10 @@ pub(crate) async fn create_tcp_stream( socket .set_tcp_keepalive(&keepalive) .map_err(TransportError::from)?; - socket.set_nodelay(true).map_err(TransportError::from)?; + + socket + .set_nodelay(!enable_nagle) + .map_err(TransportError::from)?; Ok(connection) } @@ -76,6 +83,7 @@ pub(crate) async fn create_tcp_stream( #[instrument(skip_all)] pub(crate) async fn connect_tls( to: &HostnamePort, + enable_nagle: bool, ) -> Result<( ReadHalf>, WriteHalf>, @@ -83,7 +91,7 @@ pub(crate) async fn connect_tls( debug!(to = %to, "Trying to connect using TLS"); // create a tcp stream - let connection = create_tcp_stream(to, None).await?; + let connection = create_tcp_stream(to, enable_nagle, None).await?; // create a TLS connector let tls_connector = create_tls_connector().await?; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/connection.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/connection.rs index 3c2f11bf9e2..b1a2f91f3ec 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/connection.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/connection.rs @@ -111,7 +111,7 @@ impl TcpTransport { let peer = HostnamePort::from_str(&peer.into())?; debug!("Connecting to {}", peer.clone()); - let (read_half, write_half) = connect(&peer, options.timeout).await?; + let (read_half, write_half) = connect(&peer, false, options.timeout).await?; let socket = read_half .peer_addr() .map_err(|e| ockam_core::Error::new(Origin::Transport, Kind::Internal, e))?; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs b/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs index 9bcef2aebe1..4ff08d1a38b 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/workers/receiver.rs @@ -99,18 +99,19 @@ impl TcpRecvProcessor { Ok(()) } - async fn notify_sender_stream_dropped(&self, ctx: &Context, msg: impl Display) -> Result<()> { + async fn notify_sender_stream_dropped(&self, ctx: &Context, msg: impl Display) { debug!( "Connection to peer '{}' was closed; dropping stream. {}", self.socket_address, msg ); - ctx.send_from_address( - self.addresses.sender_internal_address().clone(), - TcpSendWorkerMsg::ConnectionClosed, - self.addresses.receiver_internal_address().clone(), - ) - .await + _ = ctx + .send_from_address( + self.addresses.sender_internal_address().clone(), + TcpSendWorkerMsg::ConnectionClosed, + self.addresses.receiver_internal_address().clone(), + ) + .await; } } @@ -132,7 +133,7 @@ impl Processor for TcpRecvProcessor { Ok(p) => p, Err(e) => { trace!("Cannot read the Ockam protocol version: {:?}", &e); - self.notify_sender_stream_dropped(ctx, e).await?; + self.notify_sender_stream_dropped(ctx, e).await; // stop this processor ctx.stop_primary_address()?; return Ok(()); @@ -145,7 +146,7 @@ impl Processor for TcpRecvProcessor { let message = format!("Received protocol message is unsupported: {protocol_version}"); trace!("{}: {:?}", &message, &e); - self.notify_sender_stream_dropped(ctx, message).await?; + self.notify_sender_stream_dropped(ctx, message).await; // stop this processor ctx.stop_primary_address()?; return Ok(()); @@ -180,7 +181,7 @@ impl Processor for TcpRecvProcessor { let len = match self.read_half.read_u32().await { Ok(l) => l, Err(e) => { - self.notify_sender_stream_dropped(ctx, e).await?; + self.notify_sender_stream_dropped(ctx, e).await; return Ok(false); } }; @@ -192,7 +193,7 @@ impl Processor for TcpRecvProcessor { ctx, format!("Received message len doesn't fit usize: {}", len), ) - .await?; + .await; return Ok(false); } }; @@ -205,7 +206,7 @@ impl Processor for TcpRecvProcessor { len_usize, MAX_MESSAGE_SIZE ), ) - .await?; + .await; return Ok(false); } @@ -220,7 +221,7 @@ impl Processor for TcpRecvProcessor { match self.read_half.read_exact(&mut self.incoming_buffer).await { Ok(_) => {} Err(e) => { - self.notify_sender_stream_dropped(ctx, e).await?; + self.notify_sender_stream_dropped(ctx, e).await; return Ok(false); } } @@ -229,7 +230,7 @@ impl Processor for TcpRecvProcessor { let transport_message: TcpTransportMessage = match minicbor::decode(&self.incoming_buffer) { Ok(msg) => msg, Err(e) => { - self.notify_sender_stream_dropped(ctx, e).await?; + self.notify_sender_stream_dropped(ctx, e).await; return Ok(false); } }; diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs index 085481433ba..73b4922f8d6 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/workers/sender.rs @@ -108,7 +108,7 @@ impl TcpSendWorker { WorkerBuilder::new(sender_worker) .with_mailboxes(Mailboxes::new(main_mailbox.clone(), vec![internal_mailbox])) - .with_shutdown_priority(WorkerShutdownPriority::Priority1) + .with_shutdown_priority(WorkerShutdownPriority::Priority2) .start(ctx)?; Ok(()) @@ -116,7 +116,7 @@ impl TcpSendWorker { #[instrument(skip_all, name = "TcpSendWorker::stop")] fn stop(&self, ctx: &Context) -> Result<()> { - ctx.stop_address(self.addresses.sender_address()) + ctx.stop_primary_address() } fn serialize_message(&mut self, local_message: LocalMessage) -> Result<()> { @@ -215,7 +215,7 @@ impl Worker for TcpSendWorker { msg: Routed, ) -> Result<()> { let recipient = msg.msg_addr(); - if &recipient == self.addresses.sender_internal_address() { + if recipient == self.addresses.sender_internal_address() { let msg = TcpSendWorkerMsg::decode(msg.payload())?; match msg { diff --git a/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs b/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs index 778143b5f30..7f78d6e1d37 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/tests/portal.rs @@ -12,7 +12,7 @@ use ockam_transport_tcp::{ const LENGTH: usize = 32; -async fn setup(ctx: &Context) -> Result<(String, TcpListener)> { +async fn setup(ctx: &Context, skip_handshake: bool) -> Result<(String, TcpListener)> { let tcp = TcpTransport::create(ctx)?; let listener = { @@ -21,13 +21,17 @@ async fn setup(ctx: &Context) -> Result<(String, TcpListener)> { tcp.create_outlet( "outlet", bind_address.try_into().unwrap(), - TcpOutletOptions::new(), + TcpOutletOptions::new().set_skip_handshake(skip_handshake), )?; listener }; let inlet = tcp - .create_inlet("127.0.0.1:0", route!["outlet"], TcpInletOptions::new()) + .create_inlet( + "127.0.0.1:0", + route!["outlet"], + TcpInletOptions::new().set_skip_handshake(skip_handshake), + ) .await?; Ok((inlet.socket_address().to_string(), listener)) @@ -60,10 +64,24 @@ async fn read_should_timeout(stream: &mut TcpStream) { #[allow(non_snake_case)] #[ockam_macros::test(timeout = 5000)] async fn portal__standard_flow__should_succeed(ctx: &mut Context) -> Result<()> { + portal__standard_flow__should_succeed__impl(ctx, false).await +} + +#[allow(non_snake_case)] +#[ockam_macros::test(timeout = 5000)] +async fn portal_skip_handshake__standard_flow__should_succeed(ctx: &mut Context) -> Result<()> { + portal__standard_flow__should_succeed__impl(ctx, true).await +} + +#[allow(non_snake_case)] +async fn portal__standard_flow__should_succeed__impl( + ctx: &mut Context, + skip_handshake: bool, +) -> Result<()> { let payload1 = generate_binary(); let payload2 = generate_binary(); - let (inlet_addr, listener) = setup(ctx).await?; + let (inlet_addr, listener) = setup(ctx, skip_handshake).await?; let handle = tokio::spawn(async move { let (mut stream, _) = listener.accept().await.unwrap(); @@ -89,10 +107,24 @@ async fn portal__standard_flow__should_succeed(ctx: &mut Context) -> Result<()> #[allow(non_snake_case)] #[ockam_macros::test(timeout = 5000)] async fn portal__reverse_flow__should_succeed(ctx: &mut Context) -> Result<()> { + portal__reverse_flow__should_succeed__impl(ctx, false).await +} + +#[allow(non_snake_case)] +#[ockam_macros::test(timeout = 5000)] +async fn portal_skip_handshake__reverse_flow__should_succeed(ctx: &mut Context) -> Result<()> { + portal__reverse_flow__should_succeed__impl(ctx, true).await +} + +#[allow(non_snake_case)] +async fn portal__reverse_flow__should_succeed__impl( + ctx: &mut Context, + skip_handshake: bool, +) -> Result<()> { let payload1 = generate_binary(); let payload2 = generate_binary(); - let (inlet_addr, listener) = setup(ctx).await?; + let (inlet_addr, listener) = setup(ctx, skip_handshake).await?; let handle = tokio::spawn(async move { let (mut stream, _) = listener.accept().await.unwrap(); @@ -118,6 +150,20 @@ async fn portal__reverse_flow__should_succeed(ctx: &mut Context) -> Result<()> { #[allow(non_snake_case)] #[ockam_macros::test(timeout = 15000)] async fn portal__tcp_connection__should_succeed(ctx: &mut Context) -> Result<()> { + portal__tcp_connection__should_succeed__impl(ctx, false).await +} + +#[allow(non_snake_case)] +#[ockam_macros::test(timeout = 15000)] +async fn portal_skip_handshake__tcp_connection__should_succeed(ctx: &mut Context) -> Result<()> { + portal__tcp_connection__should_succeed__impl(ctx, true).await +} + +#[allow(non_snake_case)] +async fn portal__tcp_connection__should_succeed__impl( + ctx: &mut Context, + skip_handshake: bool, +) -> Result<()> { let payload1 = generate_binary(); let payload2 = generate_binary(); @@ -140,14 +186,16 @@ async fn portal__tcp_connection__should_succeed(ctx: &mut Context) -> Result<()> tcp.create_outlet( "outlet", bind_address.try_into().unwrap(), - TcpOutletOptions::new().as_consumer(&outlet_flow_control_id), + TcpOutletOptions::new() + .as_consumer(&outlet_flow_control_id) + .set_skip_handshake(skip_handshake), )?; let inlet = tcp .create_inlet( "127.0.0.1:0", route![tcp_connection.clone(), "outlet"], - TcpInletOptions::new(), + TcpInletOptions::new().set_skip_handshake(skip_handshake), ) .await?; @@ -168,6 +216,10 @@ async fn portal__tcp_connection__should_succeed(ctx: &mut Context) -> Result<()> let res = handle.await; assert!(res.is_ok()); + drop(stream); + + tokio::time::sleep(Duration::from_millis(250)).await; + Ok(()) } @@ -175,6 +227,22 @@ async fn portal__tcp_connection__should_succeed(ctx: &mut Context) -> Result<()> #[ockam_macros::test(timeout = 15000)] async fn portal__tcp_connection_with_invalid_message_flow__should_not_succeed( ctx: &mut Context, +) -> Result<()> { + portal__tcp_connection_with_invalid_message_flow__should_not_succeed__impl(ctx, false).await +} + +#[allow(non_snake_case)] +#[ockam_macros::test(timeout = 15000)] +async fn portal_skip_handshake__tcp_connection_with_invalid_message_flow__should_not_succeed( + ctx: &mut Context, +) -> Result<()> { + portal__tcp_connection_with_invalid_message_flow__should_not_succeed__impl(ctx, true).await +} + +#[allow(non_snake_case)] +async fn portal__tcp_connection_with_invalid_message_flow__should_not_succeed__impl( + ctx: &mut Context, + skip_handshake: bool, ) -> Result<()> { let payload = generate_binary(); @@ -194,14 +262,14 @@ async fn portal__tcp_connection_with_invalid_message_flow__should_not_succeed( tcp.create_outlet( "outlet_invalid", bind_address.try_into().unwrap(), - TcpOutletOptions::new(), + TcpOutletOptions::new().set_skip_handshake(skip_handshake), )?; let inlet = tcp .create_inlet( "127.0.0.1:0", route![tcp_connection, "outlet_invalid"], - TcpInletOptions::new(), + TcpInletOptions::new().set_skip_handshake(skip_handshake), ) .await?; @@ -223,9 +291,9 @@ async fn portal__tcp_connection_with_invalid_message_flow__should_not_succeed( handle.abort(); - if let Err(e) = ctx.shutdown_node().await { - println!("Unclean stop: {}", e) - } + drop(stream); + + tokio::time::sleep(Duration::from_millis(250)).await; Ok(()) } @@ -233,6 +301,20 @@ async fn portal__tcp_connection_with_invalid_message_flow__should_not_succeed( #[allow(non_snake_case)] #[ockam_macros::test(timeout = 5000)] async fn portal__update_route__should_succeed(ctx: &mut Context) -> Result<()> { + portal__update_route__should_succeed__impl(ctx, false).await +} + +#[allow(non_snake_case)] +#[ockam_macros::test(timeout = 5000)] +async fn portal_skip_handshake__update_route__should_succeed(ctx: &mut Context) -> Result<()> { + portal__update_route__should_succeed__impl(ctx, true).await +} + +#[allow(non_snake_case)] +async fn portal__update_route__should_succeed__impl( + ctx: &mut Context, + skip_handshake: bool, +) -> Result<()> { let payload1 = generate_binary(); let payload2 = generate_binary(); @@ -248,7 +330,9 @@ async fn portal__update_route__should_succeed(ctx: &mut Context) -> Result<()> { .unwrap() .to_string() .try_into()?, - TcpOutletOptions::new().as_consumer(listener_node.flow_control_id()), + TcpOutletOptions::new() + .as_consumer(listener_node.flow_control_id()) + .set_skip_handshake(skip_handshake), )?; let node_connection1 = tcp @@ -269,7 +353,7 @@ async fn portal__update_route__should_succeed(ctx: &mut Context) -> Result<()> { .create_inlet( "127.0.0.1:0", route![node_connection1.clone(), "outlet"], - TcpInletOptions::new(), + TcpInletOptions::new().set_skip_handshake(skip_handshake), ) .await?; @@ -305,5 +389,9 @@ async fn portal__update_route__should_succeed(ctx: &mut Context) -> Result<()> { let res = handle.await; assert!(res.is_ok()); + drop(stream); + + tokio::time::sleep(Duration::from_millis(250)).await; + Ok(()) } diff --git a/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/receiver.rs b/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/receiver.rs index 8b36c205cc4..cec55965e09 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/receiver.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/puncture/puncture/receiver.rs @@ -283,11 +283,11 @@ impl Worker for UdpPunctureReceiverWorker { msg: Routed, ) -> Result<()> { let addr = msg.msg_addr(); - if &addr == self.addresses.remote_address() { + if addr == self.addresses.remote_address() { let msg = msg.into_local_message(); let return_route = msg.return_route; self.handle_peer(ctx, msg.payload, &return_route).await?; - } else if &addr == self.addresses.heartbeat_address() { + } else if addr == self.addresses.heartbeat_address() { self.handle_heartbeat(ctx).await?; } else { return Err(PunctureError::Internal)?; diff --git a/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs b/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs index 49bd07dd40f..b1a691aca3e 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs @@ -275,9 +275,9 @@ impl Worker for UdsRouter { let return_route = msg.return_route().clone(); let msg_addr = msg.msg_addr(); - if msg_addr == self.main_addr { + if msg_addr == &self.main_addr { self.handle_route(ctx, msg.into_local_message()).await?; - } else if msg_addr == self.api_addr { + } else if msg_addr == &self.api_addr { let msg = UdsRouterRequest::decode(msg.payload())?; match msg { UdsRouterRequest::Register { accepts, self_addr } => { diff --git a/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs index cf51b6a2a14..eb1b5557dd9 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/workers/sender.rs @@ -253,7 +253,7 @@ impl Worker for UdsSendWorker { }; let recipient = msg.msg_addr(); - if recipient == self.internal_addr { + if recipient == &self.internal_addr { let msg = UdsSendWorkerMsg::decode(msg.payload())?; match msg { diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs b/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs index a364dd78e5e..d82319e5998 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs @@ -121,9 +121,9 @@ impl Worker for WebSocketRouter { let return_route = msg.return_route().clone(); let msg_addr = msg.msg_addr(); - if msg_addr == self.main_addr { + if msg_addr == &self.main_addr { self.handle_route(ctx, msg.into_local_message()).await?; - } else if msg_addr == self.api_addr { + } else if msg_addr == &self.api_addr { let msg = WebSocketRouterRequest::decode(msg.payload())?; match msg { WebSocketRouterRequest::Register { accepts, self_addr } => { diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/workers/sender.rs b/implementations/rust/ockam/ockam_transport_websocket/src/workers/sender.rs index 2a402cabd3b..3db9844141f 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/workers/sender.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/workers/sender.rs @@ -190,7 +190,7 @@ where }; let recipient = msg.msg_addr(); - if recipient == self.internal_addr { + if recipient == &self.internal_addr { let msg = TransportMessage::latest(route![], route![], vec![]); // Sending empty heartbeat if ws_sink