Skip to content

Commit

Permalink
refactor: create mirror
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Sep 18, 2023
1 parent ee31878 commit 71c7ce5
Showing 1 changed file with 61 additions and 77 deletions.
138 changes: 61 additions & 77 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use analyzer::{PeerDDL, QueryAssocation};
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use catalog::{Catalog, CatalogConfig};
use catalog::{Catalog, CatalogConfig, WorkflowDetails};
use clap::Parser;
use cursor::PeerCursors;
use dashmap::DashMap;
Expand Down Expand Up @@ -32,11 +32,11 @@ use pgwire::{
tokio::process_socket,
};
use pt::{
flow_model::QRepFlowJob,
flow_model::{FlowJob, QRepFlowJob},
peerdb_peers::{peer::Config, Peer},
};
use rand::Rng;
use tokio::sync::Mutex;
use tokio::sync::{Mutex, MutexGuard};
use tokio::{io::AsyncWriteExt, net::TcpListener};
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
Expand Down Expand Up @@ -158,6 +158,52 @@ impl NexusBackend {
!unsupported_peer_types.contains(&peer_type)
}

async fn check_for_mirror(
catalog: &MutexGuard<'_, Catalog>,
flow_name: String,
) -> PgWireResult<Option<WorkflowDetails>> {
let workflow_details = catalog
.get_workflow_details_for_flow_job(&flow_name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to query catalog for job metadata: {:?}", err),
}))
})?;
Ok(workflow_details)
}

async fn get_peer_of_mirror(
catalog: &MutexGuard<'_, Catalog>,
peer_name: String,
) -> PgWireResult<Peer> {
let peer = catalog.get_peer(&peer_name).await.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get peer {:?}: {:?}", peer_name, err),
}))
})?;
Ok(peer)
}

fn handle_mirror_existence(
if_not_exists: bool,
flow_name: String,
) -> PgWireResult<Vec<Response<'static>>> {
if if_not_exists {
let existing_mirror_success = "MIRROR ALREADY EXISTS";
Ok(vec![Response::Execution(Tag::new_for_execution(
existing_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"error".to_owned(),
format!("mirror already exists: {:?}", flow_name),
))))
}
}

async fn handle_query<'a>(
&self,
nexus_stmt: NexusStatement,
Expand Down Expand Up @@ -209,22 +255,9 @@ impl NexusBackend {
})));
}
let catalog = self.catalog.lock().await;
let workflow_details = catalog
.get_workflow_details_for_flow_job(&flow_job.name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"unable to query catalog for job metadata: {:?}",
err
),
}))
})?;
tracing::info!(
"got workflow id: {:?}",
workflow_details.as_ref().map(|w| &w.workflow_id)
);
if workflow_details.is_none() {
let mirror_details =
Self::check_for_mirror(&catalog, flow_job.name.clone()).await?;
if mirror_details.is_none() {
catalog
.create_flow_job_entry(&flow_job)
.await
Expand All @@ -239,27 +272,11 @@ impl NexusBackend {

// get source and destination peers
let src_peer =
catalog
.get_peer(&flow_job.source_peer)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("unable to get source peer: {:?}", err),
}))
})?;

Self::get_peer_of_mirror(&catalog, flow_job.source_peer.clone())
.await?;
let dst_peer =
catalog
.get_peer(&flow_job.target_peer)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"unable to get destination peer: {:?}",
err
),
}))
})?;
Self::get_peer_of_mirror(&catalog, flow_job.target_peer.clone())
.await?;

// make a request to the flow service to start the job.
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
Expand All @@ -286,18 +303,8 @@ impl NexusBackend {
&create_mirror_success,
None,
))])
} else if if_not_exists {
let existing_mirror_success = "MIRROR ALREADY EXISTS";
Ok(vec![Response::Execution(Tag::new_for_execution(
existing_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"error".to_owned(),
format!("mirror already exists: {:?}", flow_job.name),
))))
Self::handle_mirror_existence(if_not_exists, flow_job.name)
}
}
PeerDDL::CreateMirrorForSelect {
Expand All @@ -311,22 +318,9 @@ impl NexusBackend {
}

let catalog = self.catalog.lock().await;
let workflow_details = catalog
.get_workflow_details_for_flow_job(&qrep_flow_job.name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"unable to query catalog for job metadata: {:?}",
err
),
}))
})?;
tracing::info!(
"got workflow id: {:?}",
workflow_details.as_ref().map(|w| &w.workflow_id)
);
if workflow_details.is_none() {
let mirror_details =
Self::check_for_mirror(&catalog, qrep_flow_job.name.clone()).await?;
if mirror_details.is_none() {
catalog
.create_qrep_flow_job_entry(&qrep_flow_job)
.await
Expand Down Expand Up @@ -354,18 +348,8 @@ impl NexusBackend {
&create_mirror_success,
None,
))])
} else if if_not_exists {
let existing_mirror_success = "MIRROR ALREADY EXISTS";
Ok(vec![Response::Execution(Tag::new_for_execution(
existing_mirror_success,
None,
))])
} else {
Err(PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"error".to_owned(),
format!("mirror already exists: {:?}", qrep_flow_job.name),
))))
Self::handle_mirror_existence(if_not_exists, qrep_flow_job.name)
}
}
PeerDDL::ExecuteMirrorForSelect { flow_job_name } => {
Expand Down

0 comments on commit 71c7ce5

Please sign in to comment.