Skip to content

Commit

Permalink
Specifically route ROLLBACK to catalog
Browse files Browse the repository at this point in the history
When error occurred in transaction catalog could not query peers,
causing it to error when ROLLBACK submitted. Skip querying peers on ROLLBACK
  • Loading branch information
serprex committed Apr 26, 2024
1 parent dddf6e8 commit 59191c2
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 17 deletions.
2 changes: 1 addition & 1 deletion nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Catalog {

pub async fn create_peer(&self, peer: &Peer) -> anyhow::Result<i64> {
let config_blob = {
let config = peer.config.clone().context("invalid peer config")?;
let config = peer.config.as_ref().context("invalid peer config")?;
match config {
Config::SnowflakeConfig(snowflake_config) => snowflake_config.encode_to_vec(),
Config::BigqueryConfig(bigquery_config) => bigquery_config.encode_to_vec(),
Expand Down
39 changes: 23 additions & 16 deletions nexus/parser/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum NexusStatement {
stmt: Statement,
cursor: CursorEvent,
},
Rollback {
stmt: Statement,
},
Empty,
}

Expand All @@ -41,16 +44,13 @@ impl NexusStatement {
peers: HashMap<String, pt::peerdb_peers::Peer>,
stmt: &Statement,
) -> PgWireResult<Self> {
let ddl = {
let pdl: PeerDDLAnalyzer = PeerDDLAnalyzer;
pdl.analyze(stmt).map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
e.to_string(),
)))
})
}?;
let ddl = PeerDDLAnalyzer.analyze(stmt).map_err(|e| {
PgWireError::UserError(Box::new(ErrorInfo::new(
"ERROR".to_owned(),
"internal_error".to_owned(),
e.to_string(),
)))
})?;

if let Some(ddl) = ddl {
return Ok(NexusStatement::PeerDDL {
Expand Down Expand Up @@ -126,12 +126,19 @@ impl NexusQueryParser {
})
} else {
let stmt = stmts.remove(0);
let peers = self.get_peers_bridge().await?;
let nexus_stmt = NexusStatement::new(peers, &stmt)?;
Ok(NexusParsedStatement {
statement: nexus_stmt,
query: sql.to_owned(),
})
if matches!(stmt, Statement::Rollback { .. }) {
Ok(NexusParsedStatement {
statement: NexusStatement::Rollback { stmt },
query: sql.to_owned(),
})
} else {
let peers = self.get_peers_bridge().await?;
let nexus_stmt = NexusStatement::new(peers, &stmt)?;
Ok(NexusParsedStatement {
statement: nexus_stmt,
query: sql.to_owned(),
})
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,10 @@ impl NexusBackend {
self.execute_statement(executor.as_ref(), &stmt, None).await
}

NexusStatement::Rollback { stmt } => {
self.execute_statement(self.catalog.as_ref(), &stmt, None).await
}

NexusStatement::Empty => Ok(vec![Response::EmptyQuery]),
}
}
Expand Down Expand Up @@ -985,6 +989,7 @@ impl ExtendedQueryHandler for NexusBackend {
NexusStatement::PeerDDL { .. } => Ok(DescribeResponse::no_data()),
NexusStatement::PeerCursor { .. } => Ok(DescribeResponse::no_data()),
NexusStatement::Empty => Ok(DescribeResponse::no_data()),
NexusStatement::Rollback { .. } => Ok(DescribeResponse::no_data()),
NexusStatement::PeerQuery { stmt, assoc } => {
let schema: Option<Schema> = match assoc {
QueryAssociation::Peer(peer) => {
Expand Down

0 comments on commit 59191c2

Please sign in to comment.