From dc0285000b60722c29b05b13113ccd5190e7e9c3 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 4 Jan 2024 08:48:36 -0500 Subject: [PATCH 1/4] Remove the need for metadata db in eventhub group peer We already use catalog incases where this isn't passed, but validation failed for eventhub group peer. --- nexus/analyzer/src/lib.rs | 27 ++++++++++----------------- nexus/flow-rs/src/grpc.rs | 2 +- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 8a2a088f1a..7e791860a4 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -663,11 +663,8 @@ fn parse_db_options( Some(config) } DbType::Eventhub => { - let conn_str: String = opts - .get("metadata_db") - .map(|s| s.to_string()) - .unwrap_or_default(); - let metadata_db = parse_metadata_db_info(&conn_str)?; + let conn_str = opts.get("metadata_db").map(|s| s.to_string()); + let metadata_db = parse_metadata_db_info(conn_str)?; let subscription_id = opts .get("subscription_id") .map(|s| s.to_string()) @@ -711,11 +708,8 @@ fn parse_db_options( Some(config) } DbType::S3 => { - let s3_conn_str: String = opts - .get("metadata_db") - .map(|s| s.to_string()) - .unwrap_or_default(); - let metadata_db = parse_metadata_db_info(&s3_conn_str)?; + let s3_conn_str = opts.get("metadata_db").map(|s| s.to_string()); + let metadata_db = parse_metadata_db_info(s3_conn_str)?; let s3_config = S3Config { url: opts .get("url") @@ -754,9 +748,7 @@ fn parse_db_options( Some(config) } DbType::EventhubGroup => { - let conn_str = opts - .get("metadata_db") - .context("no metadata db specified")?; + let conn_str = opts.get("metadata_db").map(|s| s.to_string()); let metadata_db = parse_metadata_db_info(conn_str)?; // metadata_db is required for eventhub group @@ -808,10 +800,11 @@ fn parse_db_options( Ok(config) } -fn parse_metadata_db_info(conn_str: &str) -> anyhow::Result> { - if conn_str.is_empty() { - return Ok(None); - } +fn parse_metadata_db_info(conn_str: Option) -> anyhow::Result> { + let conn_str = match conn_str { + Some(conn_str) => conn_str, + None => return Ok(None), + }; let mut metadata_db = PostgresConfig::default(); let param_pairs: Vec<&str> = conn_str.split_whitespace().collect(); diff --git a/nexus/flow-rs/src/grpc.rs b/nexus/flow-rs/src/grpc.rs index 0462300df4..b8adf9ea84 100644 --- a/nexus/flow-rs/src/grpc.rs +++ b/nexus/flow-rs/src/grpc.rs @@ -113,7 +113,7 @@ impl FlowGrpcClient { requested_flow_state: state.into(), source_peer: Some(workflow_details.source_peer), destination_peer: Some(workflow_details.destination_peer), - flow_state_update: None + flow_state_update: None, }; let response = self.client.flow_state_change(state_change_req).await?; let state_change_response = response.into_inner(); From 188fc76f7f745b0c54c852645aab1f07f35709b1 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 4 Jan 2024 09:28:01 -0500 Subject: [PATCH 2/4] take ref --- nexus/analyzer/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 7e791860a4..0372e87fda 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -663,7 +663,7 @@ fn parse_db_options( Some(config) } DbType::Eventhub => { - let conn_str = opts.get("metadata_db").map(|s| s.to_string()); + let conn_str = opts.get("metadata_db"); let metadata_db = parse_metadata_db_info(conn_str)?; let subscription_id = opts .get("subscription_id") @@ -708,7 +708,7 @@ fn parse_db_options( Some(config) } DbType::S3 => { - let s3_conn_str = opts.get("metadata_db").map(|s| s.to_string()); + let s3_conn_str = opts.get("metadata_db"); let metadata_db = parse_metadata_db_info(s3_conn_str)?; let s3_config = S3Config { url: opts @@ -748,7 +748,7 @@ fn parse_db_options( Some(config) } DbType::EventhubGroup => { - let conn_str = opts.get("metadata_db").map(|s| s.to_string()); + let conn_str = opts.get("metadata_db"); let metadata_db = parse_metadata_db_info(conn_str)?; // metadata_db is required for eventhub group @@ -800,7 +800,7 @@ fn parse_db_options( Ok(config) } -fn parse_metadata_db_info(conn_str: Option) -> anyhow::Result> { +fn parse_metadata_db_info(conn_str: Option<&str>) -> anyhow::Result> { let conn_str = match conn_str { Some(conn_str) => conn_str, None => return Ok(None), From 4a42d897e87dcfeb124c81e063e0164abe3dcec5 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 4 Jan 2024 09:33:43 -0500 Subject: [PATCH 3/4] fix things --- nexus/analyzer/src/lib.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 0372e87fda..7d1de7107c 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -800,12 +800,16 @@ fn parse_db_options( Ok(config) } -fn parse_metadata_db_info(conn_str: Option<&str>) -> anyhow::Result> { +fn parse_metadata_db_info(conn_str: Option<&&str>) -> anyhow::Result> { let conn_str = match conn_str { Some(conn_str) => conn_str, None => return Ok(None), }; + if conn_str.is_empty() { + return Ok(None); + } + let mut metadata_db = PostgresConfig::default(); let param_pairs: Vec<&str> = conn_str.split_whitespace().collect(); match param_pairs.len() { From f42ad7c8c469147a483378e9b5074ea38732286e Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Thu, 4 Jan 2024 09:37:51 -0500 Subject: [PATCH 4/4] use copied --- nexus/analyzer/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nexus/analyzer/src/lib.rs b/nexus/analyzer/src/lib.rs index 7d1de7107c..98936a81fa 100644 --- a/nexus/analyzer/src/lib.rs +++ b/nexus/analyzer/src/lib.rs @@ -664,7 +664,7 @@ fn parse_db_options( } DbType::Eventhub => { let conn_str = opts.get("metadata_db"); - let metadata_db = parse_metadata_db_info(conn_str)?; + let metadata_db = parse_metadata_db_info(conn_str.copied())?; let subscription_id = opts .get("subscription_id") .map(|s| s.to_string()) @@ -709,7 +709,7 @@ fn parse_db_options( } DbType::S3 => { let s3_conn_str = opts.get("metadata_db"); - let metadata_db = parse_metadata_db_info(s3_conn_str)?; + let metadata_db = parse_metadata_db_info(s3_conn_str.copied())?; let s3_config = S3Config { url: opts .get("url") @@ -749,7 +749,7 @@ fn parse_db_options( } DbType::EventhubGroup => { let conn_str = opts.get("metadata_db"); - let metadata_db = parse_metadata_db_info(conn_str)?; + let metadata_db = parse_metadata_db_info(conn_str.copied())?; // metadata_db is required for eventhub group if metadata_db.is_none() { @@ -800,7 +800,7 @@ fn parse_db_options( Ok(config) } -fn parse_metadata_db_info(conn_str: Option<&&str>) -> anyhow::Result> { +fn parse_metadata_db_info(conn_str: Option<&str>) -> anyhow::Result> { let conn_str = match conn_str { Some(conn_str) => conn_str, None => return Ok(None),