From 65cacf80a4652ceb11d26e946ff0b9fa74eb0a26 Mon Sep 17 00:00:00 2001 From: Marijn Valk Date: Mon, 23 Dec 2024 11:02:12 +0100 Subject: [PATCH 1/5] adds support for reporting custom messages --- .../pipes/implementations/rust/pipes.toml | 2 +- .../rust/src/bin/pipes_tests.rs | 16 ++++++++- .../pipes/implementations/rust/src/lib.rs | 36 +++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/libraries/pipes/implementations/rust/pipes.toml b/libraries/pipes/implementations/rust/pipes.toml index dbb5cdf..02c8134 100644 --- a/libraries/pipes/implementations/rust/pipes.toml +++ b/libraries/pipes/implementations/rust/pipes.toml @@ -3,7 +3,7 @@ error_reporting = false [messages] log = false -report_custom_message = false +report_custom_message = true report_asset_materialization = false report_asset_check = false log_external_stream = false diff --git a/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs b/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs index 9010e55..276d365 100644 --- a/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs +++ b/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs @@ -72,7 +72,7 @@ pub fn main() -> Result<(), DagsterPipesError> { std::env::set_var(DAGSTER_PIPES_MESSAGES_ENV_VAR, &messages); } - let context = open_dagster_pipes()?; + let mut context = open_dagster_pipes()?; if let Some(job_name) = args.job_name { assert_eq!(context.data.job_name, Some(job_name)); @@ -84,5 +84,19 @@ pub fn main() -> Result<(), DagsterPipesError> { serde_json::from_reader(file).expect("extras could not be parsed"); assert_eq!(context.data.extras, Some(json)); } + + if let Some(custom_payload_path) = args.custom_payload_path { + let file = + File::open(custom_payload_path).expect("custom_payload_path could not be opened"); + let payload: serde_json::Value = serde_json::from_reader(file) + .expect("custom_payload_path could not be parsed") + .as_object() + .expect("custom payload must be an object") + .get("payload") + .expect("custom payload must have a 'payload' key") + .clone(); + context.report_custom_message(Some(payload))? + } + Ok(()) } diff --git a/libraries/pipes/implementations/rust/src/lib.rs b/libraries/pipes/implementations/rust/src/lib.rs index a90f1ad..49adb86 100644 --- a/libraries/pipes/implementations/rust/src/lib.rs +++ b/libraries/pipes/implementations/rust/src/lib.rs @@ -94,6 +94,17 @@ where let msg = PipesMessage::new(Method::ReportAssetCheck, Some(params)); self.message_channel.write_message(msg) } + + pub fn report_custom_message( + &mut self, + payload: Option, + ) -> Result<(), MessageWriteError> { + let params: HashMap<&str, Option> = + HashMap::from([("payload", payload)]); + + let msg = PipesMessage::new(Method::ReportCustomMessage, Some(params)); + self.message_channel.write_message(msg) + } } #[derive(Debug, Error)] @@ -272,4 +283,29 @@ mod tests { ) ); } + + #[test] + fn test_report_custom_message() { + let file = NamedTempFile::new().unwrap(); + let mut context: PipesContext = PipesContext { + message_channel: DefaultChannel::File(FileChannel::new(file.path().into())), + data: PipesContextData { + asset_keys: Some(vec!["asset1".to_string()]), + run_id: "012345".to_string(), + ..Default::default() + }, + }; + context + .report_custom_message(Some(json!({"key": "value"}))) + .expect("Failed to report custom message"); + + assert_eq!( + serde_json::from_str::(&fs::read_to_string(file.path()).unwrap()) + .unwrap(), + PipesMessage::new( + Method::ReportCustomMessage, + Some(HashMap::from([("payload", Some(json!({"key": "value"})))])), + ) + ); + } } From dcab0965c5ab79d32a8c84c82728f359ef19f41b Mon Sep 17 00:00:00 2001 From: Marijn Valk Date: Mon, 23 Dec 2024 11:07:08 +0100 Subject: [PATCH 2/5] add changelog entry --- libraries/pipes/implementations/rust/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/libraries/pipes/implementations/rust/CHANGELOG.md b/libraries/pipes/implementations/rust/CHANGELOG.md index ce25195..e6670c4 100644 --- a/libraries/pipes/implementations/rust/CHANGELOG.md +++ b/libraries/pipes/implementations/rust/CHANGELOG.md @@ -4,6 +4,7 @@ ### Added +- (pull/70) Added `report_custom_message` method to the `PipesContext` - (pull/60) Added `AssetCheckSeverity` to the jsonschema definitions - (pull/59) Moved dagster pipes version into a constant - (pull/61) Simplify construction of `PipesMetadataValue` From c9d29cb5a4c1b4a95d17351ca214a1bf6ee80d48 Mon Sep 17 00:00:00 2001 From: Marijn Valk Date: Mon, 23 Dec 2024 11:10:45 +0100 Subject: [PATCH 3/5] fix type in file reader --- libraries/pipes/implementations/rust/src/bin/pipes_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs b/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs index 276d365..a987e87 100644 --- a/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs +++ b/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs @@ -88,7 +88,7 @@ pub fn main() -> Result<(), DagsterPipesError> { if let Some(custom_payload_path) = args.custom_payload_path { let file = File::open(custom_payload_path).expect("custom_payload_path could not be opened"); - let payload: serde_json::Value = serde_json::from_reader(file) + let payload = serde_json::from_reader::(file) .expect("custom_payload_path could not be parsed") .as_object() .expect("custom payload must be an object") From 9d777ff3920b557eeb8fae6f8973a32183dc01f7 Mon Sep 17 00:00:00 2001 From: Marijn Valk Date: Sun, 29 Dec 2024 14:01:57 +0100 Subject: [PATCH 4/5] non optional Value --- .../pipes/implementations/rust/src/lib.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/libraries/pipes/implementations/rust/src/lib.rs b/libraries/pipes/implementations/rust/src/lib.rs index 1d16d73..2f8a94f 100644 --- a/libraries/pipes/implementations/rust/src/lib.rs +++ b/libraries/pipes/implementations/rust/src/lib.rs @@ -113,10 +113,10 @@ where pub fn report_custom_message( &mut self, - payload: Option, + payload: serde_json::Value, ) -> Result<(), MessageWriteError> { let params: HashMap<&str, Option> = - HashMap::from([("payload", payload)]); + HashMap::from([("payload", Some(payload))]); let msg = PipesMessage::new(Method::ReportCustomMessage, Some(params)); self.message_channel.write_message(msg) @@ -396,16 +396,19 @@ mod tests { }, }; context - .report_custom_message(Some(json!({"key": "value"}))) + .report_custom_message(json!({"key": "value"})) .expect("Failed to report custom message"); assert_eq!( - serde_json::from_str::(&fs::read_to_string(file.path()).unwrap()) + serde_json::from_str::(&fs::read_to_string(file.path()).unwrap()) .unwrap(), - PipesMessage::new( - Method::ReportCustomMessage, - Some(HashMap::from([("payload", Some(json!({"key": "value"})))])), - ) + json!({ + "__dagster_pipes_version": "0.1", + "method": "report_custom_message", + "params": { + "payload": {"key": "value"} + }, + }) ); } } From d96d8e677f07ae617ef69d1643d6b363f87bc31f Mon Sep 17 00:00:00 2001 From: Marijn Valk Date: Sun, 29 Dec 2024 14:27:53 +0100 Subject: [PATCH 5/5] code reuse in tests --- .../rust/src/bin/pipes_tests.rs | 2 +- .../pipes/implementations/rust/src/lib.rs | 56 ++++++++----------- 2 files changed, 25 insertions(+), 33 deletions(-) diff --git a/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs b/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs index 43deac7..6b912aa 100644 --- a/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs +++ b/libraries/pipes/implementations/rust/src/bin/pipes_tests.rs @@ -95,7 +95,7 @@ pub fn main() -> Result<(), DagsterPipesError> { .get("payload") .expect("custom payload must have a 'payload' key") .clone(); - context.report_custom_message(Some(payload))? + context.report_custom_message(payload)? } Ok(()) diff --git a/libraries/pipes/implementations/rust/src/lib.rs b/libraries/pipes/implementations/rust/src/lib.rs index 2f8a94f..be1c943 100644 --- a/libraries/pipes/implementations/rust/src/lib.rs +++ b/libraries/pipes/implementations/rust/src/lib.rs @@ -167,7 +167,7 @@ pub fn open_dagster_pipes() -> Result, D #[cfg(test)] mod tests { - use rstest::rstest; + use rstest::{fixture, rstest}; use std::collections::HashMap; use std::fs; use tempfile::NamedTempFile; @@ -175,8 +175,24 @@ mod tests { use super::*; - #[test] - fn test_write_pipes_metadata() { + #[fixture] + fn file_and_context() -> (NamedTempFile, PipesContext) { + let file = NamedTempFile::new().unwrap(); + let context: PipesContext = PipesContext { + message_channel: DefaultChannel::File(FileChannel::new(file.path().into())), + data: PipesContextData { + asset_keys: Some(vec!["asset1".to_string()]), + run_id: "012345".to_string(), + ..Default::default() + }, + }; + (file, context) + } + + #[rstest] + fn test_write_pipes_metadata( + #[from(file_and_context)] (file, mut context): (NamedTempFile, PipesContext), + ) { let asset_metadata = HashMap::from([ ("int", PipesMetadataValue::from(100)), ("float", PipesMetadataValue::from(100.0)), @@ -219,15 +235,6 @@ mod tests { ("job", PipesMetadataValue::from_job("some_job".to_string())), ]); - let file = NamedTempFile::new().unwrap(); - let mut context: PipesContext = PipesContext { - message_channel: DefaultChannel::File(FileChannel::new(file.path().into())), - data: PipesContextData { - asset_keys: Some(vec!["asset1".to_string()]), - run_id: "012345".to_string(), - ..Default::default() - }, - }; context .report_asset_materialization("asset1", asset_metadata, Some("v1")) .expect("Failed to report asset materialization"); @@ -340,18 +347,10 @@ mod tests { }) )] fn test_close_pipes_context( + #[from(file_and_context)] (file, mut context): (NamedTempFile, PipesContext), #[case] exc: Option, #[case] expected_message: serde_json::Value, ) { - let file = NamedTempFile::new().unwrap(); - let mut context: PipesContext = PipesContext { - message_channel: DefaultChannel::File(FileChannel::new(file.path().into())), - data: PipesContextData { - asset_keys: Some(vec!["asset1".to_string()]), - run_id: "012345".to_string(), - ..Default::default() - }, - }; context.close(exc).expect("Failed to close context"); assert_eq!( serde_json::from_str::(&fs::read_to_string(file.path()).unwrap()) @@ -384,17 +383,10 @@ mod tests { ); } - #[test] - fn test_report_custom_message() { - let file = NamedTempFile::new().unwrap(); - let mut context: PipesContext = PipesContext { - message_channel: DefaultChannel::File(FileChannel::new(file.path().into())), - data: PipesContextData { - asset_keys: Some(vec!["asset1".to_string()]), - run_id: "012345".to_string(), - ..Default::default() - }, - }; + #[rstest] + fn test_report_custom_message( + #[from(file_and_context)] (file, mut context): (NamedTempFile, PipesContext), + ) { context .report_custom_message(json!({"key": "value"})) .expect("Failed to report custom message");