Skip to content

Commit

Permalink
Merge branch 'main' into full-resync-bq
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Nov 10, 2023
2 parents 818ef6e + 7bb737e commit 3d4ce6b
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 51 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@
tmp/
.envrc
.idea/

private/
nexus/server/tests/assets/*.json
nexus/server/tests/results/actual/
3 changes: 3 additions & 0 deletions nexus/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ target/
# submodules
/sqlparser-rs

# catalog pkg
/catalog/pkg

# misc
.env
48 changes: 38 additions & 10 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pt::{
peerdb_peers::PostgresConfig,
peerdb_peers::{peer::Config, DbType, Peer},
};
use serde_json::Value;
use tokio_postgres::{types, Client};

mod embedded {
Expand Down Expand Up @@ -73,7 +74,7 @@ impl CatalogConfig {
password: self.password.clone(),
database: self.database.clone(),
transaction_snapshot: "".to_string(),
metadata_schema: Some("".to_string())
metadata_schema: Some("".to_string()),
}
}

Expand Down Expand Up @@ -308,8 +309,7 @@ impl Catalog {
}
Some(DbType::Mongo) => {
let err = format!("unable to decode {} options for peer {}", "mongo", name);
let mongo_config =
pt::peerdb_peers::MongoConfig::decode(options).context(err)?;
let mongo_config = pt::peerdb_peers::MongoConfig::decode(options).context(err)?;
Ok(Some(Config::MongoConfig(mongo_config)))
}
Some(DbType::Eventhub) => {
Expand All @@ -326,8 +326,7 @@ impl Catalog {
}
Some(DbType::S3) => {
let err = format!("unable to decode {} options for peer {}", "s3", name);
let s3_config =
pt::peerdb_peers::S3Config::decode(options).context(err)?;
let s3_config = pt::peerdb_peers::S3Config::decode(options).context(err)?;
Ok(Some(Config::S3Config(s3_config)))
}
Some(DbType::Sqlserver) => {
Expand All @@ -342,8 +341,7 @@ impl Catalog {
"eventhub_group", name
);
let eventhub_group_config =
pt::peerdb_peers::EventHubGroupConfig::decode(options)
.context(err)?;
pt::peerdb_peers::EventHubGroupConfig::decode(options).context(err)?;
Ok(Some(Config::EventhubGroupConfig(eventhub_group_config)))
}
None => Ok(None),
Expand Down Expand Up @@ -428,15 +426,21 @@ impl Catalog {
.await?;

let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {
let flow_opts_opt: Option<Value> = row.get("flow_metadata");
let flow_opts: HashMap<String, Value> = match flow_opts_opt {
Some(flow_opts) => serde_json::from_value(flow_opts)
.context("unable to deserialize flow options")
.unwrap_or_default(),
None => HashMap::new(),
};

QRepFlowJob {
name: row.get("name"),
source_peer: row.get("source_peer_name"),
target_peer: row.get("destination_peer_name"),
description: row.get("description"),
query_string: row.get("query_string"),
flow_options: serde_json::from_value(row.get("flow_metadata"))
.context("unable to deserialize flow options")
.unwrap_or_default(),
flow_options: flow_opts,
// we set the disabled flag to false by default
disabled: false,
}
Expand Down Expand Up @@ -465,6 +469,10 @@ impl Catalog {
)
.await?;

if job.flow_options.get("destination_table_name").is_none() {
return Err(anyhow!("destination_table_name not found in flow options"));
}

let _rows = self
.pg
.execute(
Expand Down Expand Up @@ -572,4 +580,24 @@ impl Catalog {
let peer_count: i64 = peer_check.get(0);
Ok(peer_count)
}

pub async fn get_qrep_config_proto(
&self,
flow_job_name: &str,
) -> anyhow::Result<Option<pt::peerdb_flow::QRepConfig>> {
let row = self
.pg
.query_opt(
"SELECT config_proto FROM flows WHERE name=$1 AND query_string IS NOT NULL",
&[&flow_job_name],
)
.await?;

Ok(match row {
Some(row) => Some(pt::peerdb_flow::QRepConfig::decode::<&[u8]>(
row.get("config_proto"),
)?),
None => None,
})
}
}
2 changes: 1 addition & 1 deletion nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl FlowGrpcClient {
})
}

async fn start_query_replication_flow(
pub async fn start_query_replication_flow(
&mut self,
qrep_config: &pt::peerdb_flow::QRepConfig,
) -> anyhow::Result<String> {
Expand Down
115 changes: 75 additions & 40 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

use analyzer::{PeerDDL, QueryAssocation};
use async_trait::async_trait;
Expand Down Expand Up @@ -446,14 +450,24 @@ impl NexusBackend {
let mut destinations = HashSet::with_capacity(table_mappings_count);
for tm in flow_job.table_mappings.iter() {
if !sources.insert(tm.source_table_identifier.as_str()) {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier),
})))
return Err(PgWireError::ApiError(Box::new(
PgError::Internal {
err_msg: format!(
"Duplicate source table identifier {}",
tm.source_table_identifier
),
},
)));
}
if !destinations.insert(tm.destination_table_identifier.as_str()) {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier),
})))
return Err(PgWireError::ApiError(Box::new(
PgError::Internal {
err_msg: format!(
"Duplicate destination table identifier {}",
tm.destination_table_identifier
),
},
)));
}
}
}
Expand Down Expand Up @@ -605,18 +619,23 @@ impl NexusBackend {
err_msg: "flow service is not configured".to_owned(),
})));
}
// retrieve the mirror job since DROP MIRROR will delete the row later.
let catalog = self.catalog.lock().await;
let qrep_job = catalog
.get_qrep_flow_job_by_name(mirror_name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error while getting QRep flow job: {:?}", err),
}))
})?;
// unlock the mutex so it can be used by the functions
std::mem::drop(catalog);

let qrep_config = {
// retrieve the mirror job since DROP MIRROR will delete the row later.
let catalog = self.catalog.lock().await;
catalog
.get_qrep_config_proto(mirror_name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"error while getting QRep flow job: {:?}",
err
),
}))
})?
};

self.handle_drop_mirror(&NexusStatement::PeerDDL {
// not supposed to be used by the function
stmt: sqlparser::ast::Statement::ExecuteMirror {
Expand All @@ -630,26 +649,42 @@ impl NexusBackend {
.await?;

// if it is none and DROP MIRROR didn't error out, either mirror doesn't exist or it is a CDC mirror.
match qrep_job {
Some(mut qrep_job) => {
match qrep_config {
Some(mut qrep_config) => {
if query_string.is_some() {
qrep_job.query_string = query_string.as_ref().unwrap().clone();
qrep_config.query = query_string.as_ref().unwrap().clone();
}
qrep_job.flow_options.insert(
"dst_table_full_resync".to_string(),
serde_json::value::Value::Bool(true),
);
self.handle_create_mirror_for_select(&NexusStatement::PeerDDL {
// not supposed to be used by the function
stmt: sqlparser::ast::Statement::ExecuteMirror {
mirror_name: "no".into(),
},
ddl: Box::new(PeerDDL::CreateMirrorForSelect {
if_not_exists: false,
qrep_flow_job: qrep_job,
}),
})
.await?;
qrep_config.dst_table_full_resync = true;

let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let workflow_id = flow_handler
.start_query_replication_flow(&qrep_config)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"error while starting new QRep job: {:?}",
err
),
}))
})?;
// relock catalog, DROP MIRROR is done with it now
let catalog = self.catalog.lock().await;
catalog
.update_workflow_id_for_flow_job(
&qrep_config.flow_job_name,
&workflow_id,
)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"unable to update workflow for flow job: {:?}",
err
),
}))
})?;

let resync_mirror_success = format!("RESYNC MIRROR {}", mirror_name);
Ok(vec![Response::Execution(Tag::new_for_execution(
&resync_mirror_success,
Expand All @@ -674,7 +709,7 @@ impl NexusBackend {
err_msg: "flow service is not configured".to_owned(),
})));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"[PAUSE MIRROR] mirror_name: {}, if_exists: {}",
Expand All @@ -696,7 +731,7 @@ impl NexusBackend {
"[PAUSE MIRROR] got workflow id: {:?}",
workflow_details.as_ref().map(|w| &w.workflow_id)
);

if let Some(workflow_details) = workflow_details {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
flow_handler
Expand Down Expand Up @@ -725,7 +760,7 @@ impl NexusBackend {
format!("no such mirror: {:?}", flow_job_name),
))))
}
},
}
PeerDDL::ResumeMirror {
if_exists,
flow_job_name,
Expand Down

0 comments on commit 3d4ce6b

Please sign in to comment.