Skip to content

Commit

Permalink
[dagster-pipes-rust] 🔥 Simplify PipesMetadataValue construction (#61)
Browse files Browse the repository at this point in the history
* 🔥 Use `From` trait where possible
* 📝 Only re-export `PipesMetadataValue` for public use
* 📝 Sort exports and update docs for public-facing traits (#64)
  • Loading branch information
christeefy authored Dec 19, 2024
1 parent 62926d5 commit d5feab5
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 133 deletions.
1 change: 1 addition & 0 deletions libraries/pipes/implementations/rust/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- (pull/60) Added `AssetCheckSeverity` to the jsonschema definitions
- (pull/59) Moved dagster pipes version into a constant
- (pull/61) Simplify construction of `PipesMetadataValue`
- (pull/57) Simplify user-facing API for hashmap metadata
- (pull/54) Handled errors for `MessageWriter` implementations
- (pull/14) Derived `PartialEq` for all types generated by `quicktype`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
use dagster_pipes_rust::types::{AssetCheckSeverity, PipesMetadataValue, RawValue, Type};
use dagster_pipes_rust::{open_dagster_pipes, DagsterPipesError};
use dagster_pipes_rust::{
open_dagster_pipes, AssetCheckSeverity, DagsterPipesError, PipesMetadataValue,
};

use std::collections::HashMap;

fn main() -> Result<(), DagsterPipesError> {
let mut context = open_dagster_pipes()?;

let asset_metadata = HashMap::from([(
"row_count",
PipesMetadataValue::new(RawValue::Integer(100), Type::Int),
)]);
let asset_metadata = HashMap::from([("row_count", PipesMetadataValue::from(100))]);
context.report_asset_materialization("example_rust_subprocess_asset", asset_metadata)?;

let check_metadata = HashMap::from([(
"quality",
PipesMetadataValue::new(RawValue::Integer(100), Type::Int),
)]);
let check_metadata = HashMap::from([("quality", PipesMetadataValue::from(100))]);
context.report_asset_check(
"example_rust_subprocess_check",
true,
Expand Down
1 change: 1 addition & 0 deletions libraries/pipes/implementations/rust/src/context_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use thiserror::Error;

use crate::PipesContextData;

/// Load context data injected by the orchestration process.
pub trait LoadContext {
fn load_context(
&self,
Expand Down
169 changes: 50 additions & 119 deletions libraries/pipes/implementations/rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod context_loader;
mod params_loader;
pub mod types;
mod types;
mod types_ext;
mod writer;

Expand All @@ -11,32 +11,22 @@ use serde_json::Map;
use serde_json::Value;
use thiserror::Error;

use crate::context_loader::DefaultLoader as PipesDefaultContextLoader;
use crate::context_loader::{DefaultLoader as PipesDefaultContextLoader, PayloadErrorKind};
use crate::params_loader::{EnvVarLoader as PipesEnvVarParamsLoader, ParamsError};
use crate::types::{Method, PipesContextData, PipesMessage};
use crate::writer::message_writer::{
get_opened_payload, DefaultWriter as PipesDefaultMessageWriter,
};
use crate::writer::message_writer_channel::MessageWriteError;

pub use crate::context_loader::LoadContext;
use crate::context_loader::PayloadErrorKind;
use crate::params_loader::EnvVarLoader as PipesEnvVarParamsLoader;
pub use crate::params_loader::LoadParams;
use crate::params_loader::ParamsError;
pub use crate::types::{
AssetCheckSeverity, Method, PipesContextData, PipesMessage, PipesMetadataValue,
};
use crate::writer::message_writer::get_opened_payload;
use crate::writer::message_writer::DefaultWriter as PipesDefaultMessageWriter;
pub use crate::types::{AssetCheckSeverity, PipesMetadataValue};
pub use crate::writer::message_writer::{DefaultWriter, MessageWriter};
pub use crate::writer::message_writer_channel::{DefaultChannel, FileChannel};
use crate::writer::message_writer_channel::{MessageWriteError, MessageWriterChannel};
pub use crate::writer::message_writer_channel::MessageWriterChannel;

const DAGSTER_PIPES_VERSION: &str = "0.1";

impl PipesMetadataValue {
pub fn new(raw_value: types::RawValue, pipes_metadata_value_type: types::Type) -> Self {
Self {
raw_value: Some(raw_value),
pipes_metadata_value_type: Some(pipes_metadata_value_type),
}
}
}

// partial translation of
// https://github.com/dagster-io/dagster/blob/258d9ca0db/python_modules/dagster-pipes/dagster_pipes/__init__.py#L859-L871
#[derive(Debug)]
Expand Down Expand Up @@ -142,111 +132,52 @@ mod tests {
use std::collections::HashMap;
use std::fs;
use tempfile::NamedTempFile;
use writer::message_writer_channel::{DefaultChannel, FileChannel};

use super::*;

#[test]
fn test_write_pipes_metadata() {
let asset_metadata = HashMap::from([
(
"text",
PipesMetadataValue::new(
types::RawValue::String("hello".to_string()),
types::Type::Text,
),
),
("int", PipesMetadataValue::from(100)),
("float", PipesMetadataValue::from(100.0)),
("bool", PipesMetadataValue::from(true)),
("none", PipesMetadataValue::null()),
("timestamp", PipesMetadataValue::from_timestamp(1000.0)),
("text", PipesMetadataValue::from("hello".to_string())),
(
"url",
PipesMetadataValue::new(
types::RawValue::String("http://someurl.com".to_string()),
types::Type::Url,
),
PipesMetadataValue::from_url("http://someurl.com".to_string()),
),
(
"path",
PipesMetadataValue::new(
types::RawValue::String("file://some/path".to_string()),
types::Type::Path,
),
PipesMetadataValue::from_path("file://some/path".to_string()),
),
(
"notebook",
PipesMetadataValue::new(
types::RawValue::String("notebook".to_string()),
types::Type::Notebook,
),
PipesMetadataValue::from_notebook("notebook".to_string()),
),
(
"json_object",
PipesMetadataValue::new(
types::RawValue::AnythingMap(HashMap::from([(
"key".to_string(),
Some(json!("value")),
)])),
types::Type::Json,
),
PipesMetadataValue::from(HashMap::from([(
"key".to_string(),
Some(json!("value")),
)])),
),
(
"json_array",
PipesMetadataValue::new(
types::RawValue::AnythingArray(vec![Some(json!({"key": "value"}))]),
types::Type::Json,
),
),
(
"md",
PipesMetadataValue::new(
types::RawValue::String("## markdown".to_string()),
types::Type::Md,
),
PipesMetadataValue::from(vec![Some(json!({"key": "value"}))]),
),
("md", PipesMetadataValue::from_md("## markdown".to_string())),
(
"dagster_run",
PipesMetadataValue::new(
types::RawValue::String("1234".to_string()),
types::Type::DagsterRun,
),
PipesMetadataValue::from_dagster_run("1234".to_string()),
),
(
"asset",
PipesMetadataValue::new(
types::RawValue::String("some_asset".to_string()),
types::Type::Asset,
),
),
(
"job",
PipesMetadataValue::new(
types::RawValue::String("some_job".to_string()),
types::Type::Job,
),
),
(
"timestamp",
PipesMetadataValue::new(
types::RawValue::String("2012-04-23T18:25:43.511Z".to_string()),
types::Type::Timestamp,
),
),
(
"int",
PipesMetadataValue::new(types::RawValue::Integer(100), types::Type::Int),
),
(
"float",
PipesMetadataValue::new(types::RawValue::Double(100.0), types::Type::Float),
),
(
"bool",
PipesMetadataValue::new(types::RawValue::Bool(true), types::Type::Bool),
),
(
"none",
PipesMetadataValue {
raw_value: None,
pipes_metadata_value_type: None,
},
PipesMetadataValue::from_asset("some_asset".to_string()),
),
("job", PipesMetadataValue::from_job("some_job".to_string())),
]);

let file = NamedTempFile::new().unwrap();
Expand All @@ -272,6 +203,26 @@ mod tests {
(
"metadata",
Some(json!({
"int": {
"raw_value": 100,
"type": "int"
},
"float": {
"raw_value": 100.0,
"type": "float"
},
"bool": {
"raw_value": true,
"type": "bool"
},
"none": {
"raw_value": null,
"type": "null"
},
"timestamp": {
"raw_value": 1000.0,
"type": "timestamp"
},
"text": {
"raw_value": "hello",
"type": "text"
Expand Down Expand Up @@ -311,26 +262,6 @@ mod tests {
"job": {
"raw_value": "some_job",
"type": "job"
},
"timestamp": {
"raw_value": "2012-04-23T18:25:43.511Z",
"type": "timestamp"
},
"int": {
"raw_value": 100,
"type": "int"
},
"float": {
"raw_value": 100.0,
"type": "float"
},
"bool": {
"raw_value": true,
"type": "bool"
},
"none": {
"raw_value": null,
"type": null
}
}))
),
Expand Down
4 changes: 2 additions & 2 deletions libraries/pipes/implementations/rust/src/params_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ const DAGSTER_PIPES_CONTEXT_ENV_VAR: &str = "DAGSTER_PIPES_CONTEXT";
const DAGSTER_PIPES_MESSAGES_ENV_VAR: &str = "DAGSTER_PIPES_MESSAGES";

/// Load params passed from the orchestration process by the context injector and
/// message reader. These params are used to respectively bootstrap
/// [`LoadContext`](crate::LoadContext) and [`PipesMessageWriter`].
/// message reader. These params are used to respectively bootstrap implementations of
/// [`LoadContext`](crate::LoadContext) and [`PipesMessageWriter`](crate::MessageWriter).
pub trait LoadParams {
/// Whether or not this process has been provided with provided with information
/// to create a `PipesContext` or should instead return a mock.
Expand Down
Loading

0 comments on commit d5feab5

Please sign in to comment.