Skip to content

Commit

Permalink
added minor fixes from #634
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Nov 10, 2023
1 parent f141d81 commit 3eb56ed
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
3 changes: 3 additions & 0 deletions nexus/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ target/
# submodules
/sqlparser-rs

# catalog pkg
/catalog/pkg

# misc
.env
17 changes: 14 additions & 3 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pt::{
peerdb_peers::PostgresConfig,
peerdb_peers::{peer::Config, DbType, Peer},
};
use serde_json::Value;
use tokio_postgres::{types, Client};

mod embedded {
Expand Down Expand Up @@ -425,15 +426,21 @@ impl Catalog {
.await?;

let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {
let flow_opts_opt: Option<Value> = row.get("flow_metadata");
let flow_opts: HashMap<String, Value> = match flow_opts_opt {
Some(flow_opts) => serde_json::from_value(flow_opts)
.context("unable to deserialize flow options")
.unwrap_or_default(),
None => HashMap::new(),
};

QRepFlowJob {
name: row.get("name"),
source_peer: row.get("source_peer_name"),
target_peer: row.get("destination_peer_name"),
description: row.get("description"),
query_string: row.get("query_string"),
flow_options: serde_json::from_value(row.get("flow_metadata"))
.context("unable to deserialize flow options")
.unwrap_or_default(),
flow_options: flow_opts,
// we set the disabled flag to false by default
disabled: false,
}
Expand Down Expand Up @@ -462,6 +469,10 @@ impl Catalog {
)
.await?;

if job.flow_options.get("destination_table_name").is_none() {
return Err(anyhow!("destination_table_name not found in flow options"));
}

let _rows = self
.pg
.execute(
Expand Down
13 changes: 7 additions & 6 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,10 @@ impl NexusBackend {
err_msg: "flow service is not configured".to_owned(),
})));
}
// retrieve the mirror job since DROP MIRROR will delete the row later.
let catalog = self.catalog.lock().await;
let qrep_config =

let qrep_config = {
// retrieve the mirror job since DROP MIRROR will delete the row later.
let catalog = self.catalog.lock().await;
catalog
.get_qrep_config_proto(mirror_name)
.await
Expand All @@ -632,9 +633,9 @@ impl NexusBackend {
err
),
}))
})?;
// unlock the mutex so it can be used by the functions
std::mem::drop(catalog);
})?
};

self.handle_drop_mirror(&NexusStatement::PeerDDL {
// not supposed to be used by the function
stmt: sqlparser::ast::Statement::ExecuteMirror {
Expand Down

0 comments on commit 3eb56ed

Please sign in to comment.