Skip to content

Commit

Permalink
nexus/catalog: fully qualify tables in queries
Browse files Browse the repository at this point in the history
Recently combined catalog connections for nexus connections,
now `SET search_path` queries change search_path for our backend connection, breaking things
  • Loading branch information
serprex committed Feb 2, 2024
1 parent cb8fc65 commit 21945c3
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl Catalog {
pub async fn get_peer_id_i32(&self, peer_name: &str) -> anyhow::Result<i32> {
let stmt = self
.pg
.prepare_typed("SELECT id FROM peers WHERE name = $1", &[types::Type::TEXT])
.prepare_typed("SELECT id FROM public.peers WHERE name = $1", &[types::Type::TEXT])
.await?;

self.pg
Expand All @@ -179,7 +179,7 @@ impl Catalog {
pub async fn get_peer_type_for_id(&self, peer_id: i32) -> anyhow::Result<DbType> {
let stmt = self
.pg
.prepare_typed("SELECT type FROM peers WHERE id = $1", &[types::Type::INT4])
.prepare_typed("SELECT type FROM public.peers WHERE id = $1", &[types::Type::INT4])
.await?;

self.pg
Expand All @@ -193,7 +193,7 @@ impl Catalog {
pub async fn get_peers(&self) -> anyhow::Result<HashMap<String, Peer>> {
let stmt = self
.pg
.prepare_typed("SELECT id, name, type, options FROM peers", &[])
.prepare_typed("SELECT id, name, type, options FROM public.peers", &[])
.await?;

let rows = self.pg.query(&stmt, &[]).await?;
Expand Down Expand Up @@ -222,7 +222,7 @@ impl Catalog {
let stmt = self
.pg
.prepare_typed(
"SELECT id, name, type, options FROM peers WHERE name = $1",
"SELECT id, name, type, options FROM public.peers WHERE name = $1",
&[],
)
.await?;
Expand Down Expand Up @@ -251,7 +251,7 @@ impl Catalog {
pub async fn get_peer_by_id(&self, peer_id: i32) -> anyhow::Result<Peer> {
let stmt = self
.pg
.prepare_typed("SELECT name, type, options FROM peers WHERE id = $1", &[])
.prepare_typed("SELECT name, type, options FROM public.peers WHERE id = $1", &[])
.await?;

let rows = self.pg.query(&stmt, &[&peer_id]).await?;
Expand Down Expand Up @@ -415,9 +415,9 @@ impl Catalog {
) -> anyhow::Result<Option<QRepFlowJob>> {
let stmt = self
.pg
.prepare_typed("SELECT f.*, sp.name as source_peer_name, dp.name as destination_peer_name FROM flows as f
INNER JOIN peers as sp ON f.source_peer = sp.id
INNER JOIN peers as dp ON f.destination_peer = dp.id
.prepare_typed("SELECT f.*, sp.name as source_peer_name, dp.name as destination_peer_name FROM public.flows as f
INNER JOIN public.peers as sp ON f.source_peer = sp.id
INNER JOIN public.peers as dp ON f.destination_peer = dp.id
WHERE f.name = $1 AND f.query_string IS NOT NULL", &[types::Type::TEXT])
.await?;

Expand Down Expand Up @@ -515,7 +515,7 @@ impl Catalog {
let rows = self
.pg
.query(
"SELECT workflow_id, source_peer, destination_peer FROM flows WHERE NAME = $1",
"SELECT workflow_id, source_peer, destination_peer FROM public.flows WHERE NAME = $1",
&[&flow_job_name],
)
.await?;
Expand Down Expand Up @@ -557,7 +557,7 @@ impl Catalog {
pub async fn delete_flow_job_entry(&self, flow_job_name: &str) -> anyhow::Result<()> {
let rows = self
.pg
.execute("DELETE FROM FLOWS WHERE NAME = $1", &[&flow_job_name])
.execute("DELETE FROM public.flows WHERE name = $1", &[&flow_job_name])
.await?;
if rows == 0 {
return Err(anyhow!("unable to delete flow job metadata"));
Expand All @@ -568,7 +568,7 @@ impl Catalog {
pub async fn check_peer_entry(&self, peer_name: &str) -> anyhow::Result<i64> {
let peer_check = self
.pg
.query_one("SELECT COUNT(*) FROM PEERS WHERE NAME = $1", &[&peer_name])
.query_one("SELECT COUNT(*) FROM public.peers WHERE name = $1", &[&peer_name])
.await?;
let peer_count: i64 = peer_check.get(0);
Ok(peer_count)
Expand All @@ -581,7 +581,7 @@ impl Catalog {
let row = self
.pg
.query_opt(
"SELECT config_proto FROM flows WHERE name=$1 AND query_string IS NOT NULL",
"SELECT config_proto FROM public.flows WHERE name = $1 AND query_string IS NOT NULL",
&[&flow_job_name],
)
.await?;
Expand Down

0 comments on commit 21945c3

Please sign in to comment.