Skip to content

Commit

Permalink
reworked RESYNC MIRROR to work with UI created mirrors (#636)
Browse files Browse the repository at this point in the history
`RESYNC MIRROR` was internally implemented in the query layer as
processing a `DROP MIRROR` statement followed by a `CREATE MIRROR`
statement. This involved retrieving a column named `flow_metadata` from
the catalog for the flow job, which contained unprocessed options for
the QRep flow job. Unfortunately, mirrors created from the PeerDB UI do
not set this column properly.

We now retrieve the fully configured `QRepConfig` proto from the
`config_proto` column instead, modify it and then dispatch it directly
without any query layer side processing.
  • Loading branch information
heavycrystal authored Nov 10, 2023
1 parent a3976c3 commit f72d9a5
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 51 deletions.
3 changes: 3 additions & 0 deletions nexus/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ target/
# submodules
/sqlparser-rs

# catalog pkg
/catalog/pkg

# misc
.env
48 changes: 38 additions & 10 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pt::{
peerdb_peers::PostgresConfig,
peerdb_peers::{peer::Config, DbType, Peer},
};
use serde_json::Value;
use tokio_postgres::{types, Client};

mod embedded {
Expand Down Expand Up @@ -73,7 +74,7 @@ impl CatalogConfig {
password: self.password.clone(),
database: self.database.clone(),
transaction_snapshot: "".to_string(),
metadata_schema: Some("".to_string())
metadata_schema: Some("".to_string()),
}
}

Expand Down Expand Up @@ -308,8 +309,7 @@ impl Catalog {
}
Some(DbType::Mongo) => {
let err = format!("unable to decode {} options for peer {}", "mongo", name);
let mongo_config =
pt::peerdb_peers::MongoConfig::decode(options).context(err)?;
let mongo_config = pt::peerdb_peers::MongoConfig::decode(options).context(err)?;
Ok(Some(Config::MongoConfig(mongo_config)))
}
Some(DbType::Eventhub) => {
Expand All @@ -326,8 +326,7 @@ impl Catalog {
}
Some(DbType::S3) => {
let err = format!("unable to decode {} options for peer {}", "s3", name);
let s3_config =
pt::peerdb_peers::S3Config::decode(options).context(err)?;
let s3_config = pt::peerdb_peers::S3Config::decode(options).context(err)?;
Ok(Some(Config::S3Config(s3_config)))
}
Some(DbType::Sqlserver) => {
Expand All @@ -342,8 +341,7 @@ impl Catalog {
"eventhub_group", name
);
let eventhub_group_config =
pt::peerdb_peers::EventHubGroupConfig::decode(options)
.context(err)?;
pt::peerdb_peers::EventHubGroupConfig::decode(options).context(err)?;
Ok(Some(Config::EventhubGroupConfig(eventhub_group_config)))
}
None => Ok(None),
Expand Down Expand Up @@ -428,15 +426,21 @@ impl Catalog {
.await?;

let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {
let flow_opts_opt: Option<Value> = row.get("flow_metadata");
let flow_opts: HashMap<String, Value> = match flow_opts_opt {
Some(flow_opts) => serde_json::from_value(flow_opts)
.context("unable to deserialize flow options")
.unwrap_or_default(),
None => HashMap::new(),
};

QRepFlowJob {
name: row.get("name"),
source_peer: row.get("source_peer_name"),
target_peer: row.get("destination_peer_name"),
description: row.get("description"),
query_string: row.get("query_string"),
flow_options: serde_json::from_value(row.get("flow_metadata"))
.context("unable to deserialize flow options")
.unwrap_or_default(),
flow_options: flow_opts,
// we set the disabled flag to false by default
disabled: false,
}
Expand Down Expand Up @@ -465,6 +469,10 @@ impl Catalog {
)
.await?;

if job.flow_options.get("destination_table_name").is_none() {
return Err(anyhow!("destination_table_name not found in flow options"));
}

let _rows = self
.pg
.execute(
Expand Down Expand Up @@ -572,4 +580,24 @@ impl Catalog {
let peer_count: i64 = peer_check.get(0);
Ok(peer_count)
}

pub async fn get_qrep_config_proto(
&self,
flow_job_name: &str,
) -> anyhow::Result<Option<pt::peerdb_flow::QRepConfig>> {
let row = self
.pg
.query_opt(
"SELECT config_proto FROM flows WHERE name=$1 AND query_string IS NOT NULL",
&[&flow_job_name],
)
.await?;

Ok(match row {
Some(row) => Some(pt::peerdb_flow::QRepConfig::decode::<&[u8]>(
row.get("config_proto"),
)?),
None => None,
})
}
}
2 changes: 1 addition & 1 deletion nexus/flow-rs/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl FlowGrpcClient {
})
}

async fn start_query_replication_flow(
pub async fn start_query_replication_flow(
&mut self,
qrep_config: &pt::peerdb_flow::QRepConfig,
) -> anyhow::Result<String> {
Expand Down
115 changes: 75 additions & 40 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

use analyzer::{PeerDDL, QueryAssocation};
use async_trait::async_trait;
Expand Down Expand Up @@ -446,14 +450,24 @@ impl NexusBackend {
let mut destinations = HashSet::with_capacity(table_mappings_count);
for tm in flow_job.table_mappings.iter() {
if !sources.insert(tm.source_table_identifier.as_str()) {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier),
})))
return Err(PgWireError::ApiError(Box::new(
PgError::Internal {
err_msg: format!(
"Duplicate source table identifier {}",
tm.source_table_identifier
),
},
)));
}
if !destinations.insert(tm.destination_table_identifier.as_str()) {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier),
})))
return Err(PgWireError::ApiError(Box::new(
PgError::Internal {
err_msg: format!(
"Duplicate destination table identifier {}",
tm.destination_table_identifier
),
},
)));
}
}
}
Expand Down Expand Up @@ -605,18 +619,23 @@ impl NexusBackend {
err_msg: "flow service is not configured".to_owned(),
})));
}
// retrieve the mirror job since DROP MIRROR will delete the row later.
let catalog = self.catalog.lock().await;
let qrep_job = catalog
.get_qrep_flow_job_by_name(mirror_name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("error while getting QRep flow job: {:?}", err),
}))
})?;
// unlock the mutex so it can be used by the functions
std::mem::drop(catalog);

let qrep_config = {
// retrieve the mirror job since DROP MIRROR will delete the row later.
let catalog = self.catalog.lock().await;
catalog
.get_qrep_config_proto(mirror_name)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"error while getting QRep flow job: {:?}",
err
),
}))
})?
};

self.handle_drop_mirror(&NexusStatement::PeerDDL {
// not supposed to be used by the function
stmt: sqlparser::ast::Statement::ExecuteMirror {
Expand All @@ -630,26 +649,42 @@ impl NexusBackend {
.await?;

// if it is none and DROP MIRROR didn't error out, either mirror doesn't exist or it is a CDC mirror.
match qrep_job {
Some(mut qrep_job) => {
match qrep_config {
Some(mut qrep_config) => {
if query_string.is_some() {
qrep_job.query_string = query_string.as_ref().unwrap().clone();
qrep_config.query = query_string.as_ref().unwrap().clone();
}
qrep_job.flow_options.insert(
"dst_table_full_resync".to_string(),
serde_json::value::Value::Bool(true),
);
self.handle_create_mirror_for_select(&NexusStatement::PeerDDL {
// not supposed to be used by the function
stmt: sqlparser::ast::Statement::ExecuteMirror {
mirror_name: "no".into(),
},
ddl: Box::new(PeerDDL::CreateMirrorForSelect {
if_not_exists: false,
qrep_flow_job: qrep_job,
}),
})
.await?;
qrep_config.dst_table_full_resync = true;

let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
let workflow_id = flow_handler
.start_query_replication_flow(&qrep_config)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"error while starting new QRep job: {:?}",
err
),
}))
})?;
// relock catalog, DROP MIRROR is done with it now
let catalog = self.catalog.lock().await;
catalog
.update_workflow_id_for_flow_job(
&qrep_config.flow_job_name,
&workflow_id,
)
.await
.map_err(|err| {
PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!(
"unable to update workflow for flow job: {:?}",
err
),
}))
})?;

let resync_mirror_success = format!("RESYNC MIRROR {}", mirror_name);
Ok(vec![Response::Execution(Tag::new_for_execution(
&resync_mirror_success,
Expand All @@ -674,7 +709,7 @@ impl NexusBackend {
err_msg: "flow service is not configured".to_owned(),
})));
}

let catalog = self.catalog.lock().await;
tracing::info!(
"[PAUSE MIRROR] mirror_name: {}, if_exists: {}",
Expand All @@ -696,7 +731,7 @@ impl NexusBackend {
"[PAUSE MIRROR] got workflow id: {:?}",
workflow_details.as_ref().map(|w| &w.workflow_id)
);

if let Some(workflow_details) = workflow_details {
let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await;
flow_handler
Expand Down Expand Up @@ -725,7 +760,7 @@ impl NexusBackend {
format!("no such mirror: {:?}", flow_job_name),
))))
}
},
}
PeerDDL::ResumeMirror {
if_exists,
flow_job_name,
Expand Down

0 comments on commit f72d9a5

Please sign in to comment.