diff --git a/examples/README.md b/examples/README.md index 3d10d42e8..dd5e051bc 100644 --- a/examples/README.md +++ b/examples/README.md @@ -77,7 +77,7 @@ These types are demonstrated in the complex data types example. | docker | [all](rust/docker) | Communicate between different docker containers and the host. | | domains | [C](c/domains) [C++](cxx/domains) [Rust](rust/domains) | Establish separate domains that operate independently from one another. | | event | [C](c/event) [C++](cxx/event) [Rust](rust/event) | Push notifications - send event signals to wakeup processes that are waiting for them. | -| event based communication | [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. | +| event based communication | [C++](cxx/event_based_communication) [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. | | event multiplexing | [C](c/event_multiplexing) [C++](cxx/event_multiplexing) [Rust](rust/event_multiplexing) | Wait on multiple listeners or sockets with a single call. The WaitSet demultiplexes incoming events and notifies the user. | | publish subscribe | [C](c/publish_subscribe) [C++](cxx/publish_subscribe) [Rust](rust/publish_subscribe) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern). | | publish subscribe dynamic data | [C++](cxx/publish_subscribe_dynamic_data) [Rust](rust/publish_subscribe_dynamic_data) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern) and payload data that has a dynamic size. | diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 01d57e7ed..40836c68e 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -21,6 +21,7 @@ add_subdirectory(discovery) add_subdirectory(domains) add_subdirectory(event) add_subdirectory(event_multiplexing) +add_subdirectory(event_based_communication) add_subdirectory(publish_subscribe) add_subdirectory(publish_subscribe_dynamic_data) add_subdirectory(publish_subscribe_with_user_header) diff --git a/examples/cxx/event_based_communication/BUILD.bazel b/examples/cxx/event_based_communication/BUILD.bazel new file mode 100644 index 000000000..ca5637e5b --- /dev/null +++ b/examples/cxx/event_based_communication/BUILD.bazel @@ -0,0 +1,41 @@ +# Copyright (c) 2024 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache Software License 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +# which is available at https://opensource.org/licenses/MIT. +# +# SPDX-License-Identifier: Apache-2.0 OR MIT + +load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library") + +cc_binary( + name = "example_cxx_event_based_communication_publisher", + srcs = [ + "src/publisher.cpp", + "src/transmission_data.hpp", + "src/pubsub_event.hpp", + "src/custom_publisher.hpp", + ], + deps = [ + "@iceoryx//:iceoryx_hoofs", + "//:iceoryx2-cxx-static", + ], +) + +cc_binary( + name = "example_cxx_event_based_communication_subscriber", + srcs = [ + "src/subscriber.cpp", + "src/transmission_data.hpp", + "src/pubsub_event.hpp", + "src/custom_subscriber.hpp", + ], + deps = [ + "@iceoryx//:iceoryx_hoofs", + "//:iceoryx2-cxx-static", + ], +) diff --git a/examples/cxx/event_based_communication/CMakeLists.txt b/examples/cxx/event_based_communication/CMakeLists.txt new file mode 100644 index 000000000..30c2bb19d --- /dev/null +++ b/examples/cxx/event_based_communication/CMakeLists.txt @@ -0,0 +1,22 @@ +# Copyright (c) 2024 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache Software License 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +# which is available at https://opensource.org/licenses/MIT. +# +# SPDX-License-Identifier: Apache-2.0 OR MIT + +cmake_minimum_required(VERSION 3.22) +project(example_cxx_event_based_communication LANGUAGES CXX) + +find_package(iceoryx2-cxx 0.4.1 REQUIRED) + +add_executable(example_cxx_event_based_communication_publisher src/publisher.cpp) +target_link_libraries(example_cxx_event_based_communication_publisher iceoryx2-cxx::static-lib-cxx) + +add_executable(example_cxx_event_based_communication_subscriber src/subscriber.cpp) +target_link_libraries(example_cxx_event_based_communication_subscriber iceoryx2-cxx::static-lib-cxx) diff --git a/examples/cxx/event_based_communication/README.md b/examples/cxx/event_based_communication/README.md new file mode 100644 index 000000000..868c4151e --- /dev/null +++ b/examples/cxx/event_based_communication/README.md @@ -0,0 +1,35 @@ +# Event-Based Communication + +## Running The Example + +This example demonstrates iceoryx2's event multiplexing mechanism in a more +complex setup. The iceoryx2 `Publisher` and `Subscriber` are integrated into +custom `ExamplePublisher` and `ExampleSubscriber` classes, which also +incorporate an additional iceoryx2 `Notifier` and `Listener`. This setup +enables automatic event emission whenever an `ExamplePublisher` or +`ExampleSubscriber` is created or dropped. Additionally, events are emitted +whenever a new `Sample` is sent or received. + +When a `class` inherits from `FileDescriptorBased`, it can be attached to a +`WaitSet`. Both `ExamplePublisher` and `ExampleSubscriber` implement this +interface by forwarding calls to their underlying `Listener`, which already +provides an implementation of `FileDescriptorBased`. + +The `WaitSet` notifies the user of the origin of an event notification. The +user can then acquire the `EventId` from the `Listener`. Based on the value of +the `EventId`, the user can identify the specific event that occurred and take +appropriate action. + +### Terminal 1 + +```sh +./target/ffi/build/examples/cxx/event_based_communication/example_cxx_event_based_communication_publisher +``` + +### Terminal 2 + +```sh +./target/ffi/build/examples/cxx/event_based_communication/example_cxx_event_based_communication_subscriber +``` + +Feel free to run multiple publishers or subscribers in parallel. diff --git a/examples/cxx/event_based_communication/src/custom_publisher.hpp b/examples/cxx/event_based_communication/src/custom_publisher.hpp new file mode 100644 index 000000000..8b96e1fce --- /dev/null +++ b/examples/cxx/event_based_communication/src/custom_publisher.hpp @@ -0,0 +1,126 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#ifndef IOX2_EXAMPLES_CUSTOM_PUBLISHER_HPP +#define IOX2_EXAMPLES_CUSTOM_PUBLISHER_HPP + +#include "iox2/file_descriptor.hpp" +#include "iox2/listener.hpp" +#include "iox2/node.hpp" +#include "iox2/notifier.hpp" +#include "iox2/publisher.hpp" +#include "iox2/sample_mut.hpp" +#include "iox2/service_name.hpp" +#include "pubsub_event.hpp" +#include "transmission_data.hpp" + +#include + +constexpr uint64_t HISTORY_SIZE = 20; + +// High-level publisher class that contains besides a publisher also a notifier and a listener. +// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` +// and the listener to wait for new subscribers. +class CustomPublisher : public iox2::FileDescriptorBased { + public: + CustomPublisher(const CustomPublisher&) = delete; + CustomPublisher(CustomPublisher&&) = default; + ~CustomPublisher() override { + m_notifier + .notify_with_custom_event_id( + iox2::EventId(iox::from(PubSubEvent::PublisherDisconnected))) + .expect(""); + } + + static auto create(iox2::Node& node, + const iox2::ServiceName& service_name) -> CustomPublisher { + auto pubsub_service = node.service_builder(service_name) + .publish_subscribe() + .history_size(HISTORY_SIZE) + .subscriber_max_buffer_size(HISTORY_SIZE) + .open_or_create() + .expect(""); + auto event_service = node.service_builder(service_name).event().open_or_create().expect(""); + + auto notifier = event_service.notifier_builder().create().expect(""); + auto listener = event_service.listener_builder().create().expect(""); + auto publisher = pubsub_service.publisher_builder().create().expect(""); + + notifier + .notify_with_custom_event_id(iox2::EventId(iox::from(PubSubEvent::PublisherConnected))) + .expect(""); + + return CustomPublisher { std::move(publisher), std::move(listener), std::move(notifier) }; + } + + auto file_descriptor() const -> iox2::FileDescriptorView override { + return m_listener.file_descriptor(); + } + + void handle_event() { + for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); + event = m_listener.try_wait_one()) { + switch (iox::from(event.value()->as_value())) { + case PubSubEvent::SubscriberConnected: { + std::cout << "new subscriber connected - delivering history" << std::endl; + m_publisher.update_connections().expect(""); + m_notifier + .notify_with_custom_event_id( + iox2::EventId(iox::from(PubSubEvent::SentHistory))) + .expect(""); + break; + } + case PubSubEvent::SubscriberDisconnected: { + std::cout << "subscriber disconnected" << std::endl; + break; + } + case PubSubEvent::ReceivedSample: { + std::cout << "subscriber has consumed sample" << std::endl; + break; + } + default: { + break; + } + } + } + } + + void send(const uint64_t counter) { + constexpr double SOME_NUMBER = 812.12; + auto sample = m_publisher.loan_uninit().expect(""); + sample.write_payload(TransmissionData { + static_cast(counter), static_cast(counter), static_cast(counter) * SOME_NUMBER }); + auto initialized_sample = assume_init(std::move(sample)); + ::iox2::send(std::move(initialized_sample)).expect(""); + + m_notifier.notify_with_custom_event_id(iox2::EventId(iox::from(PubSubEvent::SentSample))) + .expect(""); + } + + auto operator=(const CustomPublisher&) -> CustomPublisher& = delete; + auto operator=(CustomPublisher&&) -> CustomPublisher& = default; + + private: + CustomPublisher(iox2::Publisher&& publisher, + iox2::Listener&& listener, + iox2::Notifier&& notifier) + : m_publisher { std::move(publisher) } + , m_listener { std::move(listener) } + , m_notifier { std::move(notifier) } { + } + + iox2::Publisher m_publisher; + iox2::Listener m_listener; + iox2::Notifier m_notifier; +}; + +#endif diff --git a/examples/cxx/event_based_communication/src/custom_subscriber.hpp b/examples/cxx/event_based_communication/src/custom_subscriber.hpp new file mode 100644 index 000000000..566662290 --- /dev/null +++ b/examples/cxx/event_based_communication/src/custom_subscriber.hpp @@ -0,0 +1,126 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#ifndef IOX2_EXAMPLES_CUSTOM_SUBSCRIBER_HPP +#define IOX2_EXAMPLES_CUSTOM_SUBSCRIBER_HPP + +#include "iox/into.hpp" +#include "iox2/file_descriptor.hpp" +#include "iox2/listener.hpp" +#include "iox2/node.hpp" +#include "iox2/notifier.hpp" +#include "iox2/service_name.hpp" +#include "iox2/service_type.hpp" +#include "iox2/subscriber.hpp" +#include "pubsub_event.hpp" +#include "transmission_data.hpp" + +// High-level subscriber class that contains besides a subscriber also a notifier +// and a listener. The notifier is used to send events like +// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber +// connected. +// The listener waits for events originating from the publisher like +// `PubSubEvent::SentSample`. +class CustomSubscriber : public iox2::FileDescriptorBased { + public: + CustomSubscriber(const CustomSubscriber&) = delete; + CustomSubscriber(CustomSubscriber&&) = default; + ~CustomSubscriber() override { + m_notifier + .notify_with_custom_event_id( + iox2::EventId(iox::from(PubSubEvent::SubscriberDisconnected))) + .expect(""); + } + + auto operator=(const CustomSubscriber&) -> CustomSubscriber& = delete; + auto operator=(CustomSubscriber&&) -> CustomSubscriber& = default; + + static auto create(iox2::Node& node, + const iox2::ServiceName& service_name) -> CustomSubscriber { + auto pubsub_service = + node.service_builder(service_name).publish_subscribe().open_or_create().expect(""); + auto event_service = node.service_builder(service_name).event().open_or_create().expect(""); + + auto listener = event_service.listener_builder().create().expect(""); + auto notifier = event_service.notifier_builder().create().expect(""); + auto subscriber = pubsub_service.subscriber_builder().create().expect(""); + + notifier + .notify_with_custom_event_id( + iox2::EventId(iox::from(PubSubEvent::SubscriberConnected))) + .expect(""); + + return CustomSubscriber { std::move(subscriber), std::move(notifier), std::move(listener) }; + } + + auto file_descriptor() const -> iox2::FileDescriptorView override { + return m_listener.file_descriptor(); + } + + void handle_event() { + for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); + event = m_listener.try_wait_one()) { + switch (iox::from(event.value()->as_value())) { + case PubSubEvent::SentHistory: { + std::cout << "History delivered" << std::endl; + for (auto sample = receive(); sample.has_value(); sample = receive()) { + std::cout << " history: " << sample->payload().x << std::endl; + } + break; + } + case PubSubEvent::SentSample: { + for (auto sample = receive(); sample.has_value(); sample = receive()) { + std::cout << "received: " << sample->payload().x << std::endl; + } + break; + } + case PubSubEvent::PublisherConnected: { + std::cout << "new publisher connected" << std::endl; + break; + } + case PubSubEvent::PublisherDisconnected: { + std::cout << "publisher disconnected" << std::endl; + break; + } + default: { + break; + } + } + } + } + + auto receive() -> iox::optional> { + auto sample = m_subscriber.receive().expect(""); + if (sample.has_value()) { + m_notifier + .notify_with_custom_event_id(iox2::EventId(iox::from(PubSubEvent::ReceivedSample))) + .expect(""); + } + + return sample; + } + + private: + CustomSubscriber(iox2::Subscriber&& subscriber, + iox2::Notifier&& notifier, + iox2::Listener&& listener) + : m_subscriber { std::move(subscriber) } + , m_notifier { std::move(notifier) } + , m_listener { std::move(listener) } { + } + + iox2::Subscriber m_subscriber; + iox2::Notifier m_notifier; + iox2::Listener m_listener; +}; + +#endif diff --git a/examples/cxx/event_based_communication/src/publisher.cpp b/examples/cxx/event_based_communication/src/publisher.cpp new file mode 100644 index 000000000..a23f866c8 --- /dev/null +++ b/examples/cxx/event_based_communication/src/publisher.cpp @@ -0,0 +1,58 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#include "custom_publisher.hpp" +#include "iox/duration.hpp" +#include "iox2/node.hpp" +#include "iox2/service_type.hpp" +#include "iox2/waitset.hpp" + +#include + +using namespace iox2; + +constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); + +auto main() -> int { + auto node = NodeBuilder().create().expect("successful node creation"); + auto publisher = CustomPublisher::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); + + auto waitset = WaitSetBuilder().create().expect(""); + // Whenever our publisher receives an event we get notified. + auto publisher_guard = waitset.attach_notification(publisher).expect(""); + // Attach an interval so that we wake up and can publish a new message + auto cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME).expect(""); + + uint64_t counter = 0; + + // Event callback that is called whenever the WaitSet received an event. + auto on_event = [&](WaitSetAttachmentId attachment_id) -> CallbackProgression { + // when the cyclic trigger guard gets notified we send out a new message + if (attachment_id.has_event_from(cyclic_trigger_guard)) { + std::cout << "send message: " << counter << std::endl; + publisher.send(counter); + counter += 1; + // when something else happens on the publisher we handle the events + } else if (attachment_id.has_event_from(publisher_guard)) { + publisher.handle_event(); + } + return CallbackProgression::Continue; + }; + + // Start the event loop. It will run until `CallbackProgression::Stop` is returned by the + // event callback or an interrupt/termination signal was received. + waitset.wait_and_process(on_event).expect(""); + + std::cout << "exit" << std::endl; + + return 0; +} diff --git a/examples/cxx/event_based_communication/src/pubsub_event.hpp b/examples/cxx/event_based_communication/src/pubsub_event.hpp new file mode 100644 index 000000000..e180c1f12 --- /dev/null +++ b/examples/cxx/event_based_communication/src/pubsub_event.hpp @@ -0,0 +1,63 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#ifndef IOX2_EXAMPLES_PUBSUB_EVENT_HPP +#define IOX2_EXAMPLES_PUBSUB_EVENT_HPP + +#include "iox/into.hpp" +#include "iox2/event_id.hpp" + +#include + +enum class PubSubEvent : uint8_t { + PublisherConnected = 0, + PublisherDisconnected = 1, + SubscriberConnected = 2, + SubscriberDisconnected = 3, + SentSample = 4, + ReceivedSample = 5, + SentHistory = 6, + Unknown +}; + +namespace iox { +template <> +constexpr auto from(const PubSubEvent value) noexcept -> size_t { + return static_cast(value); +} + +template <> +constexpr auto from(const size_t value) noexcept -> PubSubEvent { + switch (value) { + case into(PubSubEvent::PublisherConnected): + return PubSubEvent::PublisherConnected; + case into(PubSubEvent::PublisherDisconnected): + return PubSubEvent::PublisherDisconnected; + case into(PubSubEvent::SubscriberConnected): + return PubSubEvent::SubscriberConnected; + case into(PubSubEvent::SubscriberDisconnected): + return PubSubEvent::SubscriberDisconnected; + case into(PubSubEvent::SentSample): + return PubSubEvent::SentSample; + case into(PubSubEvent::ReceivedSample): + return PubSubEvent::ReceivedSample; + case into(PubSubEvent::SentHistory): + return PubSubEvent::SentHistory; + default: + return PubSubEvent::Unknown; + } + IOX_UNREACHABLE(); +} +} // namespace iox + + +#endif diff --git a/examples/cxx/event_based_communication/src/subscriber.cpp b/examples/cxx/event_based_communication/src/subscriber.cpp new file mode 100644 index 000000000..c68cdf4a0 --- /dev/null +++ b/examples/cxx/event_based_communication/src/subscriber.cpp @@ -0,0 +1,54 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#include + +#include "custom_subscriber.hpp" +#include "iox/duration.hpp" +#include "iox2/node.hpp" +#include "iox2/service_type.hpp" +#include "iox2/waitset.hpp" + +constexpr iox::units::Duration DEADLINE = iox::units::Duration::fromSeconds(2); + +using namespace iox2; + +auto main() -> int { + auto node = NodeBuilder().create().expect("successful node creation"); + + auto subscriber = CustomSubscriber::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); + + auto waitset = WaitSetBuilder().create().expect(""); + + // The subscriber is attached as a deadline, meaning that we expect some activity + // latest after the deadline has passed. + auto subscriber_guard = waitset.attach_deadline(subscriber, DEADLINE).expect(""); + + auto on_event = [&](WaitSetAttachmentId attachment_id) { + // If we have received a new event on the subscriber we handle it. + if (attachment_id.has_event_from(subscriber_guard)) { + subscriber.handle_event(); + // If the subscriber did not receive an event until DEADLINE has + // passed, we print out a warning. + } else if (attachment_id.has_missed_deadline(subscriber_guard)) { + std::cout << "Contract violation! The subscriber did not receive a message for " << DEADLINE << std::endl; + } + + return CallbackProgression::Continue; + }; + + waitset.wait_and_process(on_event).expect(""); + + std::cout << "exit" << std::endl; + + return 0; +} diff --git a/examples/cxx/event_based_communication/src/transmission_data.hpp b/examples/cxx/event_based_communication/src/transmission_data.hpp new file mode 100644 index 000000000..e5b83272d --- /dev/null +++ b/examples/cxx/event_based_communication/src/transmission_data.hpp @@ -0,0 +1,30 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#ifndef IOX2_EXAMPLES_TRANSMISSION_DATA_HPP +#define IOX2_EXAMPLES_TRANSMISSION_DATA_HPP + +#include +#include + +struct TransmissionData { + std::int32_t x; + std::int32_t y; + double funky; +}; + +inline auto operator<<(std::ostream& stream, const TransmissionData& value) -> std::ostream& { + stream << "TransmissionData { x: " << value.x << ", y: " << value.y << ", funky: " << value.funky << " }"; + return stream; +} + +#endif diff --git a/examples/rust/event_based_communication/publisher.rs b/examples/rust/event_based_communication/publisher.rs index 0e67e17fe..4e2fd44f1 100644 --- a/examples/rust/event_based_communication/publisher.rs +++ b/examples/rust/event_based_communication/publisher.rs @@ -25,46 +25,59 @@ const HISTORY_SIZE: usize = 20; fn main() -> Result<(), Box> { let node = NodeBuilder::new().create::()?; - let publisher = EventBasedPublisher::new(&node, &"My/Funk/ServiceName".try_into()?)?; + let publisher = CustomPublisher::new(&node, &"My/Funk/ServiceName".try_into()?)?; let waitset = WaitSetBuilder::new().create::()?; + + // Whenever our publisher receives an event we get notified. let publisher_guard = waitset.attach_notification(&publisher)?; + // Attach an interval so that we wake up and can publish a new message let cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME)?; let mut counter: u64 = 0; + // Event callback that is called whenever the WaitSet received an event. let on_event = |attachment_id: WaitSetAttachmentId| { + // when the cyclic trigger guard gets notified we send out a new message if attachment_id.has_event_from(&cyclic_trigger_guard) { println!("send message: {}", counter); publisher.send(counter).unwrap(); counter += 1; + // when something else happens on the publisher we handle the events } else if attachment_id.has_event_from(&publisher_guard) { publisher.handle_event().unwrap(); } CallbackProgression::Continue }; + // Start the event loop. It will run until `CallbackProgression::Stop` is returned by the + // event callback or an interrupt/termination signal was received. waitset.wait_and_process(on_event)?; + println!("exit"); + Ok(()) } +/// High-level publisher class that contains besides a publisher also a notifier and a listener. +/// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` +/// and the listener to wait for new subscribers. #[derive(Debug)] -struct EventBasedPublisher { +struct CustomPublisher { publisher: Publisher, listener: Listener, notifier: Notifier, } -impl FileDescriptorBased for EventBasedPublisher { +impl FileDescriptorBased for CustomPublisher { fn file_descriptor(&self) -> &FileDescriptor { self.listener.file_descriptor() } } -impl SynchronousMultiplexing for EventBasedPublisher {} +impl SynchronousMultiplexing for CustomPublisher {} -impl EventBasedPublisher { +impl CustomPublisher { fn new( node: &Node, service_name: &ServiceName, @@ -134,7 +147,7 @@ impl EventBasedPublisher { } } -impl Drop for EventBasedPublisher { +impl Drop for CustomPublisher { fn drop(&mut self) { let _ = self .notifier diff --git a/examples/rust/event_based_communication/subscriber.rs b/examples/rust/event_based_communication/subscriber.rs index e144e6bd0..693b3b59f 100644 --- a/examples/rust/event_based_communication/subscriber.rs +++ b/examples/rust/event_based_communication/subscriber.rs @@ -25,14 +25,20 @@ const DEADLINE: Duration = Duration::from_secs(2); fn main() -> Result<(), Box> { let node = NodeBuilder::new().create::()?; - let subscriber = EventBasedSubscriber::new(&node, &"My/Funk/ServiceName".try_into()?)?; + let subscriber = CustomSubscriber::new(&node, &"My/Funk/ServiceName".try_into()?)?; let waitset = WaitSetBuilder::new().create::()?; + + // The subscriber is attached as a deadline, meaning that we expect some activity + // latest after the deadline has passed. let subscriber_guard = waitset.attach_deadline(&subscriber, DEADLINE)?; let on_event = |attachment_id: WaitSetAttachmentId| { + // If we have received a new event on the subscriber we handle it. if attachment_id.has_event_from(&subscriber_guard) { subscriber.handle_event().unwrap(); + // If the subscriber did not receive an event until DEADLINE has + // passed, we print out a warning. } else if attachment_id.has_missed_deadline(&subscriber_guard) { println!( "Contract violation! The subscriber did not receive a message for {:?}.", @@ -51,21 +57,27 @@ fn main() -> Result<(), Box> { } #[derive(Debug)] -struct EventBasedSubscriber { +struct CustomSubscriber { subscriber: Subscriber, notifier: Notifier, listener: Listener, } -impl FileDescriptorBased for EventBasedSubscriber { +impl FileDescriptorBased for CustomSubscriber { fn file_descriptor(&self) -> &FileDescriptor { self.listener.file_descriptor() } } -impl SynchronousMultiplexing for EventBasedSubscriber {} +impl SynchronousMultiplexing for CustomSubscriber {} -impl EventBasedSubscriber { +// High-level subscriber class that contains besides a subscriber also a notifier +// and a listener. The notifier is used to send events like +// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber +// connected. +// The listener waits for events originating from the publisher like +// `PubSubEvent::SentSample`. +impl CustomSubscriber { fn new( node: &Node, service_name: &ServiceName, @@ -133,7 +145,7 @@ impl EventBasedSubscriber { } } -impl Drop for EventBasedSubscriber { +impl Drop for CustomSubscriber { fn drop(&mut self) { self.notifier .notify_with_custom_event_id(PubSubEvent::SubscriberDisconnected.into()) diff --git a/examples/rust/event_multiplexing/wait.rs b/examples/rust/event_multiplexing/wait.rs index 2506123bf..69ea220df 100644 --- a/examples/rust/event_multiplexing/wait.rs +++ b/examples/rust/event_multiplexing/wait.rs @@ -78,7 +78,7 @@ fn main() -> Result<(), Box> { // didn't add this to the example so feel free to play around with it. waitset.wait_and_process(on_event)?; - println!("Exit"); + println!("exit"); Ok(()) } diff --git a/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp b/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp index 77da30646..2c0ee692c 100644 --- a/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp @@ -17,15 +17,46 @@ #include "iox2/internal/iceoryx2.hpp" namespace iox2 { +class FileDescriptorView; + +/// Abstract class that can be implemented by a class that is based on a [`FileDescriptor`] +class FileDescriptorBased { + public: + FileDescriptorBased() = default; + FileDescriptorBased(const FileDescriptorBased&) = default; + FileDescriptorBased(FileDescriptorBased&&) = default; + auto operator=(const FileDescriptorBased&) -> FileDescriptorBased& = default; + auto operator=(FileDescriptorBased&&) -> FileDescriptorBased& = default; + virtual ~FileDescriptorBased() = default; + + /// Returns a [`FileDescriptorView`] to the underlying [`FileDescriptor`]. + virtual auto file_descriptor() const -> FileDescriptorView = 0; +}; + /// A view to a [`FileDescriptor`]. -class FileDescriptorView { +class FileDescriptorView : public FileDescriptorBased { private: template friend class WaitSet; friend class FileDescriptor; + template + friend class Listener; explicit FileDescriptorView(iox2_file_descriptor_ptr handle); + /// Returns a [`FileDescriptorView`] to the underlying [`FileDescriptor`]. + auto file_descriptor() const -> FileDescriptorView override; + + /// Returns the underlying [`FileDescriptor`] value. + /// + /// # Safety + /// + /// * the user shall not store the value in a variable otherwise lifetime issues may be + /// encountered + /// * do not manually close the file descriptor with a sys call + /// + auto unsafe_native_handle() const -> int32_t; + iox2_file_descriptor_ptr m_handle = nullptr; }; @@ -49,7 +80,14 @@ class FileDescriptor { ~FileDescriptor(); /// Returns the underlying [`FileDescriptor`] value. - auto native_handle() const -> int32_t; + /// + /// # Safety + /// + /// * the user shall not store the value in a variable otherwise lifetime issues may be + /// encountered + /// * do not manually close the file descriptor with a sys call + /// + auto unsafe_native_handle() const -> int32_t; /// Creates a [`FileDescriptorView`] out of the [`FileDescriptor`]. The view is only valid as /// long as the [`FileDescriptor`] is living - otherwise it will be a dangling view. diff --git a/iceoryx2-ffi/cxx/include/iox2/listener.hpp b/iceoryx2-ffi/cxx/include/iox2/listener.hpp index 10d5c128c..2f85f7e0e 100644 --- a/iceoryx2-ffi/cxx/include/iox2/listener.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/listener.hpp @@ -18,6 +18,7 @@ #include "iox/function.hpp" #include "iox/optional.hpp" #include "iox2/event_id.hpp" +#include "iox2/file_descriptor.hpp" #include "iox2/internal/iceoryx2.hpp" #include "iox2/listener_error.hpp" #include "iox2/service_type.hpp" @@ -26,15 +27,18 @@ namespace iox2 { /// Represents the receiving endpoint of an event based communication. template -class Listener { +class Listener : public FileDescriptorBased { public: Listener(Listener&&) noexcept; auto operator=(Listener&&) noexcept -> Listener&; - ~Listener(); + ~Listener() override; Listener(const Listener&) = delete; auto operator=(const Listener&) -> Listener& = delete; + /// Returns a [`FileDescriptorView`] to the underlying [`FileDescriptor`] of the [`Listener`]. + auto file_descriptor() const -> FileDescriptorView override; + /// Returns the [`UniqueListenerId`] of the [`Listener`] auto id() const -> UniqueListenerId; diff --git a/iceoryx2-ffi/cxx/include/iox2/waitset.hpp b/iceoryx2-ffi/cxx/include/iox2/waitset.hpp index 8a118c86e..67ac6348b 100644 --- a/iceoryx2-ffi/cxx/include/iox2/waitset.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/waitset.hpp @@ -174,17 +174,17 @@ class WaitSet { /// * The [`WaitSetGuard`] must life at least as long as the [`WaitsSet`]. auto attach_notification(const Listener& listener) -> iox::expected, WaitSetAttachmentError>; - /// Attaches a [`FileDescriptorView`] as notification to the [`WaitSet`]. Whenever an event is received on the - /// object the [`WaitSet`] informs the user in [`WaitSet::wait_and_process()`] to handle the event. - /// The object cannot be attached twice and the + /// Attaches a [`FileDescriptorBased`] object as notification to the [`WaitSet`]. Whenever an event is received on + /// the object the [`WaitSet`] informs the user in [`WaitSet::wait_and_process()`] to handle the event. The object + /// cannot be attached twice and the /// [`WaitSet::capacity()`] is limited by the underlying implementation. /// /// # Safety /// /// * The corresponding [`FileDescriptor`] must life at least as long as the returned [`WaitSetGuard`]. /// * The [`WaitSetGuard`] must life at least as long as the [`WaitsSet`]. - auto - attach_notification(FileDescriptorView file_descriptor) -> iox::expected, WaitSetAttachmentError>; + auto attach_notification(const FileDescriptorBased& attachment) + -> iox::expected, WaitSetAttachmentError>; /// Attaches a [`Listener`] as deadline to the [`WaitSet`]. Whenever the event is received or the /// deadline is hit, the user is informed in [`WaitSet::wait_and_process()`]. @@ -199,7 +199,7 @@ class WaitSet { auto attach_deadline(const Listener& listener, iox::units::Duration deadline) -> iox::expected, WaitSetAttachmentError>; - /// Attaches a [`FileDescriptorView`] as deadline to the [`WaitSet`]. Whenever the event is received or the + /// Attaches a [`FileDescriptorBased`] object as deadline to the [`WaitSet`]. Whenever the event is received or the /// deadline is hit, the user is informed in [`WaitSet::wait_and_process()`]. /// The object cannot be attached twice and the /// [`WaitSet::capacity()`] is limited by the underlying implementation. @@ -209,7 +209,7 @@ class WaitSet { /// /// * The corresponding [`FileDescriptor`] must life at least as long as the returned [`WaitSetGuard`]. /// * The [`WaitSetGuard`] must life at least as long as the [`WaitsSet`]. - auto attach_deadline(FileDescriptorView file_descriptor, + auto attach_deadline(const FileDescriptorBased& attachment, iox::units::Duration deadline) -> iox::expected, WaitSetAttachmentError>; /// Attaches a tick event to the [`WaitSet`]. Whenever the timeout is reached the [`WaitSet`] diff --git a/iceoryx2-ffi/cxx/src/file_descriptor.cpp b/iceoryx2-ffi/cxx/src/file_descriptor.cpp index f8abb6203..77a8c8e7b 100644 --- a/iceoryx2-ffi/cxx/src/file_descriptor.cpp +++ b/iceoryx2-ffi/cxx/src/file_descriptor.cpp @@ -17,6 +17,14 @@ FileDescriptorView::FileDescriptorView(iox2_file_descriptor_ptr handle) : m_handle { handle } { } +auto FileDescriptorView::file_descriptor() const -> FileDescriptorView { + return *this; +} + +auto FileDescriptorView::unsafe_native_handle() const -> int32_t { + return iox2_file_descriptor_native_handle(m_handle); +} + auto FileDescriptor::create_owning(int32_t file_descriptor) -> iox::optional { iox2_file_descriptor_h handle = nullptr; if (iox2_file_descriptor_new(file_descriptor, true, nullptr, &handle)) { @@ -64,8 +72,8 @@ void FileDescriptor::drop() { } } -auto FileDescriptor::native_handle() const -> int32_t { - return iox2_file_descriptor_native_handle(&m_handle); +auto FileDescriptor::unsafe_native_handle() const -> int32_t { + return iox2_file_descriptor_native_handle(iox2_cast_file_descriptor_ptr(m_handle)); } auto FileDescriptor::as_view() const -> FileDescriptorView { diff --git a/iceoryx2-ffi/cxx/src/listener.cpp b/iceoryx2-ffi/cxx/src/listener.cpp index c80b8f340..7bc3943d3 100644 --- a/iceoryx2-ffi/cxx/src/listener.cpp +++ b/iceoryx2-ffi/cxx/src/listener.cpp @@ -49,6 +49,11 @@ void Listener::drop() { } } +template +auto Listener::file_descriptor() const -> FileDescriptorView { + return FileDescriptorView(iox2_listener_get_file_descriptor(&m_handle)); +} + template auto Listener::id() const -> UniqueListenerId { iox2_unique_listener_id_h id_handle = nullptr; diff --git a/iceoryx2-ffi/cxx/src/waitset.cpp b/iceoryx2-ffi/cxx/src/waitset.cpp index c9a654a86..28ed3a57c 100644 --- a/iceoryx2-ffi/cxx/src/waitset.cpp +++ b/iceoryx2-ffi/cxx/src/waitset.cpp @@ -241,11 +241,11 @@ auto WaitSet::attach_interval(const iox::units::Duration deadline) } template -auto WaitSet::attach_deadline(FileDescriptorView file_descriptor, const iox::units::Duration deadline) +auto WaitSet::attach_deadline(const FileDescriptorBased& attachment, const iox::units::Duration deadline) -> iox::expected, WaitSetAttachmentError> { iox2_waitset_guard_h guard_handle {}; auto result = iox2_waitset_attach_deadline(&m_handle, - file_descriptor.m_handle, + attachment.file_descriptor().m_handle, deadline.toSeconds(), deadline.toNanoseconds() - deadline.toSeconds() * iox::units::Duration::NANOSECS_PER_SEC, @@ -266,10 +266,11 @@ auto WaitSet::attach_deadline(const Listener& listener, const iox::units:: } template -auto WaitSet::attach_notification(const FileDescriptorView file_descriptor) +auto WaitSet::attach_notification(const FileDescriptorBased& attachment) -> iox::expected, WaitSetAttachmentError> { iox2_waitset_guard_h guard_handle {}; - auto result = iox2_waitset_attach_notification(&m_handle, file_descriptor.m_handle, nullptr, &guard_handle); + auto result = + iox2_waitset_attach_notification(&m_handle, attachment.file_descriptor().m_handle, nullptr, &guard_handle); if (result == IOX2_OK) { return iox::ok(WaitSetGuard(guard_handle)); diff --git a/iceoryx2-ffi/ffi/src/api/file_descriptor.rs b/iceoryx2-ffi/ffi/src/api/file_descriptor.rs index 0023c1396..d5ec18e2c 100644 --- a/iceoryx2-ffi/ffi/src/api/file_descriptor.rs +++ b/iceoryx2-ffi/ffi/src/api/file_descriptor.rs @@ -147,15 +147,10 @@ pub unsafe extern "C" fn iox2_file_descriptor_drop(handle: iox2_file_descriptor_ /// * `handle` must be valid and acquired with [`iox2_file_descriptor_new()`]. #[no_mangle] pub unsafe extern "C" fn iox2_file_descriptor_native_handle( - handle: iox2_file_descriptor_h_ref, + handle: iox2_file_descriptor_ptr, ) -> i32 { - handle.assert_non_null(); - - (*handle.as_type()) - .value - .as_ref() - .file_descriptor() - .native_handle() + debug_assert!(!handle.is_null()); + (*handle).file_descriptor().native_handle() } /// Creates a new [`iox2_file_descriptor_t`].