diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 8cfc5da413..85f3bfd91b 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -38,7 +38,7 @@ async fn run_migrations(client: &mut Client) -> anyhow::Result<()> { Ok(()) } -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone)] pub struct CatalogConfig<'a> { pub host: &'a str, pub port: u16, @@ -75,8 +75,7 @@ impl<'a> CatalogConfig<'a> { } impl Catalog { - pub async fn new<'a>(catalog_config: &CatalogConfig<'a>) -> anyhow::Result { - let pt_config = catalog_config.to_postgres_config(); + pub async fn new(pt_config: pt::peerdb_peers::PostgresConfig) -> anyhow::Result { let client = connect_postgres(&pt_config).await?; let executor = PostgresQueryExecutor::new(None, &pt_config).await?; diff --git a/nexus/postgres-connection/src/lib.rs b/nexus/postgres-connection/src/lib.rs index 58e9ecc793..3eb6558a2d 100644 --- a/nexus/postgres-connection/src/lib.rs +++ b/nexus/postgres-connection/src/lib.rs @@ -1,3 +1,4 @@ +use std::fmt::Write; use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode}; use postgres_openssl::MakeTlsConnector; use pt::peerdb_peers::PostgresConfig; @@ -10,15 +11,9 @@ pub fn get_pg_connection_string(config: &PostgresConfig) -> String { connection_string.push(':'); connection_string.push_str(&urlencoding::encode(&config.password)); } - connection_string.push('@'); - connection_string.push_str(&config.host); - connection_string.push(':'); - connection_string.push_str(&config.port.to_string()); - connection_string.push('/'); - connection_string.push_str(&config.database); // Add the timeout as a query parameter, sslmode changes here appear to be useless - connection_string.push_str("?connect_timeout=15"); + write!(connection_string, "@{}:{}/{}?connect_timeout=15", config.host, config.port, config.database).ok(); connection_string } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 2720ca5c4e..c5011bd4f3 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1,5 +1,6 @@ use std::{ collections::{HashMap, HashSet}, + fmt::Write, sync::Arc, time::Duration, }; @@ -1279,7 +1280,7 @@ async fn run_migrations<'a>(config: &CatalogConfig<'a>) -> anyhow::Result<()> { // retry connecting to the catalog 3 times with 30 seconds delay // if it fails, return an error for _ in 0..3 { - let catalog = Catalog::new(config).await; + let catalog = Catalog::new(config.to_postgres_config()).await; match catalog { Ok(mut catalog) => { catalog.run_migrations().await?; @@ -1339,50 +1340,53 @@ pub async fn main() -> anyhow::Result<()> { v = listener.accept() => v, } .unwrap(); - let catalog = match Catalog::new(&catalog_config).await { - Ok(c) => c, - Err(e) => { - tracing::error!("Failed to connect to catalog: {}", e); - - let mut buf = BytesMut::with_capacity(1024); - buf.put_u8(b'E'); - buf.put_i32(0); - buf.put(&b"FATAL"[..]); - buf.put_u8(0); - let error_message = format!("Failed to connect to catalog: {}", e); - buf.put(error_message.as_bytes()); - buf.put_u8(0); - buf.put_u8(b'\0'); - - socket.write_all(&buf).await?; - socket.shutdown().await?; - continue; - } - }; - - let conn_uuid = uuid::Uuid::new_v4(); - let tracker = PeerConnectionTracker::new(conn_uuid, peer_conns.clone()); - + let conn_flow_handler = flow_handler.clone(); + let conn_peer_conns = peer_conns.clone(); + let peerdb_fdw_mode = args.peerdb_fwd_mode == "true"; let authenticator_ref = authenticator.make(); + let pg_config = catalog_config.to_postgres_config(); - let peerdb_fdw_mode = args.peerdb_fwd_mode == "true"; - let processor = Arc::new(NexusBackend::new( - Arc::new(Mutex::new(catalog)), - tracker, - flow_handler.clone(), - peerdb_fdw_mode, - )); tokio::task::Builder::new() .name("tcp connection handler") .spawn(async move { - process_socket( - socket, - None, - authenticator_ref, - processor.clone(), - processor, - ) - .await + match Catalog::new(pg_config).await { + Ok(catalog) => { + let conn_uuid = uuid::Uuid::new_v4(); + let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns); + + let processor = Arc::new(NexusBackend::new( + Arc::new(Mutex::new(catalog)), + tracker, + conn_flow_handler, + peerdb_fdw_mode, + )); + process_socket( + socket, + None, + authenticator_ref, + processor.clone(), + processor, + ) + .await + } + Err(e) => { + tracing::error!("Failed to connect to catalog: {}", e); + + let mut buf = BytesMut::with_capacity(1024); + buf.put_u8(b'E'); + buf.put_i32(0); + buf.put(&b"FATAL"[..]); + buf.put_u8(0); + write!(buf, "Failed to connect to catalog: {e}").ok(); + buf.put_u8(0); + buf.put_u8(b'\0'); + + socket.write_all(&buf).await?; + socket.shutdown().await?; + + Ok(()) + } + } })?; } }