Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
marijncv committed Dec 19, 2024
2 parents 6380580 + c586318 commit 7856f47
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 29 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/libraries-pipes-rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
pull_request:
types: [opened, synchronize, reopened, closed]
paths:
- "./libraries/pipes/implementations/rust/**"
- "libraries/pipes/implementations/rust/**"
- ".github/workflows/libraries-pipes-rust.yml"

defaults:
Expand Down Expand Up @@ -45,7 +45,7 @@ jobs:

- name: "Doctests"
run: cargo test --doc

integration:
name: "integration"
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions libraries/pipes/implementations/rust/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Added

- (pull/60) Added `AssetCheckSeverity` to the jsonschema definitions
- (pull/59) Moved dagster pipes version into a constant
- (pull/57) Simplify user-facing API for hashmap metadata
- (pull/54) Handled errors for `MessageWriter` implementations
- (pull/14) Derived `PartialEq` for all types generated by `quicktype`
Expand Down
5 changes: 4 additions & 1 deletion libraries/pipes/implementations/rust/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# dagster-pipes-rust

A pipes implementation for the [Rust](https://www.rust-lang.org/) programming language.

Get full observability into your Rust workloads when orchestrating through Dagster. With this light weight interface, you can retrieve data directly from the Dagster context, report asset materializations, report asset checks, provide structured logging, end more.

[![Crates.io](https://img.shields.io/crates/v/dagster_pipes_rust.svg)](https://crates.io/crates/dagster_pipes_rust)
Expand Down Expand Up @@ -76,5 +78,6 @@ We use [jsonschema](https://json-schema.org/) to define the pipes protocol and [
To generate the Rust structs, make sure to install quicktype with `npm install -g quicktype`. Then run:

```bash
./quicktype.sh
cd community-integrations/pipes
make jsonschema_rust
```
27 changes: 13 additions & 14 deletions libraries/pipes/implementations/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ use crate::context_loader::PayloadErrorKind;
use crate::params_loader::EnvVarLoader as PipesEnvVarParamsLoader;
pub use crate::params_loader::LoadParams;
use crate::params_loader::ParamsError;
pub use crate::types::{Method, PipesContextData, PipesMessage, PipesMetadataValue, AssetCheckSeverity};
pub use crate::types::{
AssetCheckSeverity, Method, PipesContextData, PipesMessage, PipesMetadataValue,
};
use crate::writer::message_writer::get_opened_payload;
use crate::writer::message_writer::DefaultWriter as PipesDefaultMessageWriter;
pub use crate::writer::message_writer::{DefaultWriter, MessageWriter};
pub use crate::writer::message_writer_channel::{DefaultChannel, FileChannel};
use crate::writer::message_writer_channel::{MessageWriteError, MessageWriterChannel};

const DAGSTER_PIPES_VERSION: &str = "0.1";

impl PipesMetadataValue {
pub fn new(raw_value: types::RawValue, pipes_metadata_value_type: types::Type) -> Self {
Self {
Expand Down Expand Up @@ -55,11 +59,7 @@ where
) -> Result<Self, MessageWriteError> {
let mut message_channel = message_writer.open(message_params);
let opened_payload = get_opened_payload(message_writer);
let opened_message = PipesMessage {
dagster_pipes_version: "0.1".to_string(), // TODO: Convert to `const`
method: Method::Opened,
params: Some(opened_payload),
};
let opened_message = PipesMessage::new(Method::Opened, Some(opened_payload));
message_channel.write_message(opened_message)?;

Ok(Self {
Expand Down Expand Up @@ -265,13 +265,12 @@ mod tests {
assert_eq!(
serde_json::from_str::<PipesMessage>(&fs::read_to_string(file.path()).unwrap())
.unwrap(),
PipesMessage {
dagster_pipes_version: "0.1".to_string(),
method: Method::ReportAssetMaterialization,
params: Some(HashMap::from([
("asset_key".to_string(), Some(json!("asset1"))),
PipesMessage::new(
Method::ReportAssetMaterialization,
Some(HashMap::from([
("asset_key", Some(json!("asset1"))),
(
"metadata".to_string(),
"metadata",
Some(json!({
"text": {
"raw_value": "hello",
Expand Down Expand Up @@ -335,9 +334,9 @@ mod tests {
}
}))
),
("data_version".to_string(), None),
("data_version", None),
])),
}
)
);
}
}
9 changes: 3 additions & 6 deletions libraries/pipes/implementations/rust/src/types_ext.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
//! Module containing extension implementations for the types auto-generated by `quicktype` in `types.rs`.
//! The implementations are included here because:
//! 1. `quicktype` does not support these additional traits
//! 2. Manual changes made in `types.rs` will be overwritten by future calls to `quicktype.sh`.
//! 2. Manual changes made in `types.rs` will be overwritten by future calls to `make jsonschema`.
#![allow(clippy::derivable_impls)]
use std::collections::HashMap;

use crate::{
types::{RawValue, Type},
Method, PipesContextData, PipesMessage, PipesMetadataValue,
};
use crate::{Method, PipesContextData, PipesMessage, DAGSTER_PIPES_VERSION};

impl Default for PipesContextData {
fn default() -> Self {
Expand All @@ -30,7 +27,7 @@ impl Default for PipesContextData {
impl PipesMessage {
pub fn new(method: Method, params: Option<HashMap<&str, Option<serde_json::Value>>>) -> Self {
Self {
dagster_pipes_version: "0.1".to_string(), // TODO: Make `const`
dagster_pipes_version: DAGSTER_PIPES_VERSION.to_string(),
method,
params: params.map(|hashmap| {
hashmap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,9 @@ pub trait MessageWriter {
///
/// MyMessageWriter(42).get_opened_payload(private::Token); // use of undeclared crate or module `private`
/// ```
fn get_opened_payload(&self, _: private::Token) -> HashMap<String, Option<Value>> {
fn get_opened_payload(&self, _: private::Token) -> HashMap<&str, Option<Value>> {
let mut extras = HashMap::new();
extras.insert(
"extras".to_string(),
Some(Value::Object(self.get_opened_extras())),
);
extras.insert("extras", Some(Value::Object(self.get_opened_extras())));
extras
}

Expand All @@ -71,7 +68,7 @@ pub trait MessageWriter {
}

/// Public accessor to the sealed method
pub fn get_opened_payload(writer: &impl MessageWriter) -> HashMap<String, Option<Value>> {
pub fn get_opened_payload(writer: &impl MessageWriter) -> HashMap<&str, Option<Value>> {
writer.get_opened_payload(private::Token)
}

Expand Down

0 comments on commit 7856f47

Please sign in to comment.