Skip to content

Commit

Permalink
Refactor sessions to use socket pool
Browse files Browse the repository at this point in the history
This commit refactors how we handle upstream connections to the
gameservers. When profiling quilkin I noticed that there was a lot of
time (~10–15%) being spent dropping the upstream socket through its Arc
implementation that happened whenever a session was dropped.

As I was thinking about how to solve this problem I also realised there
was a second issue, which is that there is a limitation on how many
connections Quilkin can hold at once, roughly ~16,383. Because after
that we're likely to start encountering port exhaustion from the
operating system, since each session is a unique socket.

This brought me to the solution in this commit, which is that while we
need to give each connection to the gameserver a unique port, we don't
need to give a unique port across gameservers. So I refactored how we
create sessions to use what I've called a "SessionPool". This pools the
sockets for sessions into a map that is keyed by their destination.

With this implementation this means that we now have a limit of ~16,000
connections per gameserver, which is far more than any gameserver could
reasonably need.
  • Loading branch information
XAMPPRocky committed Oct 11, 2023
1 parent b7a6c51 commit b8320dc
Show file tree
Hide file tree
Showing 13 changed files with 588 additions and 367 deletions.
45 changes: 33 additions & 12 deletions src/cli/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::{
use tonic::transport::Endpoint;

use super::Admin;
use crate::{proxy::SessionMap, xds::ResourceType, Config, Result};
use crate::{proxy::SessionPool, xds::ResourceType, Config, Result};

#[cfg(doc)]
use crate::filters::FilterFactory;
Expand Down Expand Up @@ -81,9 +81,6 @@ impl Proxy {
mode: Admin,
mut shutdown_rx: tokio::sync::watch::Receiver<()>,
) -> crate::Result<()> {
const SESSION_TIMEOUT_SECONDS: Duration = Duration::from_secs(60);
const SESSION_EXPIRY_POLL_INTERVAL: Duration = Duration::from_secs(60);

let _mmdb_task = self.mmdb.clone().map(|source| {
tokio::spawn(async move {
use crate::config::BACKOFF_INITIAL_DELAY_MILLISECONDS;
Expand Down Expand Up @@ -122,8 +119,12 @@ impl Proxy {
let id = config.id.load();
tracing::info!(port = self.port, proxy_id = &*id, "Starting");

let sessions = SessionMap::new(SESSION_TIMEOUT_SECONDS, SESSION_EXPIRY_POLL_INTERVAL);
let runtime_config = mode.unwrap_proxy();
let sessions = SessionPool::new(
config.clone(),
DualStackLocalSocket::new(self.port)?,
shutdown_rx.clone(),
);

let _xds_stream = if !self.management_server.is_empty() {
{
Expand Down Expand Up @@ -161,10 +162,10 @@ impl Proxy {
.await
.map_err(|error| eyre::eyre!(error))?;

tracing::info!(sessions=%sessions.len(), "waiting for active sessions to expire");
while sessions.is_not_empty() {
tracing::info!(sessions=%sessions.sessions().len(), "waiting for active sessions to expire");
while sessions.sessions().is_not_empty() {
tokio::time::sleep(Duration::from_secs(1)).await;
tracing::debug!(sessions=%sessions.len(), "sessions still active");
tracing::debug!(sessions=%sessions.sessions().len(), "sessions still active");
}
tracing::info!("all sessions expired");

Expand All @@ -176,7 +177,7 @@ impl Proxy {
/// This function also spawns the set of worker tasks responsible for consuming packets
/// off the aforementioned queue and processing them through the filter chain and session
/// pipeline.
fn run_recv_from(&self, config: &Arc<Config>, sessions: SessionMap) -> Result<()> {
fn run_recv_from(&self, config: &Arc<Config>, sessions: Arc<SessionPool>) -> Result<()> {
// The number of worker tasks to spawn. Each task gets a dedicated queue to
// consume packets off.
let num_workers = num_cpus::get();
Expand Down Expand Up @@ -366,8 +367,17 @@ mod tests {
crate::proxy::DownstreamReceiveWorkerConfig {
worker_id: 1,
socket: socket.clone(),
config,
sessions: <_>::default(),
config: config.clone(),
sessions: SessionPool::new(
config,
DualStackLocalSocket::new(
crate::test_utils::available_addr(&AddressType::Random)
.await
.port(),
)
.unwrap(),
tokio::sync::watch::channel(()).1,
),
}
.spawn();

Expand Down Expand Up @@ -405,7 +415,18 @@ mod tests {
)
});

proxy.run_recv_from(&config, <_>::default()).unwrap();
let sessions = SessionPool::new(
config.clone(),
DualStackLocalSocket::new(
crate::test_utils::available_addr(&AddressType::Random)
.await
.port(),
)
.unwrap(),
tokio::sync::watch::channel(()).1,
);

proxy.run_recv_from(&config, sessions).unwrap();

let socket = create_socket().await;
socket.send_to(msg.as_bytes(), &local_addr).await.unwrap();
Expand Down
2 changes: 0 additions & 2 deletions src/filters/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ mod tests {
);

let mut context = WriteContext::new(
endpoints_fixture[0].clone(),
endpoints_fixture[0].address.clone(),
"127.0.0.1:70".parse().unwrap(),
b"hello".to_vec(),
Expand Down Expand Up @@ -417,7 +416,6 @@ mod tests {
);

let mut context = WriteContext::new(
endpoints_fixture[0].clone(),
endpoints_fixture[0].address.clone(),
"127.0.0.1:70".parse().unwrap(),
b"hello".to_vec(),
Expand Down
4 changes: 0 additions & 4 deletions src/filters/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ mod tests {

// write decompress
let mut write_context = WriteContext::new(
Endpoint::new("127.0.0.1:80".parse().unwrap()),
"127.0.0.1:8080".parse().unwrap(),
"127.0.0.1:8081".parse().unwrap(),
read_context.contents.clone(),
Expand All @@ -223,7 +222,6 @@ mod tests {

assert!(compression
.write(&mut WriteContext::new(
Endpoint::new("127.0.0.1:80".parse().unwrap()),
"127.0.0.1:8080".parse().unwrap(),
"127.0.0.1:8081".parse().unwrap(),
b"hello".to_vec(),
Expand Down Expand Up @@ -270,7 +268,6 @@ mod tests {
assert_eq!(b"hello", &*read_context.contents);

let mut write_context = WriteContext::new(
Endpoint::new("127.0.0.1:80".parse().unwrap()),
"127.0.0.1:8080".parse().unwrap(),
"127.0.0.1:8081".parse().unwrap(),
b"hello".to_vec(),
Expand Down Expand Up @@ -329,7 +326,6 @@ mod tests {
let expected = contents_fixture();
// write compress
let mut write_context = WriteContext::new(
Endpoint::new("127.0.0.1:80".parse().unwrap()),
"127.0.0.1:8080".parse().unwrap(),
"127.0.0.1:8081".parse().unwrap(),
expected.clone(),
Expand Down
8 changes: 6 additions & 2 deletions src/filters/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ impl Filter for Debug {

#[cfg_attr(feature = "instrument", tracing::instrument(skip(self, ctx)))]
async fn write(&self, ctx: &mut WriteContext) -> Result<(), FilterError> {
info!(id = ?self.config.id, endpoint = ?ctx.endpoint.address, source = ?&ctx.source,
dest = ?&ctx.dest, contents = ?String::from_utf8_lossy(&ctx.contents), "Write filter event");
info!(
id = ?self.config.id,
source = ?&ctx.source,
dest = ?&ctx.dest,
contents = ?String::from_utf8_lossy(&ctx.contents), "Write filter event"
);
Ok(())
}
}
Expand Down
16 changes: 3 additions & 13 deletions src/filters/firewall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,23 +166,13 @@ mod tests {
}],
};

let endpoint = Endpoint::new((Ipv4Addr::LOCALHOST, 80).into());
let local_addr: crate::endpoint::EndpointAddress = (Ipv4Addr::LOCALHOST, 8081).into();

let mut ctx = WriteContext::new(
endpoint.clone(),
([192, 168, 75, 20], 80).into(),
local_addr.clone(),
vec![],
);
let mut ctx =
WriteContext::new(([192, 168, 75, 20], 80).into(), local_addr.clone(), vec![]);
assert!(firewall.write(&mut ctx).await.is_ok());

let mut ctx = WriteContext::new(
endpoint,
([192, 168, 77, 20], 80).into(),
local_addr,
vec![],
);
let mut ctx = WriteContext::new(([192, 168, 77, 20], 80).into(), local_addr, vec![]);
assert!(firewall.write(&mut ctx).await.is_err());
}
}
1 change: 0 additions & 1 deletion src/filters/match.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ mod tests {
// no config, so should make no change.
filter
.write(&mut WriteContext::new(
endpoint.clone(),
endpoint.address,
"127.0.0.1:70".parse().unwrap(),
contents.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/filters/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ mod tests {
.await
.is_ok());
assert!(filter
.write(&mut WriteContext::new(endpoint, addr.clone(), addr, vec![],))
.write(&mut WriteContext::new(addr.clone(), addr, vec![],))
.await
.is_ok());
}
Expand Down
15 changes: 2 additions & 13 deletions src/filters/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@

use std::collections::HashMap;

use crate::{
endpoint::{Endpoint, EndpointAddress},
metadata::DynamicMetadata,
};
use crate::{endpoint::EndpointAddress, metadata::DynamicMetadata};

#[cfg(doc)]
use crate::filters::Filter;

/// The input arguments to [`Filter::write`].
#[non_exhaustive]
pub struct WriteContext {
/// The upstream endpoint that we're expecting packets from.
pub endpoint: Endpoint,
/// The source of the received packet.
pub source: EndpointAddress,
/// The destination of the received packet.
Expand All @@ -41,14 +36,8 @@ pub struct WriteContext {

impl WriteContext {
/// Creates a new [`WriteContext`]
pub fn new(
endpoint: Endpoint,
source: EndpointAddress,
dest: EndpointAddress,
contents: Vec<u8>,
) -> Self {
pub fn new(source: EndpointAddress, dest: EndpointAddress, contents: Vec<u8>) -> Self {
Self {
endpoint,
source,
dest,
contents,
Expand Down
72 changes: 14 additions & 58 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

use std::{net::SocketAddr, sync::Arc};

pub use sessions::{Session, SessionKey, SessionMap};
pub use sessions::{Session, SessionKey, SessionPool};

use crate::{
endpoint::{Endpoint, EndpointAddress},
filters::{Filter, ReadContext},
utils::net::DualStackLocalSocket,
Config,
Expand All @@ -44,7 +43,7 @@ pub(crate) struct DownstreamReceiveWorkerConfig {
/// Socket with reused port from which the worker receives packets.
pub socket: Arc<DualStackLocalSocket>,
pub config: Arc<Config>,
pub sessions: SessionMap,
pub sessions: Arc<SessionPool>,
}

impl DownstreamReceiveWorkerConfig {
Expand Down Expand Up @@ -88,7 +87,7 @@ impl DownstreamReceiveWorkerConfig {
}
last_received_at = Some(packet.received_at);

Self::spawn_process_task(packet, source, worker_id, &socket, &config, &sessions)
Self::spawn_process_task(packet, source, worker_id, &config, &sessions)
}
Err(error) => {
tracing::error!(%error, "error receiving packet");
Expand All @@ -106,9 +105,8 @@ impl DownstreamReceiveWorkerConfig {
packet: DownstreamPacket,
source: std::net::SocketAddr,
worker_id: usize,
socket: &Arc<DualStackLocalSocket>,
config: &Arc<Config>,
sessions: &SessionMap,
sessions: &Arc<SessionPool>,
) {
tracing::trace!(
id = worker_id,
Expand All @@ -121,16 +119,13 @@ impl DownstreamReceiveWorkerConfig {
tokio::spawn({
let config = config.clone();
let sessions = sessions.clone();
let socket = socket.clone();

async move {
let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer();

let asn_info = packet.asn_info.clone();
let asn_info = asn_info.as_ref();
match Self::process_downstream_received_packet(packet, config, socket, sessions)
.await
{
match Self::process_downstream_received_packet(packet, config, sessions).await {
Ok(size) => {
crate::metrics::packets_total(crate::metrics::READ, asn_info).inc();
crate::metrics::bytes_total(crate::metrics::READ, asn_info)
Expand All @@ -157,8 +152,7 @@ impl DownstreamReceiveWorkerConfig {
async fn process_downstream_received_packet(
packet: DownstreamPacket,
config: Arc<Config>,
downstream_socket: Arc<DualStackLocalSocket>,
sessions: SessionMap,
sessions: Arc<SessionPool>,
) -> Result<usize, PipelineError> {
let endpoints: Vec<_> = config.clusters.read().endpoints().collect();
if endpoints.is_empty() {
Expand All @@ -171,56 +165,18 @@ impl DownstreamReceiveWorkerConfig {
let mut bytes_written = 0;

for endpoint in context.endpoints.iter() {
bytes_written += Self::session_send_packet(
&context.contents,
&context.source,
endpoint,
&downstream_socket,
&config,
&sessions,
packet.asn_info.clone(),
)
.await?;
let session_key = SessionKey {
source: packet.source,
dest: endpoint.address.to_socket_addr().await?,
};

bytes_written += sessions
.send(session_key, packet.asn_info.clone(), &context.contents)
.await?;
}

Ok(bytes_written)
}

/// Send a packet received from `recv_addr` to an endpoint.
#[tracing::instrument(level="trace", skip_all, fields(source = %recv_addr, dest = %endpoint.address))]
async fn session_send_packet(
packet: &[u8],
recv_addr: &EndpointAddress,
endpoint: &Endpoint,
downstream_socket: &Arc<DualStackLocalSocket>,
config: &Arc<Config>,
sessions: &SessionMap,
asn_info: Option<crate::maxmind_db::IpNetEntry>,
) -> Result<usize, PipelineError> {
let session_key = SessionKey {
source: recv_addr.clone(),
dest: endpoint.address.clone(),
};

let send_future = match sessions.get(&session_key) {
Some(entry) => entry.send(packet),
None => {
let session = Session::new(
config.clone(),
session_key.source.clone(),
downstream_socket.clone(),
endpoint.clone(),
asn_info,
)?;

let future = session.send(packet);
sessions.insert(session_key, session);
future
}
};

send_future.await
}
}

#[derive(thiserror::Error, Debug)]
Expand Down
Loading

0 comments on commit b8320dc

Please sign in to comment.