Skip to content

Commit

Permalink
nexus/server: reject duplicate source tables or duplicate target tables
Browse files Browse the repository at this point in the history
Not sure how to write tests for this crate, but this updates psql interface to reject

CREATE MIRROR m FROM source TO target WITH (s.t1:s.t2, s.t1:s.t3)
& CREATE MIRROR m FROM source TO target WITH (s.t1:s.t3, s.t2:s.t3)

As this kind of mirror doesn't work today & is support is not prioritized

This leaves capability to create these kinds of broken mirrors over grpc open,
that should be alright since this change's goal is to fail fast for users to know what's allowed
  • Loading branch information
serprex committed Nov 9, 2023
1 parent 858bcbc commit 9bc0086
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion nexus/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::{HashMap, HashSet}, sync::Arc, time::Duration};

use analyzer::{PeerDDL, QueryAssocation};
use async_trait::async_trait;
Expand Down Expand Up @@ -439,6 +439,25 @@ impl NexusBackend {
let catalog = self.catalog.lock().await;
let mirror_details = Self::check_for_mirror(&catalog, &flow_job.name).await?;
if mirror_details.is_none() {
// reject duplicate source tables or duplicate target tables
let table_mappings_count = flow_job.table_mappings.len();
if table_mappings_count > 1 {
let mut sources = HashSet::with_capacity(table_mappings_count);
let mut destinations = HashSet::with_capacity(table_mappings_count);
for tm in flow_job.table_mappings.iter() {
if !sources.insert(tm.source_table_identifier.as_str()) {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("Duplicate source table identifier {}", tm.source_table_identifier),
})))
}
if !destinations.insert(tm.destination_table_identifier.as_str()) {
return Err(PgWireError::ApiError(Box::new(PgError::Internal {
err_msg: format!("Duplicate destination table identifier {}", tm.destination_table_identifier),
})))
}
}
}

catalog
.create_cdc_flow_job_entry(flow_job)
.await
Expand Down

0 comments on commit 9bc0086

Please sign in to comment.