From 50da9db7544c63fc36d18cefe5b34d1fcb0e17d0 Mon Sep 17 00:00:00 2001 From: Colton Padden Date: Wed, 18 Dec 2024 10:47:12 -0500 Subject: [PATCH 1/3] [dagster-pipes-rust] rm dot prefix for paths filter --- .github/workflows/libraries-pipes-rust.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/libraries-pipes-rust.yml b/.github/workflows/libraries-pipes-rust.yml index 4202fab..e17a185 100644 --- a/.github/workflows/libraries-pipes-rust.yml +++ b/.github/workflows/libraries-pipes-rust.yml @@ -4,7 +4,7 @@ on: pull_request: types: [opened, synchronize, reopened, closed] paths: - - "./libraries/pipes/implementations/rust/**" + - "libraries/pipes/implementations/rust/**" - ".github/workflows/libraries-pipes-rust.yml" defaults: @@ -45,7 +45,7 @@ jobs: - name: "Doctests" run: cargo test --doc - + integration: name: "integration" runs-on: ubuntu-latest From 27e7e6d55fec2271f24f6c5c90d8ca5a87278f7a Mon Sep 17 00:00:00 2001 From: colton Date: Wed, 18 Dec 2024 10:49:30 -0500 Subject: [PATCH 2/3] [dagster-pipes-rust] update readme on how to generate quicktypes (#62) --- libraries/pipes/implementations/rust/README.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/libraries/pipes/implementations/rust/README.md b/libraries/pipes/implementations/rust/README.md index cc56a16..7dae77c 100644 --- a/libraries/pipes/implementations/rust/README.md +++ b/libraries/pipes/implementations/rust/README.md @@ -1,5 +1,7 @@ # dagster-pipes-rust +A pipes implementation for the [Rust](https://www.rust-lang.org/) programming language. + Get full observability into your Rust workloads when orchestrating through Dagster. With this light weight interface, you can retrieve data directly from the Dagster context, report asset materializations, report asset checks, provide structured logging, end more. [![Crates.io](https://img.shields.io/crates/v/dagster_pipes_rust.svg)](https://crates.io/crates/dagster_pipes_rust) @@ -76,5 +78,6 @@ We use [jsonschema](https://json-schema.org/) to define the pipes protocol and [ To generate the Rust structs, make sure to install quicktype with `npm install -g quicktype`. Then run: ```bash -./quicktype.sh +cd community-integrations/pipes +make jsonschema_rust ``` From c586318dd5b364910905d04f984cffe7a6541583 Mon Sep 17 00:00:00 2001 From: Marijn Valk Date: Thu, 19 Dec 2024 10:54:35 +0100 Subject: [PATCH 3/3] [dagster-pipes-rust]: make pipes version a constant (#59) * make pipes version a constant * add changelog * use PipesMessage::new in opening the context * use PipesMessage::new in the tests --- .../pipes/implementations/rust/CHANGELOG.md | 1 + .../pipes/implementations/rust/src/lib.rs | 23 ++++++++----------- .../implementations/rust/src/types_ext.rs | 9 +++----- .../rust/src/writer/message_writer.rs | 9 +++----- 4 files changed, 17 insertions(+), 25 deletions(-) diff --git a/libraries/pipes/implementations/rust/CHANGELOG.md b/libraries/pipes/implementations/rust/CHANGELOG.md index 5a640a1..6780237 100644 --- a/libraries/pipes/implementations/rust/CHANGELOG.md +++ b/libraries/pipes/implementations/rust/CHANGELOG.md @@ -4,6 +4,7 @@ ### Added +- (pull/59) Moved dagster pipes version into a constant - (pull/57) Simplify user-facing API for hashmap metadata - (pull/54) Handled errors for `MessageWriter` implementations - (pull/14) Derived `PartialEq` for all types generated by `quicktype` diff --git a/libraries/pipes/implementations/rust/src/lib.rs b/libraries/pipes/implementations/rust/src/lib.rs index eb0eda8..47746c0 100644 --- a/libraries/pipes/implementations/rust/src/lib.rs +++ b/libraries/pipes/implementations/rust/src/lib.rs @@ -25,6 +25,8 @@ pub use crate::writer::message_writer::{DefaultWriter, MessageWriter}; pub use crate::writer::message_writer_channel::{DefaultChannel, FileChannel}; use crate::writer::message_writer_channel::{MessageWriteError, MessageWriterChannel}; +const DAGSTER_PIPES_VERSION: &str = "0.1"; + #[derive(Serialize)] #[serde(rename_all = "UPPERCASE")] pub enum AssetCheckSeverity { @@ -63,11 +65,7 @@ where ) -> Result { let mut message_channel = message_writer.open(message_params); let opened_payload = get_opened_payload(message_writer); - let opened_message = PipesMessage { - dagster_pipes_version: "0.1".to_string(), // TODO: Convert to `const` - method: Method::Opened, - params: Some(opened_payload), - }; + let opened_message = PipesMessage::new(Method::Opened, Some(opened_payload)); message_channel.write_message(opened_message)?; Ok(Self { @@ -273,13 +271,12 @@ mod tests { assert_eq!( serde_json::from_str::(&fs::read_to_string(file.path()).unwrap()) .unwrap(), - PipesMessage { - dagster_pipes_version: "0.1".to_string(), - method: Method::ReportAssetMaterialization, - params: Some(HashMap::from([ - ("asset_key".to_string(), Some(json!("asset1"))), + PipesMessage::new( + Method::ReportAssetMaterialization, + Some(HashMap::from([ + ("asset_key", Some(json!("asset1"))), ( - "metadata".to_string(), + "metadata", Some(json!({ "text": { "raw_value": "hello", @@ -343,9 +340,9 @@ mod tests { } })) ), - ("data_version".to_string(), None), + ("data_version", None), ])), - } + ) ); } } diff --git a/libraries/pipes/implementations/rust/src/types_ext.rs b/libraries/pipes/implementations/rust/src/types_ext.rs index 368a826..a1d31ae 100644 --- a/libraries/pipes/implementations/rust/src/types_ext.rs +++ b/libraries/pipes/implementations/rust/src/types_ext.rs @@ -1,14 +1,11 @@ //! Module containing extension implementations for the types auto-generated by `quicktype` in `types.rs`. //! The implementations are included here because: //! 1. `quicktype` does not support these additional traits -//! 2. Manual changes made in `types.rs` will be overwritten by future calls to `quicktype.sh`. +//! 2. Manual changes made in `types.rs` will be overwritten by future calls to `make jsonschema`. #![allow(clippy::derivable_impls)] use std::collections::HashMap; -use crate::{ - types::{RawValue, Type}, - Method, PipesContextData, PipesMessage, PipesMetadataValue, -}; +use crate::{Method, PipesContextData, PipesMessage, DAGSTER_PIPES_VERSION}; impl Default for PipesContextData { fn default() -> Self { @@ -30,7 +27,7 @@ impl Default for PipesContextData { impl PipesMessage { pub fn new(method: Method, params: Option>>) -> Self { Self { - dagster_pipes_version: "0.1".to_string(), // TODO: Make `const` + dagster_pipes_version: DAGSTER_PIPES_VERSION.to_string(), method, params: params.map(|hashmap| { hashmap diff --git a/libraries/pipes/implementations/rust/src/writer/message_writer.rs b/libraries/pipes/implementations/rust/src/writer/message_writer.rs index d1094e8..2f2cf72 100644 --- a/libraries/pipes/implementations/rust/src/writer/message_writer.rs +++ b/libraries/pipes/implementations/rust/src/writer/message_writer.rs @@ -54,12 +54,9 @@ pub trait MessageWriter { /// /// MyMessageWriter(42).get_opened_payload(private::Token); // use of undeclared crate or module `private` /// ``` - fn get_opened_payload(&self, _: private::Token) -> HashMap> { + fn get_opened_payload(&self, _: private::Token) -> HashMap<&str, Option> { let mut extras = HashMap::new(); - extras.insert( - "extras".to_string(), - Some(Value::Object(self.get_opened_extras())), - ); + extras.insert("extras", Some(Value::Object(self.get_opened_extras()))); extras } @@ -71,7 +68,7 @@ pub trait MessageWriter { } /// Public accessor to the sealed method -pub fn get_opened_payload(writer: &impl MessageWriter) -> HashMap> { +pub fn get_opened_payload(writer: &impl MessageWriter) -> HashMap<&str, Option> { writer.get_opened_payload(private::Token) }