diff --git a/.github/workflows/libraries-pipes-rust.yml b/.github/workflows/libraries-pipes-rust.yml index 4202fab..e17a185 100644 --- a/.github/workflows/libraries-pipes-rust.yml +++ b/.github/workflows/libraries-pipes-rust.yml @@ -4,7 +4,7 @@ on: pull_request: types: [opened, synchronize, reopened, closed] paths: - - "./libraries/pipes/implementations/rust/**" + - "libraries/pipes/implementations/rust/**" - ".github/workflows/libraries-pipes-rust.yml" defaults: @@ -45,7 +45,7 @@ jobs: - name: "Doctests" run: cargo test --doc - + integration: name: "integration" runs-on: ubuntu-latest diff --git a/libraries/pipes/implementations/rust/CHANGELOG.md b/libraries/pipes/implementations/rust/CHANGELOG.md index db745ee..134f076 100644 --- a/libraries/pipes/implementations/rust/CHANGELOG.md +++ b/libraries/pipes/implementations/rust/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added - (pull/60) Added `AssetCheckSeverity` to the jsonschema definitions +- (pull/59) Moved dagster pipes version into a constant - (pull/57) Simplify user-facing API for hashmap metadata - (pull/54) Handled errors for `MessageWriter` implementations - (pull/14) Derived `PartialEq` for all types generated by `quicktype` diff --git a/libraries/pipes/implementations/rust/README.md b/libraries/pipes/implementations/rust/README.md index cc56a16..7dae77c 100644 --- a/libraries/pipes/implementations/rust/README.md +++ b/libraries/pipes/implementations/rust/README.md @@ -1,5 +1,7 @@ # dagster-pipes-rust +A pipes implementation for the [Rust](https://www.rust-lang.org/) programming language. + Get full observability into your Rust workloads when orchestrating through Dagster. With this light weight interface, you can retrieve data directly from the Dagster context, report asset materializations, report asset checks, provide structured logging, end more. [![Crates.io](https://img.shields.io/crates/v/dagster_pipes_rust.svg)](https://crates.io/crates/dagster_pipes_rust) @@ -76,5 +78,6 @@ We use [jsonschema](https://json-schema.org/) to define the pipes protocol and [ To generate the Rust structs, make sure to install quicktype with `npm install -g quicktype`. Then run: ```bash -./quicktype.sh +cd community-integrations/pipes +make jsonschema_rust ``` diff --git a/libraries/pipes/implementations/rust/src/lib.rs b/libraries/pipes/implementations/rust/src/lib.rs index 53b061b..3969d17 100644 --- a/libraries/pipes/implementations/rust/src/lib.rs +++ b/libraries/pipes/implementations/rust/src/lib.rs @@ -17,13 +17,17 @@ use crate::context_loader::PayloadErrorKind; use crate::params_loader::EnvVarLoader as PipesEnvVarParamsLoader; pub use crate::params_loader::LoadParams; use crate::params_loader::ParamsError; -pub use crate::types::{Method, PipesContextData, PipesMessage, PipesMetadataValue, AssetCheckSeverity}; +pub use crate::types::{ + AssetCheckSeverity, Method, PipesContextData, PipesMessage, PipesMetadataValue, +}; use crate::writer::message_writer::get_opened_payload; use crate::writer::message_writer::DefaultWriter as PipesDefaultMessageWriter; pub use crate::writer::message_writer::{DefaultWriter, MessageWriter}; pub use crate::writer::message_writer_channel::{DefaultChannel, FileChannel}; use crate::writer::message_writer_channel::{MessageWriteError, MessageWriterChannel}; +const DAGSTER_PIPES_VERSION: &str = "0.1"; + impl PipesMetadataValue { pub fn new(raw_value: types::RawValue, pipes_metadata_value_type: types::Type) -> Self { Self { @@ -55,11 +59,7 @@ where ) -> Result { let mut message_channel = message_writer.open(message_params); let opened_payload = get_opened_payload(message_writer); - let opened_message = PipesMessage { - dagster_pipes_version: "0.1".to_string(), // TODO: Convert to `const` - method: Method::Opened, - params: Some(opened_payload), - }; + let opened_message = PipesMessage::new(Method::Opened, Some(opened_payload)); message_channel.write_message(opened_message)?; Ok(Self { @@ -265,13 +265,12 @@ mod tests { assert_eq!( serde_json::from_str::(&fs::read_to_string(file.path()).unwrap()) .unwrap(), - PipesMessage { - dagster_pipes_version: "0.1".to_string(), - method: Method::ReportAssetMaterialization, - params: Some(HashMap::from([ - ("asset_key".to_string(), Some(json!("asset1"))), + PipesMessage::new( + Method::ReportAssetMaterialization, + Some(HashMap::from([ + ("asset_key", Some(json!("asset1"))), ( - "metadata".to_string(), + "metadata", Some(json!({ "text": { "raw_value": "hello", @@ -335,9 +334,9 @@ mod tests { } })) ), - ("data_version".to_string(), None), + ("data_version", None), ])), - } + ) ); } } diff --git a/libraries/pipes/implementations/rust/src/types_ext.rs b/libraries/pipes/implementations/rust/src/types_ext.rs index 368a826..a1d31ae 100644 --- a/libraries/pipes/implementations/rust/src/types_ext.rs +++ b/libraries/pipes/implementations/rust/src/types_ext.rs @@ -1,14 +1,11 @@ //! Module containing extension implementations for the types auto-generated by `quicktype` in `types.rs`. //! The implementations are included here because: //! 1. `quicktype` does not support these additional traits -//! 2. Manual changes made in `types.rs` will be overwritten by future calls to `quicktype.sh`. +//! 2. Manual changes made in `types.rs` will be overwritten by future calls to `make jsonschema`. #![allow(clippy::derivable_impls)] use std::collections::HashMap; -use crate::{ - types::{RawValue, Type}, - Method, PipesContextData, PipesMessage, PipesMetadataValue, -}; +use crate::{Method, PipesContextData, PipesMessage, DAGSTER_PIPES_VERSION}; impl Default for PipesContextData { fn default() -> Self { @@ -30,7 +27,7 @@ impl Default for PipesContextData { impl PipesMessage { pub fn new(method: Method, params: Option>>) -> Self { Self { - dagster_pipes_version: "0.1".to_string(), // TODO: Make `const` + dagster_pipes_version: DAGSTER_PIPES_VERSION.to_string(), method, params: params.map(|hashmap| { hashmap diff --git a/libraries/pipes/implementations/rust/src/writer/message_writer.rs b/libraries/pipes/implementations/rust/src/writer/message_writer.rs index d1094e8..2f2cf72 100644 --- a/libraries/pipes/implementations/rust/src/writer/message_writer.rs +++ b/libraries/pipes/implementations/rust/src/writer/message_writer.rs @@ -54,12 +54,9 @@ pub trait MessageWriter { /// /// MyMessageWriter(42).get_opened_payload(private::Token); // use of undeclared crate or module `private` /// ``` - fn get_opened_payload(&self, _: private::Token) -> HashMap> { + fn get_opened_payload(&self, _: private::Token) -> HashMap<&str, Option> { let mut extras = HashMap::new(); - extras.insert( - "extras".to_string(), - Some(Value::Object(self.get_opened_extras())), - ); + extras.insert("extras", Some(Value::Object(self.get_opened_extras()))); extras } @@ -71,7 +68,7 @@ pub trait MessageWriter { } /// Public accessor to the sealed method -pub fn get_opened_payload(writer: &impl MessageWriter) -> HashMap> { +pub fn get_opened_payload(writer: &impl MessageWriter) -> HashMap<&str, Option> { writer.get_opened_payload(private::Token) }