diff --git a/nexus/parser/src/lib.rs b/nexus/parser/src/lib.rs index f99dbe8751..4a305c7899 100644 --- a/nexus/parser/src/lib.rs +++ b/nexus/parser/src/lib.rs @@ -11,13 +11,12 @@ use pgwire::{ error::{ErrorInfo, PgWireError, PgWireResult}, }; use sqlparser::{ast::Statement, dialect::PostgreSqlDialect, parser::Parser}; -use tokio::sync::Mutex; const DIALECT: PostgreSqlDialect = PostgreSqlDialect {}; #[derive(Clone)] pub struct NexusQueryParser { - catalog: Arc>, + catalog: Arc, } #[derive(Debug, Clone)] @@ -93,13 +92,12 @@ pub struct NexusParsedStatement { } impl NexusQueryParser { - pub fn new(catalog: Arc>) -> Self { + pub fn new(catalog: Arc) -> Self { Self { catalog } } pub async fn get_peers_bridge(&self) -> PgWireResult> { - let catalog = self.catalog.lock().await; - let peers = catalog.get_peers().await; + let peers = self.catalog.get_peers().await; peers.map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index bb2219512e..55e096cb7f 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -13,6 +13,7 @@ use clap::Parser; use cursor::PeerCursors; use dashmap::{mapref::entry::Entry as DashEntry, DashMap}; use flow_rs::grpc::{FlowGrpcClient, PeerValidationResult}; +use futures::join; use peer_bigquery::BigQueryQueryExecutor; use peer_connections::{PeerConnectionTracker, PeerConnections}; use peer_cursor::{ @@ -40,7 +41,7 @@ use pt::{ }; use rand::Rng; use tokio::signal::unix::{signal, SignalKind}; -use tokio::sync::{Mutex, MutexGuard}; +use tokio::sync::Mutex; use tokio::{io::AsyncWriteExt, net::TcpListener}; use tracing_appender::non_blocking::WorkerGuard; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -78,7 +79,7 @@ impl AuthSource for FixedPasswordAuthSource { } pub struct NexusBackend { - catalog: Arc>, + catalog: Arc, peer_connections: PeerConnectionTracker, query_parser: NexusQueryParser, peer_cursors: Mutex, @@ -89,7 +90,7 @@ pub struct NexusBackend { impl NexusBackend { pub fn new( - catalog: Arc>, + catalog: Arc, peer_connections: PeerConnectionTracker, flow_handler: Option>>, peerdb_fdw_mode: bool, @@ -161,7 +162,7 @@ impl NexusBackend { } async fn check_for_mirror( - catalog: &MutexGuard<'_, Catalog>, + catalog: &Catalog, flow_name: &str, ) -> PgWireResult> { let workflow_details = catalog @@ -175,10 +176,7 @@ impl NexusBackend { Ok(workflow_details) } - async fn get_peer_of_mirror( - catalog: &MutexGuard<'_, Catalog>, - peer_name: &str, - ) -> PgWireResult { + async fn get_peer_of_mirror(catalog: &Catalog, peer_name: &str) -> PgWireResult { let peer = catalog.get_peer(peer_name).await.map_err(|err| { PgWireError::ApiError(format!("unable to get peer {:?}: {:?}", peer_name, err).into()) })?; @@ -251,13 +249,13 @@ impl NexusBackend { )); } - let catalog = self.catalog.lock().await; tracing::info!( "DROP MIRROR: mirror_name: {}, if_exists: {}", flow_job_name, if_exists ); - let workflow_details = catalog + let workflow_details = self + .catalog .get_workflow_details_for_flow_job(flow_job_name) .await .map_err(|err| { @@ -284,7 +282,7 @@ impl NexusBackend { format!("unable to shutdown flow job: {:?}", err).into(), ) })?; - catalog + self.catalog .delete_flow_job_entry(flow_job_name) .await .map_err(|err| { @@ -334,14 +332,13 @@ impl NexusBackend { } let mirror_details; { - let catalog = self.catalog.lock().await; mirror_details = - Self::check_for_mirror(&catalog, &qrep_flow_job.name).await?; + Self::check_for_mirror(self.catalog.as_ref(), &qrep_flow_job.name) + .await?; } if mirror_details.is_none() { { - let catalog = self.catalog.lock().await; - catalog + self.catalog .create_qrep_flow_job_entry(qrep_flow_job) .await .map_err(|err| { @@ -399,8 +396,7 @@ impl NexusBackend { })?; } - let catalog = self.catalog.lock().await; - catalog.create_peer(peer.as_ref()).await.map_err(|e| { + self.catalog.create_peer(peer.as_ref()).await.map_err(|e| { PgWireError::UserError(Box::new(ErrorInfo::new( "ERROR".to_owned(), "internal_error".to_owned(), @@ -420,8 +416,8 @@ impl NexusBackend { "flow service is not configured".into(), )); } - let catalog = self.catalog.lock().await; - let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?; + let mirror_details = + Self::check_for_mirror(self.catalog.as_ref(), &flow_job.name).await?; if mirror_details.is_none() { // reject duplicate source tables or duplicate target tables let table_mappings_count = flow_job.table_mappings.len(); @@ -450,7 +446,7 @@ impl NexusBackend { } } - catalog + self.catalog .create_cdc_flow_job_entry(flow_job) .await .map_err(|err| { @@ -460,10 +456,12 @@ impl NexusBackend { })?; // get source and destination peers - let src_peer = - Self::get_peer_of_mirror(&catalog, &flow_job.source_peer).await?; - let dst_peer = - Self::get_peer_of_mirror(&catalog, &flow_job.target_peer).await?; + let (src_peer, dst_peer) = join!( + Self::get_peer_of_mirror(self.catalog.as_ref(), &flow_job.source_peer), + Self::get_peer_of_mirror(self.catalog.as_ref(), &flow_job.target_peer), + ); + let src_peer = src_peer?; + let dst_peer = dst_peer?; // make a request to the flow service to start the job. let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; @@ -476,7 +474,7 @@ impl NexusBackend { ) })?; - catalog + self.catalog .update_workflow_id_for_flow_job(&flow_job.name, &workflow_id) .await .map_err(|err| { @@ -505,8 +503,7 @@ impl NexusBackend { } if let Some(job) = { - let catalog = self.catalog.lock().await; - catalog + self.catalog .get_qrep_flow_job_by_name(flow_job_name) .await .map_err(|err| { @@ -540,17 +537,21 @@ impl NexusBackend { )); } - let catalog = self.catalog.lock().await; tracing::info!( "DROP PEER: peer_name: {}, if_exists: {}", peer_name, if_exists ); - let peer_exists = catalog.check_peer_entry(peer_name).await.map_err(|err| { - PgWireError::ApiError( - format!("unable to query catalog for peer metadata: {:?}", err).into(), - ) - })?; + let peer_exists = + self.catalog + .check_peer_entry(peer_name) + .await + .map_err(|err| { + PgWireError::ApiError( + format!("unable to query catalog for peer metadata: {:?}", err) + .into(), + ) + })?; tracing::info!("peer exist count: {}", peer_exists); if peer_exists != 0 { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; @@ -590,8 +591,7 @@ impl NexusBackend { let qrep_config = { // retrieve the mirror job since DROP MIRROR will delete the row later. - let catalog = self.catalog.lock().await; - catalog + self.catalog .get_qrep_config_proto(mirror_name) .await .map_err(|err| { @@ -632,8 +632,7 @@ impl NexusBackend { ) })?; // relock catalog, DROP MIRROR is done with it now - let catalog = self.catalog.lock().await; - catalog + self.catalog .update_workflow_id_for_flow_job( &qrep_config.flow_job_name, &workflow_id, @@ -674,13 +673,13 @@ impl NexusBackend { )); } - let catalog = self.catalog.lock().await; tracing::info!( "[PAUSE MIRROR] mirror_name: {}, if_exists: {}", flow_job_name, if_exists ); - let workflow_details = catalog + let workflow_details = self + .catalog .get_workflow_details_for_flow_job(flow_job_name) .await .map_err(|err| { @@ -737,13 +736,13 @@ impl NexusBackend { )); } - let catalog = self.catalog.lock().await; tracing::info!( "[RESUME MIRROR] mirror_name: {}, if_exists: {}", flow_job_name, if_exists ); - let workflow_details = catalog + let workflow_details = self + .catalog .get_workflow_details_for_flow_job(flow_job_name) .await .map_err(|err| { @@ -805,8 +804,7 @@ impl NexusBackend { } QueryAssociation::Catalog => { tracing::info!("handling catalog query: {}", stmt); - let catalog = self.catalog.lock().await; - Arc::clone(catalog.get_executor()) + Arc::clone(self.catalog.get_executor()) } }; @@ -829,10 +827,7 @@ impl NexusBackend { analyzer::CursorEvent::Close(c) => peer_cursors.get_peer(&c), }; match peer { - None => { - let catalog = self.catalog.lock().await; - Arc::clone(catalog.get_executor()) - } + None => Arc::clone(self.catalog.get_executor()), Some(peer) => self.get_peer_executor(peer).await.map_err(|err| { PgWireError::ApiError( format!("unable to get peer executor: {:?}", err).into(), @@ -850,22 +845,18 @@ impl NexusBackend { } async fn run_qrep_mirror(&self, qrep_flow_job: &QRepFlowJob) -> PgWireResult { - let catalog = self.catalog.lock().await; - + let (src_peer, dst_peer) = join!( + self.catalog.get_peer(&qrep_flow_job.source_peer), + self.catalog.get_peer(&qrep_flow_job.target_peer), + ); // get source and destination peers - let src_peer = catalog - .get_peer(&qrep_flow_job.source_peer) - .await - .map_err(|err| { - PgWireError::ApiError(format!("unable to get source peer: {:?}", err).into()) - })?; + let src_peer = src_peer.map_err(|err| { + PgWireError::ApiError(format!("unable to get source peer: {:?}", err).into()) + })?; - let dst_peer = catalog - .get_peer(&qrep_flow_job.target_peer) - .await - .map_err(|err| { - PgWireError::ApiError(format!("unable to get destination peer: {:?}", err).into()) - })?; + let dst_peer = dst_peer.map_err(|err| { + PgWireError::ApiError(format!("unable to get destination peer: {:?}", err).into()) + })?; // make a request to the flow service to start the job. let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; @@ -876,7 +867,7 @@ impl NexusBackend { PgWireError::ApiError(format!("unable to submit job: {:?}", err).into()) })?; - catalog + self.catalog .update_workflow_id_for_flow_job(&qrep_flow_job.name, &workflow_id) .await .map_err(|err| { @@ -1087,11 +1078,7 @@ impl ExtendedQueryHandler for NexusBackend { } } } - QueryAssociation::Catalog => { - let catalog = self.catalog.lock().await; - let executor = catalog.get_executor(); - executor.describe(stmt).await? - } + QueryAssociation::Catalog => self.catalog.get_executor().describe(stmt).await?, }; if let Some(described_schema) = schema { if self.peerdb_fdw_mode { @@ -1320,7 +1307,7 @@ pub async fn main() -> anyhow::Result<()> { let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns); let processor = Arc::new(NexusBackend::new( - Arc::new(Mutex::new(catalog)), + Arc::new(catalog), tracker, conn_flow_handler, peerdb_fdw_mode,