diff --git a/doc/release-notes/iceoryx2-unreleased.md b/doc/release-notes/iceoryx2-unreleased.md index f88531fc8..fa5080caa 100644 --- a/doc/release-notes/iceoryx2-unreleased.md +++ b/doc/release-notes/iceoryx2-unreleased.md @@ -24,6 +24,7 @@ * Support dynamic data with reallocation for publish-subscribe communication [#532](https://github.com/eclipse-iceoryx/iceoryx2/issues/532) * Add benchmark for iceoryx2 queues [#535](https://github.com/eclipse-iceoryx/iceoryx2/issues/535) * Add auto event mission for create, drop and dead notifiers [#550](https://github.com/eclipse-iceoryx/iceoryx2/issues/550) +* Introduce health monitoring example [#555](https://github.com/eclipse-iceoryx/iceoryx2/issues/555) ### Bugfixes diff --git a/examples/Cargo.toml b/examples/Cargo.toml index d53276c54..d7dd3ef87 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -63,6 +63,24 @@ path = "rust/event_based_communication/publisher.rs" name = "event_based_comm_subscriber" path = "rust/event_based_communication/subscriber.rs" +# event based communication + +[[example]] +name = "health_monitoring_publisher_1" +path = "rust/health_monitoring/publisher_1.rs" + +[[example]] +name = "health_monitoring_publisher_2" +path = "rust/health_monitoring/publisher_2.rs" + +[[example]] +name = "health_monitoring_subscriber" +path = "rust/health_monitoring/subscriber.rs" + +[[example]] +name = "health_monitoring_central_daemon" +path = "rust/health_monitoring/central_daemon.rs" + # publish_subscribe [[example]] diff --git a/examples/README.md b/examples/README.md index dd5e051bc..26291f249 100644 --- a/examples/README.md +++ b/examples/README.md @@ -79,7 +79,8 @@ These types are demonstrated in the complex data types example. | event | [C](c/event) [C++](cxx/event) [Rust](rust/event) | Push notifications - send event signals to wakeup processes that are waiting for them. | | event based communication | [C++](cxx/event_based_communication) [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. | | event multiplexing | [C](c/event_multiplexing) [C++](cxx/event_multiplexing) [Rust](rust/event_multiplexing) | Wait on multiple listeners or sockets with a single call. The WaitSet demultiplexes incoming events and notifies the user. | +| health monitoring | [Rust](rust/health_monitoring) | A central daemon creates the communication resources and monitors all nodes. When the central daemon crashes other nodes can take over and use the decentral API to monitor the nodes. | | publish subscribe | [C](c/publish_subscribe) [C++](cxx/publish_subscribe) [Rust](rust/publish_subscribe) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern). | | publish subscribe dynamic data | [C++](cxx/publish_subscribe_dynamic_data) [Rust](rust/publish_subscribe_dynamic_data) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern) and payload data that has a dynamic size. | | publish subscribe with user header | [C](c/publish_subscribe_with_user_header) [C++](cxx/publish_subscribe_with_user_header) [Rust](rust/publish_subscribe_with_user_header) | Add a user header to the payload (samples) to transfer additional information. | -| service attributes | [Rust](rust/service_attributes) | Creates a service with custom attributes that are available to every endpoint. If the attributes are not compatible the service will not open. | +| service attributes | [C++](cxx/service_attributes) [Rust](rust/service_attributes) | Creates a service with custom attributes that are available to every endpoint. If the attributes are not compatible the service will not open. | diff --git a/examples/rust/_examples_common/lib.rs b/examples/rust/_examples_common/lib.rs index b673ec350..fa558f181 100644 --- a/examples/rust/_examples_common/lib.rs +++ b/examples/rust/_examples_common/lib.rs @@ -17,3 +17,27 @@ mod transmission_data; pub use custom_header::CustomHeader; pub use pubsub_event::PubSubEvent; pub use transmission_data::TransmissionData; + +use iceoryx2::{ + prelude::*, + service::port_factory::{event, publish_subscribe}, +}; + +pub type ServiceTuple = ( + event::PortFactory, + publish_subscribe::PortFactory, +); + +pub fn open_service( + node: &Node, + service_name: &ServiceName, +) -> Result> { + let service_pubsub = node + .service_builder(service_name) + .publish_subscribe::() + .open()?; + + let service_event = node.service_builder(service_name).event().open()?; + + Ok((service_event, service_pubsub)) +} diff --git a/examples/rust/_examples_common/pubsub_event.rs b/examples/rust/_examples_common/pubsub_event.rs index bb6c64210..4352ae917 100644 --- a/examples/rust/_examples_common/pubsub_event.rs +++ b/examples/rust/_examples_common/pubsub_event.rs @@ -20,6 +20,7 @@ pub enum PubSubEvent { SentSample = 4, ReceivedSample = 5, SentHistory = 6, + ProcessDied = 7, Unknown, } @@ -39,6 +40,7 @@ impl From for PubSubEvent { 4 => PubSubEvent::SentSample, 5 => PubSubEvent::ReceivedSample, 6 => PubSubEvent::SentHistory, + 7 => PubSubEvent::ProcessDied, _ => PubSubEvent::Unknown, } } diff --git a/examples/rust/health_monitoring/BUILD.bazel b/examples/rust/health_monitoring/BUILD.bazel new file mode 100644 index 000000000..a0ed206db --- /dev/null +++ b/examples/rust/health_monitoring/BUILD.bazel @@ -0,0 +1,57 @@ +# Copyright (c) 2024 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache Software License 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +# which is available at https://opensource.org/licenses/MIT. +# +# SPDX-License-Identifier: Apache-2.0 OR MIT + +load("@rules_rust//rust:defs.bzl", "rust_binary") + +rust_binary( + name = "publisher_1", + srcs = [ + "publisher_1.rs", + ], + deps = [ + "//iceoryx2:iceoryx2", + "//examples/rust:examples-common", + ], +) + +rust_binary( + name = "publisher_2", + srcs = [ + "publisher_2.rs", + ], + deps = [ + "//iceoryx2:iceoryx2", + "//examples/rust:examples-common", + ], +) + +rust_binary( + name = "subscriber", + srcs = [ + "subscriber.rs", + ], + deps = [ + "//iceoryx2:iceoryx2", + "//examples/rust:examples-common", + ], +) + +rust_binary( + name = "central_daemon", + srcs = [ + "central_daemon.rs", + ], + deps = [ + "//iceoryx2:iceoryx2", + "//examples/rust:examples-common", + ], +) diff --git a/examples/rust/health_monitoring/README.md b/examples/rust/health_monitoring/README.md new file mode 100644 index 000000000..776958610 --- /dev/null +++ b/examples/rust/health_monitoring/README.md @@ -0,0 +1,120 @@ +# Health Monitoring + +This example demonstrates how to create a robust system using iceoryx2. +A central daemon pre-creates all communication resources to ensure that every +required resource, such as memory, is available as soon as the application +starts. +Additionally, the subscriber is immediately informed if one of the processes +it depends on has crashed. Even if the central daemon itself crashes, +communication can continue without any restrictions. Thanks to the +decentralized API of iceoryx2, the subscriber can take over the role of the +central daemon and continue monitoring all processes. + +The communication must also be reliable, and we expect publishers to provide +updates at regular intervals. If a publisher misses a deadline, we want to be +informed immediately. This situation can occur if the system is under heavy +load or if a process has crashed. + +This example is more advanced and consists of four components: + +* `central_daemon` - Must run first. It creates all communication resources and + monitors all nodes/processes. +* `publisher_1` - Sends data at a specific frequency on `service_1`. +* `publisher_2` - Sends data at a specific frequency on `service_2`. +* `subscriber` - Connects to `service_1` and `service_2` and expects new samples + within a specific time. If no sample arrives, it proactively checks for dead + nodes. + +```ascii ++----------------+ creates ........................... +| central_daemon | ----------> : communication resources : ++----------------+ ........................... + | ^ + | opens | + | +-----------------+--------------+ + | | | | + | +-------------+ +-------------+ +------------+ + | | publisher_1 | | publisher_2 | | subscriber | + | +-------------+ +-------------+ +------------+ + | ^ ^ ^ + | monitores | | | + +-------------+-------------------+-----------------+ +``` + +## Running The Example + +> [!CAUTION] +> Every payload you transmit with iceoryx2 must be compatible with shared +> memory. Specifically, it must: +> +> * be self contained, no heap, no pointers to external sources +> * have a uniform memory representation -> `#[repr(C)]` +> * not use pointers to manage their internal structure +> +> Data types like `String` or `Vec` will cause undefined behavior and may +> result in segmentation faults. We provide alternative data types that are +> compatible with shared memory. See the +> [complex data type example](../complex_data_types) for guidance on how to +> use them. + +For this example, you need to open five separate terminals. + +## Terminal 1: Central Daemon - Create All Communication Resources + +Run the central daemon, which sets up all communication resources and monitors +processes. + +```sh +cargo run --example health_monitoring_central_daemon +``` + +## Terminal 2: Publisher 1 + +Run the first publisher, which sends data on `service_1`. + +```sh +cargo run --example health_monitoring_publisher_1 +``` + +## Terminal 3: Publisher 2 + +Run the second publisher, which sends data on `service_2`. + +```sh +cargo run --example health_monitoring_publisher_2 +``` + +## Terminal 4: Subscriber + +Run the subscriber, which listens to both `service_1` and `service_2`. + +```sh +cargo run --example health_monitoring_subscriber +``` + +## Terminal 5: Simulate Process Crashes + +Send a `SIGKILL` signal to `publisher_1` to simulate a fatal crash. This +ensures that the process is unable to clean up any resources. + +```sh +killall -9 health_monitoring_publisher_1 +``` + +After running this command: + +1. The `central_daemon` will detect that the process has crashed and print: + ```ascii + detected dead node: Some(NodeName { value: "publisher 1" }) + ``` + The event service is configured to emit a `PubSub::ProcessDied` event when a + process is identified as dead. + +2. On the `subscriber` side, you will see the message: + ```ascii + ServiceName { value: "service_1" }: process died! + ``` + +3. Since `publisher_1` is no longer sending messages, the subscriber will also + regularly print another message indicating that `service_1` has violated + the contract because no new samples are being received. diff --git a/examples/rust/health_monitoring/central_daemon.rs b/examples/rust/health_monitoring/central_daemon.rs new file mode 100644 index 000000000..912559ae7 --- /dev/null +++ b/examples/rust/health_monitoring/central_daemon.rs @@ -0,0 +1,95 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +use core::time::Duration; +use examples_common::PubSubEvent; +use iceoryx2::{node::NodeView, prelude::*}; + +const CYCLE_TIME: Duration = Duration::from_millis(100); + +fn main() -> Result<(), Box> { + let service_name_1 = ServiceName::new("service_1")?; + let service_name_2 = ServiceName::new("service_2")?; + + let node = NodeBuilder::new() + .name(&"watchdog and resource creator".try_into()?) + .create::()?; + + // The central daemon is responsible to create all services before hand and the other processes + // just open the communication resources and start communicating. + let _service_pubsub_1 = node + .service_builder(&service_name_1) + .publish_subscribe::() + // We use here open_or_create so that, in case of a crash of the central daemon, it can + // be restarted. + .open_or_create()?; + + let _service_event_1 = node + .service_builder(&service_name_1) + .event() + // Whenever a new notifier is created the PublisherConnected event is emitted. this makes + // sense since in this example a notifier is always created after a new publisher was + // created. + // The task of the notifier/event is it to inform and wake up other processes when certain + // system event have happened. + .notifier_created_event(PubSubEvent::PublisherConnected.into()) + .notifier_dropped_event(PubSubEvent::PublisherDisconnected.into()) + // This event is emitted when either the central daemon or a decentralized process detects + // a dead node and cleaned up all of its stale resources succesfully. + .notifier_dead_event(PubSubEvent::ProcessDied.into()) + .open_or_create()?; + + let _service_pubsub_2 = node + .service_builder(&service_name_2) + .publish_subscribe::() + .open_or_create()?; + + let _service_event_2 = node + .service_builder(&service_name_2) + .event() + .notifier_created_event(PubSubEvent::PublisherConnected.into()) + .notifier_dropped_event(PubSubEvent::PublisherDisconnected.into()) + .notifier_dead_event(PubSubEvent::ProcessDied.into()) + .open_or_create()?; + + let waitset = WaitSetBuilder::new().create::()?; + let _cycle_guard = waitset.attach_interval(CYCLE_TIME); + + println!("Central daemon up and running."); + waitset.wait_and_process(|_| { + // The only task of our central daemon is it to monitor all running nodes and cleanup their + // resources if a process has died. + // + // Since we added the notifier_dead_event to the service, all listeners, that are waiting + // on a service where one participant has died, will be woken up and they receive + // the PubSubEvent::ProcessDied + find_and_cleanup_dead_nodes(); + CallbackProgression::Continue + })?; + + Ok(()) +} + +fn find_and_cleanup_dead_nodes() { + Node::::list(Config::global_config(), |node_state| { + if let NodeState::Dead(state) = node_state { + println!( + "detected dead node: {:?}", + state.details().as_ref().map(|v| v.name()) + ); + state.remove_stale_resources().expect(""); + } + + CallbackProgression::Continue + }) + .expect(""); +} diff --git a/examples/rust/health_monitoring/publisher_1.rs b/examples/rust/health_monitoring/publisher_1.rs new file mode 100644 index 000000000..7477e6de3 --- /dev/null +++ b/examples/rust/health_monitoring/publisher_1.rs @@ -0,0 +1,55 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +use core::time::Duration; +use examples_common::{open_service, PubSubEvent}; +use iceoryx2::prelude::*; + +const CYCLE_TIME: Duration = Duration::from_millis(1000); + +fn main() -> Result<(), Box> { + let service_name = ServiceName::new("service_1")?; + let node = NodeBuilder::new() + .name(&"publisher 1".try_into()?) + .create::()?; + + let (service_event, service_pubsub) = open_service(&node, &service_name)?; + + let publisher = service_pubsub.publisher_builder().create()?; + let notifier = service_event + .notifier_builder() + // we only want to notify the other side explicitly when we have sent a sample + // so we can define it as default event id + .default_event_id(PubSubEvent::SentSample.into()) + .create()?; + let mut counter: u64 = 0; + + let waitset = WaitSetBuilder::new().create::()?; + + // we need to send out a sample with an interval of CYCLE_TIME, therefore we attach an + // interval that wakes us up regularly to send out the next sample + let _cycle_guard = waitset.attach_interval(CYCLE_TIME); + + waitset.wait_and_process(|_| { + println!("{:?}: Send sample {} ...", service_name, counter); + publisher + .send_copy(counter) + .expect("sample delivery successful."); + notifier.notify().expect("notification successful."); + counter += 1; + CallbackProgression::Continue + })?; + + println!("exit"); + + Ok(()) +} diff --git a/examples/rust/health_monitoring/publisher_2.rs b/examples/rust/health_monitoring/publisher_2.rs new file mode 100644 index 000000000..a48a5ecc9 --- /dev/null +++ b/examples/rust/health_monitoring/publisher_2.rs @@ -0,0 +1,55 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +use core::time::Duration; +use examples_common::{open_service, PubSubEvent}; +use iceoryx2::prelude::*; + +const CYCLE_TIME: Duration = Duration::from_millis(1500); + +fn main() -> Result<(), Box> { + let service_name = ServiceName::new("service_2")?; + let node = NodeBuilder::new() + .name(&"publisher 2".try_into()?) + .create::()?; + + let (service_event, service_pubsub) = open_service(&node, &service_name)?; + + let publisher = service_pubsub.publisher_builder().create()?; + let notifier = service_event + .notifier_builder() + // we only want to notify the other side explicitly when we have sent a sample + // so we can define it as default event id + .default_event_id(PubSubEvent::SentSample.into()) + .create()?; + let mut counter: u64 = 1000000; + + let waitset = WaitSetBuilder::new().create::()?; + + // we only want to notify the other side explicitly when we have sent a sample + // so we can define it as default event id + let _cycle_guard = waitset.attach_interval(CYCLE_TIME); + + waitset.wait_and_process(|_| { + println!("{:?}: Send sample {} ...", service_name, counter); + publisher + .send_copy(counter) + .expect("sample delivery successful."); + notifier.notify().expect("notification successful."); + counter += 1; + CallbackProgression::Continue + })?; + + println!("exit"); + + Ok(()) +} diff --git a/examples/rust/health_monitoring/subscriber.rs b/examples/rust/health_monitoring/subscriber.rs new file mode 100644 index 000000000..1b54fcdf4 --- /dev/null +++ b/examples/rust/health_monitoring/subscriber.rs @@ -0,0 +1,125 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +use core::time::Duration; +use examples_common::{open_service, PubSubEvent}; +use iceoryx2::{ + node::NodeView, + port::{listener::Listener, subscriber::Subscriber}, + prelude::*, +}; + +const REACTION_BUFFER_MS: u64 = 100; +const CYCLE_TIME_1: Duration = Duration::from_millis(1000 + REACTION_BUFFER_MS); +const CYCLE_TIME_2: Duration = Duration::from_millis(1500 + REACTION_BUFFER_MS); + +fn main() -> Result<(), Box> { + let service_name_1 = ServiceName::new("service_1")?; + let service_name_2 = ServiceName::new("service_2")?; + + let node = NodeBuilder::new() + .name(&"subscriber".try_into()?) + .create::()?; + + // open a pubsub and an event service with the same name + let (service_event_1, service_pubsub_1) = open_service(&node, &service_name_1)?; + let (service_event_2, service_pubsub_2) = open_service(&node, &service_name_2)?; + + let subscriber_1 = service_pubsub_1.subscriber_builder().create()?; + let subscriber_2 = service_pubsub_2.subscriber_builder().create()?; + let listener_1 = service_event_1.listener_builder().create()?; + let listener_2 = service_event_2.listener_builder().create()?; + + let waitset = WaitSetBuilder::new().create::()?; + + // we expect that the listener receive a message sent event after at most CYCLE_TIME_X + // so we add it as a deadline + let listener_1_guard = waitset.attach_deadline(&listener_1, CYCLE_TIME_1)?; + let listener_2_guard = waitset.attach_deadline(&listener_2, CYCLE_TIME_2)?; + + let missed_deadline = |service_name, cycle_time| { + println!( + "{:?}: violated contract and did not send a message after {:?}.", + service_name, cycle_time + ); + }; + + let on_event = |attachment_id: WaitSetAttachmentId| { + if attachment_id.has_missed_deadline(&listener_1_guard) { + missed_deadline(&service_name_1, CYCLE_TIME_1); + // one cause of a deadline it can be a dead node. usually our "central_daemon" would + // take care of monitoring but when the node and the central daemon crashed we take + // over here and check for dead nodes + find_and_cleanup_dead_nodes(); + } + + if attachment_id.has_missed_deadline(&listener_2_guard) { + missed_deadline(&service_name_2, CYCLE_TIME_2); + find_and_cleanup_dead_nodes(); + } + + if attachment_id.has_event_from(&listener_1_guard) { + // in this function we either print out the received sample or the event that has + // occurred like, publisher connected/disconnected or a process was identified as dead + handle_incoming_event(&listener_1, &subscriber_1, &service_name_1); + } + + if attachment_id.has_event_from(&listener_2_guard) { + handle_incoming_event(&listener_2, &subscriber_2, &service_name_2); + } + + CallbackProgression::Continue + }; + + waitset.wait_and_process(on_event)?; + + println!("exit"); + + Ok(()) +} + +fn find_and_cleanup_dead_nodes() { + Node::::list(Config::global_config(), |node_state| { + if let NodeState::Dead(state) = node_state { + println!( + "detected dead node: {:?}", + state.details().as_ref().map(|v| v.name()) + ); + state.remove_stale_resources().expect(""); + } + + CallbackProgression::Continue + }) + .expect(""); +} + +fn handle_incoming_event( + listener: &Listener, + subscriber: &Subscriber, + service_name: &ServiceName, +) { + listener + .try_wait_all(|event_id| { + if event_id == PubSubEvent::ProcessDied.into() { + println!("{:?}: process died!", service_name); + } else if event_id == PubSubEvent::PublisherConnected.into() { + println!("{:?}: publisher connected!", service_name); + } else if event_id == PubSubEvent::PublisherDisconnected.into() { + println!("{:?}: publisher disconnected!", service_name); + } else if event_id == PubSubEvent::SentSample.into() { + subscriber.receive().expect("").map(|sample| { + println!("{:?}: Received sample {} ...", service_name, *sample); + }); + } + }) + .expect(""); +} diff --git a/examples/rust/publish_subscribe/publisher.rs b/examples/rust/publish_subscribe/publisher.rs index 3b1c3e2b8..d8132e900 100644 --- a/examples/rust/publish_subscribe/publisher.rs +++ b/examples/rust/publish_subscribe/publisher.rs @@ -17,7 +17,6 @@ use iceoryx2::prelude::*; const CYCLE_TIME: Duration = Duration::from_secs(1); fn main() -> Result<(), Box> { - set_log_level(LogLevel::Trace); let node = NodeBuilder::new().create::()?; let service = node