diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 8cfc5da413..85f3bfd91b 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -38,7 +38,7 @@ async fn run_migrations(client: &mut Client) -> anyhow::Result<()> { Ok(()) } -#[derive(Debug, Clone)] +#[derive(Debug, Copy, Clone)] pub struct CatalogConfig<'a> { pub host: &'a str, pub port: u16, @@ -75,8 +75,7 @@ impl<'a> CatalogConfig<'a> { } impl Catalog { - pub async fn new<'a>(catalog_config: &CatalogConfig<'a>) -> anyhow::Result { - let pt_config = catalog_config.to_postgres_config(); + pub async fn new(pt_config: pt::peerdb_peers::PostgresConfig) -> anyhow::Result { let client = connect_postgres(&pt_config).await?; let executor = PostgresQueryExecutor::new(None, &pt_config).await?; diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 2720ca5c4e..b6b987e5e4 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1279,7 +1279,7 @@ async fn run_migrations<'a>(config: &CatalogConfig<'a>) -> anyhow::Result<()> { // retry connecting to the catalog 3 times with 30 seconds delay // if it fails, return an error for _ in 0..3 { - let catalog = Catalog::new(config).await; + let catalog = Catalog::new(config.to_postgres_config()).await; match catalog { Ok(mut catalog) => { catalog.run_migrations().await?; @@ -1339,50 +1339,54 @@ pub async fn main() -> anyhow::Result<()> { v = listener.accept() => v, } .unwrap(); - let catalog = match Catalog::new(&catalog_config).await { - Ok(c) => c, - Err(e) => { - tracing::error!("Failed to connect to catalog: {}", e); - - let mut buf = BytesMut::with_capacity(1024); - buf.put_u8(b'E'); - buf.put_i32(0); - buf.put(&b"FATAL"[..]); - buf.put_u8(0); - let error_message = format!("Failed to connect to catalog: {}", e); - buf.put(error_message.as_bytes()); - buf.put_u8(0); - buf.put_u8(b'\0'); - - socket.write_all(&buf).await?; - socket.shutdown().await?; - continue; - } - }; - - let conn_uuid = uuid::Uuid::new_v4(); - let tracker = PeerConnectionTracker::new(conn_uuid, peer_conns.clone()); - + let conn_flow_handler = flow_handler.clone(); + let conn_peer_conns = peer_conns.clone(); + let peerdb_fdw_mode = args.peerdb_fwd_mode == "true"; let authenticator_ref = authenticator.make(); + let pg_config = catalog_config.to_postgres_config(); - let peerdb_fdw_mode = args.peerdb_fwd_mode == "true"; - let processor = Arc::new(NexusBackend::new( - Arc::new(Mutex::new(catalog)), - tracker, - flow_handler.clone(), - peerdb_fdw_mode, - )); tokio::task::Builder::new() .name("tcp connection handler") .spawn(async move { - process_socket( - socket, - None, - authenticator_ref, - processor.clone(), - processor, - ) - .await + match Catalog::new(pg_config).await { + Ok(catalog) => { + let conn_uuid = uuid::Uuid::new_v4(); + let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns); + + let processor = Arc::new(NexusBackend::new( + Arc::new(Mutex::new(catalog)), + tracker, + conn_flow_handler, + peerdb_fdw_mode, + )); + process_socket( + socket, + None, + authenticator_ref, + processor.clone(), + processor, + ) + .await + } + Err(e) => { + tracing::error!("Failed to connect to catalog: {}", e); + + let mut buf = BytesMut::with_capacity(1024); + buf.put_u8(b'E'); + buf.put_i32(0); + buf.put(&b"FATAL"[..]); + buf.put_u8(0); + let error_message = format!("Failed to connect to catalog: {}", e); + buf.put(error_message.as_bytes()); + buf.put_u8(0); + buf.put_u8(b'\0'); + + socket.write_all(&buf).await?; + socket.shutdown().await?; + + Ok(()) + } + } })?; } }