-
Notifications
You must be signed in to change notification settings - Fork 22
Source
In Zenoh-Flow, the Source
node was designed to represent interactions with the outside world: for instance, a sensor sending the temperature and the humidity is measured.
For Zenoh-Flow to be able to load our Source, it must be accompanied by a descriptor.
The content of the descriptor is as follow:
-
(optional) a
description
, - (optional) some configuration,
- (optional) some vars,
- its
outputs
--- i.e. the data it will produce, - a
library
--- i.e. where to find its actual implementation.
Below is a valid descriptor that matches the code we are going to write next:
description: my implementation of a source
# This configuration is not used and serves as an example.
configuration:
value: not-used
# This vars section is not used and serves as an example.
vars:
FOO: not-used
outputs:
- output
# Linux:
library: file:///absolute/path/to/the/implementation/libmy_source.so
# MacOS:
# library: file:///absolute/path/to/the/implementation/libmy_source.dylib
Assuming you want to create a Source called my-source
, enter the following in a terminal:
cargo new --lib my-source
Modify the Cargo.toml
to add these dependencies and tell rustc
that you want a library that can be dynamically loaded:
[dependencies]
async-trait = "0.1.50" # Zenoh-Flow’s nodes traits are asynchronous
zenoh-flow-nodes = { git = "https://github.com/eclipse-zenoh-flow/zenoh-flow.git" }
[lib]
crate-type=["cdylib"]
Now modify lib.rs
to (i) implement the Zenoh-Flow traits and (ii) include your logic.
Below you can find commented boilerplate code to do (i).
use async_trait::async_trait;
use zenoh_flow_nodes::prelude::*;
// MySource is where you implement your business' logic. `Output` is a structure provided by
// Zenoh-Flow through which you send `Data` to the next node in your data flow.
//
// The way to pass an `Output` is through its Constructor --- see below.
//
// That structure is the place where a state can be saved. For concurrency reasons, the state must
// implement `Send` and `Sync` (`Arc` and `Mutex` structures can be helpful, in particular their
// `async_std` variant).
//
// The `export_source` macro is required to properly expose the symbol and information about the
// version of the Rust compiler and Zenoh-Flow, to Zenoh-Flow.
//
// It allows Zenoh-Flow to detect, at runtime, a version mismatch between the Zenoh-Flow daemon and
// the shared library (be it on the version of the Rust compiler or of Zenoh-Flow itself).
#[export_source]
struct MySource {
output: Output<String>,
}
#[async_trait]
impl Node for MySource {
async fn iteration(&self) -> Result<()> {
// Add your business logic here.
let data = "Hello, World!".to_string();
self.output.send(data, None).await
}
}
#[async_trait]
impl Source for MySource {
async fn new(
// The `context` provides information about the Zenoh-Flow daemon on which the generated
// node MySource will be executed.
context: Context,
// The `configuration`(1) is a re-export of `serde_json::Value`(2). It behaves as a
// dictionary and allows accessing configuration variables defined in the descriptor or in
// its parents.
configuration: Configuration,
// The `Outputs` structure was created by Zenoh-Flow. It is a HashMap whose keys match what
// was defined in the descriptor file.
mut outputs: Outputs,
) -> Result<Self> {
let output = outputs
.take("output")
.expect("No output named 'output' found")
// The method `typed` allows automatically serialising data if it goes to a
// node located on another process. It will call the provided closure.
.typed(|buffer, data: &String| todo!()),
Ok(MySource { output })
}
}
(1): Configuration (2): serde_json::Value
-
Descriptors
-
Node Implementation
-
Advanced