diff --git a/flow/activities/flowable.go b/flow/activities/flowable.go index 2c4e1cbb7..9cefbecc9 100644 --- a/flow/activities/flowable.go +++ b/flow/activities/flowable.go @@ -680,7 +680,7 @@ func (a *FlowableActivity) SendWALHeartbeat(ctx context.Context, config *protos. func (a *FlowableActivity) QRepWaitUntilNewRows(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) error { - if config.SourcePeer.Type != protos.DBType_POSTGRES { + if config.SourcePeer.Type != protos.DBType_POSTGRES || last.Range == nil { return nil } waitBetweenBatches := 5 * time.Second diff --git a/flow/workflows/qrep_flow.go b/flow/workflows/qrep_flow.go index 2be185666..45f4fdab9 100644 --- a/flow/workflows/qrep_flow.go +++ b/flow/workflows/qrep_flow.go @@ -313,7 +313,7 @@ func (q *QRepFlowExecution) handleTableRenameForResync(ctx workflow.Context, sta } q.config.DestinationTableIdentifier = oldTableIdentifier } - state.NeedsResync = true + state.NeedsResync = false return nil } diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 14ff6b285..b97077f57 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -123,6 +123,11 @@ pub enum PeerDDL { if_exists: bool, flow_job_name: String, }, + ResyncMirror { + if_exists: bool, + mirror_name: String, + query_string: Option, + }, } impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { @@ -378,6 +383,27 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> { if_exists: *if_exists, peer_name: peer_name.to_string().to_lowercase(), })), + Statement::ResyncMirror { + if_exists, + mirror_name, + with_options, + } => { + let mut raw_options = HashMap::new(); + for option in with_options { + raw_options.insert(&option.name.value as &str, &option.value); + } + + let query_string = match raw_options.remove("query_string") { + Some(sqlparser::ast::Value::SingleQuotedString(s)) => Some(s.clone()), + _ => None, + }; + + Ok(Some(PeerDDL::ResyncMirror { + if_exists: *if_exists, + mirror_name: mirror_name.to_string().to_lowercase(), + query_string, + })) + } _ => Ok(None), } } diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index bf00addf8..55078366f 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -365,7 +365,7 @@ impl Catalog { Ok(table_identifier_parts.join(".")) } - pub async fn create_flow_job_entry(&self, job: &FlowJob) -> anyhow::Result<()> { + pub async fn create_cdc_flow_job_entry(&self, job: &FlowJob) -> anyhow::Result<()> { let source_peer_id = self .get_peer_id_i32(&job.source_peer) .await @@ -424,7 +424,7 @@ impl Catalog { .prepare_typed("SELECT f.*, sp.name as source_peer_name, dp.name as destination_peer_name FROM flows as f INNER JOIN peers as sp ON f.source_peer = sp.id INNER JOIN peers as dp ON f.destination_peer = dp.id - WHERE f.name = $1", &[types::Type::TEXT]) + WHERE f.name = $1 AND f.query_string IS NOT NULL", &[types::Type::TEXT]) .await?; let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| { diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index d36895268..b01109399 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -251,13 +251,155 @@ impl NexusBackend { } } + async fn handle_drop_mirror<'a>( + &self, + drop_mirror_stmt: &NexusStatement, + ) -> PgWireResult>> { + match drop_mirror_stmt { + NexusStatement::PeerDDL { stmt: _, ddl } => match ddl.as_ref() { + PeerDDL::DropMirror { + if_exists, + flow_job_name, + } => { + if self.flow_handler.is_none() { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: "flow service is not configured".to_owned(), + }))); + } + + let catalog = self.catalog.lock().await; + tracing::info!( + "DROP MIRROR: mirror_name: {}, if_exists: {}", + flow_job_name, + if_exists + ); + let workflow_details = catalog + .get_workflow_details_for_flow_job(flow_job_name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to query catalog for job metadata: {:?}", + err + ), + })) + })?; + tracing::info!( + "got workflow id: {:?}", + workflow_details.as_ref().map(|w| &w.workflow_id) + ); + if let Some(workflow_details) = workflow_details { + let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; + flow_handler + .shutdown_flow_job(flow_job_name, workflow_details) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to shutdown flow job: {:?}", err), + })) + })?; + catalog + .delete_flow_job_entry(flow_job_name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("unable to delete job metadata: {:?}", err), + })) + })?; + let drop_mirror_success = format!("DROP MIRROR {}", flow_job_name); + Ok(vec![Response::Execution(Tag::new_for_execution( + &drop_mirror_success, + None, + ))]) + } else if *if_exists { + let no_mirror_success = "NO SUCH MIRROR"; + Ok(vec![Response::Execution(Tag::new_for_execution( + no_mirror_success, + None, + ))]) + } else { + Err(PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "error".to_owned(), + format!("no such mirror: {:?}", flow_job_name), + )))) + } + } + _ => unreachable!(), + }, + _ => unreachable!(), + } + } + + async fn handle_create_mirror_for_select<'a>( + &self, + create_mirror_stmt: &NexusStatement, + ) -> PgWireResult>> { + match create_mirror_stmt { + NexusStatement::PeerDDL { stmt: _, ddl } => match ddl.as_ref() { + PeerDDL::CreateMirrorForSelect { + if_not_exists, + qrep_flow_job, + } => { + if self.flow_handler.is_none() { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: "flow service is not configured".to_owned(), + }))); + } + let mirror_details; + { + let catalog = self.catalog.lock().await; + mirror_details = + Self::check_for_mirror(&catalog, &qrep_flow_job.name).await?; + } + if mirror_details.is_none() { + { + let catalog = self.catalog.lock().await; + catalog + .create_qrep_flow_job_entry(qrep_flow_job) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to create mirror job entry: {:?}", + err + ), + })) + })?; + } + + if qrep_flow_job.disabled { + let create_mirror_success = + format!("CREATE MIRROR {}", qrep_flow_job.name); + return Ok(vec![Response::Execution(Tag::new_for_execution( + &create_mirror_success, + None, + ))]); + } + + let _workflow_id = self.run_qrep_mirror(qrep_flow_job).await?; + let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); + Ok(vec![Response::Execution(Tag::new_for_execution( + &create_mirror_success, + None, + ))]) + } else { + Self::handle_mirror_existence(*if_not_exists, qrep_flow_job.name.clone()) + } + } + _ => unreachable!(), + }, + _ => unreachable!(), + } + } + async fn handle_query<'a>( &self, nexus_stmt: NexusStatement, ) -> PgWireResult>> { let mut peer_holder: Option> = None; match nexus_stmt { - NexusStatement::PeerDDL { stmt: _, ddl } => match ddl.as_ref() { + NexusStatement::PeerDDL { stmt: _, ref ddl } => match ddl.as_ref() { PeerDDL::CreatePeer { peer, if_not_exists: _, @@ -298,7 +440,7 @@ impl NexusBackend { let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?; if mirror_details.is_none() { catalog - .create_flow_job_entry(flow_job) + .create_cdc_flow_job_entry(flow_job) .await .map_err(|err| { PgWireError::ApiError(Box::new(PgError::Internal { @@ -346,55 +488,8 @@ impl NexusBackend { Self::handle_mirror_existence(*if_not_exists, flow_job.name.clone()) } } - PeerDDL::CreateMirrorForSelect { - if_not_exists, - qrep_flow_job, - } => { - if self.flow_handler.is_none() { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: "flow service is not configured".to_owned(), - }))); - } - let mirror_details; - { - let catalog = self.catalog.lock().await; - mirror_details = - Self::check_for_mirror(&catalog, &qrep_flow_job.name).await?; - } - if mirror_details.is_none() { - { - let catalog = self.catalog.lock().await; - catalog - .create_qrep_flow_job_entry(qrep_flow_job) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to create mirror job entry: {:?}", - err - ), - })) - })?; - } - - if qrep_flow_job.disabled { - let create_mirror_success = - format!("CREATE MIRROR {}", qrep_flow_job.name); - return Ok(vec![Response::Execution(Tag::new_for_execution( - &create_mirror_success, - None, - ))]); - } - - let _workflow_id = self.run_qrep_mirror(qrep_flow_job).await?; - let create_mirror_success = format!("CREATE MIRROR {}", qrep_flow_job.name); - Ok(vec![Response::Execution(Tag::new_for_execution( - &create_mirror_success, - None, - ))]) - } else { - Self::handle_mirror_existence(*if_not_exists, qrep_flow_job.name.clone()) - } + PeerDDL::CreateMirrorForSelect { .. } => { + self.handle_create_mirror_for_select(&nexus_stmt).await } PeerDDL::ExecuteMirrorForSelect { flow_job_name } => { if self.flow_handler.is_none() { @@ -428,70 +523,7 @@ impl NexusBackend { )))) } } - PeerDDL::DropMirror { - if_exists, - flow_job_name, - } => { - if self.flow_handler.is_none() { - return Err(PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: "flow service is not configured".to_owned(), - }))); - } - - let catalog = self.catalog.lock().await; - tracing::info!("mirror_name: {}, if_exists: {}", flow_job_name, if_exists); - let workflow_details = catalog - .get_workflow_details_for_flow_job(flow_job_name) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to query catalog for job metadata: {:?}", - err - ), - })) - })?; - tracing::info!( - "got workflow id: {:?}", - workflow_details.as_ref().map(|w| &w.workflow_id) - ); - if let Some(workflow_details) = workflow_details { - let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; - flow_handler - .shutdown_flow_job(flow_job_name, workflow_details) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to shutdown flow job: {:?}", err), - })) - })?; - catalog - .delete_flow_job_entry(flow_job_name) - .await - .map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!("unable to delete job metadata: {:?}", err), - })) - })?; - let drop_mirror_success = format!("DROP MIRROR {}", flow_job_name); - Ok(vec![Response::Execution(Tag::new_for_execution( - &drop_mirror_success, - None, - ))]) - } else if *if_exists { - let no_mirror_success = "NO SUCH MIRROR"; - Ok(vec![Response::Execution(Tag::new_for_execution( - no_mirror_success, - None, - ))]) - } else { - Err(PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "error".to_owned(), - format!("no such mirror: {:?}", flow_job_name), - )))) - } - } + PeerDDL::DropMirror { .. } => self.handle_drop_mirror(&nexus_stmt).await, PeerDDL::DropPeer { if_exists, peer_name, @@ -503,16 +535,19 @@ impl NexusBackend { } let catalog = self.catalog.lock().await; - tracing::info!("drop peer_name: {}, if_exists: {}", peer_name, if_exists); - let peer_exists = - catalog.check_peer_entry(peer_name).await.map_err(|err| { - PgWireError::ApiError(Box::new(PgError::Internal { - err_msg: format!( - "unable to query catalog for peer metadata: {:?}", - err - ), - })) - })?; + tracing::info!( + "DROP PEER: peer_name: {}, if_exists: {}", + peer_name, + if_exists + ); + let peer_exists = catalog.check_peer_entry(peer_name).await.map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!( + "unable to query catalog for peer metadata: {:?}", + err + ), + })) + })?; tracing::info!("peer exist count: {}", peer_exists); if peer_exists != 0 { let mut flow_handler = self.flow_handler.as_ref().unwrap().lock().await; @@ -540,6 +575,76 @@ impl NexusBackend { )))) } } + PeerDDL::ResyncMirror { + mirror_name, + query_string, + .. + } => { + if self.flow_handler.is_none() { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: "flow service is not configured".to_owned(), + }))); + } + // retrieve the mirror job since DROP MIRROR will delete the row later. + let catalog = self.catalog.lock().await; + let qrep_job = catalog + .get_qrep_flow_job_by_name(mirror_name) + .await + .map_err(|err| { + PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("error while getting QRep flow job: {:?}", err), + })) + })?; + // unlock the mutex so it can be used by the functions + std::mem::drop(catalog); + self.handle_drop_mirror(&NexusStatement::PeerDDL { + // not supposed to be used by the function + stmt: sqlparser::ast::Statement::ExecuteMirror { + mirror_name: "no".into(), + }, + ddl: Box::new(PeerDDL::DropMirror { + if_exists: false, + flow_job_name: mirror_name.to_string(), + }), + }) + .await?; + + // if it is none and DROP MIRROR didn't error out, either mirror doesn't exist or it is a CDC mirror. + match qrep_job { + Some(mut qrep_job) => { + if query_string.is_some() { + qrep_job.query_string = query_string.as_ref().unwrap().clone(); + } + qrep_job.flow_options.insert( + "dst_table_full_resync".to_string(), + serde_json::value::Value::Bool(true), + ); + self.handle_create_mirror_for_select(&NexusStatement::PeerDDL { + // not supposed to be used by the function + stmt: sqlparser::ast::Statement::ExecuteMirror { + mirror_name: "no".into(), + }, + ddl: Box::new(PeerDDL::CreateMirrorForSelect { + if_not_exists: false, + qrep_flow_job: qrep_job, + }), + }) + .await?; + let resync_mirror_success = format!("RESYNC MIRROR {}", mirror_name); + Ok(vec![Response::Execution(Tag::new_for_execution( + &resync_mirror_success, + None, + ))]) + } + None => { + let no_peer_success = "NO SUCH QREP MIRROR"; + Ok(vec![Response::Execution(Tag::new_for_execution( + no_peer_success, + None, + ))]) + } + } + } }, NexusStatement::PeerQuery { stmt, assoc } => { // get the query executor @@ -692,7 +797,10 @@ fn parameter_to_string(portal: &Portal, idx: usize) -> PgW match param_type { &Type::VARCHAR | &Type::TEXT => Ok(format!( "'{}'", - portal.parameter::(idx, param_type)?.as_deref().unwrap_or("") + portal + .parameter::(idx, param_type)? + .as_deref() + .unwrap_or("") )), &Type::BOOL => Ok(portal .parameter::(idx, param_type)? diff --git a/nexus/sqlparser-rs b/nexus/sqlparser-rs index 123247b82..19e9fc9cc 160000 --- a/nexus/sqlparser-rs +++ b/nexus/sqlparser-rs @@ -1 +1 @@ -Subproject commit 123247b82e0c7868cf4d0aa9bae42c03f1e6a2e0 +Subproject commit 19e9fc9cc28419abe08d89d4db4c50b578e7e387