From f72d9a5a9ee76283e291c7f4994f610bfae4ca4a Mon Sep 17 00:00:00 2001 From: Kevin K Biju <52661649+heavycrystal@users.noreply.github.com> Date: Sat, 11 Nov 2023 00:22:19 +0530 Subject: [PATCH 1/2] reworked RESYNC MIRROR to work with UI created mirrors (#636) `RESYNC MIRROR` was internally implemented in the query layer as processing a `DROP MIRROR` statement followed by a `CREATE MIRROR` statement. This involved retrieving a column named `flow_metadata` from the catalog for the flow job, which contained unprocessed options for the QRep flow job. Unfortunately, mirrors created from the PeerDB UI do not set this column properly. We now retrieve the fully configured `QRepConfig` proto from the `config_proto` column instead, modify it and then dispatch it directly without any query layer side processing. --- nexus/.gitignore | 3 + nexus/catalog/src/lib.rs | 48 ++++++++++++---- nexus/flow-rs/src/grpc.rs | 2 +- nexus/server/src/main.rs | 115 +++++++++++++++++++++++++------------- 4 files changed, 117 insertions(+), 51 deletions(-) diff --git a/nexus/.gitignore b/nexus/.gitignore index b9b2a160cc..86adb245ec 100644 --- a/nexus/.gitignore +++ b/nexus/.gitignore @@ -4,5 +4,8 @@ target/ # submodules /sqlparser-rs +# catalog pkg +/catalog/pkg + # misc .env diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 55078366fd..633ad95b36 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -10,6 +10,7 @@ use pt::{ peerdb_peers::PostgresConfig, peerdb_peers::{peer::Config, DbType, Peer}, }; +use serde_json::Value; use tokio_postgres::{types, Client}; mod embedded { @@ -73,7 +74,7 @@ impl CatalogConfig { password: self.password.clone(), database: self.database.clone(), transaction_snapshot: "".to_string(), - metadata_schema: Some("".to_string()) + metadata_schema: Some("".to_string()), } } @@ -308,8 +309,7 @@ impl Catalog { } Some(DbType::Mongo) => { let err = format!("unable to decode {} options for peer {}", "mongo", name); - let mongo_config = - pt::peerdb_peers::MongoConfig::decode(options).context(err)?; + let mongo_config = pt::peerdb_peers::MongoConfig::decode(options).context(err)?; Ok(Some(Config::MongoConfig(mongo_config))) } Some(DbType::Eventhub) => { @@ -326,8 +326,7 @@ impl Catalog { } Some(DbType::S3) => { let err = format!("unable to decode {} options for peer {}", "s3", name); - let s3_config = - pt::peerdb_peers::S3Config::decode(options).context(err)?; + let s3_config = pt::peerdb_peers::S3Config::decode(options).context(err)?; Ok(Some(Config::S3Config(s3_config))) } Some(DbType::Sqlserver) => { @@ -342,8 +341,7 @@ impl Catalog { "eventhub_group", name ); let eventhub_group_config = - pt::peerdb_peers::EventHubGroupConfig::decode(options) - .context(err)?; + pt::peerdb_peers::EventHubGroupConfig::decode(options).context(err)?; Ok(Some(Config::EventhubGroupConfig(eventhub_group_config))) } None => Ok(None), @@ -428,15 +426,21 @@ impl Catalog { .await?; let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| { + let flow_opts_opt: Option = row.get("flow_metadata"); + let flow_opts: HashMap = match flow_opts_opt { + Some(flow_opts) => serde_json::from_value(flow_opts) + .context("unable to deserialize flow options") + .unwrap_or_default(), + None => HashMap::new(), + }; + QRepFlowJob { name: row.get("name"), source_peer: row.get("source_peer_name"), target_peer: row.get("destination_peer_name"), description: row.get("description"), query_string: row.get("query_string"), - flow_options: serde_json::from_value(row.get("flow_metadata")) - .context("unable to deserialize flow options") - .unwrap_or_default(), + flow_options: flow_opts, // we set the disabled flag to false by default disabled: false, } @@ -465,6 +469,10 @@ impl Catalog { ) .await?; + if job.flow_options.get("destination_table_name").is_none() { + return Err(anyhow!("destination_table_name not found in flow options")); + } + let _rows = self .pg .execute( @@ -572,4 +580,24 @@ impl Catalog { let peer_count: i64 = peer_check.get(0); Ok(peer_count) } + + pub async fn get_qrep_config_proto( + &self, + flow_job_name: &str, + ) -> anyhow::Result> { + let row = self + .pg + .query_opt( + "SELECT config_proto FROM flows WHERE name=$1 AND query_string IS NOT NULL", + &[&flow_job_name], + ) + .await?; + + Ok(match row { + Some(row) => Some(pt::peerdb_flow::QRepConfig::decode::<&[u8]>( + row.get("config_proto"), + )?), + None => None, + }) + } } diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index e149d16eb5..f355ddde07 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -75,7 +75,7 @@ impl FlowGrpcClient { }) } - async fn start_query_replication_flow( + pub async fn start_query_replication_flow( &mut self, qrep_config: &pt::peerdb_flow::QRepConfig, ) -> anyhow::Result { diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index d6a80c52a7..cd8454c977 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1,4 +1,8 @@ -use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use analyzer::{PeerDDL, QueryAssocation}; use async_trait::async_trait; @@ -446,14 +450,24 @@ impl NexusBackend { let mut destinations = HashSet::with_capacity(table_mappings_count); for tm in flow_job.table_mappings.iter() { if !sources.insert(tm.source_table_identifier.as_str()) { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier), - }))) + return Err(PgWireError::ApiError(Box::new( + PgError::Internal { + err_msg: format!( + "Duplicate source table identifier {}", + tm.source_table_identifier + ), + }, + ))); } if !destinations.insert(tm.destination_table_identifier.as_str()) { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier), - }))) + return Err(PgWireError::ApiError(Box::new( + PgError::Internal { + err_msg: format!( + "Duplicate destination table identifier {}", + tm.destination_table_identifier + ), + }, + ))); } } } @@ -605,18 +619,23 @@ impl NexusBackend { err_msg: "flow service is not configured".to_owned(), }))); } - // retrieve the mirror job since DROP MIRROR will delete the row later. - let catalog = self.catalog.lock().await; - let qrep_job = catalog - .get_qrep_flow_job_by_name(mirror_name) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("error while getting QRep flow job: {:?}", err), - })) - })?; - // unlock the mutex so it can be used by the functions - std::mem::drop(catalog); + + let qrep_config = { + // retrieve the mirror job since DROP MIRROR will delete the row later. + let catalog = self.catalog.lock().await; + catalog + .get_qrep_config_proto(mirror_name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "error while getting QRep flow job: {:?}", + err + ), + })) + })? + }; + self.handle_drop_mirror(&NexusStatement::PeerDDL { // not supposed to be used by the function stmt: sqlparser::ast::Statement::ExecuteMirror { @@ -630,26 +649,42 @@ impl NexusBackend { .await?; // if it is none and DROP MIRROR didn't error out, either mirror doesn't exist or it is a CDC mirror. - match qrep_job { - Some(mut qrep_job) => { + match qrep_config { + Some(mut qrep_config) => { if query_string.is_some() { - qrep_job.query_string = query_string.as_ref().unwrap().clone(); + qrep_config.query = query_string.as_ref().unwrap().clone(); } - qrep_job.flow_options.insert( - "dst_table_full_resync".to_string(), - serde_json::value::Value::Bool(true), - ); - self.handle_create_mirror_for_select(&NexusStatement::PeerDDL { - // not supposed to be used by the function - stmt: sqlparser::ast::Statement::ExecuteMirror { - mirror_name: "no".into(), - }, - ddl: Box::new(PeerDDL::CreateMirrorForSelect { - if_not_exists: false, - qrep_flow_job: qrep_job, - }), - }) - .await?; + qrep_config.dst_table_full_resync = true; + + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + let workflow_id = flow_handler + .start_query_replication_flow(&qrep_config) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "error while starting new QRep job: {:?}", + err + ), + })) + })?; + // relock catalog, DROP MIRROR is done with it now + let catalog = self.catalog.lock().await; + catalog + .update_workflow_id_for_flow_job( + &qrep_config.flow_job_name, + &workflow_id, + ) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to update workflow for flow job: {:?}", + err + ), + })) + })?; + let resync_mirror_success = format!("RESYNC MIRROR {}", mirror_name); Ok(vec![Response::Execution(Tag::new_for_execution( &resync_mirror_success, @@ -674,7 +709,7 @@ impl NexusBackend { err_msg: "flow service is not configured".to_owned(), }))); } - + let catalog = self.catalog.lock().await; tracing::info!( "[PAUSE MIRROR] mirror_name: {}, if_exists: {}", @@ -696,7 +731,7 @@ impl NexusBackend { "[PAUSE MIRROR] got workflow id: {:?}", workflow_details.as_ref().map(|w| &w.workflow_id) ); - + if let Some(workflow_details) = workflow_details { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler @@ -725,7 +760,7 @@ impl NexusBackend { format!("no such mirror: {:?}", flow_job_name), )))) } - }, + } PeerDDL::ResumeMirror { if_exists, flow_job_name, From 7bb737e659dbf29fe64b857e762485daff63d947 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 10 Nov 2023 19:23:06 +0000 Subject: [PATCH 2/2] ignore ci assets, test results, private directories (#638) --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.gitignore b/.gitignore index 07ce752e93..0b458e3e8c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,7 @@ tmp/ .envrc .idea/ + +private/ +nexus/server/tests/assets/*.json +nexus/server/tests/results/actual/