Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nexus: don't wrap Catalog in Mutex #1040

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions nexus/parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ use pgwire::{
error::{ErrorInfo, PgWireError, PgWireResult},
};
use sqlparser::{ast::Statement, dialect::PostgreSqlDialect, parser::Parser};
use tokio::sync::Mutex;

const DIALECT: PostgreSqlDialect = PostgreSqlDialect {};

#[derive(Clone)]
pub struct NexusQueryParser {
catalog: Arc<Mutex<Catalog>>,
catalog: Arc<Catalog>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -93,13 +92,12 @@ pub struct NexusParsedStatement {
}

impl NexusQueryParser {
pub fn new(catalog: Arc<Mutex<Catalog>>) -> Self {
pub fn new(catalog: Arc<Catalog>) -> Self {
Self { catalog }
}

pub async fn get_peers_bridge(&self) -> PgWireResult<HashMap<String, pt::peerdb_peers::Peer>> {
let catalog = self.catalog.lock().await;
let peers = catalog.get_peers().await;
let peers = self.catalog.get_peers().await;

peers.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
Expand Down
123 changes: 55 additions & 68 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use clap::Parser;
use cursor::PeerCursors;
use dashmap::{mapref::entry::Entry as DashEntry, DashMap};
use flow_rs::grpc::{FlowGrpcClient, PeerValidationResult};
use futures::join;
use peer_bigquery::BigQueryQueryExecutor;
use peer_connections::{PeerConnectionTracker, PeerConnections};
use peer_cursor::{
Expand Down Expand Up @@ -40,7 +41,7 @@ use pt::{
};
use rand::Rng;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{Mutex, MutexGuard};
use tokio::sync::Mutex;
use tokio::{io::AsyncWriteExt, net::TcpListener};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
Expand Down Expand Up @@ -78,7 +79,7 @@ impl AuthSource for FixedPasswordAuthSource {
}

pub struct NexusBackend {
catalog: Arc<Mutex<Catalog>>,
catalog: Arc<Catalog>,
peer_connections: PeerConnectionTracker,
query_parser: NexusQueryParser,
peer_cursors: Mutex<PeerCursors>,
Expand All @@ -89,7 +90,7 @@ pub struct NexusBackend {

impl NexusBackend {
pub fn new(
catalog: Arc<Mutex<Catalog>>,
catalog: Arc<Catalog>,
peer_connections: PeerConnectionTracker,
flow_handler: Option<Arc<Mutex<FlowGrpcClient>>>,
peerdb_fdw_mode: bool,
Expand Down Expand Up @@ -161,7 +162,7 @@ impl NexusBackend {
}

async fn check_for_mirror(
catalog: &MutexGuard<'_, Catalog>,
catalog: &Catalog,
flow_name: &str,
) -> PgWireResult<Option<WorkflowDetails>> {
let workflow_details = catalog
Expand All @@ -175,10 +176,7 @@ impl NexusBackend {
Ok(workflow_details)
}

async fn get_peer_of_mirror(
catalog: &MutexGuard<'_, Catalog>,
peer_name: &str,
) -> PgWireResult<Peer> {
async fn get_peer_of_mirror(catalog: &Catalog, peer_name: &str) -> PgWireResult<Peer> {
let peer = catalog.get_peer(peer_name).await.map_err(|err| {
PgWireError::ApiError(format!("unable to get peer {:?}: {:?}", peer_name, err).into())
})?;
Expand Down Expand Up @@ -251,13 +249,13 @@ impl NexusBackend {
));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"DROP MIRROR: mirror_name: {}, if_exists: {}",
flow_job_name,
if_exists
);
let workflow_details = catalog
let workflow_details = self
.catalog
.get_workflow_details_for_flow_job(flow_job_name)
.await
.map_err(|err| {
Expand All @@ -284,7 +282,7 @@ impl NexusBackend {
format!("unable to shutdown flow job: {:?}", err).into(),
)
})?;
catalog
self.catalog
.delete_flow_job_entry(flow_job_name)
.await
.map_err(|err| {
Expand Down Expand Up @@ -334,14 +332,13 @@ impl NexusBackend {
}
let mirror_details;
{
let catalog = self.catalog.lock().await;
mirror_details =
Self::check_for_mirror(&catalog, &qrep_flow_job.name).await?;
Self::check_for_mirror(self.catalog.as_ref(), &qrep_flow_job.name)
.await?;
}
if mirror_details.is_none() {
{
let catalog = self.catalog.lock().await;
catalog
self.catalog
.create_qrep_flow_job_entry(qrep_flow_job)
.await
.map_err(|err| {
Expand Down Expand Up @@ -399,8 +396,7 @@ impl NexusBackend {
})?;
}

let catalog = self.catalog.lock().await;
catalog.create_peer(peer.as_ref()).await.map_err(|e| {
self.catalog.create_peer(peer.as_ref()).await.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
Expand All @@ -420,8 +416,8 @@ impl NexusBackend {
"flow service is not configured".into(),
));
}
let catalog = self.catalog.lock().await;
let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?;
let mirror_details =
Self::check_for_mirror(self.catalog.as_ref(), &flow_job.name).await?;
if mirror_details.is_none() {
// reject duplicate source tables or duplicate target tables
let table_mappings_count = flow_job.table_mappings.len();
Expand Down Expand Up @@ -450,7 +446,7 @@ impl NexusBackend {
}
}

catalog
self.catalog
.create_cdc_flow_job_entry(flow_job)
.await
.map_err(|err| {
Expand All @@ -460,10 +456,12 @@ impl NexusBackend {
})?;

// get source and destination peers
let src_peer =
Self::get_peer_of_mirror(&catalog, &flow_job.source_peer).await?;
let dst_peer =
Self::get_peer_of_mirror(&catalog, &flow_job.target_peer).await?;
let (src_peer, dst_peer) = join!(
Self::get_peer_of_mirror(self.catalog.as_ref(), &flow_job.source_peer),
Self::get_peer_of_mirror(self.catalog.as_ref(), &flow_job.target_peer),
);
let src_peer = src_peer?;
let dst_peer = dst_peer?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
Expand All @@ -476,7 +474,7 @@ impl NexusBackend {
)
})?;

catalog
self.catalog
.update_workflow_id_for_flow_job(&flow_job.name, &workflow_id)
.await
.map_err(|err| {
Expand Down Expand Up @@ -505,8 +503,7 @@ impl NexusBackend {
}

if let Some(job) = {
let catalog = self.catalog.lock().await;
catalog
self.catalog
.get_qrep_flow_job_by_name(flow_job_name)
.await
.map_err(|err| {
Expand Down Expand Up @@ -540,17 +537,21 @@ impl NexusBackend {
));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"DROP PEER: peer_name: {}, if_exists: {}",
peer_name,
if_exists
);
let peer_exists = catalog.check_peer_entry(peer_name).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to query catalog for peer metadata: {:?}", err).into(),
)
})?;
let peer_exists =
self.catalog
.check_peer_entry(peer_name)
.await
.map_err(|err| {
PgWireError::ApiError(
format!("unable to query catalog for peer metadata: {:?}", err)
.into(),
)
})?;
tracing::info!("peer exist count: {}", peer_exists);
if peer_exists != 0 {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
Expand Down Expand Up @@ -590,8 +591,7 @@ impl NexusBackend {

let qrep_config = {
// retrieve the mirror job since DROP MIRROR will delete the row later.
let catalog = self.catalog.lock().await;
catalog
self.catalog
.get_qrep_config_proto(mirror_name)
.await
.map_err(|err| {
Expand Down Expand Up @@ -632,8 +632,7 @@ impl NexusBackend {
)
})?;
// relock catalog, DROP MIRROR is done with it now
let catalog = self.catalog.lock().await;
catalog
self.catalog
.update_workflow_id_for_flow_job(
&qrep_config.flow_job_name,
&workflow_id,
Expand Down Expand Up @@ -674,13 +673,13 @@ impl NexusBackend {
));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"[PAUSE MIRROR] mirror_name: {}, if_exists: {}",
flow_job_name,
if_exists
);
let workflow_details = catalog
let workflow_details = self
.catalog
.get_workflow_details_for_flow_job(flow_job_name)
.await
.map_err(|err| {
Expand Down Expand Up @@ -737,13 +736,13 @@ impl NexusBackend {
));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"[RESUME MIRROR] mirror_name: {}, if_exists: {}",
flow_job_name,
if_exists
);
let workflow_details = catalog
let workflow_details = self
.catalog
.get_workflow_details_for_flow_job(flow_job_name)
.await
.map_err(|err| {
Expand Down Expand Up @@ -805,8 +804,7 @@ impl NexusBackend {
}
QueryAssociation::Catalog => {
tracing::info!("handling catalog query: {}", stmt);
let catalog = self.catalog.lock().await;
Arc::clone(catalog.get_executor())
Arc::clone(self.catalog.get_executor())
}
};

Expand All @@ -829,10 +827,7 @@ impl NexusBackend {
analyzer::CursorEvent::Close(c) => peer_cursors.get_peer(&c),
};
match peer {
None => {
let catalog = self.catalog.lock().await;
Arc::clone(catalog.get_executor())
}
None => Arc::clone(self.catalog.get_executor()),
Some(peer) => self.get_peer_executor(peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err).into(),
Expand All @@ -850,22 +845,18 @@ impl NexusBackend {
}

async fn run_qrep_mirror(&self, qrep_flow_job: &QRepFlowJob) -> PgWireResult<String> {
let catalog = self.catalog.lock().await;

let (src_peer, dst_peer) = join!(
self.catalog.get_peer(&qrep_flow_job.source_peer),
self.catalog.get_peer(&qrep_flow_job.target_peer),
);
// get source and destination peers
let src_peer = catalog
.get_peer(&qrep_flow_job.source_peer)
.await
.map_err(|err| {
PgWireError::ApiError(format!("unable to get source peer: {:?}", err).into())
})?;
let src_peer = src_peer.map_err(|err| {
PgWireError::ApiError(format!("unable to get source peer: {:?}", err).into())
})?;

let dst_peer = catalog
.get_peer(&qrep_flow_job.target_peer)
.await
.map_err(|err| {
PgWireError::ApiError(format!("unable to get destination peer: {:?}", err).into())
})?;
let dst_peer = dst_peer.map_err(|err| {
PgWireError::ApiError(format!("unable to get destination peer: {:?}", err).into())
})?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
Expand All @@ -876,7 +867,7 @@ impl NexusBackend {
PgWireError::ApiError(format!("unable to submit job: {:?}", err).into())
})?;

catalog
self.catalog
.update_workflow_id_for_flow_job(&qrep_flow_job.name, &workflow_id)
.await
.map_err(|err| {
Expand Down Expand Up @@ -1087,11 +1078,7 @@ impl ExtendedQueryHandler for NexusBackend {
}
}
}
QueryAssociation::Catalog => {
let catalog = self.catalog.lock().await;
let executor = catalog.get_executor();
executor.describe(stmt).await?
}
QueryAssociation::Catalog => self.catalog.get_executor().describe(stmt).await?,
};
if let Some(described_schema) = schema {
if self.peerdb_fdw_mode {
Expand Down Expand Up @@ -1320,7 +1307,7 @@ pub async fn main() -> anyhow::Result<()> {
let tracker = PeerConnectionTracker::new(conn_uuid, conn_peer_conns);

let processor = Arc::new(NexusBackend::new(
Arc::new(Mutex::new(catalog)),
Arc::new(catalog),
tracker,
conn_flow_handler,
peerdb_fdw_mode,
Expand Down
Loading