Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-pipes-rust] 🔥 Simplify PipesMetadataValue construction #61

Merged
merged 9 commits into from
Dec 19, 2024
1 change: 1 addition & 0 deletions libraries/pipes/implementations/rust/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Added

- (pull/61) Simplify construction of `PipesMetadataValue`
- (pull/57) Simplify user-facing API for hashmap metadata
- (pull/54) Handled errors for `MessageWriter` implementations
- (pull/14) Derived `PartialEq` for all types generated by `quicktype`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
use dagster_pipes_rust::types::{PipesMetadataValue, RawValue, Type};
use dagster_pipes_rust::{open_dagster_pipes, AssetCheckSeverity, DagsterPipesError};
use dagster_pipes_rust::{
open_dagster_pipes, AssetCheckSeverity, DagsterPipesError, PipesMetadataValue,
};

use std::collections::HashMap;

fn main() -> Result<(), DagsterPipesError> {
let mut context = open_dagster_pipes()?;

let asset_metadata = HashMap::from([(
"row_count",
PipesMetadataValue::new(RawValue::Integer(100), Type::Int),
)]);
let asset_metadata = HashMap::from([("row_count", PipesMetadataValue::from(100))]);
context.report_asset_materialization("example_rust_subprocess_asset", asset_metadata)?;

let check_metadata = HashMap::from([(
"quality",
PipesMetadataValue::new(RawValue::Integer(100), Type::Int),
)]);
let check_metadata = HashMap::from([("quality", PipesMetadataValue::from(100))]);
context.report_asset_check(
"example_rust_subprocess_check",
true,
Expand Down
152 changes: 42 additions & 110 deletions libraries/pipes/implementations/rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod context_loader;
mod params_loader;
pub mod types;
mod types;
mod types_ext;
mod writer;

Expand All @@ -18,11 +18,11 @@ use crate::context_loader::PayloadErrorKind;
use crate::params_loader::EnvVarLoader as PipesEnvVarParamsLoader;
pub use crate::params_loader::LoadParams;
use crate::params_loader::ParamsError;
pub use crate::types::{Method, PipesContextData, PipesMessage, PipesMetadataValue};
pub use crate::types::PipesMetadataValue;
use crate::types::{Method, PipesContextData, PipesMessage};
use crate::writer::message_writer::get_opened_payload;
use crate::writer::message_writer::DefaultWriter as PipesDefaultMessageWriter;
pub use crate::writer::message_writer::{DefaultWriter, MessageWriter};
pub use crate::writer::message_writer_channel::{DefaultChannel, FileChannel};
use crate::writer::message_writer_channel::{MessageWriteError, MessageWriterChannel};

#[derive(Serialize)]
Expand All @@ -32,15 +32,6 @@ pub enum AssetCheckSeverity {
Error,
}

impl PipesMetadataValue {
pub fn new(raw_value: types::RawValue, pipes_metadata_value_type: types::Type) -> Self {
Self {
raw_value: Some(raw_value),
pipes_metadata_value_type: Some(pipes_metadata_value_type),
}
}
}

// partial translation of
// https://github.com/dagster-io/dagster/blob/258d9ca0db/python_modules/dagster-pipes/dagster_pipes/__init__.py#L859-L871
#[derive(Debug)]
Expand Down Expand Up @@ -150,111 +141,52 @@ mod tests {
use std::collections::HashMap;
use std::fs;
use tempfile::NamedTempFile;
use writer::message_writer_channel::{DefaultChannel, FileChannel};

use super::*;

#[test]
fn test_write_pipes_metadata() {
let asset_metadata = HashMap::from([
(
"text",
PipesMetadataValue::new(
types::RawValue::String("hello".to_string()),
types::Type::Text,
),
),
("int", PipesMetadataValue::from(100)),
("float", PipesMetadataValue::from(100.0)),
("bool", PipesMetadataValue::from(true)),
("none", PipesMetadataValue::none()),
("timestamp", PipesMetadataValue::from_timestamp(1000.0)),
("text", PipesMetadataValue::from("hello".to_string())),
(
"url",
PipesMetadataValue::new(
types::RawValue::String("http://someurl.com".to_string()),
types::Type::Url,
),
PipesMetadataValue::from_url("http://someurl.com".to_string()),
),
(
"path",
PipesMetadataValue::new(
types::RawValue::String("file://some/path".to_string()),
types::Type::Path,
),
PipesMetadataValue::from_path("file://some/path".to_string()),
),
(
"notebook",
PipesMetadataValue::new(
types::RawValue::String("notebook".to_string()),
types::Type::Notebook,
),
PipesMetadataValue::from_notebook("notebook".to_string()),
),
(
"json_object",
PipesMetadataValue::new(
types::RawValue::AnythingMap(HashMap::from([(
"key".to_string(),
Some(json!("value")),
)])),
types::Type::Json,
),
PipesMetadataValue::from(HashMap::from([(
"key".to_string(),
Some(json!("value")),
)])),
),
(
"json_array",
PipesMetadataValue::new(
types::RawValue::AnythingArray(vec![Some(json!({"key": "value"}))]),
types::Type::Json,
),
),
(
"md",
PipesMetadataValue::new(
types::RawValue::String("## markdown".to_string()),
types::Type::Md,
),
PipesMetadataValue::from(vec![Some(json!({"key": "value"}))]),
),
("md", PipesMetadataValue::from_md("## markdown".to_string())),
(
"dagster_run",
PipesMetadataValue::new(
types::RawValue::String("1234".to_string()),
types::Type::DagsterRun,
),
PipesMetadataValue::from_dagster_run("1234".to_string()),
),
(
"asset",
PipesMetadataValue::new(
types::RawValue::String("some_asset".to_string()),
types::Type::Asset,
),
),
(
"job",
PipesMetadataValue::new(
types::RawValue::String("some_job".to_string()),
types::Type::Job,
),
),
(
"timestamp",
PipesMetadataValue::new(
types::RawValue::String("2012-04-23T18:25:43.511Z".to_string()),
types::Type::Timestamp,
),
),
(
"int",
PipesMetadataValue::new(types::RawValue::Integer(100), types::Type::Int),
),
(
"float",
PipesMetadataValue::new(types::RawValue::Double(100.0), types::Type::Float),
),
(
"bool",
PipesMetadataValue::new(types::RawValue::Bool(true), types::Type::Bool),
),
(
"none",
PipesMetadataValue {
raw_value: None,
pipes_metadata_value_type: None,
},
PipesMetadataValue::from_asset("some_asset".to_string()),
),
("job", PipesMetadataValue::from_job("some_job".to_string())),
]);

let file = NamedTempFile::new().unwrap();
Expand All @@ -281,6 +213,26 @@ mod tests {
(
"metadata".to_string(),
Some(json!({
"int": {
"raw_value": 100,
"type": "int"
},
"float": {
"raw_value": 100.0,
"type": "float"
},
"bool": {
"raw_value": true,
"type": "bool"
},
"none": {
"raw_value": null,
"type": null
},
"timestamp": {
"raw_value": 1000.0,
marijncv marked this conversation as resolved.
Show resolved Hide resolved
"type": "timestamp"
},
"text": {
"raw_value": "hello",
"type": "text"
Expand Down Expand Up @@ -320,26 +272,6 @@ mod tests {
"job": {
"raw_value": "some_job",
"type": "job"
},
"timestamp": {
"raw_value": "2012-04-23T18:25:43.511Z",
"type": "timestamp"
},
"int": {
"raw_value": 100,
"type": "int"
},
"float": {
"raw_value": 100.0,
"type": "float"
},
"bool": {
"raw_value": true,
"type": "bool"
},
"none": {
"raw_value": null,
"type": null
}
}))
),
Expand Down
130 changes: 130 additions & 0 deletions libraries/pipes/implementations/rust/src/types_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,133 @@ impl PipesMessage {
}
}
}

/////////////////////////////////////////////////
///// From<T> traits for PipesMetadataValue /////
/////////////////////////////////////////////////

impl From<i64> for PipesMetadataValue {
fn from(value: i64) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::Integer(value)),
pipes_metadata_value_type: Some(Type::Int),
}
}
}

impl From<f64> for PipesMetadataValue {
fn from(value: f64) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::Double(value)),
pipes_metadata_value_type: Some(Type::Float),
}
}
}

impl From<bool> for PipesMetadataValue {
fn from(value: bool) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::Bool(value)),
pipes_metadata_value_type: Some(Type::Bool),
}
}
}

impl From<String> for PipesMetadataValue {
fn from(value: String) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::String(value)),
pipes_metadata_value_type: Some(Type::Text),
}
}
}

impl From<HashMap<String, Option<serde_json::Value>>> for PipesMetadataValue {
fn from(value: HashMap<String, Option<serde_json::Value>>) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::AnythingMap(value)),
pipes_metadata_value_type: Some(Type::Json),
}
}
}

impl From<Vec<Option<serde_json::Value>>> for PipesMetadataValue {
fn from(value: Vec<Option<serde_json::Value>>) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::AnythingArray(value)),
pipes_metadata_value_type: Some(Type::Json),
}
}
}

impl PipesMetadataValue {
pub fn new(raw_value: RawValue, pipes_metadata_value_type: Type) -> Self {
Self {
raw_value: Some(raw_value),
pipes_metadata_value_type: Some(pipes_metadata_value_type),
}
}

pub fn from_url(url: String) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::String(url)),
pipes_metadata_value_type: Some(Type::Url),
}
}

pub fn from_path(path: String) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::String(path)),
pipes_metadata_value_type: Some(Type::Path),
}
}

pub fn from_notebook(notebook: String) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::String(notebook)),
pipes_metadata_value_type: Some(Type::Notebook),
}
}

pub fn from_md(md: String) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::String(md)),
pipes_metadata_value_type: Some(Type::Md),
}
}

pub fn from_timestamp(timestamp: f64) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::Double(timestamp)),
pipes_metadata_value_type: Some(Type::Timestamp),
}
}

pub fn from_asset(asset: String) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::String(asset)),
pipes_metadata_value_type: Some(Type::Asset),
}
}

pub fn from_job(job: String) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::String(job)),
pipes_metadata_value_type: Some(Type::Job),
}
}

pub fn from_dagster_run(dagster_run: String) -> Self {
PipesMetadataValue {
raw_value: Some(RawValue::String(dagster_run)),
pipes_metadata_value_type: Some(Type::DagsterRun),
}
}

pub fn none() -> Self {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this method name? Should it be none, from_none, or null?

PipesMetadataValue {
raw_value: None,
pipes_metadata_value_type: None, // TODO: Should this be `Some(Type::Null)`?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And should this be None, or Some(Type::Null)? Not sure what's the difference with either option though.

Copy link
Collaborator

@marijncv marijncv Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go with:

    pub fn null() -> Self {
        PipesMetadataValue {
            raw_value: None,
            pipes_metadata_value_type: Some(Type::Null),

Function name: null is similar to the python implementation here. Some(Type::Null) follows the convention used by the other types in the rust implementation

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Will merge in an hour, if everything's still looks good.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

}
}
}
Loading