Skip to content

Commit

Permalink
Loadtesting fixes (#827)
Browse files Browse the repository at this point in the history
* Loadtesting fixes

This commit makes three changes

* Moves QCMP logs to debug
* Removed the retry function on the command level
    This is because if it starts failing we'd prefer if k8s restarts it
    than it restart itself continously.
* Removed the asserts and unwraps in `release_socket`.
    This was never a problem while running, but could cause issues
    during teardown when can't rely on how things are being freed.

* Fix some other destinations and timeouts
  • Loading branch information
XAMPPRocky authored Oct 18, 2023
1 parent 6e0adc9 commit 56c54f3
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 88 deletions.
71 changes: 17 additions & 54 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,58 +202,21 @@ impl Cli {
shutdown_tx.send(()).ok();
});

let fut = tryhard::retry_fn({
let shutdown_rx = shutdown_rx.clone();
let mode = mode.clone();
move || match self.command.clone() {
Commands::Agent(agent) => {
let config = config.clone();
let shutdown_rx = shutdown_rx.clone();
let mode = mode.clone();
tokio::spawn(async move {
agent.run(config.clone(), mode, shutdown_rx.clone()).await
})
}
Commands::Proxy(runner) => {
let config = config.clone();
let shutdown_rx = shutdown_rx.clone();
let mode = mode.clone();
tokio::spawn(async move {
runner
.run(config.clone(), mode.clone(), shutdown_rx.clone())
.await
})
}
Commands::Manage(manager) => {
let config = config.clone();
let shutdown_rx = shutdown_rx.clone();
let mode = mode.clone();
tokio::spawn(async move {
manager
.manage(config.clone(), mode, shutdown_rx.clone())
.await
})
}
Commands::Relay(relay) => {
let config = config.clone();
let shutdown_rx = shutdown_rx.clone();
let mode = mode.clone();
tokio::spawn(
async move { relay.relay(config, mode, shutdown_rx.clone()).await },
)
}
Commands::GenerateConfigSchema(_) | Commands::Qcmp(_) => unreachable!(),
match self.command {
Commands::Agent(agent) => agent.run(config.clone(), mode, shutdown_rx.clone()).await,
Commands::Proxy(runner) => {
runner
.run(config.clone(), mode.clone(), shutdown_rx.clone())
.await
}
})
.retries(3)
.on_retry(|_, _, error| {
let error = error.to_string();
async move {
tracing::warn!(%error, "error would have caused fatal crash");
Commands::Manage(manager) => {
manager
.manage(config.clone(), mode, shutdown_rx.clone())
.await
}
});

fut.await?
Commands::Relay(relay) => relay.relay(config, mode, shutdown_rx.clone()).await,
Commands::GenerateConfigSchema(_) | Commands::Qcmp(_) => unreachable!(),
}
}

/// Searches for the configuration file, and panics if not found.
Expand Down Expand Up @@ -403,9 +366,9 @@ mod tests {

tokio::spawn(relay.drive());
tokio::spawn(control_plane.drive());
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(50)).await;
tokio::spawn(proxy.drive());
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(50)).await;
let socket = create_socket().await;
let config = Config::default();
let proxy_address: SocketAddr = (std::net::Ipv4Addr::LOCALHOST, 7777).into();
Expand Down Expand Up @@ -436,7 +399,7 @@ mod tests {

assert_eq!(
"hello",
timeout(Duration::from_millis(500), rx.recv())
timeout(Duration::from_millis(100), rx.recv())
.await
.expect("should have received a packet")
.unwrap()
Expand All @@ -449,7 +412,7 @@ mod tests {
let msg = b"hello\xFF\xFF\xFF";
socket.send_to(msg, &proxy_address).await.unwrap();

let result = timeout(Duration::from_millis(500), rx.recv()).await;
let result = timeout(Duration::from_millis(50), rx.recv()).await;
assert!(result.is_err(), "should not have received a packet");
tracing::info!(?token, "didn't receive bad packet");
}
Expand Down
20 changes: 8 additions & 12 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,18 +470,17 @@ mod tests {
let endpoint = t.open_socket_and_recv_single_packet().await;
let mut local_addr = available_addr(&AddressType::Ipv6).await;
crate::test::map_addr_to_localhost(&mut local_addr);
let mut dest = endpoint.socket.local_ipv6_addr().unwrap();
crate::test::map_addr_to_localhost(&mut dest);

let proxy = crate::cli::Proxy {
port: local_addr.port(),
..<_>::default()
};

let config = Arc::new(Config::default());
config.clusters.modify(|clusters| {
clusters.insert_default(
[Endpoint::new(
endpoint.socket.local_ipv6_addr().unwrap().into(),
)]
.into(),
);
clusters.insert_default([Endpoint::new(dest.into())].into());
});
t.run_server(config, proxy, None);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Expand Down Expand Up @@ -509,6 +508,8 @@ mod tests {
load_test_filters();
let endpoint = t.open_socket_and_recv_single_packet().await;
let local_addr = available_addr(&AddressType::Random).await;
let mut dest = endpoint.socket.local_ipv4_addr().unwrap();
crate::test::map_addr_to_localhost(&mut dest);
let config = Arc::new(Config::default());
config.filters.store(
crate::filters::FilterChain::try_from(vec![config::Filter {
Expand All @@ -520,12 +521,7 @@ mod tests {
.unwrap(),
);
config.clusters.modify(|clusters| {
clusters.insert_default(
[Endpoint::new(
endpoint.socket.local_ipv4_addr().unwrap().into(),
)]
.into(),
);
clusters.insert_default([Endpoint::new(dest.into())].into());
});
t.run_server(
config,
Expand Down
27 changes: 17 additions & 10 deletions src/cli/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,32 +353,39 @@ impl SessionPool {
) {
tracing::trace!("releasing socket");
let mut storage = self.storage.write().await;
let socket_set = storage.destination_to_sockets.get_mut(dest).unwrap();
let Some(socket_set) = storage.destination_to_sockets.get_mut(dest) else {
return;
};

assert!(socket_set.remove(&port));
socket_set.remove(&port);

if socket_set.is_empty() {
storage.destination_to_sockets.remove(dest).unwrap();
storage.destination_to_sockets.remove(dest);
}

let dest_set = storage.sockets_to_destination.get_mut(&port).unwrap();
let Some(dest_set) = storage.sockets_to_destination.get_mut(&port) else {
return;
};

assert!(dest_set.remove(dest));
dest_set.remove(dest);

if dest_set.is_empty() {
storage.sockets_to_destination.remove(&port).unwrap();
storage.sockets_to_destination.remove(&port);
}

// Not asserted because the source might not have GeoIP info.
storage.sources_to_asn_info.remove(source);
assert!(storage
.destination_to_sources
.remove(&(*dest, port))
.is_some());
storage.destination_to_sources.remove(&(*dest, port));
tracing::trace!("socket released");
}
}

impl Drop for SessionPool {
fn drop(&mut self) {
drop(std::mem::take(&mut self.session_map));
}
}

/// Session encapsulates a UDP stream session
#[derive(Debug)]
pub struct Session {
Expand Down
4 changes: 2 additions & 2 deletions src/codec/qcmp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub async fn spawn(port: u16) -> crate::Result<()> {
let mut output_buf = Vec::new();

loop {
tracing::info!(%v4_addr, %v6_addr, "awaiting qcmp packets");
tracing::debug!("awaiting qcmp packets");

match socket.recv_from(&mut buf).await {
Ok((size, source)) => {
Expand Down Expand Up @@ -83,7 +83,7 @@ pub async fn spawn(port: u16) -> crate::Result<()> {
}
}
}
.instrument(tracing::info_span!("qcmp_task", %v4_addr, %v6_addr)),
.instrument(tracing::debug_span!("qcmp_task", %v4_addr, %v6_addr)),
);
Ok(())
}
Expand Down
18 changes: 8 additions & 10 deletions src/net/xds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ mod tests {
// Test that the client can handle the manager dropping out.
let handle = tokio::spawn(server::spawn(xds_port, xds_config.clone()));

tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(());
tokio::spawn(server::spawn(xds_port, xds_config.clone()));
let client_proxy = crate::cli::Proxy {
Expand All @@ -207,7 +206,6 @@ mod tests {
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

tokio::time::sleep(std::time::Duration::from_millis(50)).await;
handle.abort();
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
tokio::spawn(server::spawn(xds_port, xds_config.clone()));
Expand Down Expand Up @@ -279,10 +277,11 @@ mod tests {
.send_to(&packet, (std::net::Ipv6Addr::LOCALHOST, client_addr.port()))
.await
.unwrap();
let response = tokio::time::timeout(std::time::Duration::from_secs(1), client.packet_rx)
.await
.unwrap()
.unwrap();
let response =
tokio::time::timeout(std::time::Duration::from_millis(100), client.packet_rx)
.await
.unwrap()
.unwrap();

assert_eq!(format!("{}{}", fixture, token), response);
}
Expand All @@ -308,7 +307,7 @@ mod tests {
config.clone(),
crate::cli::admin::IDLE_REQUEST_INTERVAL_SECS,
);
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;

// Each time, we create a new upstream endpoint and send a cluster update for it.
let concat_bytes = vec![("b", "c,"), ("d", "e")];
Expand All @@ -322,7 +321,6 @@ mod tests {
cluster.clear();
cluster.insert(Endpoint::new(local_addr.clone()));
});
tokio::time::sleep(std::time::Duration::from_millis(500)).await;

let filters = crate::filters::FilterChain::try_from(vec![
Concatenate::as_filter_config(concatenate::Config {
Expand All @@ -346,7 +344,7 @@ mod tests {
.discovery_request(ResourceType::Cluster, &[])
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(
local_addr,
config
Expand All @@ -364,7 +362,7 @@ mod tests {
.discovery_request(ResourceType::Listener, &[])
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let changed_filters = config.filters.load();

assert_eq!(changed_filters.len(), 2);
Expand Down

0 comments on commit 56c54f3

Please sign in to comment.