From 1dcfb097d2c497e609ad03b9e53afcc66eb86996 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 10 Nov 2023 00:00:37 +0000 Subject: [PATCH] nexus/server: reject duplicate source tables or duplicate target tables (#632) Not sure how to write tests for this crate, but this updates psql interface to reject ```sql CREATE MIRROR m FROM source TO target WITH (s.t1:s.t2, s.t1:s.t3) CREATE MIRROR m FROM source TO target WITH (s.t1:s.t3, s.t2:s.t3) ``` As this kind of mirror doesn't work today & is support is not prioritized This leaves capability to create these kinds of broken mirrors over grpc open, that should be alright since this change's goal is to fail fast for users to know what's allowed --- nexus/server/src/main.rs | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 578c0f5f4f..d6a80c52a7 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration}; use analyzer::{PeerDDL, QueryAssocation}; use async_trait::async_trait; @@ -439,6 +439,25 @@ impl NexusBackend { let catalog = self.catalog.lock().await; let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?; if mirror_details.is_none() { + // reject duplicate source tables or duplicate target tables + let table_mappings_count = flow_job.table_mappings.len(); + if table_mappings_count > 1 { + let mut sources = HashSet::with_capacity(table_mappings_count); + let mut destinations = HashSet::with_capacity(table_mappings_count); + for tm in flow_job.table_mappings.iter() { + if !sources.insert(tm.source_table_identifier.as_str()) { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier), + }))) + } + if !destinations.insert(tm.destination_table_identifier.as_str()) { + return Err(PgWireError::ApiError(Box::new(PgError::Internal { + err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier), + }))) + } + } + } + catalog .create_cdc_flow_job_entry(flow_job) .await