diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 40836c68e..fa4490ff8 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -22,6 +22,7 @@ add_subdirectory(domains) add_subdirectory(event) add_subdirectory(event_multiplexing) add_subdirectory(event_based_communication) +add_subdirectory(health_monitoring) add_subdirectory(publish_subscribe) add_subdirectory(publish_subscribe_dynamic_data) add_subdirectory(publish_subscribe_with_user_header) diff --git a/examples/cxx/health_monitoring/BUILD.bazel b/examples/cxx/health_monitoring/BUILD.bazel new file mode 100644 index 000000000..b54cbadbc --- /dev/null +++ b/examples/cxx/health_monitoring/BUILD.bazel @@ -0,0 +1,61 @@ +# Copyright (c) 2024 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache Software License 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +# which is available at https://opensource.org/licenses/MIT. +# +# SPDX-License-Identifier: Apache-2.0 OR MIT + +load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library") + +cc_binary( + name = "example_cxx_health_monitoring_central_daemon", + srcs = [ + "src/central_daemon.cpp", + "src/pubsub_event.hpp" + ], + deps = [ + "@iceoryx//:iceoryx_hoofs", + "//:iceoryx2-cxx-static", + ], +) + +cc_binary( + name = "example_cxx_health_monitoring_publisher_1", + srcs = [ + "src/publisher_1.cpp", + "src/pubsub_event.hpp" + ], + deps = [ + "@iceoryx//:iceoryx_hoofs", + "//:iceoryx2-cxx-static", + ], +) + +cc_binary( + name = "example_cxx_health_monitoring_publisher_2", + srcs = [ + "src/publisher_2.cpp", + "src/pubsub_event.hpp" + ], + deps = [ + "@iceoryx//:iceoryx_hoofs", + "//:iceoryx2-cxx-static", + ], +) + +cc_binary( + name = "example_cxx_health_monitoring_subscriber", + srcs = [ + "src/subscriber.cpp", + "src/pubsub_event.hpp" + ], + deps = [ + "@iceoryx//:iceoryx_hoofs", + "//:iceoryx2-cxx-static", + ], +) diff --git a/examples/cxx/health_monitoring/CMakeLists.txt b/examples/cxx/health_monitoring/CMakeLists.txt new file mode 100644 index 000000000..22f3cc417 --- /dev/null +++ b/examples/cxx/health_monitoring/CMakeLists.txt @@ -0,0 +1,28 @@ +# Copyright (c) 2024 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache Software License 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +# which is available at https://opensource.org/licenses/MIT. +# +# SPDX-License-Identifier: Apache-2.0 OR MIT + +cmake_minimum_required(VERSION 3.22) +project(example_cxx_health_monitoring LANGUAGES CXX) + +find_package(iceoryx2-cxx 0.4.1 REQUIRED) + +add_executable(example_cxx_health_monitoring_central_daemon src/central_daemon.cpp) +target_link_libraries(example_cxx_health_monitoring_central_daemon iceoryx2-cxx::static-lib-cxx) + +add_executable(example_cxx_health_monitoring_publisher_1 src/publisher_1.cpp) +target_link_libraries(example_cxx_health_monitoring_publisher_1 iceoryx2-cxx::static-lib-cxx) + +add_executable(example_cxx_health_monitoring_publisher_2 src/publisher_2.cpp) +target_link_libraries(example_cxx_health_monitoring_publisher_2 iceoryx2-cxx::static-lib-cxx) + +add_executable(example_cxx_health_monitoring_subscriber src/subscriber.cpp) +target_link_libraries(example_cxx_health_monitoring_subscriber iceoryx2-cxx::static-lib-cxx) diff --git a/examples/cxx/health_monitoring/README.md b/examples/cxx/health_monitoring/README.md new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/examples/cxx/health_monitoring/README.md @@ -0,0 +1 @@ + diff --git a/examples/cxx/health_monitoring/src/central_daemon.cpp b/examples/cxx/health_monitoring/src/central_daemon.cpp new file mode 100644 index 000000000..468904068 --- /dev/null +++ b/examples/cxx/health_monitoring/src/central_daemon.cpp @@ -0,0 +1,106 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#include + +#include "iox/duration.hpp" +#include "iox2/node.hpp" +#include "iox2/service_name.hpp" +#include "iox2/service_type.hpp" +#include "iox2/waitset.hpp" +#include "pubsub_event.hpp" + +using namespace iox2; + +constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromMilliseconds(100); + +void find_and_cleanup_dead_nodes(); + +auto main() -> int { + auto service_name_1 = ServiceName::create("service_1").expect(""); + auto service_name_2 = ServiceName::create("service_2").expect(""); + + auto node = NodeBuilder() + .name(NodeName::create("central daemon").expect("")) + .create() + .expect("successful node creation"); + + // The central daemon is responsible to create all services before hand and the other processes + // just open the communication resources and start communicating. + auto service_pubsub_1 = node.service_builder(service_name_1) + .publish_subscribe() + // We use here open_or_create so that, in case of a crash of the central daemon, it can + // be restarted. + .open_or_create() + .expect("successful service creation/opening"); + + auto service_event_1 = node.service_builder(service_name_1) + .event() + // Whenever a new notifier is created the PublisherConnected event is emitted. this makes + // sense since in this example a notifier is always created after a new publisher was + // created. + // The task of the notifier/event is it to inform and wake up other processes when + // certain system event have happened. + .notifier_created_event(iox::into(PubSubEvent::PublisherConnected)) + .notifier_dropped_event(iox::into(PubSubEvent::PublisherDisconnected)) + // This event is emitted when either the central daemon or a decentralized process + // detects a dead node and cleaned up all of its stale resources succesfully. + .notifier_dead_event(iox::into(PubSubEvent::ProcessDied)) + .open_or_create() + .expect("successful service creation/opening"); + + auto service_pubsub_2 = node.service_builder(service_name_2) + .publish_subscribe() + .open_or_create() + .expect("successful service creation/opening"); + + auto service_event_2 = node.service_builder(service_name_2) + .event() + .notifier_created_event(iox::into(PubSubEvent::PublisherConnected)) + .notifier_dropped_event(iox::into(PubSubEvent::PublisherDisconnected)) + .notifier_dead_event(iox::into(PubSubEvent::ProcessDied)) + .open_or_create() + .expect("successful service creation/opening"); + + auto waitset = WaitSetBuilder().create().expect(""); + auto cycle_guard = waitset.attach_interval(CYCLE_TIME); + + std::cout << "Central daemon up and running." << std::endl; + waitset + // The only task of our central daemon is it to monitor all running nodes and cleanup their + // resources if a process has died. + // + // Since we added the notifier_dead_event to the service, all listeners, that are waiting + // on a service where one participant has died, will be woken up and they receive + // the PubSubEvent::ProcessDied + .wait_and_process([](auto) { + find_and_cleanup_dead_nodes(); + return CallbackProgression::Continue; + }) + .expect(""); + + std::cout << "exit" << std::endl; + + return 0; +} + +void find_and_cleanup_dead_nodes() { + Node::list(Config::global_config(), [](auto node_state) { + node_state.dead([](auto view) { + std::cout << "detected dead node: "; + view.details().and_then([](auto details) { std::cout << details.name().to_string().c_str(); }); + std::cout << std::endl; + view.remove_stale_resources().expect(""); + }); + return CallbackProgression::Continue; + }).expect(""); +} diff --git a/examples/cxx/health_monitoring/src/publisher_1.cpp b/examples/cxx/health_monitoring/src/publisher_1.cpp new file mode 100644 index 000000000..cb2c46aa3 --- /dev/null +++ b/examples/cxx/health_monitoring/src/publisher_1.cpp @@ -0,0 +1,63 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#include "iox/duration.hpp" +#include "iox2/node.hpp" +#include "iox2/service_name.hpp" +#include "iox2/service_type.hpp" +#include "iox2/waitset.hpp" +#include "pubsub_event.hpp" + +#include + +constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromMilliseconds(1000); + +auto main() -> int { + using namespace iox2; + auto service_name = ServiceName::create("service_1").expect(""); + auto node = NodeBuilder() + .name(NodeName::create("publisher 1").expect("")) + .create() + .expect("successful node creation"); + + auto service = open_service(node, service_name); + + auto publisher = service.pubsub.publisher_builder().create().expect(""); + auto notifier = service.event + .notifier_builder() + // we only want to notify the other side explicitly when we have sent a sample + // so we can define it as default event id + .default_event_id(iox::into(PubSubEvent::SentSample)) + .create() + .expect(""); + auto counter = 0; + + auto waitset = WaitSetBuilder().create().expect(""); + + // we only want to notify the other side explicitly when we have sent a sample + // so we can define it as default event id + auto cycle_guard = waitset.attach_interval(CYCLE_TIME); + + waitset + .wait_and_process([&](auto) { + std::cout << service_name.to_string().c_str() << ": Send sample " << counter << " ..." << std::endl; + publisher.send_copy(counter).expect(""); + notifier.notify().expect(""); + counter += 1; + return CallbackProgression::Continue; + }) + .expect(""); + + std::cout << "exit" << std::endl; + + return 0; +} diff --git a/examples/cxx/health_monitoring/src/publisher_2.cpp b/examples/cxx/health_monitoring/src/publisher_2.cpp new file mode 100644 index 000000000..bbee48cb0 --- /dev/null +++ b/examples/cxx/health_monitoring/src/publisher_2.cpp @@ -0,0 +1,63 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#include "iox/duration.hpp" +#include "iox2/node.hpp" +#include "iox2/service_name.hpp" +#include "iox2/service_type.hpp" +#include "iox2/waitset.hpp" +#include "pubsub_event.hpp" + +#include + +constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromMilliseconds(1500); + +auto main() -> int { + using namespace iox2; + auto service_name = ServiceName::create("service_2").expect(""); + auto node = NodeBuilder() + .name(NodeName::create("publisher 2").expect("")) + .create() + .expect("successful node creation"); + + auto service = open_service(node, service_name); + + auto publisher = service.pubsub.publisher_builder().create().expect(""); + auto notifier = service.event + .notifier_builder() + // we only want to notify the other side explicitly when we have sent a sample + // so we can define it as default event id + .default_event_id(iox::into(PubSubEvent::SentSample)) + .create() + .expect(""); + auto counter = 1000000; // NOLINT, magic number is fine in an example + + auto waitset = WaitSetBuilder().create().expect(""); + + // we only want to notify the other side explicitly when we have sent a sample + // so we can define it as default event id + auto cycle_guard = waitset.attach_interval(CYCLE_TIME); + + waitset + .wait_and_process([&](auto) { + std::cout << service_name.to_string().c_str() << ": Send sample " << counter << " ..." << std::endl; + publisher.send_copy(counter).expect(""); + notifier.notify().expect(""); + counter += 1; + return CallbackProgression::Continue; + }) + .expect(""); + + std::cout << "exit" << std::endl; + + return 0; +} diff --git a/examples/cxx/health_monitoring/src/pubsub_event.hpp b/examples/cxx/health_monitoring/src/pubsub_event.hpp new file mode 100644 index 000000000..a4d989b84 --- /dev/null +++ b/examples/cxx/health_monitoring/src/pubsub_event.hpp @@ -0,0 +1,56 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#ifndef __EXAMPLE_HEALTH_MONITORING_PUBSUB_EVENT_HPP__ +#define __EXAMPLE_HEALTH_MONITORING_PUBSUB_EVENT_HPP__ + +#include "iox/into.hpp" +#include "iox2/event_id.hpp" +#include "iox2/node.hpp" +#include "iox2/service_name.hpp" + +#include + +enum class PubSubEvent : uint8_t { + PublisherConnected = 0, + PublisherDisconnected = 1, + SubscriberConnected = 2, + SubscriberDisconnected = 3, + SentSample = 4, + ReceivedSample = 5, + SentHistory = 6, + ProcessDied = 7, + Unknown, +}; + +struct ServiceTuple { + iox2::PortFactoryEvent event; + iox2::PortFactoryPublishSubscribe pubsub; +}; + +inline auto open_service(const iox2::Node& node, + const iox2::ServiceName& service_name) -> ServiceTuple { + auto service_pubsub = node.service_builder(service_name).publish_subscribe().open().expect(""); + auto service_event = node.service_builder(service_name).event().open().expect(""); + + return { std::move(service_event), std::move(service_pubsub) }; +} + +namespace iox { +template <> +inline auto from(const PubSubEvent value) noexcept -> iox2::EventId { + return iox2::EventId(static_cast(value)); +} +} // namespace iox + + +#endif diff --git a/examples/cxx/health_monitoring/src/subscriber.cpp b/examples/cxx/health_monitoring/src/subscriber.cpp new file mode 100644 index 000000000..58943d321 --- /dev/null +++ b/examples/cxx/health_monitoring/src/subscriber.cpp @@ -0,0 +1,128 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#include + +#include "iox/duration.hpp" +#include "iox2/node.hpp" +#include "iox2/service_name.hpp" +#include "iox2/service_type.hpp" +#include "iox2/waitset.hpp" +#include "pubsub_event.hpp" + +using namespace iox2; + +constexpr iox::units::Duration REACTION_BUFFER = iox::units::Duration::fromMilliseconds(100); +constexpr iox::units::Duration CYCLE_TIME_1 = iox::units::Duration::fromMilliseconds(1000) + REACTION_BUFFER; +constexpr iox::units::Duration CYCLE_TIME_2 = iox::units::Duration::fromMilliseconds(1500) + REACTION_BUFFER; + +void find_and_cleanup_dead_nodes(); +void handle_incoming_events(Listener& listener, + const Subscriber& subscriber, + const ServiceName& service_name); + +auto main() -> int { + auto service_name_1 = ServiceName::create("service_1").expect(""); + auto service_name_2 = ServiceName::create("service_2").expect(""); + + auto node = NodeBuilder() + .name(NodeName::create("subscruber").expect("")) + .create() + .expect("successful node creation"); + + // open a pubsub and an event service with the same name + auto service_1 = open_service(node, service_name_1); + auto service_2 = open_service(node, service_name_2); + + auto subscriber_1 = service_1.pubsub.subscriber_builder().create().expect(""); + auto subscriber_2 = service_2.pubsub.subscriber_builder().create().expect(""); + auto listener_1 = service_1.event.listener_builder().create().expect(""); + auto listener_2 = service_2.event.listener_builder().create().expect(""); + + auto waitset = WaitSetBuilder().create().expect(""); + + // we expect that the listener receive a message sent event after at most CYCLE_TIME_X + // so we add it as a deadline + auto listener_1_guard = waitset.attach_deadline(listener_1, CYCLE_TIME_1).expect(""); + auto listener_2_guard = waitset.attach_deadline(listener_2, CYCLE_TIME_2).expect(""); + + auto missed_deadline = [](const ServiceName& service_name, const iox::units::Duration& cycle_time) { + std::cout << service_name.to_string().c_str() << ": voilated contract and did not send a message after " + << cycle_time << std::endl; + }; + + auto on_event = [&](const WaitSetAttachmentId& attachment_id) { + if (attachment_id.has_missed_deadline(listener_1_guard)) { + missed_deadline(service_name_1, CYCLE_TIME_1); + // one cause of a deadline it can be a dead node. usually our "central_daemon" would + // take care of monitoring but when the node and the central daemon crashed we take + // over here and check for dead nodes + find_and_cleanup_dead_nodes(); + } + + if (attachment_id.has_missed_deadline(listener_2_guard)) { + missed_deadline(service_name_2, CYCLE_TIME_2); + find_and_cleanup_dead_nodes(); + } + + if (attachment_id.has_event_from(listener_1_guard)) { + // in this function we either print out the received sample or the event that has + // occurred like, publisher connected/disconnected or a process was identified as dead + handle_incoming_events(listener_1, subscriber_1, service_name_1); + } + + if (attachment_id.has_event_from(listener_2_guard)) { + handle_incoming_events(listener_2, subscriber_2, service_name_2); + } + + return CallbackProgression::Continue; + }; + + waitset.wait_and_process(on_event).expect(""); + + std::cout << "exit" << std::endl; + + return 0; +} + +void handle_incoming_events(Listener& listener, + const Subscriber& subscriber, + const ServiceName& service_name) { + listener + .try_wait_all([&](auto event_id) { + if (event_id == iox::into(PubSubEvent::ProcessDied)) { + std::cout << service_name.to_string().c_str() << ": process died!" << std::endl; + } else if (event_id == iox::into(PubSubEvent::PublisherConnected)) { + std::cout << service_name.to_string().c_str() << ": publisher connected!" << std::endl; + } else if (event_id == iox::into(PubSubEvent::PublisherDisconnected)) { + std::cout << service_name.to_string().c_str() << ": publisher disconnected!" << std::endl; + } else if (event_id == iox::into(PubSubEvent::SentSample)) { + subscriber.receive().expect("").and_then([&](auto& sample) { + std::cout << service_name.to_string().c_str() << ": Received sample " << *sample << " ..." + << std::endl; + }); + } + }) + .expect(""); +} + +void find_and_cleanup_dead_nodes() { + Node::list(Config::global_config(), [](auto node_state) { + node_state.dead([](auto view) { + std::cout << "detected dead node: "; + view.details().and_then([](auto details) { std::cout << details.name().to_string().c_str(); }); + std::cout << std::endl; + view.remove_stale_resources().expect(""); + }); + return CallbackProgression::Continue; + }).expect(""); +} diff --git a/examples/rust/health_monitoring/central_daemon.rs b/examples/rust/health_monitoring/central_daemon.rs index 912559ae7..14886365d 100644 --- a/examples/rust/health_monitoring/central_daemon.rs +++ b/examples/rust/health_monitoring/central_daemon.rs @@ -21,7 +21,7 @@ fn main() -> Result<(), Box> { let service_name_2 = ServiceName::new("service_2")?; let node = NodeBuilder::new() - .name(&"watchdog and resource creator".try_into()?) + .name(&"central daemon".try_into()?) .create::()?; // The central daemon is responsible to create all services before hand and the other processes