diff --git a/src/cli.rs b/src/cli.rs index 2ad0a7ccd2..081e612677 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -202,58 +202,21 @@ impl Cli { shutdown_tx.send(()).ok(); }); - let fut = tryhard::retry_fn({ - let shutdown_rx = shutdown_rx.clone(); - let mode = mode.clone(); - move || match self.command.clone() { - Commands::Agent(agent) => { - let config = config.clone(); - let shutdown_rx = shutdown_rx.clone(); - let mode = mode.clone(); - tokio::spawn(async move { - agent.run(config.clone(), mode, shutdown_rx.clone()).await - }) - } - Commands::Proxy(runner) => { - let config = config.clone(); - let shutdown_rx = shutdown_rx.clone(); - let mode = mode.clone(); - tokio::spawn(async move { - runner - .run(config.clone(), mode.clone(), shutdown_rx.clone()) - .await - }) - } - Commands::Manage(manager) => { - let config = config.clone(); - let shutdown_rx = shutdown_rx.clone(); - let mode = mode.clone(); - tokio::spawn(async move { - manager - .manage(config.clone(), mode, shutdown_rx.clone()) - .await - }) - } - Commands::Relay(relay) => { - let config = config.clone(); - let shutdown_rx = shutdown_rx.clone(); - let mode = mode.clone(); - tokio::spawn( - async move { relay.relay(config, mode, shutdown_rx.clone()).await }, - ) - } - Commands::GenerateConfigSchema(_) | Commands::Qcmp(_) => unreachable!(), + match self.command { + Commands::Agent(agent) => agent.run(config.clone(), mode, shutdown_rx.clone()).await, + Commands::Proxy(runner) => { + runner + .run(config.clone(), mode.clone(), shutdown_rx.clone()) + .await } - }) - .retries(3) - .on_retry(|_, _, error| { - let error = error.to_string(); - async move { - tracing::warn!(%error, "error would have caused fatal crash"); + Commands::Manage(manager) => { + manager + .manage(config.clone(), mode, shutdown_rx.clone()) + .await } - }); - - fut.await? + Commands::Relay(relay) => relay.relay(config, mode, shutdown_rx.clone()).await, + Commands::GenerateConfigSchema(_) | Commands::Qcmp(_) => unreachable!(), + } } /// Searches for the configuration file, and panics if not found. @@ -403,9 +366,9 @@ mod tests { tokio::spawn(relay.drive()); tokio::spawn(control_plane.drive()); - tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(50)).await; tokio::spawn(proxy.drive()); - tokio::time::sleep(Duration::from_millis(500)).await; + tokio::time::sleep(Duration::from_millis(50)).await; let socket = create_socket().await; let config = Config::default(); let proxy_address: SocketAddr = (std::net::Ipv4Addr::LOCALHOST, 7777).into(); @@ -436,7 +399,7 @@ mod tests { assert_eq!( "hello", - timeout(Duration::from_millis(500), rx.recv()) + timeout(Duration::from_millis(100), rx.recv()) .await .expect("should have received a packet") .unwrap() @@ -449,7 +412,7 @@ mod tests { let msg = b"hello\xFF\xFF\xFF"; socket.send_to(msg, &proxy_address).await.unwrap(); - let result = timeout(Duration::from_millis(500), rx.recv()).await; + let result = timeout(Duration::from_millis(50), rx.recv()).await; assert!(result.is_err(), "should not have received a packet"); tracing::info!(?token, "didn't receive bad packet"); } diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index 7b3f621943..49790e7aca 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -470,18 +470,17 @@ mod tests { let endpoint = t.open_socket_and_recv_single_packet().await; let mut local_addr = available_addr(&AddressType::Ipv6).await; crate::test::map_addr_to_localhost(&mut local_addr); + let mut dest = endpoint.socket.local_ipv6_addr().unwrap(); + crate::test::map_addr_to_localhost(&mut dest); + let proxy = crate::cli::Proxy { port: local_addr.port(), ..<_>::default() }; + let config = Arc::new(Config::default()); config.clusters.modify(|clusters| { - clusters.insert_default( - [Endpoint::new( - endpoint.socket.local_ipv6_addr().unwrap().into(), - )] - .into(), - ); + clusters.insert_default([Endpoint::new(dest.into())].into()); }); t.run_server(config, proxy, None); tokio::time::sleep(std::time::Duration::from_millis(100)).await; @@ -509,6 +508,8 @@ mod tests { load_test_filters(); let endpoint = t.open_socket_and_recv_single_packet().await; let local_addr = available_addr(&AddressType::Random).await; + let mut dest = endpoint.socket.local_ipv4_addr().unwrap(); + crate::test::map_addr_to_localhost(&mut dest); let config = Arc::new(Config::default()); config.filters.store( crate::filters::FilterChain::try_from(vec![config::Filter { @@ -520,12 +521,7 @@ mod tests { .unwrap(), ); config.clusters.modify(|clusters| { - clusters.insert_default( - [Endpoint::new( - endpoint.socket.local_ipv4_addr().unwrap().into(), - )] - .into(), - ); + clusters.insert_default([Endpoint::new(dest.into())].into()); }); t.run_server( config, diff --git a/src/cli/proxy/sessions.rs b/src/cli/proxy/sessions.rs index dddadf8c23..ef6b984f13 100644 --- a/src/cli/proxy/sessions.rs +++ b/src/cli/proxy/sessions.rs @@ -353,32 +353,39 @@ impl SessionPool { ) { tracing::trace!("releasing socket"); let mut storage = self.storage.write().await; - let socket_set = storage.destination_to_sockets.get_mut(dest).unwrap(); + let Some(socket_set) = storage.destination_to_sockets.get_mut(dest) else { + return; + }; - assert!(socket_set.remove(&port)); + socket_set.remove(&port); if socket_set.is_empty() { - storage.destination_to_sockets.remove(dest).unwrap(); + storage.destination_to_sockets.remove(dest); } - let dest_set = storage.sockets_to_destination.get_mut(&port).unwrap(); + let Some(dest_set) = storage.sockets_to_destination.get_mut(&port) else { + return; + }; - assert!(dest_set.remove(dest)); + dest_set.remove(dest); if dest_set.is_empty() { - storage.sockets_to_destination.remove(&port).unwrap(); + storage.sockets_to_destination.remove(&port); } // Not asserted because the source might not have GeoIP info. storage.sources_to_asn_info.remove(source); - assert!(storage - .destination_to_sources - .remove(&(*dest, port)) - .is_some()); + storage.destination_to_sources.remove(&(*dest, port)); tracing::trace!("socket released"); } } +impl Drop for SessionPool { + fn drop(&mut self) { + drop(std::mem::take(&mut self.session_map)); + } +} + /// Session encapsulates a UDP stream session #[derive(Debug)] pub struct Session { diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs index 3eb7cc5573..1e42b51c87 100644 --- a/src/codec/qcmp.rs +++ b/src/codec/qcmp.rs @@ -44,7 +44,7 @@ pub async fn spawn(port: u16) -> crate::Result<()> { let mut output_buf = Vec::new(); loop { - tracing::info!(%v4_addr, %v6_addr, "awaiting qcmp packets"); + tracing::debug!("awaiting qcmp packets"); match socket.recv_from(&mut buf).await { Ok((size, source)) => { @@ -83,7 +83,7 @@ pub async fn spawn(port: u16) -> crate::Result<()> { } } } - .instrument(tracing::info_span!("qcmp_task", %v4_addr, %v6_addr)), + .instrument(tracing::debug_span!("qcmp_task", %v4_addr, %v6_addr)), ); Ok(()) } diff --git a/src/net/xds.rs b/src/net/xds.rs index 4070795ed4..cf7bf34ca6 100644 --- a/src/net/xds.rs +++ b/src/net/xds.rs @@ -190,7 +190,6 @@ mod tests { // Test that the client can handle the manager dropping out. let handle = tokio::spawn(server::spawn(xds_port, xds_config.clone())); - tokio::time::sleep(std::time::Duration::from_millis(50)).await; let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(()); tokio::spawn(server::spawn(xds_port, xds_config.clone())); let client_proxy = crate::cli::Proxy { @@ -207,7 +206,6 @@ mod tests { }); tokio::time::sleep(std::time::Duration::from_millis(50)).await; - tokio::time::sleep(std::time::Duration::from_millis(50)).await; handle.abort(); tokio::time::sleep(std::time::Duration::from_millis(50)).await; tokio::spawn(server::spawn(xds_port, xds_config.clone())); @@ -279,10 +277,11 @@ mod tests { .send_to(&packet, (std::net::Ipv6Addr::LOCALHOST, client_addr.port())) .await .unwrap(); - let response = tokio::time::timeout(std::time::Duration::from_secs(1), client.packet_rx) - .await - .unwrap() - .unwrap(); + let response = + tokio::time::timeout(std::time::Duration::from_millis(100), client.packet_rx) + .await + .unwrap() + .unwrap(); assert_eq!(format!("{}{}", fixture, token), response); } @@ -308,7 +307,7 @@ mod tests { config.clone(), crate::cli::admin::IDLE_REQUEST_INTERVAL_SECS, ); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; // Each time, we create a new upstream endpoint and send a cluster update for it. let concat_bytes = vec![("b", "c,"), ("d", "e")]; @@ -322,7 +321,6 @@ mod tests { cluster.clear(); cluster.insert(Endpoint::new(local_addr.clone())); }); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; let filters = crate::filters::FilterChain::try_from(vec![ Concatenate::as_filter_config(concatenate::Config { @@ -346,7 +344,7 @@ mod tests { .discovery_request(ResourceType::Cluster, &[]) .await .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(500)).await; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; assert_eq!( local_addr, config @@ -364,7 +362,7 @@ mod tests { .discovery_request(ResourceType::Listener, &[]) .await .unwrap(); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_millis(50)).await; let changed_filters = config.filters.load(); assert_eq!(changed_filters.len(), 2);