From 59191c20af65d7e7d90a39a8be938596ae376354 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Fri, 26 Apr 2024 17:01:17 +0000 Subject: [PATCH] Specifically route ROLLBACK to catalog When error occurred in transaction catalog could not query peers, causing it to error when ROLLBACK submitted. Skip querying peers on ROLLBACK --- nexus/catalog/src/lib.rs | 2 +- nexus/parser/src/lib.rs | 39 +++++++++++++++++++++++---------------- nexus/server/src/main.rs | 5 +++++ 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/nexus/catalog/src/lib.rs b/nexus/catalog/src/lib.rs index 0f67d550d5..0c3b65d2b8 100644 --- a/nexus/catalog/src/lib.rs +++ b/nexus/catalog/src/lib.rs @@ -87,7 +87,7 @@ impl Catalog { pub async fn create_peer(&self, peer: &Peer) -> anyhow::Result { let config_blob = { - let config = peer.config.clone().context("invalid peer config")?; + let config = peer.config.as_ref().context("invalid peer config")?; match config { Config::SnowflakeConfig(snowflake_config) => snowflake_config.encode_to_vec(), Config::BigqueryConfig(bigquery_config) => bigquery_config.encode_to_vec(), diff --git a/nexus/parser/src/lib.rs b/nexus/parser/src/lib.rs index deb16af505..491693a4c5 100644 --- a/nexus/parser/src/lib.rs +++ b/nexus/parser/src/lib.rs @@ -33,6 +33,9 @@ pub enum NexusStatement { stmt: Statement, cursor: CursorEvent, }, + Rollback { + stmt: Statement, + }, Empty, } @@ -41,16 +44,13 @@ impl NexusStatement { peers: HashMap, stmt: &Statement, ) -> PgWireResult { - let ddl = { - let pdl: PeerDDLAnalyzer = PeerDDLAnalyzer; - pdl.analyze(stmt).map_err(|e| { - PgWireError::UserError(Box::new(ErrorInfo::new( - "ERROR".to_owned(), - "internal_error".to_owned(), - e.to_string(), - ))) - }) - }?; + let ddl = PeerDDLAnalyzer.analyze(stmt).map_err(|e| { + PgWireError::UserError(Box::new(ErrorInfo::new( + "ERROR".to_owned(), + "internal_error".to_owned(), + e.to_string(), + ))) + })?; if let Some(ddl) = ddl { return Ok(NexusStatement::PeerDDL { @@ -126,12 +126,19 @@ impl NexusQueryParser { }) } else { let stmt = stmts.remove(0); - let peers = self.get_peers_bridge().await?; - let nexus_stmt = NexusStatement::new(peers, &stmt)?; - Ok(NexusParsedStatement { - statement: nexus_stmt, - query: sql.to_owned(), - }) + if matches!(stmt, Statement::Rollback { .. }) { + Ok(NexusParsedStatement { + statement: NexusStatement::Rollback { stmt }, + query: sql.to_owned(), + }) + } else { + let peers = self.get_peers_bridge().await?; + let nexus_stmt = NexusStatement::new(peers, &stmt)?; + Ok(NexusParsedStatement { + statement: nexus_stmt, + query: sql.to_owned(), + }) + } } } } diff --git a/nexus/server/src/main.rs b/nexus/server/src/main.rs index 872bd07fc9..941cc881a4 100644 --- a/nexus/server/src/main.rs +++ b/nexus/server/src/main.rs @@ -786,6 +786,10 @@ impl NexusBackend { self.execute_statement(executor.as_ref(), &stmt, None).await } + NexusStatement::Rollback { stmt } => { + self.execute_statement(self.catalog.as_ref(), &stmt, None).await + } + NexusStatement::Empty => Ok(vec![Response::EmptyQuery]), } } @@ -985,6 +989,7 @@ impl ExtendedQueryHandler for NexusBackend { NexusStatement::PeerDDL { .. } => Ok(DescribeResponse::no_data()), NexusStatement::PeerCursor { .. } => Ok(DescribeResponse::no_data()), NexusStatement::Empty => Ok(DescribeResponse::no_data()), + NexusStatement::Rollback { .. } => Ok(DescribeResponse::no_data()), NexusStatement::PeerQuery { stmt, assoc } => { let schema: Option = match assoc { QueryAssociation::Peer(peer) => {