diff --git a/.gitignore b/.gitignore index 07ce752e93..0b458e3e8c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,7 @@ tmp/ .envrc .idea/ + +private/ +nexus/server/tests/assets/*.json +nexus/server/tests/results/actual/ diff --git a/nexus/.gitignore b/nexus/.gitignore index b9b2a160cc..86adb245ec 100644 --- a/nexus/.gitignore +++ b/nexus/.gitignore @@ -4,5 +4,8 @@ target/ # submodules /sqlparser-rs +# catalog pkg +/catalog/pkg + # misc .env diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 55078366fd..633ad95b36 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -10,6 +10,7 @@ use pt::{ peerdb_peers::PostgresConfig, peerdb_peers::{peer::Config, DbType, Peer}, }; +use serde_json::Value; use tokio_postgres::{types, Client}; mod embedded { @@ -73,7 +74,7 @@ impl CatalogConfig { password: self.password.clone(), database: self.database.clone(), transaction_snapshot: "".to_string(), - metadata_schema: Some("".to_string()) + metadata_schema: Some("".to_string()), } } @@ -308,8 +309,7 @@ impl Catalog { } Some(DbType::Mongo) => { let err = format!("unable to decode {} options for peer {}", "mongo", name); - let mongo_config = - pt::peerdb_peers::MongoConfig::decode(options).context(err)?; + let mongo_config = pt::peerdb_peers::MongoConfig::decode(options).context(err)?; Ok(Some(Config::MongoConfig(mongo_config))) } Some(DbType::Eventhub) => { @@ -326,8 +326,7 @@ impl Catalog { } Some(DbType::S3) => { let err = format!("unable to decode {} options for peer {}", "s3", name); - let s3_config = - pt::peerdb_peers::S3Config::decode(options).context(err)?; + let s3_config = pt::peerdb_peers::S3Config::decode(options).context(err)?; Ok(Some(Config::S3Config(s3_config))) } Some(DbType::Sqlserver) => { @@ -342,8 +341,7 @@ impl Catalog { "eventhub_group", name ); let eventhub_group_config = - pt::peerdb_peers::EventHubGroupConfig::decode(options) - .context(err)?; + pt::peerdb_peers::EventHubGroupConfig::decode(options).context(err)?; Ok(Some(Config::EventhubGroupConfig(eventhub_group_config))) } None => Ok(None), @@ -428,15 +426,21 @@ impl Catalog { .await?; let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| { + let flow_opts_opt: Option = row.get("flow_metadata"); + let flow_opts: HashMap = match flow_opts_opt { + Some(flow_opts) => serde_json::from_value(flow_opts) + .context("unable to deserialize flow options") + .unwrap_or_default(), + None => HashMap::new(), + }; + QRepFlowJob { name: row.get("name"), source_peer: row.get("source_peer_name"), target_peer: row.get("destination_peer_name"), description: row.get("description"), query_string: row.get("query_string"), - flow_options: serde_json::from_value(row.get("flow_metadata")) - .context("unable to deserialize flow options") - .unwrap_or_default(), + flow_options: flow_opts, // we set the disabled flag to false by default disabled: false, } @@ -465,6 +469,10 @@ impl Catalog { ) .await?; + if job.flow_options.get("destination_table_name").is_none() { + return Err(anyhow!("destination_table_name not found in flow options")); + } + let _rows = self .pg .execute( @@ -572,4 +580,24 @@ impl Catalog { let peer_count: i64 = peer_check.get(0); Ok(peer_count) } + + pub async fn get_qrep_config_proto( + &self, + flow_job_name: &str, + ) -> anyhow::Result> { + let row = self + .pg + .query_opt( + "SELECT config_proto FROM flows WHERE name=$1 AND query_string IS NOT NULL", + &[&flow_job_name], + ) + .await?; + + Ok(match row { + Some(row) => Some(pt::peerdb_flow::QRepConfig::decode::<&[u8]>( + row.get("config_proto"), + )?), + None => None, + }) + } } diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index e149d16eb5..f355ddde07 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -75,7 +75,7 @@ impl FlowGrpcClient { }) } - async fn start_query_replication_flow( + pub async fn start_query_replication_flow( &mut self, qrep_config: &pt::peerdb_flow::QRepConfig, ) -> anyhow::Result { diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index d6a80c52a7..cd8454c977 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1,4 +1,8 @@ -use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use analyzer::{PeerDDL, QueryAssocation}; use async_trait::async_trait; @@ -446,14 +450,24 @@ impl NexusBackend { let mut destinations = HashSet::with_capacity(table_mappings_count); for tm in flow_job.table_mappings.iter() { if !sources.insert(tm.source_table_identifier.as_str()) { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier), - }))) + return Err(PgWireError::ApiError(Box::new( + PgError::Internal { + err_msg: format!( + "Duplicate source table identifier {}", + tm.source_table_identifier + ), + }, + ))); } if !destinations.insert(tm.destination_table_identifier.as_str()) { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier), - }))) + return Err(PgWireError::ApiError(Box::new( + PgError::Internal { + err_msg: format!( + "Duplicate destination table identifier {}", + tm.destination_table_identifier + ), + }, + ))); } } } @@ -605,18 +619,23 @@ impl NexusBackend { err_msg: "flow service is not configured".to_owned(), }))); } - // retrieve the mirror job since DROP MIRROR will delete the row later. - let catalog = self.catalog.lock().await; - let qrep_job = catalog - .get_qrep_flow_job_by_name(mirror_name) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("error while getting QRep flow job: {:?}", err), - })) - })?; - // unlock the mutex so it can be used by the functions - std::mem::drop(catalog); + + let qrep_config = { + // retrieve the mirror job since DROP MIRROR will delete the row later. + let catalog = self.catalog.lock().await; + catalog + .get_qrep_config_proto(mirror_name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "error while getting QRep flow job: {:?}", + err + ), + })) + })? + }; + self.handle_drop_mirror(&NexusStatement::PeerDDL { // not supposed to be used by the function stmt: sqlparser::ast::Statement::ExecuteMirror { @@ -630,26 +649,42 @@ impl NexusBackend { .await?; // if it is none and DROP MIRROR didn't error out, either mirror doesn't exist or it is a CDC mirror. - match qrep_job { - Some(mut qrep_job) => { + match qrep_config { + Some(mut qrep_config) => { if query_string.is_some() { - qrep_job.query_string = query_string.as_ref().unwrap().clone(); + qrep_config.query = query_string.as_ref().unwrap().clone(); } - qrep_job.flow_options.insert( - "dst_table_full_resync".to_string(), - serde_json::value::Value::Bool(true), - ); - self.handle_create_mirror_for_select(&NexusStatement::PeerDDL { - // not supposed to be used by the function - stmt: sqlparser::ast::Statement::ExecuteMirror { - mirror_name: "no".into(), - }, - ddl: Box::new(PeerDDL::CreateMirrorForSelect { - if_not_exists: false, - qrep_flow_job: qrep_job, - }), - }) - .await?; + qrep_config.dst_table_full_resync = true; + + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + let workflow_id = flow_handler + .start_query_replication_flow(&qrep_config) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "error while starting new QRep job: {:?}", + err + ), + })) + })?; + // relock catalog, DROP MIRROR is done with it now + let catalog = self.catalog.lock().await; + catalog + .update_workflow_id_for_flow_job( + &qrep_config.flow_job_name, + &workflow_id, + ) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to update workflow for flow job: {:?}", + err + ), + })) + })?; + let resync_mirror_success = format!("RESYNC MIRROR {}", mirror_name); Ok(vec![Response::Execution(Tag::new_for_execution( &resync_mirror_success, @@ -674,7 +709,7 @@ impl NexusBackend { err_msg: "flow service is not configured".to_owned(), }))); } - + let catalog = self.catalog.lock().await; tracing::info!( "[PAUSE MIRROR] mirror_name: {}, if_exists: {}", @@ -696,7 +731,7 @@ impl NexusBackend { "[PAUSE MIRROR] got workflow id: {:?}", workflow_details.as_ref().map(|w| &w.workflow_id) ); - + if let Some(workflow_details) = workflow_details { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; flow_handler @@ -725,7 +760,7 @@ impl NexusBackend { format!("no such mirror: {:?}", flow_job_name), )))) } - }, + } PeerDDL::ResumeMirror { if_exists, flow_job_name,