diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 578c0f5f4f..d6a80c52a7 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration}; use analyzer::{PeerDDL, QueryAssocation}; use async_trait::async_trait; @@ -439,6 +439,25 @@ impl NexusBackend { let catalog = self.catalog.lock().await; let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?; if mirror_details.is_none() { + // reject duplicate source tables or duplicate target tables + let table_mappings_count = flow_job.table_mappings.len(); + if table_mappings_count > 1 { + let mut sources = HashSet::with_capacity(table_mappings_count); + let mut destinations = HashSet::with_capacity(table_mappings_count); + for tm in flow_job.table_mappings.iter() { + if !sources.insert(tm.source_table_identifier.as_str()) { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier), + }))) + } + if !destinations.insert(tm.destination_table_identifier.as_str()) { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier), + }))) + } + } + } + catalog .create_cdc_flow_job_entry(flow_job) .await