Skip to content

Commit

Permalink
fix create peer if not exists
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed May 20, 2024
1 parent d991ded commit 783d50f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 11 deletions.
Empty file modified images/banner.jpg
100755 → 100644
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
14 changes: 11 additions & 3 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Catalog {
run_migrations(&mut self.pg).await
}

pub async fn create_peer(&self, peer: &Peer) -> anyhow::Result<i64> {
pub async fn create_peer(&self, peer: &Peer, if_not_exists: bool) -> anyhow::Result<i64> {
let config_blob = {
let config = peer.config.as_ref().context("invalid peer config")?;
match config {
Expand Down Expand Up @@ -116,9 +116,17 @@ impl Catalog {
)
.await?;

self.pg
if let Err(err) = self
.pg
.execute(&stmt, &[&peer.name, &peer.r#type, &config_blob])
.await?;
.await
{
if !if_not_exists
|| err.code() != Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION)
{
return Err(err.into());
}
}

self.get_peer_id(&peer.name).await
}
Expand Down
19 changes: 11 additions & 8 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl NexusBackend {
NexusStatement::PeerDDL { stmt: _, ref ddl } => match ddl.as_ref() {
PeerDDL::CreatePeer {
peer,
if_not_exists: _,
if_not_exists,
} => {
self.validate_peer(peer).await.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
Expand All @@ -365,13 +365,16 @@ impl NexusBackend {
)))
})?;

self.catalog.create_peer(peer.as_ref()).await.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
e.to_string(),
)))
})?;
self.catalog
.create_peer(peer.as_ref(), *if_not_exists)
.await
.map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
e.to_string(),
)))
})?;
Ok(vec![Response::Execution(Tag::new("OK"))])
}
PeerDDL::CreateMirrorForCDC {
Expand Down

0 comments on commit 783d50f

Please sign in to comment.