Skip to content

Commit

Permalink
nexus: don't wrap Catalog in Mutex
Browse files Browse the repository at this point in the history
Catalog's methods already handle synchronization,
besides when running migrations, which already uses exclusive connections

Also query source/destination peer in parallel
  • Loading branch information
serprex committed Jan 9, 2024
1 parent c0061a2 commit bfbc045
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 73 deletions.
8 changes: 3 additions & 5 deletions nexus/parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ use pgwire::{
error::{ErrorInfo, PgWireError, PgWireResult},
};
use sqlparser::{ast::Statement, dialect::PostgreSqlDialect, parser::Parser};
use tokio::sync::Mutex;

const DIALECT: PostgreSqlDialect = PostgreSqlDialect {};

#[derive(Clone)]
pub struct NexusQueryParser {
catalog: Arc<Mutex<Catalog>>,
catalog: Arc<Catalog>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -93,13 +92,12 @@ pub struct NexusParsedStatement {
}

impl NexusQueryParser {
pub fn new(catalog: Arc<Mutex<Catalog>>) -> Self {
pub fn new(catalog: Arc<Catalog>) -> Self {
Self { catalog }
}

pub async fn get_peers_bridge(&self) -> PgWireResult<HashMap<String, pt::peerdb_peers::Peer>> {
let catalog = self.catalog.lock().await;
let peers = catalog.get_peers().await;
let peers = self.catalog.get_peers().await;

peers.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
Expand Down
123 changes: 55 additions & 68 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use clap::Parser;
use cursor::PeerCursors;
use dashmap::{mapref::entry::Entry as DashEntry, DashMap};
use flow_rs::grpc::{FlowGrpcClient, PeerValidationResult};
use futures::join;
use peer_bigquery::BigQueryQueryExecutor;
use peer_connections::{PeerConnectionTracker, PeerConnections};
use peer_cursor::{
Expand Down Expand Up @@ -40,7 +41,7 @@ use pt::{
};
use rand::Rng;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{Mutex, MutexGuard};
use tokio::sync::Mutex;
use tokio::{io::AsyncWriteExt, net::TcpListener};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
Expand Down Expand Up @@ -78,7 +79,7 @@ impl AuthSource for FixedPasswordAuthSource {
}

pub struct NexusBackend {
catalog: Arc<Mutex<Catalog>>,
catalog: Arc<Catalog>,
peer_connections: PeerConnectionTracker,
query_parser: NexusQueryParser,
peer_cursors: Mutex<PeerCursors>,
Expand All @@ -89,7 +90,7 @@ pub struct NexusBackend {

impl NexusBackend {
pub fn new(
catalog: Arc<Mutex<Catalog>>,
catalog: Arc<Catalog>,
peer_connections: PeerConnectionTracker,
flow_handler: Option<Arc<Mutex<FlowGrpcClient>>>,
peerdb_fdw_mode: bool,
Expand Down Expand Up @@ -161,7 +162,7 @@ impl NexusBackend {
}

async fn check_for_mirror(
catalog: &MutexGuard<'_, Catalog>,
catalog: &Catalog,
flow_name: &str,
) -> PgWireResult<Option<WorkflowDetails>> {
let workflow_details = catalog
Expand All @@ -175,10 +176,7 @@ impl NexusBackend {
Ok(workflow_details)
}

async fn get_peer_of_mirror(
catalog: &MutexGuard<'_, Catalog>,
peer_name: &str,
) -> PgWireResult<Peer> {
async fn get_peer_of_mirror(catalog: &Catalog, peer_name: &str) -> PgWireResult<Peer> {
let peer = catalog.get_peer(peer_name).await.map_err(|err| {
PgWireError::ApiError(format!("unable to get peer {:?}: {:?}", peer_name, err).into())
})?;
Expand Down Expand Up @@ -251,13 +249,13 @@ impl NexusBackend {
));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"DROP MIRROR: mirror_name: {}, if_exists: {}",
flow_job_name,
if_exists
);
let workflow_details = catalog
let workflow_details = self
.catalog
.get_workflow_details_for_flow_job(flow_job_name)
.await
.map_err(|err| {
Expand All @@ -284,7 +282,7 @@ impl NexusBackend {
format!("unable to shutdown flow job: {:?}", err).into(),
)
})?;
catalog
self.catalog
.delete_flow_job_entry(flow_job_name)
.await
.map_err(|err| {
Expand Down Expand Up @@ -334,14 +332,13 @@ impl NexusBackend {
}
let mirror_details;
{
let catalog = self.catalog.lock().await;
mirror_details =
Self::check_for_mirror(&catalog, &qrep_flow_job.name).await?;
Self::check_for_mirror(self.catalog.as_ref(), &qrep_flow_job.name)
.await?;
}
if mirror_details.is_none() {
{
let catalog = self.catalog.lock().await;
catalog
self.catalog
.create_qrep_flow_job_entry(qrep_flow_job)
.await
.map_err(|err| {
Expand Down Expand Up @@ -399,8 +396,7 @@ impl NexusBackend {
})?;
}

let catalog = self.catalog.lock().await;
catalog.create_peer(peer.as_ref()).await.map_err(|e| {
self.catalog.create_peer(peer.as_ref()).await.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
Expand All @@ -420,8 +416,8 @@ impl NexusBackend {
"flow service is not configured".into(),
));
}
let catalog = self.catalog.lock().await;
let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?;
let mirror_details =
Self::check_for_mirror(&self.catalog.as_ref(), &flow_job.name).await?;
if mirror_details.is_none() {
// reject duplicate source tables or duplicate target tables
let table_mappings_count = flow_job.table_mappings.len();
Expand Down Expand Up @@ -450,7 +446,7 @@ impl NexusBackend {
}
}

catalog
self.catalog
.create_cdc_flow_job_entry(flow_job)
.await
.map_err(|err| {
Expand All @@ -460,10 +456,12 @@ impl NexusBackend {
})?;

// get source and destination peers
let src_peer =
Self::get_peer_of_mirror(&catalog, &flow_job.source_peer).await?;
let dst_peer =
Self::get_peer_of_mirror(&catalog, &flow_job.target_peer).await?;
let (src_peer, dst_peer) = join!(
Self::get_peer_of_mirror(&self.catalog.as_ref(), &flow_job.source_peer),
Self::get_peer_of_mirror(&self.catalog.as_ref(), &flow_job.target_peer),
);
let src_peer = src_peer?;
let dst_peer = dst_peer?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
Expand All @@ -476,7 +474,7 @@ impl NexusBackend {
)
})?;

catalog
self.catalog
.update_workflow_id_for_flow_job(&flow_job.name, &workflow_id)
.await
.map_err(|err| {
Expand Down Expand Up @@ -505,8 +503,7 @@ impl NexusBackend {
}

if let Some(job) = {
let catalog = self.catalog.lock().await;
catalog
self.catalog
.get_qrep_flow_job_by_name(flow_job_name)
.await
.map_err(|err| {
Expand Down Expand Up @@ -540,17 +537,21 @@ impl NexusBackend {
));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"DROP PEER: peer_name: {}, if_exists: {}",
peer_name,
if_exists
);
let peer_exists = catalog.check_peer_entry(peer_name).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to query catalog for peer metadata: {:?}", err).into(),
)
})?;
let peer_exists =
self.catalog
.check_peer_entry(peer_name)
.await
.map_err(|err| {
PgWireError::ApiError(
format!("unable to query catalog for peer metadata: {:?}", err)
.into(),
)
})?;
tracing::info!("peer exist count: {}", peer_exists);
if peer_exists != 0 {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
Expand Down Expand Up @@ -590,8 +591,7 @@ impl NexusBackend {

let qrep_config = {
// retrieve the mirror job since DROP MIRROR will delete the row later.
let catalog = self.catalog.lock().await;
catalog
self.catalog
.get_qrep_config_proto(mirror_name)
.await
.map_err(|err| {
Expand Down Expand Up @@ -632,8 +632,7 @@ impl NexusBackend {
)
})?;
// relock catalog, DROP MIRROR is done with it now
let catalog = self.catalog.lock().await;
catalog
self.catalog
.update_workflow_id_for_flow_job(
&qrep_config.flow_job_name,
&workflow_id,
Expand Down Expand Up @@ -674,13 +673,13 @@ impl NexusBackend {
));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"[PAUSE MIRROR] mirror_name: {}, if_exists: {}",
flow_job_name,
if_exists
);
let workflow_details = catalog
let workflow_details = self
.catalog
.get_workflow_details_for_flow_job(flow_job_name)
.await
.map_err(|err| {
Expand Down Expand Up @@ -737,13 +736,13 @@ impl NexusBackend {
));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"[RESUME MIRROR] mirror_name: {}, if_exists: {}",
flow_job_name,
if_exists
);
let workflow_details = catalog
let workflow_details = self
.catalog
.get_workflow_details_for_flow_job(flow_job_name)
.await
.map_err(|err| {
Expand Down Expand Up @@ -805,8 +804,7 @@ impl NexusBackend {
}
QueryAssociation::Catalog => {
tracing::info!("handling catalog query: {}", stmt);
let catalog = self.catalog.lock().await;
Arc::clone(catalog.get_executor())
Arc::clone(self.catalog.get_executor())
}
};

Expand All @@ -829,10 +827,7 @@ impl NexusBackend {
analyzer::CursorEvent::Close(c) => peer_cursors.get_peer(&c),
};
match peer {
None => {
let catalog = self.catalog.lock().await;
Arc::clone(catalog.get_executor())
}
None => Arc::clone(self.catalog.get_executor()),
Some(peer) => self.get_peer_executor(peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err).into(),
Expand All @@ -850,22 +845,18 @@ impl NexusBackend {
}

async fn run_qrep_mirror(&self, qrep_flow_job: &QRepFlowJob) -> PgWireResult<String> {
let catalog = self.catalog.lock().await;

let (src_peer, dst_peer) = join!(
self.catalog.get_peer(&qrep_flow_job.source_peer),
self.catalog.get_peer(&qrep_flow_job.target_peer),
);
// get source and destination peers
let src_peer = catalog
.get_peer(&qrep_flow_job.source_peer)
.await
.map_err(|err| {
PgWireError::ApiError(format!("unable to get source peer: {:?}", err).into())
})?;
let src_peer = src_peer.map_err(|err| {
PgWireError::ApiError(format!("unable to get source peer: {:?}", err).into())
})?;

let dst_peer = catalog
.get_peer(&qrep_flow_job.target_peer)
.await
.map_err(|err| {
PgWireError::ApiError(format!("unable to get destination peer: {:?}", err).into())
})?;
let dst_peer = dst_peer.map_err(|err| {
PgWireError::ApiError(format!("unable to get destination peer: {:?}", err).into())
})?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
Expand All @@ -876,7 +867,7 @@ impl NexusBackend {
PgWireError::ApiError(format!("unable to submit job: {:?}", err).into())
})?;

catalog
self.catalog
.update_workflow_id_for_flow_job(&qrep_flow_job.name, &workflow_id)
.await
.map_err(|err| {
Expand Down Expand Up @@ -1087,11 +1078,7 @@ impl ExtendedQueryHandler for NexusBackend {
}
}
}
QueryAssociation::Catalog => {
let catalog = self.catalog.lock().await;
let executor = catalog.get_executor();
executor.describe(stmt).await?
}
QueryAssociation::Catalog => self.catalog.get_executor().describe(stmt).await?,
};
if let Some(described_schema) = schema {
if self.peerdb_fdw_mode {
Expand Down Expand Up @@ -1320,7 +1307,7 @@ pub async fn main() -> anyhow::Result<()> {
let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns);

let processor = Arc::new(NexusBackend::new(
Arc::new(Mutex::new(catalog)),
Arc::new(catalog),
tracker,
conn_flow_handler,
peerdb_fdw_mode,
Expand Down

0 comments on commit bfbc045

Please sign in to comment.