Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reworked RESYNC MIRROR to work with UI created mirrors #636

Merged
merged 6 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions nexus/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ target/
# submodules
/sqlparser-rs

# catalog pkg
/catalog/pkg

# misc
.env
48 changes: 38 additions & 10 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pt::{
peerdb_peers::PostgresConfig,
peerdb_peers::{peer::Config, DbType, Peer},
};
use serde_json::Value;
use tokio_postgres::{types, Client};

mod embedded {
Expand Down Expand Up @@ -73,7 +74,7 @@ impl CatalogConfig {
password: self.password.clone(),
database: self.database.clone(),
transaction_snapshot: "".to_string(),
metadata_schema: Some("".to_string())
metadata_schema: Some("".to_string()),
}
}

Expand Down Expand Up @@ -308,8 +309,7 @@ impl Catalog {
}
Some(DbType::Mongo) => {
let err = format!("unable to decode {} options for peer {}", "mongo", name);
let mongo_config =
pt::peerdb_peers::MongoConfig::decode(options).context(err)?;
let mongo_config = pt::peerdb_peers::MongoConfig::decode(options).context(err)?;
Ok(Some(Config::MongoConfig(mongo_config)))
}
Some(DbType::Eventhub) => {
Expand All @@ -326,8 +326,7 @@ impl Catalog {
}
Some(DbType::S3) => {
let err = format!("unable to decode {} options for peer {}", "s3", name);
let s3_config =
pt::peerdb_peers::S3Config::decode(options).context(err)?;
let s3_config = pt::peerdb_peers::S3Config::decode(options).context(err)?;
Ok(Some(Config::S3Config(s3_config)))
}
Some(DbType::Sqlserver) => {
Expand All @@ -342,8 +341,7 @@ impl Catalog {
"eventhub_group", name
);
let eventhub_group_config =
pt::peerdb_peers::EventHubGroupConfig::decode(options)
.context(err)?;
pt::peerdb_peers::EventHubGroupConfig::decode(options).context(err)?;
Ok(Some(Config::EventhubGroupConfig(eventhub_group_config)))
}
None => Ok(None),
Expand Down Expand Up @@ -428,15 +426,21 @@ impl Catalog {
.await?;

let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {
let flow_opts_opt: Option<Value> = row.get("flow_metadata");
let flow_opts: HashMap<String, Value> = match flow_opts_opt {
Some(flow_opts) => serde_json::from_value(flow_opts)
.context("unable to deserialize flow options")
.unwrap_or_default(),
None => HashMap::new(),
};

QRepFlowJob {
name: row.get("name"),
source_peer: row.get("source_peer_name"),
target_peer: row.get("destination_peer_name"),
description: row.get("description"),
query_string: row.get("query_string"),
flow_options: serde_json::from_value(row.get("flow_metadata"))
.context("unable to deserialize flow options")
.unwrap_or_default(),
flow_options: flow_opts,
// we set the disabled flag to false by default
disabled: false,
}
Expand Down Expand Up @@ -465,6 +469,10 @@ impl Catalog {
)
.await?;

if job.flow_options.get("destination_table_name").is_none() {
return Err(anyhow!("destination_table_name not found in flow options"));
}

let _rows = self
.pg
.execute(
Expand Down Expand Up @@ -572,4 +580,24 @@ impl Catalog {
let peer_count: i64 = peer_check.get(0);
Ok(peer_count)
}

pub async fn get_qrep_config_proto(
&self,
flow_job_name: &str,
) -> anyhow::Result<Option<pt::peerdb_flow::QRepConfig>> {
let row = self
.pg
.query_opt(
"SELECT config_proto FROM flows WHERE name=$1 AND query_string IS NOT NULL",
&[&flow_job_name],
)
.await?;

Ok(match row {
Some(row) => Some(pt::peerdb_flow::QRepConfig::decode::<&[u8]>(
row.get("config_proto"),
)?),
None => None,
})
}
}
2 changes: 1 addition & 1 deletion nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl FlowGrpcClient {
})
}

async fn start_query_replication_flow(
pub async fn start_query_replication_flow(
&mut self,
qrep_config: &pt::peerdb_flow::QRepConfig,
) -> anyhow::Result<String> {
Expand Down
115 changes: 75 additions & 40 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

use analyzer::{PeerDDL, QueryAssocation};
use async_trait::async_trait;
Expand Down Expand Up @@ -446,14 +450,24 @@ impl NexusBackend {
let mut destinations = HashSet::with_capacity(table_mappings_count);
for tm in flow_job.table_mappings.iter() {
if !sources.insert(tm.source_table_identifier.as_str()) {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier),
})))
return Err(PgWireError::ApiError(Box::new(
PgError::Internal {
err_msg: format!(
"Duplicate source table identifier {}",
tm.source_table_identifier
),
},
)));
}
if !destinations.insert(tm.destination_table_identifier.as_str()) {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier),
})))
return Err(PgWireError::ApiError(Box::new(
PgError::Internal {
err_msg: format!(
"Duplicate destination table identifier {}",
tm.destination_table_identifier
),
},
)));
}
}
}
Expand Down Expand Up @@ -605,18 +619,23 @@ impl NexusBackend {
err_msg: "flow service is not configured".to_owned(),
})));
}
// retrieve the mirror job since DROP MIRROR will delete the row later.
let catalog = self.catalog.lock().await;
let qrep_job = catalog
.get_qrep_flow_job_by_name(mirror_name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error while getting QRep flow job: {:?}", err),
}))
})?;
// unlock the mutex so it can be used by the functions
std::mem::drop(catalog);

let qrep_config = {
// retrieve the mirror job since DROP MIRROR will delete the row later.
let catalog = self.catalog.lock().await;
catalog
.get_qrep_config_proto(mirror_name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"error while getting QRep flow job: {:?}",
err
),
}))
})?
};

self.handle_drop_mirror(&NexusStatement::PeerDDL {
// not supposed to be used by the function
stmt: sqlparser::ast::Statement::ExecuteMirror {
Expand All @@ -630,26 +649,42 @@ impl NexusBackend {
.await?;

// if it is none and DROP MIRROR didn't error out, either mirror doesn't exist or it is a CDC mirror.
match qrep_job {
Some(mut qrep_job) => {
match qrep_config {
Some(mut qrep_config) => {
if query_string.is_some() {
qrep_job.query_string = query_string.as_ref().unwrap().clone();
qrep_config.query = query_string.as_ref().unwrap().clone();
}
qrep_job.flow_options.insert(
"dst_table_full_resync".to_string(),
serde_json::value::Value::Bool(true),
);
self.handle_create_mirror_for_select(&NexusStatement::PeerDDL {
// not supposed to be used by the function
stmt: sqlparser::ast::Statement::ExecuteMirror {
mirror_name: "no".into(),
},
ddl: Box::new(PeerDDL::CreateMirrorForSelect {
if_not_exists: false,
qrep_flow_job: qrep_job,
}),
})
.await?;
qrep_config.dst_table_full_resync = true;

let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let workflow_id = flow_handler
.start_query_replication_flow(&qrep_config)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"error while starting new QRep job: {:?}",
err
),
}))
})?;
// relock catalog, DROP MIRROR is done with it now
let catalog = self.catalog.lock().await;
catalog
.update_workflow_id_for_flow_job(
&qrep_config.flow_job_name,
&workflow_id,
)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"unable to update workflow for flow job: {:?}",
err
),
}))
})?;

let resync_mirror_success = format!("RESYNC MIRROR {}", mirror_name);
Ok(vec![Response::Execution(Tag::new_for_execution(
&resync_mirror_success,
Expand All @@ -674,7 +709,7 @@ impl NexusBackend {
err_msg: "flow service is not configured".to_owned(),
})));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"[PAUSE MIRROR] mirror_name: {}, if_exists: {}",
Expand All @@ -696,7 +731,7 @@ impl NexusBackend {
"[PAUSE MIRROR] got workflow id: {:?}",
workflow_details.as_ref().map(|w| &w.workflow_id)
);

if let Some(workflow_details) = workflow_details {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
flow_handler
Expand Down Expand Up @@ -725,7 +760,7 @@ impl NexusBackend {
format!("no such mirror: {:?}", flow_job_name),
))))
}
},
}
PeerDDL::ResumeMirror {
if_exists,
flow_job_name,
Expand Down