Skip to content

Commit

Permalink
Merge branch 'main' into unflake-test-pls
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Nov 17, 2023
2 parents aa97878 + 085425a commit 109c155
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 50 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:

- name: run tests
run: |
gotestsum --format testname -- -p 16 ./... -timeout 2400s
gotestsum --format testname -- -p 8 ./... -timeout 2400s
working-directory: ./flow
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
Expand Down
95 changes: 53 additions & 42 deletions nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ impl<'a> StatementAnalyzer for PeerExistanceAnalyzer<'a> {
visit_statements(statement, |stmt| {
if let &Statement::Drop { names, .. } = &stmt {
for name in names {
let peer_name = &name.0[0].value.to_lowercase();
if self.peers.contains_key(peer_name) {
peers_touched.insert(peer_name.into());
let peer_name = name.0[0].value.to_lowercase();
if self.peers.contains_key(&peer_name) {
peers_touched.insert(peer_name);
}
}
}
ControlFlow::<()>::Continue(())
});
visit_relations(statement, |relation| {
let peer_name = &relation.0[0].value.to_lowercase();
if self.peers.contains_key(peer_name) {
peers_touched.insert(peer_name.into());
let peer_name = relation.0[0].value.to_lowercase();
if self.peers.contains_key(&peer_name) {
peers_touched.insert(peer_name);
}
ControlFlow::<()>::Continue(())
});
Expand Down Expand Up @@ -150,7 +150,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
with_options,
} => {
let db_type = DbType::from(peer_type.clone());
let config = parse_db_options(self.peers, db_type, with_options.clone())?;
let config = parse_db_options(self.peers, db_type, with_options)?;
let peer = Peer {
name: peer_name.to_string().to_lowercase(),
r#type: db_type as i32,
Expand Down Expand Up @@ -186,7 +186,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
}

// get do_initial_copy from with_options
let mut raw_options = HashMap::new();
let mut raw_options = HashMap::with_capacity(cdc.with_options.len());
for option in &cdc.with_options {
raw_options.insert(&option.name.value as &str, &option.value);
}
Expand Down Expand Up @@ -362,7 +362,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
}))
}
Select(select) => {
let mut raw_options = HashMap::new();
let mut raw_options = HashMap::with_capacity(select.with_options.len());
for option in &select.with_options {
raw_options.insert(&option.name.value as &str, &option.value);
}
Expand Down Expand Up @@ -417,7 +417,7 @@ impl<'a> StatementAnalyzer for PeerDDLAnalyzer<'a> {
mirror_name,
with_options,
} => {
let mut raw_options = HashMap::new();
let mut raw_options = HashMap::with_capacity(with_options.len());
for option in with_options {
raw_options.insert(&option.name.value as &str, &option.value);
}
Expand Down Expand Up @@ -509,61 +509,72 @@ impl StatementAnalyzer for PeerCursorAnalyzer {
fn parse_db_options(
peers: &HashMap<String, Peer>,
db_type: DbType,
with_options: Vec<SqlOption>,
with_options: &[SqlOption],
) -> anyhow::Result<Option<Config>> {
let mut opts: HashMap<String, String> = HashMap::new();
let mut opts: HashMap<&str, &str> = HashMap::with_capacity(with_options.len());
for opt in with_options {
let key = opt.name.value;
let val = match opt.value {
sqlparser::ast::Value::SingleQuotedString(str) => str,
sqlparser::ast::Value::Number(v, _) => v,
sqlparser::ast::Value::Boolean(v) => v.to_string(),
sqlparser::ast::Value::SingleQuotedString(ref str) => str,
sqlparser::ast::Value::Number(ref v, _) => v,
sqlparser::ast::Value::Boolean(v) => if v { "true" } else { "false" },
_ => panic!("invalid option type for peer"),
};
opts.insert(key, val);
opts.insert(&opt.name.value, val);
}

let config = match db_type {
DbType::Bigquery => {
let pem_str = opts
.remove("private_key")
.get("private_key")
.ok_or_else(|| anyhow::anyhow!("missing private_key option for bigquery"))?;
pem::parse(pem_str.as_bytes())
.map_err(|err| anyhow::anyhow!("unable to parse private_key: {:?}", err))?;
let bq_config = BigqueryConfig {
auth_type: opts
.remove("type")
.ok_or_else(|| anyhow::anyhow!("missing type option for bigquery"))?,
.get("type")
.ok_or_else(|| anyhow::anyhow!("missing type option for bigquery"))?
.to_string(),
project_id: opts
.remove("project_id")
.ok_or_else(|| anyhow::anyhow!("missing project_id in peer options"))?,
.get("project_id")
.ok_or_else(|| anyhow::anyhow!("missing project_id in peer options"))?
.to_string(),
private_key_id: opts
.remove("private_key_id")
.ok_or_else(|| anyhow::anyhow!("missing private_key_id option for bigquery"))?,
private_key: pem_str,
.get("private_key_id")
.ok_or_else(|| anyhow::anyhow!("missing private_key_id option for bigquery"))?
.to_string(),
private_key: pem_str.to_string(),
client_email: opts
.remove("client_email")
.ok_or_else(|| anyhow::anyhow!("missing client_email option for bigquery"))?,
.get("client_email")
.ok_or_else(|| anyhow::anyhow!("missing client_email option for bigquery"))?
.to_string(),
client_id: opts
.remove("client_id")
.ok_or_else(|| anyhow::anyhow!("missing client_id option for bigquery"))?,
.get("client_id")
.ok_or_else(|| anyhow::anyhow!("missing client_id option for bigquery"))?
.to_string(),
auth_uri: opts
.remove("auth_uri")
.ok_or_else(|| anyhow::anyhow!("missing auth_uri option for bigquery"))?,
.get("auth_uri")
.ok_or_else(|| anyhow::anyhow!("missing auth_uri option for bigquery"))?
.to_string(),
token_uri: opts
.remove("token_uri")
.ok_or_else(|| anyhow::anyhow!("missing token_uri option for bigquery"))?,
.get("token_uri")
.ok_or_else(|| anyhow::anyhow!("missing token_uri option for bigquery"))?
.to_string(),
auth_provider_x509_cert_url: opts
.remove("auth_provider_x509_cert_url")
.get("auth_provider_x509_cert_url")
.ok_or_else(|| {
anyhow::anyhow!("missing auth_provider_x509_cert_url option for bigquery")
})?,
client_x509_cert_url: opts.remove("client_x509_cert_url").ok_or_else(|| {
anyhow::anyhow!("missing client_x509_cert_url option for bigquery")
})?,
})?
.to_string(),
client_x509_cert_url: opts
.get("client_x509_cert_url")
.ok_or_else(|| {
anyhow::anyhow!("missing client_x509_cert_url option for bigquery")
})?
.to_string(),
dataset_id: opts
.remove("dataset_id")
.ok_or_else(|| anyhow::anyhow!("missing dataset_id in peer options"))?,
.get("dataset_id")
.ok_or_else(|| anyhow::anyhow!("missing dataset_id in peer options"))?
.to_string(),
};
let config = Config::BigqueryConfig(bq_config);
Some(config)
Expand Down Expand Up @@ -781,13 +792,13 @@ fn parse_db_options(

let mut eventhubs: HashMap<String, EventHubConfig> = HashMap::new();
for (key, _) in opts {
if keys_to_ignore.contains(&key) {
if keys_to_ignore.contains(key) {
continue;
}

// check if peers contains key and if it does
// then add it to the eventhubs hashmap, if not error
if let Some(peer) = peers.get(&key) {
if let Some(peer) = peers.get(key) {
let eventhub_config = peer.config.as_ref().unwrap();
if let Config::EventhubConfig(eventhub_config) = eventhub_config {
eventhubs.insert(key.to_string(), eventhub_config.clone());
Expand Down
10 changes: 3 additions & 7 deletions nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,13 +426,9 @@ impl Catalog {
.await?;

let job = self.pg.query_opt(&stmt, &[&job_name]).await?.map(|row| {
let flow_opts_opt: Option<Value> = row.get("flow_metadata");
let flow_opts: HashMap<String, Value> = match flow_opts_opt {
Some(flow_opts) => serde_json::from_value(flow_opts)
.context("unable to deserialize flow options")
.unwrap_or_default(),
None => HashMap::new(),
};
let flow_opts: HashMap<String, Value> = row.get::<&str, Option<Value>>("flow_metadata")
.and_then(|flow_opts| serde_json::from_value(flow_opts).ok())
.unwrap_or_default();

QRepFlowJob {
name: row.get("name"),
Expand Down

0 comments on commit 109c155

Please sign in to comment.