Skip to content

Commit

Permalink
nexus: move most connection setup into connection handling task (#831)
Browse files Browse the repository at this point in the history
Previously this was blocking accepting next connection

Benchmark:
```
pids=()
for i in {1..100}
do psql "host=localhost port=9900 password=asdf" 2> /dev/null&
pids+=($!)
done
for pid in ${pids[@]}
do wait $pid
```

Before:
```
real	0m0.739s
user	0m0.325s
sys	0m0.268s
```

After:
```
real	0m0.205s
user	0m0.284s
sys	0m0.249s
```
  • Loading branch information
serprex authored Dec 15, 2023
1 parent e810be4 commit 4d0d18a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 50 deletions.
5 changes: 2 additions & 3 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async fn run_migrations(client: &mut Client) -> anyhow::Result<()> {
Ok(())
}

#[derive(Debug, Clone)]
#[derive(Debug, Copy, Clone)]
pub struct CatalogConfig<'a> {
pub host: &'a str,
pub port: u16,
Expand Down Expand Up @@ -75,8 +75,7 @@ impl<'a> CatalogConfig<'a> {
}

impl Catalog {
pub async fn new<'a>(catalog_config: &CatalogConfig<'a>) -> anyhow::Result<Self> {
let pt_config = catalog_config.to_postgres_config();
pub async fn new(pt_config: pt::peerdb_peers::PostgresConfig) -> anyhow::Result<Self> {
let client = connect_postgres(&pt_config).await?;
let executor = PostgresQueryExecutor::new(None, &pt_config).await?;

Expand Down
9 changes: 2 additions & 7 deletions nexus/postgres-connection/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt::Write;
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres_openssl::MakeTlsConnector;
use pt::peerdb_peers::PostgresConfig;
Expand All @@ -10,15 +11,9 @@ pub fn get_pg_connection_string(config: &PostgresConfig) -> String {
connection_string.push(':');
connection_string.push_str(&urlencoding::encode(&config.password));
}
connection_string.push('@');
connection_string.push_str(&config.host);
connection_string.push(':');
connection_string.push_str(&config.port.to_string());
connection_string.push('/');
connection_string.push_str(&config.database);

// Add the timeout as a query parameter, sslmode changes here appear to be useless
connection_string.push_str("?connect_timeout=15");
write!(connection_string, "@{}:{}/{}?connect_timeout=15", config.host, config.port, config.database).ok();

connection_string
}
Expand Down
84 changes: 44 additions & 40 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::{HashMap, HashSet},
fmt::Write,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -1279,7 +1280,7 @@ async fn run_migrations<'a>(config: &CatalogConfig<'a>) -> anyhow::Result<()> {
// retry connecting to the catalog 3 times with 30 seconds delay
// if it fails, return an error
for _ in 0..3 {
let catalog = Catalog::new(config).await;
let catalog = Catalog::new(config.to_postgres_config()).await;
match catalog {
Ok(mut catalog) => {
catalog.run_migrations().await?;
Expand Down Expand Up @@ -1339,50 +1340,53 @@ pub async fn main() -> anyhow::Result<()> {
v = listener.accept() => v,
}
.unwrap();
let catalog = match Catalog::new(&catalog_config).await {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to connect to catalog: {}", e);

let mut buf = BytesMut::with_capacity(1024);
buf.put_u8(b'E');
buf.put_i32(0);
buf.put(&b"FATAL"[..]);
buf.put_u8(0);
let error_message = format!("Failed to connect to catalog: {}", e);
buf.put(error_message.as_bytes());
buf.put_u8(0);
buf.put_u8(b'\0');

socket.write_all(&buf).await?;
socket.shutdown().await?;
continue;
}
};

let conn_uuid = uuid::Uuid::new_v4();
let tracker = PeerConnectionTracker::new(conn_uuid, peer_conns.clone());

let conn_flow_handler = flow_handler.clone();
let conn_peer_conns = peer_conns.clone();
let peerdb_fdw_mode = args.peerdb_fwd_mode == "true";
let authenticator_ref = authenticator.make();
let pg_config = catalog_config.to_postgres_config();

let peerdb_fdw_mode = args.peerdb_fwd_mode == "true";
let processor = Arc::new(NexusBackend::new(
Arc::new(Mutex::new(catalog)),
tracker,
flow_handler.clone(),
peerdb_fdw_mode,
));
tokio::task::Builder::new()
.name("tcp connection handler")
.spawn(async move {
process_socket(
socket,
None,
authenticator_ref,
processor.clone(),
processor,
)
.await
match Catalog::new(pg_config).await {
Ok(catalog) => {
let conn_uuid = uuid::Uuid::new_v4();
let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns);

let processor = Arc::new(NexusBackend::new(
Arc::new(Mutex::new(catalog)),
tracker,
conn_flow_handler,
peerdb_fdw_mode,
));
process_socket(
socket,
None,
authenticator_ref,
processor.clone(),
processor,
)
.await
}
Err(e) => {
tracing::error!("Failed to connect to catalog: {}", e);

let mut buf = BytesMut::with_capacity(1024);
buf.put_u8(b'E');
buf.put_i32(0);
buf.put(&b"FATAL"[..]);
buf.put_u8(0);
write!(buf, "Failed to connect to catalog: {e}").ok();
buf.put_u8(0);
buf.put_u8(b'\0');

socket.write_all(&buf).await?;
socket.shutdown().await?;

Ok(())
}
}
})?;
}
}

0 comments on commit 4d0d18a

Please sign in to comment.