Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-pipes-rust] 🔥 Simplify PipesMetadataValue construction #61

Merged
merged 9 commits into from
Dec 19, 2024
1 change: 1 addition & 0 deletions libraries/pipes/implementations/rust/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- (pull/60) Added `AssetCheckSeverity` to the jsonschema definitions
- (pull/59) Moved dagster pipes version into a constant
- (pull/61) Simplify construction of `PipesMetadataValue`
- (pull/57) Simplify user-facing API for hashmap metadata
- (pull/54) Handled errors for `MessageWriter` implementations
- (pull/14) Derived `PartialEq` for all types generated by `quicktype`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
use dagster_pipes_rust::types::{AssetCheckSeverity, PipesMetadataValue, RawValue, Type};
use dagster_pipes_rust::{open_dagster_pipes, DagsterPipesError};
use dagster_pipes_rust::{
open_dagster_pipes, AssetCheckSeverity, DagsterPipesError, PipesMetadataValue,
};

use std::collections::HashMap;

fn main() -> Result<(), DagsterPipesError> {
let mut context = open_dagster_pipes()?;

let asset_metadata = HashMap::from([(
"row_count",
PipesMetadataValue::new(RawValue::Integer(100), Type::Int),
)]);
let asset_metadata = HashMap::from([("row_count", PipesMetadataValue::from(100))]);
context.report_asset_materialization("example_rust_subprocess_asset", asset_metadata)?;

let check_metadata = HashMap::from([(
"quality",
PipesMetadataValue::new(RawValue::Integer(100), Type::Int),
)]);
let check_metadata = HashMap::from([("quality", PipesMetadataValue::from(100))]);
context.report_asset_check(
"example_rust_subprocess_check",
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use thiserror::Error;

use crate::PipesContextData;

/// Load context data injected by the orchestration process.
pub trait LoadContext {
fn load_context(
&self,
Expand Down
169 changes: 50 additions & 119 deletions libraries/pipes/implementations/rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod context_loader;
mod params_loader;
pub mod types;
mod types;
mod types_ext;
mod writer;

Expand All @@ -11,32 +11,22 @@ use serde_json::Map;
use serde_json::Value;
use thiserror::Error;

use crate::context_loader::DefaultLoader as PipesDefaultContextLoader;
use crate::context_loader::{DefaultLoader as PipesDefaultContextLoader, PayloadErrorKind};
use crate::params_loader::{EnvVarLoader as PipesEnvVarParamsLoader, ParamsError};
use crate::types::{Method, PipesContextData, PipesMessage};
use crate::writer::message_writer::{
get_opened_payload, DefaultWriter as PipesDefaultMessageWriter,
};
use crate::writer::message_writer_channel::MessageWriteError;

pub use crate::context_loader::LoadContext;
use crate::context_loader::PayloadErrorKind;
use crate::params_loader::EnvVarLoader as PipesEnvVarParamsLoader;
pub use crate::params_loader::LoadParams;
use crate::params_loader::ParamsError;
pub use crate::types::{
AssetCheckSeverity, Method, PipesContextData, PipesMessage, PipesMetadataValue,
};
use crate::writer::message_writer::get_opened_payload;
use crate::writer::message_writer::DefaultWriter as PipesDefaultMessageWriter;
pub use crate::types::{AssetCheckSeverity, PipesMetadataValue};
pub use crate::writer::message_writer::{DefaultWriter, MessageWriter};
pub use crate::writer::message_writer_channel::{DefaultChannel, FileChannel};
use crate::writer::message_writer_channel::{MessageWriteError, MessageWriterChannel};
pub use crate::writer::message_writer_channel::MessageWriterChannel;

const DAGSTER_PIPES_VERSION: &str = "0.1";

impl PipesMetadataValue {
pub fn new(raw_value: types::RawValue, pipes_metadata_value_type: types::Type) -> Self {
Self {
raw_value: Some(raw_value),
pipes_metadata_value_type: Some(pipes_metadata_value_type),
}
}
}

// partial translation of
// https://github.com/dagster-io/dagster/blob/258d9ca0db/python_modules/dagster-pipes/dagster_pipes/__init__.py#L859-L871
#[derive(Debug)]
Expand Down Expand Up @@ -142,111 +132,52 @@ mod tests {
use std::collections::HashMap;
use std::fs;
use tempfile::NamedTempFile;
use writer::message_writer_channel::{DefaultChannel, FileChannel};

use super::*;

#[test]
fn test_write_pipes_metadata() {
let asset_metadata = HashMap::from([
(
"text",
PipesMetadataValue::new(
types::RawValue::String("hello".to_string()),
types::Type::Text,
),
),
("int", PipesMetadataValue::from(100)),
("float", PipesMetadataValue::from(100.0)),
("bool", PipesMetadataValue::from(true)),
("none", PipesMetadataValue::null()),
("timestamp", PipesMetadataValue::from_timestamp(1000.0)),
("text", PipesMetadataValue::from("hello".to_string())),
(
"url",
PipesMetadataValue::new(
types::RawValue::String("http://someurl.com".to_string()),
types::Type::Url,
),
PipesMetadataValue::from_url("http://someurl.com".to_string()),
),
(
"path",
PipesMetadataValue::new(
types::RawValue::String("file://some/path".to_string()),
types::Type::Path,
),
PipesMetadataValue::from_path("file://some/path".to_string()),
),
(
"notebook",
PipesMetadataValue::new(
types::RawValue::String("notebook".to_string()),
types::Type::Notebook,
),
PipesMetadataValue::from_notebook("notebook".to_string()),
),
(
"json_object",
PipesMetadataValue::new(
types::RawValue::AnythingMap(HashMap::from([(
"key".to_string(),
Some(json!("value")),
)])),
types::Type::Json,
),
PipesMetadataValue::from(HashMap::from([(
"key".to_string(),
Some(json!("value")),
)])),
),
(
"json_array",
PipesMetadataValue::new(
types::RawValue::AnythingArray(vec![Some(json!({"key": "value"}))]),
types::Type::Json,
),
),
(
"md",
PipesMetadataValue::new(
types::RawValue::String("## markdown".to_string()),
types::Type::Md,
),
PipesMetadataValue::from(vec![Some(json!({"key": "value"}))]),
),
("md", PipesMetadataValue::from_md("## markdown".to_string())),
(
"dagster_run",
PipesMetadataValue::new(
types::RawValue::String("1234".to_string()),
types::Type::DagsterRun,
),
PipesMetadataValue::from_dagster_run("1234".to_string()),
),
(
"asset",
PipesMetadataValue::new(
types::RawValue::String("some_asset".to_string()),
types::Type::Asset,
),
),
(
"job",
PipesMetadataValue::new(
types::RawValue::String("some_job".to_string()),
types::Type::Job,
),
),
(
"timestamp",
PipesMetadataValue::new(
types::RawValue::String("2012-04-23T18:25:43.511Z".to_string()),
types::Type::Timestamp,
),
),
(
"int",
PipesMetadataValue::new(types::RawValue::Integer(100), types::Type::Int),
),
(
"float",
PipesMetadataValue::new(types::RawValue::Double(100.0), types::Type::Float),
),
(
"bool",
PipesMetadataValue::new(types::RawValue::Bool(true), types::Type::Bool),
),
(
"none",
PipesMetadataValue {
raw_value: None,
pipes_metadata_value_type: None,
},
PipesMetadataValue::from_asset("some_asset".to_string()),
),
("job", PipesMetadataValue::from_job("some_job".to_string())),
]);

let file = NamedTempFile::new().unwrap();
Expand All @@ -272,6 +203,26 @@ mod tests {
(
"metadata",
Some(json!({
"int": {
"raw_value": 100,
"type": "int"
},
"float": {
"raw_value": 100.0,
"type": "float"
},
"bool": {
"raw_value": true,
"type": "bool"
},
"none": {
"raw_value": null,
"type": "null"
},
"timestamp": {
"raw_value": 1000.0,
marijncv marked this conversation as resolved.
Show resolved Hide resolved
"type": "timestamp"
},
"text": {
"raw_value": "hello",
"type": "text"
Expand Down Expand Up @@ -311,26 +262,6 @@ mod tests {
"job": {
"raw_value": "some_job",
"type": "job"
},
"timestamp": {
"raw_value": "2012-04-23T18:25:43.511Z",
"type": "timestamp"
},
"int": {
"raw_value": 100,
"type": "int"
},
"float": {
"raw_value": 100.0,
"type": "float"
},
"bool": {
"raw_value": true,
"type": "bool"
},
"none": {
"raw_value": null,
"type": null
}
}))
),
Expand Down
4 changes: 2 additions & 2 deletions libraries/pipes/implementations/rust/src/params_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ const DAGSTER_PIPES_CONTEXT_ENV_VAR: &str = "DAGSTER_PIPES_CONTEXT";
const DAGSTER_PIPES_MESSAGES_ENV_VAR: &str = "DAGSTER_PIPES_MESSAGES";

/// Load params passed from the orchestration process by the context injector and
/// message reader. These params are used to respectively bootstrap
/// [`LoadContext`](crate::LoadContext) and [`PipesMessageWriter`].
/// message reader. These params are used to respectively bootstrap implementations of
/// [`LoadContext`](crate::LoadContext) and [`PipesMessageWriter`](crate::MessageWriter).
pub trait LoadParams {
/// Whether or not this process has been provided with provided with information
/// to create a `PipesContext` or should instead return a mock.
Expand Down
Loading
Loading