From f3722cf0562f1a5aa40ea8eabf0b949057310a52 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Sun, 24 Nov 2024 14:37:49 +0100 Subject: [PATCH 01/10] [#390] Implement event based comm c++ publisher --- examples/README.md | 2 +- examples/cxx/CMakeLists.txt | 1 + .../cxx/event_based_communication/BUILD.bazel | 39 +++++ .../event_based_communication/CMakeLists.txt | 22 +++ .../cxx/event_based_communication/README.md | 1 + .../src/publisher.cpp | 153 ++++++++++++++++++ .../src/pubsub_event.hpp | 63 ++++++++ .../src/subscriber.cpp | 45 ++++++ .../src/transmission_data.hpp | 30 ++++ .../event_based_communication/publisher.rs | 2 + .../cxx/include/iox2/file_descriptor.hpp | 2 + iceoryx2-ffi/cxx/include/iox2/listener.hpp | 4 + iceoryx2-ffi/cxx/src/listener.cpp | 5 + 13 files changed, 368 insertions(+), 1 deletion(-) create mode 100644 examples/cxx/event_based_communication/BUILD.bazel create mode 100644 examples/cxx/event_based_communication/CMakeLists.txt create mode 100644 examples/cxx/event_based_communication/README.md create mode 100644 examples/cxx/event_based_communication/src/publisher.cpp create mode 100644 examples/cxx/event_based_communication/src/pubsub_event.hpp create mode 100644 examples/cxx/event_based_communication/src/subscriber.cpp create mode 100644 examples/cxx/event_based_communication/src/transmission_data.hpp diff --git a/examples/README.md b/examples/README.md index 3d10d42e8..8ca30e5cb 100644 --- a/examples/README.md +++ b/examples/README.md @@ -77,7 +77,7 @@ These types are demonstrated in the complex data types example. | docker | [all](rust/docker) | Communicate between different docker containers and the host. | | domains | [C](c/domains) [C++](cxx/domains) [Rust](rust/domains) | Establish separate domains that operate independently from one another. | | event | [C](c/event) [C++](cxx/event) [Rust](rust/event) | Push notifications - send event signals to wakeup processes that are waiting for them. | -| event based communication | [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. | +| event based communication | [C++](rust/event_based_communication) [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. | | event multiplexing | [C](c/event_multiplexing) [C++](cxx/event_multiplexing) [Rust](rust/event_multiplexing) | Wait on multiple listeners or sockets with a single call. The WaitSet demultiplexes incoming events and notifies the user. | | publish subscribe | [C](c/publish_subscribe) [C++](cxx/publish_subscribe) [Rust](rust/publish_subscribe) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern). | | publish subscribe dynamic data | [C++](cxx/publish_subscribe_dynamic_data) [Rust](rust/publish_subscribe_dynamic_data) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern) and payload data that has a dynamic size. | diff --git a/examples/cxx/CMakeLists.txt b/examples/cxx/CMakeLists.txt index 01d57e7ed..40836c68e 100644 --- a/examples/cxx/CMakeLists.txt +++ b/examples/cxx/CMakeLists.txt @@ -21,6 +21,7 @@ add_subdirectory(discovery) add_subdirectory(domains) add_subdirectory(event) add_subdirectory(event_multiplexing) +add_subdirectory(event_based_communication) add_subdirectory(publish_subscribe) add_subdirectory(publish_subscribe_dynamic_data) add_subdirectory(publish_subscribe_with_user_header) diff --git a/examples/cxx/event_based_communication/BUILD.bazel b/examples/cxx/event_based_communication/BUILD.bazel new file mode 100644 index 000000000..5a6758442 --- /dev/null +++ b/examples/cxx/event_based_communication/BUILD.bazel @@ -0,0 +1,39 @@ +# Copyright (c) 2024 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache Software License 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +# which is available at https://opensource.org/licenses/MIT. +# +# SPDX-License-Identifier: Apache-2.0 OR MIT + +load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library") + +cc_binary( + name = "example_cxx_event_based_communication_publisher", + srcs = [ + "src/publisher.cpp", + "src/transmission_data.hpp", + "src/pubsub_event.hpp", + ], + deps = [ + "@iceoryx//:iceoryx_hoofs", + "//:iceoryx2-cxx-static", + ], +) + +cc_binary( + name = "example_cxx_event_based_communication_subscriber", + srcs = [ + "src/subscriber.cpp", + "src/transmission_data.hpp", + "src/pubsub_event.hpp", + ], + deps = [ + "@iceoryx//:iceoryx_hoofs", + "//:iceoryx2-cxx-static", + ], +) diff --git a/examples/cxx/event_based_communication/CMakeLists.txt b/examples/cxx/event_based_communication/CMakeLists.txt new file mode 100644 index 000000000..30c2bb19d --- /dev/null +++ b/examples/cxx/event_based_communication/CMakeLists.txt @@ -0,0 +1,22 @@ +# Copyright (c) 2024 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache Software License 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +# which is available at https://opensource.org/licenses/MIT. +# +# SPDX-License-Identifier: Apache-2.0 OR MIT + +cmake_minimum_required(VERSION 3.22) +project(example_cxx_event_based_communication LANGUAGES CXX) + +find_package(iceoryx2-cxx 0.4.1 REQUIRED) + +add_executable(example_cxx_event_based_communication_publisher src/publisher.cpp) +target_link_libraries(example_cxx_event_based_communication_publisher iceoryx2-cxx::static-lib-cxx) + +add_executable(example_cxx_event_based_communication_subscriber src/subscriber.cpp) +target_link_libraries(example_cxx_event_based_communication_subscriber iceoryx2-cxx::static-lib-cxx) diff --git a/examples/cxx/event_based_communication/README.md b/examples/cxx/event_based_communication/README.md new file mode 100644 index 000000000..9656f4804 --- /dev/null +++ b/examples/cxx/event_based_communication/README.md @@ -0,0 +1 @@ +# asd diff --git a/examples/cxx/event_based_communication/src/publisher.cpp b/examples/cxx/event_based_communication/src/publisher.cpp new file mode 100644 index 000000000..202bad4d1 --- /dev/null +++ b/examples/cxx/event_based_communication/src/publisher.cpp @@ -0,0 +1,153 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#include "iox2/publisher.hpp" +#include "iox/duration.hpp" +#include "iox2/listener.hpp" +#include "iox2/node.hpp" +#include "iox2/notifier.hpp" +#include "iox2/sample_mut.hpp" +#include "iox2/service_name.hpp" +#include "iox2/service_type.hpp" +#include "iox2/waitset.hpp" +#include "pubsub_event.hpp" +#include "transmission_data.hpp" + +#include +#include + +using namespace iox2; + +constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); +constexpr uint64_t HISTORY_SIZE = 20; + +class EventBasedPublisher { + public: + EventBasedPublisher(const EventBasedPublisher&) = delete; + EventBasedPublisher(EventBasedPublisher&&) = default; + ~EventBasedPublisher(); + + auto operator=(const EventBasedPublisher&) -> EventBasedPublisher& = delete; + auto operator=(EventBasedPublisher&&) -> EventBasedPublisher& = default; + + static auto create(Node& node, const ServiceName& service_name) -> EventBasedPublisher; + void handle_event(); + void send(uint64_t counter); + auto file_descriptor() -> FileDescriptorView; + + private: + EventBasedPublisher(Publisher&& publisher, + Listener&& listener, + Notifier&& notifier); + + Publisher m_publisher; + Listener m_listener; + Notifier m_notifier; +}; + +auto main() -> int { + auto node = NodeBuilder().create().expect("successful node creation"); + auto publisher = EventBasedPublisher::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); + + auto waitset = WaitSetBuilder().create().expect(""); + auto publisher_guard = waitset.attach_notification(publisher.file_descriptor()).expect(""); + auto cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME).expect(""); + + uint64_t counter = 0; + + auto on_event = [&](WaitSetAttachmentId attachment_id) -> CallbackProgression { + if (attachment_id.has_event_from(cyclic_trigger_guard)) { + std::cout << "send message: " << counter << std::endl; + publisher.send(counter); + counter += 1; + } else if (attachment_id.has_event_from(publisher_guard)) { + publisher.handle_event(); + } + return CallbackProgression::Continue; + }; + + waitset.wait_and_process(on_event).expect(""); + + std::cout << "exit ..." << std::endl; + + return 0; +} + +EventBasedPublisher::EventBasedPublisher(Publisher&& publisher, + Listener&& listener, + Notifier&& notifier) + : m_publisher { std::move(publisher) } + , m_listener { std::move(listener) } + , m_notifier { std::move(notifier) } { +} + +EventBasedPublisher::~EventBasedPublisher() { + m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::PublisherDisconnected))) + .expect(""); +} + +auto EventBasedPublisher::create(Node& node, const ServiceName& service_name) -> EventBasedPublisher { + auto pubsub_service = node.service_builder(service_name) + .publish_subscribe() + .history_size(HISTORY_SIZE) + .subscriber_max_buffer_size(HISTORY_SIZE) + .open_or_create() + .expect(""); + auto event_service = node.service_builder(service_name).event().open_or_create().expect(""); + + auto notifier = event_service.notifier_builder().create().expect(""); + auto listener = event_service.listener_builder().create().expect(""); + auto publisher = pubsub_service.publisher_builder().create().expect(""); + + return EventBasedPublisher { std::move(publisher), std::move(listener), std::move(notifier) }; +} + +auto EventBasedPublisher::file_descriptor() -> FileDescriptorView { + return m_listener.file_descriptor(); +} + +void EventBasedPublisher::handle_event() { + for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); + event = m_listener.try_wait_one()) { + switch (iox::from(event.value()->as_value())) { + case PubSubEvent::SubscriberConnected: { + std::cout << "new subscriber connected - delivering history" << std::endl; + m_publisher.update_connections().expect(""); + m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SentHistory))) + .expect(""); + break; + } + case PubSubEvent::SubscriberDisconnected: { + std::cout << "subscriber disconnected" << std::endl; + break; + } + case PubSubEvent::ReceivedSample: { + std::cout << "subscriber has consumed sample" << std::endl; + break; + } + default: { + break; + } + } + } +} + +void EventBasedPublisher::send(const uint64_t counter) { + constexpr double SOME_NUMBER = 812.12; + auto sample = m_publisher.loan_uninit().expect(""); + sample.write_payload(TransmissionData { + static_cast(counter), static_cast(counter), static_cast(counter) * SOME_NUMBER }); + auto initialized_sample = assume_init(std::move(sample)); + ::send(std::move(initialized_sample)).expect(""); + + m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SentSample))).expect(""); +} diff --git a/examples/cxx/event_based_communication/src/pubsub_event.hpp b/examples/cxx/event_based_communication/src/pubsub_event.hpp new file mode 100644 index 000000000..553f59376 --- /dev/null +++ b/examples/cxx/event_based_communication/src/pubsub_event.hpp @@ -0,0 +1,63 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#ifndef IOX2_EXAMPLES_PUBSUB_EVENT_HPP +#define IOX2_EXAMPLES_PUBSUB_EVENT_HPP + +#include "iox/into.hpp" +#include "iox2/event_id.hpp" + +#include + +enum class PubSubEvent : uint8_t { + PublisherConnected = 0, + PublisherDisconnected = 1, + SubscriberConnected = 2, + SubscriberDisconnected = 3, + SentSample = 4, + ReceivedSample = 5, + SentHistory = 6, + Unknown +}; + +namespace iox { +template <> +inline constexpr auto from(const PubSubEvent value) noexcept -> size_t { + return static_cast(value); +} + +template <> +inline constexpr auto from(const size_t value) noexcept -> PubSubEvent { + switch (value) { + case from(PubSubEvent::PublisherConnected): + return PubSubEvent::PublisherConnected; + case from(PubSubEvent::PublisherDisconnected): + return PubSubEvent::PublisherDisconnected; + case from(PubSubEvent::SubscriberConnected): + return PubSubEvent::SubscriberConnected; + case from(PubSubEvent::SubscriberDisconnected): + return PubSubEvent::SubscriberDisconnected; + case from(PubSubEvent::SentSample): + return PubSubEvent::SentSample; + case from(PubSubEvent::ReceivedSample): + return PubSubEvent::ReceivedSample; + case from(PubSubEvent::SentHistory): + return PubSubEvent::SentHistory; + default: + return PubSubEvent::Unknown; + } + IOX_UNREACHABLE(); +} +} // namespace iox + + +#endif diff --git a/examples/cxx/event_based_communication/src/subscriber.cpp b/examples/cxx/event_based_communication/src/subscriber.cpp new file mode 100644 index 000000000..b534b4e3a --- /dev/null +++ b/examples/cxx/event_based_communication/src/subscriber.cpp @@ -0,0 +1,45 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#include + +#include "iox/duration.hpp" +#include "iox2/node.hpp" +#include "iox2/service_name.hpp" +#include "iox2/service_type.hpp" +#include "transmission_data.hpp" + +constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); + +auto main() -> int { + using namespace iox2; + auto node = NodeBuilder().create().expect("successful node creation"); + + auto service = node.service_builder(ServiceName::create("My/Funk/ServiceName").expect("valid service name")) + .publish_subscribe() + .open_or_create() + .expect("successful service creation/opening"); + + auto subscriber = service.subscriber_builder().create().expect("successful subscriber creation"); + + while (node.wait(CYCLE_TIME).has_value()) { + auto sample = subscriber.receive().expect("receive succeeds"); + while (sample.has_value()) { + std::cout << "received: " << sample->payload() << std::endl; + sample = subscriber.receive().expect("receive succeeds"); + } + } + + std::cout << "exit" << std::endl; + + return 0; +} diff --git a/examples/cxx/event_based_communication/src/transmission_data.hpp b/examples/cxx/event_based_communication/src/transmission_data.hpp new file mode 100644 index 000000000..e5b83272d --- /dev/null +++ b/examples/cxx/event_based_communication/src/transmission_data.hpp @@ -0,0 +1,30 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#ifndef IOX2_EXAMPLES_TRANSMISSION_DATA_HPP +#define IOX2_EXAMPLES_TRANSMISSION_DATA_HPP + +#include +#include + +struct TransmissionData { + std::int32_t x; + std::int32_t y; + double funky; +}; + +inline auto operator<<(std::ostream& stream, const TransmissionData& value) -> std::ostream& { + stream << "TransmissionData { x: " << value.x << ", y: " << value.y << ", funky: " << value.funky << " }"; + return stream; +} + +#endif diff --git a/examples/rust/event_based_communication/publisher.rs b/examples/rust/event_based_communication/publisher.rs index 0e67e17fe..d079d8673 100644 --- a/examples/rust/event_based_communication/publisher.rs +++ b/examples/rust/event_based_communication/publisher.rs @@ -46,6 +46,8 @@ fn main() -> Result<(), Box> { waitset.wait_and_process(on_event)?; + println!("exit ..."); + Ok(()) } diff --git a/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp b/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp index 77da30646..b0c3a8283 100644 --- a/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp @@ -23,6 +23,8 @@ class FileDescriptorView { template friend class WaitSet; friend class FileDescriptor; + template + friend class Listener; explicit FileDescriptorView(iox2_file_descriptor_ptr handle); diff --git a/iceoryx2-ffi/cxx/include/iox2/listener.hpp b/iceoryx2-ffi/cxx/include/iox2/listener.hpp index 10d5c128c..5eecfecd2 100644 --- a/iceoryx2-ffi/cxx/include/iox2/listener.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/listener.hpp @@ -18,6 +18,7 @@ #include "iox/function.hpp" #include "iox/optional.hpp" #include "iox2/event_id.hpp" +#include "iox2/file_descriptor.hpp" #include "iox2/internal/iceoryx2.hpp" #include "iox2/listener_error.hpp" #include "iox2/service_type.hpp" @@ -35,6 +36,9 @@ class Listener { Listener(const Listener&) = delete; auto operator=(const Listener&) -> Listener& = delete; + /// Returns a [`FileDescriptorView`] to the underlying [`FileDescriptor`] of the [`Listener`]. + auto file_descriptor() const -> FileDescriptorView; + /// Returns the [`UniqueListenerId`] of the [`Listener`] auto id() const -> UniqueListenerId; diff --git a/iceoryx2-ffi/cxx/src/listener.cpp b/iceoryx2-ffi/cxx/src/listener.cpp index c80b8f340..7bc3943d3 100644 --- a/iceoryx2-ffi/cxx/src/listener.cpp +++ b/iceoryx2-ffi/cxx/src/listener.cpp @@ -49,6 +49,11 @@ void Listener::drop() { } } +template +auto Listener::file_descriptor() const -> FileDescriptorView { + return FileDescriptorView(iox2_listener_get_file_descriptor(&m_handle)); +} + template auto Listener::id() const -> UniqueListenerId { iox2_unique_listener_id_h id_handle = nullptr; From 7655e31660b12df3df7a3e65bd540b85a27fe9ca Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Sun, 24 Nov 2024 15:29:18 +0100 Subject: [PATCH 02/10] [#390] Introduce FileDescriptorBased in C++ API --- .../src/publisher.cpp | 11 +++++----- .../cxx/include/iox2/file_descriptor.hpp | 21 ++++++++++++++++++- iceoryx2-ffi/cxx/include/iox2/listener.hpp | 6 +++--- iceoryx2-ffi/cxx/include/iox2/waitset.hpp | 14 ++++++------- iceoryx2-ffi/cxx/src/file_descriptor.cpp | 4 ++++ iceoryx2-ffi/cxx/src/waitset.cpp | 9 ++++---- 6 files changed, 45 insertions(+), 20 deletions(-) diff --git a/examples/cxx/event_based_communication/src/publisher.cpp b/examples/cxx/event_based_communication/src/publisher.cpp index 202bad4d1..8bcaf6840 100644 --- a/examples/cxx/event_based_communication/src/publisher.cpp +++ b/examples/cxx/event_based_communication/src/publisher.cpp @@ -12,6 +12,7 @@ #include "iox2/publisher.hpp" #include "iox/duration.hpp" +#include "iox2/file_descriptor.hpp" #include "iox2/listener.hpp" #include "iox2/node.hpp" #include "iox2/notifier.hpp" @@ -30,11 +31,11 @@ using namespace iox2; constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); constexpr uint64_t HISTORY_SIZE = 20; -class EventBasedPublisher { +class EventBasedPublisher : public FileDescriptorBased { public: EventBasedPublisher(const EventBasedPublisher&) = delete; EventBasedPublisher(EventBasedPublisher&&) = default; - ~EventBasedPublisher(); + ~EventBasedPublisher() override; auto operator=(const EventBasedPublisher&) -> EventBasedPublisher& = delete; auto operator=(EventBasedPublisher&&) -> EventBasedPublisher& = default; @@ -42,7 +43,7 @@ class EventBasedPublisher { static auto create(Node& node, const ServiceName& service_name) -> EventBasedPublisher; void handle_event(); void send(uint64_t counter); - auto file_descriptor() -> FileDescriptorView; + auto file_descriptor() const -> FileDescriptorView override; private: EventBasedPublisher(Publisher&& publisher, @@ -59,7 +60,7 @@ auto main() -> int { auto publisher = EventBasedPublisher::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); auto waitset = WaitSetBuilder().create().expect(""); - auto publisher_guard = waitset.attach_notification(publisher.file_descriptor()).expect(""); + auto publisher_guard = waitset.attach_notification(publisher).expect(""); auto cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME).expect(""); uint64_t counter = 0; @@ -111,7 +112,7 @@ auto EventBasedPublisher::create(Node& node, const ServiceName return EventBasedPublisher { std::move(publisher), std::move(listener), std::move(notifier) }; } -auto EventBasedPublisher::file_descriptor() -> FileDescriptorView { +auto EventBasedPublisher::file_descriptor() const -> FileDescriptorView { return m_listener.file_descriptor(); } diff --git a/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp b/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp index b0c3a8283..73555e3d9 100644 --- a/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp @@ -17,8 +17,24 @@ #include "iox2/internal/iceoryx2.hpp" namespace iox2 { +class FileDescriptorView; + +/// Abstract class that can be implemented by a class that is based on a [`FileDescriptor`] +class FileDescriptorBased { + public: + FileDescriptorBased() = default; + FileDescriptorBased(const FileDescriptorBased&) = default; + FileDescriptorBased(FileDescriptorBased&&) = default; + auto operator=(const FileDescriptorBased&) -> FileDescriptorBased& = default; + auto operator=(FileDescriptorBased&&) -> FileDescriptorBased& = default; + virtual ~FileDescriptorBased() = default; + + /// Returns a [`FileDescriptorView`] to the underlying [`FileDescriptor`]. + virtual auto file_descriptor() const -> FileDescriptorView = 0; +}; + /// A view to a [`FileDescriptor`]. -class FileDescriptorView { +class FileDescriptorView : public FileDescriptorBased { private: template friend class WaitSet; @@ -28,6 +44,9 @@ class FileDescriptorView { explicit FileDescriptorView(iox2_file_descriptor_ptr handle); + /// Returns a [`FileDescriptorView`] to the underlying [`FileDescriptor`]. + auto file_descriptor() const -> FileDescriptorView override; + iox2_file_descriptor_ptr m_handle = nullptr; }; diff --git a/iceoryx2-ffi/cxx/include/iox2/listener.hpp b/iceoryx2-ffi/cxx/include/iox2/listener.hpp index 5eecfecd2..2f85f7e0e 100644 --- a/iceoryx2-ffi/cxx/include/iox2/listener.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/listener.hpp @@ -27,17 +27,17 @@ namespace iox2 { /// Represents the receiving endpoint of an event based communication. template -class Listener { +class Listener : public FileDescriptorBased { public: Listener(Listener&&) noexcept; auto operator=(Listener&&) noexcept -> Listener&; - ~Listener(); + ~Listener() override; Listener(const Listener&) = delete; auto operator=(const Listener&) -> Listener& = delete; /// Returns a [`FileDescriptorView`] to the underlying [`FileDescriptor`] of the [`Listener`]. - auto file_descriptor() const -> FileDescriptorView; + auto file_descriptor() const -> FileDescriptorView override; /// Returns the [`UniqueListenerId`] of the [`Listener`] auto id() const -> UniqueListenerId; diff --git a/iceoryx2-ffi/cxx/include/iox2/waitset.hpp b/iceoryx2-ffi/cxx/include/iox2/waitset.hpp index 8a118c86e..67ac6348b 100644 --- a/iceoryx2-ffi/cxx/include/iox2/waitset.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/waitset.hpp @@ -174,17 +174,17 @@ class WaitSet { /// * The [`WaitSetGuard`] must life at least as long as the [`WaitsSet`]. auto attach_notification(const Listener& listener) -> iox::expected, WaitSetAttachmentError>; - /// Attaches a [`FileDescriptorView`] as notification to the [`WaitSet`]. Whenever an event is received on the - /// object the [`WaitSet`] informs the user in [`WaitSet::wait_and_process()`] to handle the event. - /// The object cannot be attached twice and the + /// Attaches a [`FileDescriptorBased`] object as notification to the [`WaitSet`]. Whenever an event is received on + /// the object the [`WaitSet`] informs the user in [`WaitSet::wait_and_process()`] to handle the event. The object + /// cannot be attached twice and the /// [`WaitSet::capacity()`] is limited by the underlying implementation. /// /// # Safety /// /// * The corresponding [`FileDescriptor`] must life at least as long as the returned [`WaitSetGuard`]. /// * The [`WaitSetGuard`] must life at least as long as the [`WaitsSet`]. - auto - attach_notification(FileDescriptorView file_descriptor) -> iox::expected, WaitSetAttachmentError>; + auto attach_notification(const FileDescriptorBased& attachment) + -> iox::expected, WaitSetAttachmentError>; /// Attaches a [`Listener`] as deadline to the [`WaitSet`]. Whenever the event is received or the /// deadline is hit, the user is informed in [`WaitSet::wait_and_process()`]. @@ -199,7 +199,7 @@ class WaitSet { auto attach_deadline(const Listener& listener, iox::units::Duration deadline) -> iox::expected, WaitSetAttachmentError>; - /// Attaches a [`FileDescriptorView`] as deadline to the [`WaitSet`]. Whenever the event is received or the + /// Attaches a [`FileDescriptorBased`] object as deadline to the [`WaitSet`]. Whenever the event is received or the /// deadline is hit, the user is informed in [`WaitSet::wait_and_process()`]. /// The object cannot be attached twice and the /// [`WaitSet::capacity()`] is limited by the underlying implementation. @@ -209,7 +209,7 @@ class WaitSet { /// /// * The corresponding [`FileDescriptor`] must life at least as long as the returned [`WaitSetGuard`]. /// * The [`WaitSetGuard`] must life at least as long as the [`WaitsSet`]. - auto attach_deadline(FileDescriptorView file_descriptor, + auto attach_deadline(const FileDescriptorBased& attachment, iox::units::Duration deadline) -> iox::expected, WaitSetAttachmentError>; /// Attaches a tick event to the [`WaitSet`]. Whenever the timeout is reached the [`WaitSet`] diff --git a/iceoryx2-ffi/cxx/src/file_descriptor.cpp b/iceoryx2-ffi/cxx/src/file_descriptor.cpp index f8abb6203..d5d204063 100644 --- a/iceoryx2-ffi/cxx/src/file_descriptor.cpp +++ b/iceoryx2-ffi/cxx/src/file_descriptor.cpp @@ -17,6 +17,10 @@ FileDescriptorView::FileDescriptorView(iox2_file_descriptor_ptr handle) : m_handle { handle } { } +auto FileDescriptorView::file_descriptor() const -> FileDescriptorView { + return *this; +} + auto FileDescriptor::create_owning(int32_t file_descriptor) -> iox::optional { iox2_file_descriptor_h handle = nullptr; if (iox2_file_descriptor_new(file_descriptor, true, nullptr, &handle)) { diff --git a/iceoryx2-ffi/cxx/src/waitset.cpp b/iceoryx2-ffi/cxx/src/waitset.cpp index c9a654a86..28ed3a57c 100644 --- a/iceoryx2-ffi/cxx/src/waitset.cpp +++ b/iceoryx2-ffi/cxx/src/waitset.cpp @@ -241,11 +241,11 @@ auto WaitSet::attach_interval(const iox::units::Duration deadline) } template -auto WaitSet::attach_deadline(FileDescriptorView file_descriptor, const iox::units::Duration deadline) +auto WaitSet::attach_deadline(const FileDescriptorBased& attachment, const iox::units::Duration deadline) -> iox::expected, WaitSetAttachmentError> { iox2_waitset_guard_h guard_handle {}; auto result = iox2_waitset_attach_deadline(&m_handle, - file_descriptor.m_handle, + attachment.file_descriptor().m_handle, deadline.toSeconds(), deadline.toNanoseconds() - deadline.toSeconds() * iox::units::Duration::NANOSECS_PER_SEC, @@ -266,10 +266,11 @@ auto WaitSet::attach_deadline(const Listener& listener, const iox::units:: } template -auto WaitSet::attach_notification(const FileDescriptorView file_descriptor) +auto WaitSet::attach_notification(const FileDescriptorBased& attachment) -> iox::expected, WaitSetAttachmentError> { iox2_waitset_guard_h guard_handle {}; - auto result = iox2_waitset_attach_notification(&m_handle, file_descriptor.m_handle, nullptr, &guard_handle); + auto result = + iox2_waitset_attach_notification(&m_handle, attachment.file_descriptor().m_handle, nullptr, &guard_handle); if (result == IOX2_OK) { return iox::ok(WaitSetGuard(guard_handle)); From 9003d8b945aba42140f031cfafd436db0373c215 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Sun, 24 Nov 2024 16:17:51 +0100 Subject: [PATCH 03/10] [#390] Add documentation to example source code --- .../src/publisher.cpp | 13 ++ .../src/subscriber.cpp | 148 ++++++++++++++++-- .../event_based_communication/publisher.rs | 11 ++ .../event_based_communication/subscriber.rs | 12 ++ 4 files changed, 171 insertions(+), 13 deletions(-) diff --git a/examples/cxx/event_based_communication/src/publisher.cpp b/examples/cxx/event_based_communication/src/publisher.cpp index 8bcaf6840..5086cbb27 100644 --- a/examples/cxx/event_based_communication/src/publisher.cpp +++ b/examples/cxx/event_based_communication/src/publisher.cpp @@ -31,6 +31,9 @@ using namespace iox2; constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); constexpr uint64_t HISTORY_SIZE = 20; +// High-level publisher class that contains besides a publisher also a notifier and a listener. +// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` +// and the listener to wait for new subscribers. class EventBasedPublisher : public FileDescriptorBased { public: EventBasedPublisher(const EventBasedPublisher&) = delete; @@ -60,22 +63,29 @@ auto main() -> int { auto publisher = EventBasedPublisher::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); auto waitset = WaitSetBuilder().create().expect(""); + // Whenever our publisher receives an event we get notified. auto publisher_guard = waitset.attach_notification(publisher).expect(""); + // Attach an interval so that we wake up and can publish a new message auto cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME).expect(""); uint64_t counter = 0; + // Event callback that is called whenever the WaitSet received an event. auto on_event = [&](WaitSetAttachmentId attachment_id) -> CallbackProgression { + // when the cyclic trigger guard gets notified we send out a new message if (attachment_id.has_event_from(cyclic_trigger_guard)) { std::cout << "send message: " << counter << std::endl; publisher.send(counter); counter += 1; + // when something else happens on the publisher we handle the events } else if (attachment_id.has_event_from(publisher_guard)) { publisher.handle_event(); } return CallbackProgression::Continue; }; + // Start the event loop. It will run until `CallbackProgression::Stop` is returned by the + // event callback or an interrupt/termination signal was received. waitset.wait_and_process(on_event).expect(""); std::cout << "exit ..." << std::endl; @@ -109,6 +119,9 @@ auto EventBasedPublisher::create(Node& node, const ServiceName auto listener = event_service.listener_builder().create().expect(""); auto publisher = pubsub_service.publisher_builder().create().expect(""); + notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::PublisherConnected))) + .expect(""); + return EventBasedPublisher { std::move(publisher), std::move(listener), std::move(notifier) }; } diff --git a/examples/cxx/event_based_communication/src/subscriber.cpp b/examples/cxx/event_based_communication/src/subscriber.cpp index b534b4e3a..ec2de1203 100644 --- a/examples/cxx/event_based_communication/src/subscriber.cpp +++ b/examples/cxx/event_based_communication/src/subscriber.cpp @@ -13,33 +13,155 @@ #include #include "iox/duration.hpp" +#include "iox/into.hpp" +#include "iox2/file_descriptor.hpp" +#include "iox2/listener.hpp" #include "iox2/node.hpp" +#include "iox2/notifier.hpp" #include "iox2/service_name.hpp" #include "iox2/service_type.hpp" +#include "iox2/subscriber.hpp" +#include "iox2/waitset.hpp" +#include "pubsub_event.hpp" #include "transmission_data.hpp" -constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); +constexpr iox::units::Duration DEADLINE = iox::units::Duration::fromSeconds(2); + +using namespace iox2; + +// High-level subscriber class that contains besides a subscriber also a notifier +// and a listener. The notifier is used to send events like +// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber +// connected. +// The listener waits for events originating from the publisher like +// `PubSubEvent::SentSample`. +class EventBasedSubscriber : public FileDescriptorBased { + public: + EventBasedSubscriber(const EventBasedSubscriber&) = delete; + EventBasedSubscriber(EventBasedSubscriber&&) = default; + ~EventBasedSubscriber() override; + auto operator=(const EventBasedSubscriber&) -> EventBasedSubscriber& = delete; + auto operator=(EventBasedSubscriber&&) -> EventBasedSubscriber& = default; + + static auto create(Node& node, const ServiceName& service_name) -> EventBasedSubscriber; + auto file_descriptor() const -> FileDescriptorView override; + void handle_event(); + auto receive() -> iox::optional>; + + private: + EventBasedSubscriber(Subscriber&& subscriber, + Notifier&& notifier, + Listener&& listener); + + Subscriber m_subscriber; + Notifier m_notifier; + Listener m_listener; +}; auto main() -> int { - using namespace iox2; auto node = NodeBuilder().create().expect("successful node creation"); - auto service = node.service_builder(ServiceName::create("My/Funk/ServiceName").expect("valid service name")) - .publish_subscribe() - .open_or_create() - .expect("successful service creation/opening"); + auto subscriber = EventBasedSubscriber::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); - auto subscriber = service.subscriber_builder().create().expect("successful subscriber creation"); + auto waitset = WaitSetBuilder().create().expect(""); - while (node.wait(CYCLE_TIME).has_value()) { - auto sample = subscriber.receive().expect("receive succeeds"); - while (sample.has_value()) { - std::cout << "received: " << sample->payload() << std::endl; - sample = subscriber.receive().expect("receive succeeds"); + // The subscriber is attached as a deadline, meaning that we expect some activity + // latest after the deadline has passed. + auto subscriber_guard = waitset.attach_deadline(subscriber, DEADLINE).expect(""); + + auto on_event = [&](WaitSetAttachmentId attachment_id) { + // If we have received a new event on the subscriber we handle it. + if (attachment_id.has_event_from(subscriber_guard)) { + subscriber.handle_event(); + // If the subscriber did not receive an event until DEADLINE has + // passed, we print out a warning. + } else if (attachment_id.has_missed_deadline(subscriber_guard)) { + std::cout << "Contract violation! The subscriber did not receive a message for " << DEADLINE << std::endl; } - } + + return CallbackProgression::Continue; + }; + + waitset.wait_and_process(on_event).expect(""); std::cout << "exit" << std::endl; return 0; } + +EventBasedSubscriber::EventBasedSubscriber(Subscriber&& subscriber, + Notifier&& notifier, + Listener&& listener) + : m_subscriber { std::move(subscriber) } + , m_notifier { std::move(notifier) } + , m_listener { std::move(listener) } { +} + +EventBasedSubscriber::~EventBasedSubscriber() { + m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SubscriberDisconnected))) + .expect(""); +} + + +auto EventBasedSubscriber::create(Node& node, + const ServiceName& service_name) -> EventBasedSubscriber { + auto pubsub_service = + node.service_builder(service_name).publish_subscribe().open_or_create().expect(""); + auto event_service = node.service_builder(service_name).event().open_or_create().expect(""); + + auto listener = event_service.listener_builder().create().expect(""); + auto notifier = event_service.notifier_builder().create().expect(""); + auto subscriber = pubsub_service.subscriber_builder().create().expect(""); + + notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SubscriberConnected))) + .expect(""); + + return EventBasedSubscriber { std::move(subscriber), std::move(notifier), std::move(listener) }; +} + +auto EventBasedSubscriber::file_descriptor() const -> FileDescriptorView { + return m_listener.file_descriptor(); +} + +void EventBasedSubscriber::handle_event() { + for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); + event = m_listener.try_wait_one()) { + switch (iox::from(event.value()->as_value())) { + case PubSubEvent::SentHistory: { + std::cout << "History delivered" << std::endl; + for (auto sample = receive(); sample.has_value(); sample = receive()) { + std::cout << " history: " << sample->payload().x << std::endl; + } + break; + } + case PubSubEvent::SentSample: { + for (auto sample = receive(); sample.has_value(); sample = receive()) { + std::cout << "received: " << sample->payload().x << std::endl; + } + break; + } + case PubSubEvent::PublisherConnected: { + std::cout << "new publisher connected" << std::endl; + break; + } + case PubSubEvent::PublisherDisconnected: { + std::cout << "publisher disconnected" << std::endl; + break; + } + default: { + break; + } + } + } +} + +auto EventBasedSubscriber::receive() -> iox::optional> { + auto sample = m_subscriber.receive().expect(""); + if (sample.has_value()) { + m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::ReceivedSample))) + .expect(""); + } + + return sample; +} + diff --git a/examples/rust/event_based_communication/publisher.rs b/examples/rust/event_based_communication/publisher.rs index d079d8673..69f0388aa 100644 --- a/examples/rust/event_based_communication/publisher.rs +++ b/examples/rust/event_based_communication/publisher.rs @@ -28,22 +28,30 @@ fn main() -> Result<(), Box> { let publisher = EventBasedPublisher::new(&node, &"My/Funk/ServiceName".try_into()?)?; let waitset = WaitSetBuilder::new().create::()?; + + // Whenever our publisher receives an event we get notified. let publisher_guard = waitset.attach_notification(&publisher)?; + // Attach an interval so that we wake up and can publish a new message let cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME)?; let mut counter: u64 = 0; + // Event callback that is called whenever the WaitSet received an event. let on_event = |attachment_id: WaitSetAttachmentId| { + // when the cyclic trigger guard gets notified we send out a new message if attachment_id.has_event_from(&cyclic_trigger_guard) { println!("send message: {}", counter); publisher.send(counter).unwrap(); counter += 1; + // when something else happens on the publisher we handle the events } else if attachment_id.has_event_from(&publisher_guard) { publisher.handle_event().unwrap(); } CallbackProgression::Continue }; + // Start the event loop. It will run until `CallbackProgression::Stop` is returned by the + // event callback or an interrupt/termination signal was received. waitset.wait_and_process(on_event)?; println!("exit ..."); @@ -51,6 +59,9 @@ fn main() -> Result<(), Box> { Ok(()) } +/// High-level publisher class that contains besides a publisher also a notifier and a listener. +/// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` +/// and the listener to wait for new subscribers. #[derive(Debug)] struct EventBasedPublisher { publisher: Publisher, diff --git a/examples/rust/event_based_communication/subscriber.rs b/examples/rust/event_based_communication/subscriber.rs index e144e6bd0..88c074645 100644 --- a/examples/rust/event_based_communication/subscriber.rs +++ b/examples/rust/event_based_communication/subscriber.rs @@ -28,11 +28,17 @@ fn main() -> Result<(), Box> { let subscriber = EventBasedSubscriber::new(&node, &"My/Funk/ServiceName".try_into()?)?; let waitset = WaitSetBuilder::new().create::()?; + + // The subscriber is attached as a deadline, meaning that we expect some activity + // latest after the deadline has passed. let subscriber_guard = waitset.attach_deadline(&subscriber, DEADLINE)?; let on_event = |attachment_id: WaitSetAttachmentId| { + // If we have received a new event on the subscriber we handle it. if attachment_id.has_event_from(&subscriber_guard) { subscriber.handle_event().unwrap(); + // If the subscriber did not receive an event until DEADLINE has + // passed, we print out a warning. } else if attachment_id.has_missed_deadline(&subscriber_guard) { println!( "Contract violation! The subscriber did not receive a message for {:?}.", @@ -65,6 +71,12 @@ impl FileDescriptorBased for EventBasedSubscriber { impl SynchronousMultiplexing for EventBasedSubscriber {} +// High-level subscriber class that contains besides a subscriber also a notifier +// and a listener. The notifier is used to send events like +// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber +// connected. +// The listener waits for events originating from the publisher like +// `PubSubEvent::SentSample`. impl EventBasedSubscriber { fn new( node: &Node, From c766160c8afec3c589f4fe63f7f7fe0714575298 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Sun, 24 Nov 2024 16:19:48 +0100 Subject: [PATCH 04/10] [#390] Add readme --- .../cxx/event_based_communication/README.md | 36 ++++++++++++++++++- .../src/pubsub_event.hpp | 4 +-- .../src/subscriber.cpp | 1 - 3 files changed, 37 insertions(+), 4 deletions(-) diff --git a/examples/cxx/event_based_communication/README.md b/examples/cxx/event_based_communication/README.md index 9656f4804..868c4151e 100644 --- a/examples/cxx/event_based_communication/README.md +++ b/examples/cxx/event_based_communication/README.md @@ -1 +1,35 @@ -# asd +# Event-Based Communication + +## Running The Example + +This example demonstrates iceoryx2's event multiplexing mechanism in a more +complex setup. The iceoryx2 `Publisher` and `Subscriber` are integrated into +custom `ExamplePublisher` and `ExampleSubscriber` classes, which also +incorporate an additional iceoryx2 `Notifier` and `Listener`. This setup +enables automatic event emission whenever an `ExamplePublisher` or +`ExampleSubscriber` is created or dropped. Additionally, events are emitted +whenever a new `Sample` is sent or received. + +When a `class` inherits from `FileDescriptorBased`, it can be attached to a +`WaitSet`. Both `ExamplePublisher` and `ExampleSubscriber` implement this +interface by forwarding calls to their underlying `Listener`, which already +provides an implementation of `FileDescriptorBased`. + +The `WaitSet` notifies the user of the origin of an event notification. The +user can then acquire the `EventId` from the `Listener`. Based on the value of +the `EventId`, the user can identify the specific event that occurred and take +appropriate action. + +### Terminal 1 + +```sh +./target/ffi/build/examples/cxx/event_based_communication/example_cxx_event_based_communication_publisher +``` + +### Terminal 2 + +```sh +./target/ffi/build/examples/cxx/event_based_communication/example_cxx_event_based_communication_subscriber +``` + +Feel free to run multiple publishers or subscribers in parallel. diff --git a/examples/cxx/event_based_communication/src/pubsub_event.hpp b/examples/cxx/event_based_communication/src/pubsub_event.hpp index 553f59376..082171e30 100644 --- a/examples/cxx/event_based_communication/src/pubsub_event.hpp +++ b/examples/cxx/event_based_communication/src/pubsub_event.hpp @@ -31,12 +31,12 @@ enum class PubSubEvent : uint8_t { namespace iox { template <> -inline constexpr auto from(const PubSubEvent value) noexcept -> size_t { +constexpr auto from(const PubSubEvent value) noexcept -> size_t { return static_cast(value); } template <> -inline constexpr auto from(const size_t value) noexcept -> PubSubEvent { +constexpr auto from(const size_t value) noexcept -> PubSubEvent { switch (value) { case from(PubSubEvent::PublisherConnected): return PubSubEvent::PublisherConnected; diff --git a/examples/cxx/event_based_communication/src/subscriber.cpp b/examples/cxx/event_based_communication/src/subscriber.cpp index ec2de1203..89f2c5adf 100644 --- a/examples/cxx/event_based_communication/src/subscriber.cpp +++ b/examples/cxx/event_based_communication/src/subscriber.cpp @@ -164,4 +164,3 @@ auto EventBasedSubscriber::receive() -> iox::optional Date: Mon, 25 Nov 2024 11:09:17 +0100 Subject: [PATCH 05/10] [#390] Fix windows conflict with their c 'send' method --- examples/cxx/event_based_communication/src/publisher.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/cxx/event_based_communication/src/publisher.cpp b/examples/cxx/event_based_communication/src/publisher.cpp index 5086cbb27..dfb42db8a 100644 --- a/examples/cxx/event_based_communication/src/publisher.cpp +++ b/examples/cxx/event_based_communication/src/publisher.cpp @@ -161,7 +161,7 @@ void EventBasedPublisher::send(const uint64_t counter) { sample.write_payload(TransmissionData { static_cast(counter), static_cast(counter), static_cast(counter) * SOME_NUMBER }); auto initialized_sample = assume_init(std::move(sample)); - ::send(std::move(initialized_sample)).expect(""); + ::iox2::send(std::move(initialized_sample)).expect(""); m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SentSample))).expect(""); } From b2db0fdbf8116f75bf277bf1fe74ec18ede156e1 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 25 Nov 2024 23:38:21 +0100 Subject: [PATCH 06/10] [#390] Fix typos; clearer structure for example --- examples/README.md | 2 +- .../event_based_communication/src/publisher.cpp | 14 +++++++++++++- .../event_based_communication/src/pubsub_event.hpp | 14 +++++++------- .../event_based_communication/src/subscriber.cpp | 12 ++++++++++++ .../rust/event_based_communication/publisher.rs | 2 +- examples/rust/event_multiplexing/wait.rs | 2 +- 6 files changed, 35 insertions(+), 11 deletions(-) diff --git a/examples/README.md b/examples/README.md index 8ca30e5cb..dd5e051bc 100644 --- a/examples/README.md +++ b/examples/README.md @@ -77,7 +77,7 @@ These types are demonstrated in the complex data types example. | docker | [all](rust/docker) | Communicate between different docker containers and the host. | | domains | [C](c/domains) [C++](cxx/domains) [Rust](rust/domains) | Establish separate domains that operate independently from one another. | | event | [C](c/event) [C++](cxx/event) [Rust](rust/event) | Push notifications - send event signals to wakeup processes that are waiting for them. | -| event based communication | [C++](rust/event_based_communication) [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. | +| event based communication | [C++](cxx/event_based_communication) [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. | | event multiplexing | [C](c/event_multiplexing) [C++](cxx/event_multiplexing) [Rust](rust/event_multiplexing) | Wait on multiple listeners or sockets with a single call. The WaitSet demultiplexes incoming events and notifies the user. | | publish subscribe | [C](c/publish_subscribe) [C++](cxx/publish_subscribe) [Rust](rust/publish_subscribe) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern). | | publish subscribe dynamic data | [C++](cxx/publish_subscribe_dynamic_data) [Rust](rust/publish_subscribe_dynamic_data) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern) and payload data that has a dynamic size. | diff --git a/examples/cxx/event_based_communication/src/publisher.cpp b/examples/cxx/event_based_communication/src/publisher.cpp index dfb42db8a..e60739bb5 100644 --- a/examples/cxx/event_based_communication/src/publisher.cpp +++ b/examples/cxx/event_based_communication/src/publisher.cpp @@ -31,6 +31,10 @@ using namespace iox2; constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); constexpr uint64_t HISTORY_SIZE = 20; +/////////////////////////////////////////////// +/// START: EventBasedPublisher declaration +/////////////////////////////////////////////// + // High-level publisher class that contains besides a publisher also a notifier and a listener. // The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` // and the listener to wait for new subscribers. @@ -58,6 +62,10 @@ class EventBasedPublisher : public FileDescriptorBased { Notifier m_notifier; }; +/////////////////////////////////////////////// +/// START: main +/////////////////////////////////////////////// + auto main() -> int { auto node = NodeBuilder().create().expect("successful node creation"); auto publisher = EventBasedPublisher::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); @@ -88,11 +96,15 @@ auto main() -> int { // event callback or an interrupt/termination signal was received. waitset.wait_and_process(on_event).expect(""); - std::cout << "exit ..." << std::endl; + std::cout << "exit" << std::endl; return 0; } +/////////////////////////////////////////////// +/// START: EventBasedPublisher implementation +/////////////////////////////////////////////// + EventBasedPublisher::EventBasedPublisher(Publisher&& publisher, Listener&& listener, Notifier&& notifier) diff --git a/examples/cxx/event_based_communication/src/pubsub_event.hpp b/examples/cxx/event_based_communication/src/pubsub_event.hpp index 082171e30..e180c1f12 100644 --- a/examples/cxx/event_based_communication/src/pubsub_event.hpp +++ b/examples/cxx/event_based_communication/src/pubsub_event.hpp @@ -38,19 +38,19 @@ constexpr auto from(const PubSubEvent value) noexcept -> si template <> constexpr auto from(const size_t value) noexcept -> PubSubEvent { switch (value) { - case from(PubSubEvent::PublisherConnected): + case into(PubSubEvent::PublisherConnected): return PubSubEvent::PublisherConnected; - case from(PubSubEvent::PublisherDisconnected): + case into(PubSubEvent::PublisherDisconnected): return PubSubEvent::PublisherDisconnected; - case from(PubSubEvent::SubscriberConnected): + case into(PubSubEvent::SubscriberConnected): return PubSubEvent::SubscriberConnected; - case from(PubSubEvent::SubscriberDisconnected): + case into(PubSubEvent::SubscriberDisconnected): return PubSubEvent::SubscriberDisconnected; - case from(PubSubEvent::SentSample): + case into(PubSubEvent::SentSample): return PubSubEvent::SentSample; - case from(PubSubEvent::ReceivedSample): + case into(PubSubEvent::ReceivedSample): return PubSubEvent::ReceivedSample; - case from(PubSubEvent::SentHistory): + case into(PubSubEvent::SentHistory): return PubSubEvent::SentHistory; default: return PubSubEvent::Unknown; diff --git a/examples/cxx/event_based_communication/src/subscriber.cpp b/examples/cxx/event_based_communication/src/subscriber.cpp index 89f2c5adf..43fa0147a 100644 --- a/examples/cxx/event_based_communication/src/subscriber.cpp +++ b/examples/cxx/event_based_communication/src/subscriber.cpp @@ -29,6 +29,10 @@ constexpr iox::units::Duration DEADLINE = iox::units::Duration::fromSeconds(2); using namespace iox2; +/////////////////////////////////////////////// +/// START: EventBasedSubscriber declaration +/////////////////////////////////////////////// + // High-level subscriber class that contains besides a subscriber also a notifier // and a listener. The notifier is used to send events like // `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber @@ -58,6 +62,10 @@ class EventBasedSubscriber : public FileDescriptorBased { Listener m_listener; }; +/////////////////////////////////////////////// +/// START: main +/////////////////////////////////////////////// + auto main() -> int { auto node = NodeBuilder().create().expect("successful node creation"); @@ -89,6 +97,10 @@ auto main() -> int { return 0; } +/////////////////////////////////////////////// +/// START: EventBasedSubscriber implementation +/////////////////////////////////////////////// + EventBasedSubscriber::EventBasedSubscriber(Subscriber&& subscriber, Notifier&& notifier, Listener&& listener) diff --git a/examples/rust/event_based_communication/publisher.rs b/examples/rust/event_based_communication/publisher.rs index 69f0388aa..834ac971c 100644 --- a/examples/rust/event_based_communication/publisher.rs +++ b/examples/rust/event_based_communication/publisher.rs @@ -54,7 +54,7 @@ fn main() -> Result<(), Box> { // event callback or an interrupt/termination signal was received. waitset.wait_and_process(on_event)?; - println!("exit ..."); + println!("exit"); Ok(()) } diff --git a/examples/rust/event_multiplexing/wait.rs b/examples/rust/event_multiplexing/wait.rs index 2506123bf..69ea220df 100644 --- a/examples/rust/event_multiplexing/wait.rs +++ b/examples/rust/event_multiplexing/wait.rs @@ -78,7 +78,7 @@ fn main() -> Result<(), Box> { // didn't add this to the example so feel free to play around with it. waitset.wait_and_process(on_event)?; - println!("Exit"); + println!("exit"); Ok(()) } From ec8787210586aa1f8ca866efb7e5cf36d1a6aaf6 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 25 Nov 2024 23:54:18 +0100 Subject: [PATCH 07/10] [#390] Add native_handle to FileDescriptorView --- iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp | 3 +++ iceoryx2-ffi/cxx/src/file_descriptor.cpp | 6 +++++- iceoryx2-ffi/ffi/src/api/file_descriptor.rs | 11 +++-------- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp b/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp index 73555e3d9..7c71ec10e 100644 --- a/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp @@ -47,6 +47,9 @@ class FileDescriptorView : public FileDescriptorBased { /// Returns a [`FileDescriptorView`] to the underlying [`FileDescriptor`]. auto file_descriptor() const -> FileDescriptorView override; + /// Returns the underlying [`FileDescriptor`] value. + auto native_handle() const -> int32_t; + iox2_file_descriptor_ptr m_handle = nullptr; }; diff --git a/iceoryx2-ffi/cxx/src/file_descriptor.cpp b/iceoryx2-ffi/cxx/src/file_descriptor.cpp index d5d204063..1c58e5090 100644 --- a/iceoryx2-ffi/cxx/src/file_descriptor.cpp +++ b/iceoryx2-ffi/cxx/src/file_descriptor.cpp @@ -21,6 +21,10 @@ auto FileDescriptorView::file_descriptor() const -> FileDescriptorView { return *this; } +auto FileDescriptorView::native_handle() const -> int32_t { + return iox2_file_descriptor_native_handle(m_handle); +} + auto FileDescriptor::create_owning(int32_t file_descriptor) -> iox::optional { iox2_file_descriptor_h handle = nullptr; if (iox2_file_descriptor_new(file_descriptor, true, nullptr, &handle)) { @@ -69,7 +73,7 @@ void FileDescriptor::drop() { } auto FileDescriptor::native_handle() const -> int32_t { - return iox2_file_descriptor_native_handle(&m_handle); + return iox2_file_descriptor_native_handle(iox2_cast_file_descriptor_ptr(m_handle)); } auto FileDescriptor::as_view() const -> FileDescriptorView { diff --git a/iceoryx2-ffi/ffi/src/api/file_descriptor.rs b/iceoryx2-ffi/ffi/src/api/file_descriptor.rs index 0023c1396..d5ec18e2c 100644 --- a/iceoryx2-ffi/ffi/src/api/file_descriptor.rs +++ b/iceoryx2-ffi/ffi/src/api/file_descriptor.rs @@ -147,15 +147,10 @@ pub unsafe extern "C" fn iox2_file_descriptor_drop(handle: iox2_file_descriptor_ /// * `handle` must be valid and acquired with [`iox2_file_descriptor_new()`]. #[no_mangle] pub unsafe extern "C" fn iox2_file_descriptor_native_handle( - handle: iox2_file_descriptor_h_ref, + handle: iox2_file_descriptor_ptr, ) -> i32 { - handle.assert_non_null(); - - (*handle.as_type()) - .value - .as_ref() - .file_descriptor() - .native_handle() + debug_assert!(!handle.is_null()); + (*handle).file_descriptor().native_handle() } /// Creates a new [`iox2_file_descriptor_t`]. From 4b68d49b65b864e1266004900e3853e639fbf9e0 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Tue, 26 Nov 2024 00:05:10 +0100 Subject: [PATCH 08/10] [#390] Add `unsafe_` prefix to `native_handle` method with safety doc section --- .../cxx/include/iox2/file_descriptor.hpp | 18 ++++++++++++++++-- iceoryx2-ffi/cxx/src/file_descriptor.cpp | 4 ++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp b/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp index 7c71ec10e..2c0ee692c 100644 --- a/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/file_descriptor.hpp @@ -48,7 +48,14 @@ class FileDescriptorView : public FileDescriptorBased { auto file_descriptor() const -> FileDescriptorView override; /// Returns the underlying [`FileDescriptor`] value. - auto native_handle() const -> int32_t; + /// + /// # Safety + /// + /// * the user shall not store the value in a variable otherwise lifetime issues may be + /// encountered + /// * do not manually close the file descriptor with a sys call + /// + auto unsafe_native_handle() const -> int32_t; iox2_file_descriptor_ptr m_handle = nullptr; }; @@ -73,7 +80,14 @@ class FileDescriptor { ~FileDescriptor(); /// Returns the underlying [`FileDescriptor`] value. - auto native_handle() const -> int32_t; + /// + /// # Safety + /// + /// * the user shall not store the value in a variable otherwise lifetime issues may be + /// encountered + /// * do not manually close the file descriptor with a sys call + /// + auto unsafe_native_handle() const -> int32_t; /// Creates a [`FileDescriptorView`] out of the [`FileDescriptor`]. The view is only valid as /// long as the [`FileDescriptor`] is living - otherwise it will be a dangling view. diff --git a/iceoryx2-ffi/cxx/src/file_descriptor.cpp b/iceoryx2-ffi/cxx/src/file_descriptor.cpp index 1c58e5090..77a8c8e7b 100644 --- a/iceoryx2-ffi/cxx/src/file_descriptor.cpp +++ b/iceoryx2-ffi/cxx/src/file_descriptor.cpp @@ -21,7 +21,7 @@ auto FileDescriptorView::file_descriptor() const -> FileDescriptorView { return *this; } -auto FileDescriptorView::native_handle() const -> int32_t { +auto FileDescriptorView::unsafe_native_handle() const -> int32_t { return iox2_file_descriptor_native_handle(m_handle); } @@ -72,7 +72,7 @@ void FileDescriptor::drop() { } } -auto FileDescriptor::native_handle() const -> int32_t { +auto FileDescriptor::unsafe_native_handle() const -> int32_t { return iox2_file_descriptor_native_handle(iox2_cast_file_descriptor_ptr(m_handle)); } From 74e11ff4cef900a724394095afe971657ca39f81 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Tue, 26 Nov 2024 17:19:03 +0100 Subject: [PATCH 09/10] [#390] Rename EventBased{Publisher|Subscriber} into Custom{Publisher|Subscriber} --- .../src/publisher.cpp | 44 +++++++++--------- .../src/subscriber.cpp | 45 +++++++++---------- .../event_based_communication/publisher.rs | 12 ++--- .../event_based_communication/subscriber.rs | 12 ++--- 4 files changed, 56 insertions(+), 57 deletions(-) diff --git a/examples/cxx/event_based_communication/src/publisher.cpp b/examples/cxx/event_based_communication/src/publisher.cpp index e60739bb5..26eba81fd 100644 --- a/examples/cxx/event_based_communication/src/publisher.cpp +++ b/examples/cxx/event_based_communication/src/publisher.cpp @@ -32,30 +32,30 @@ constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1) constexpr uint64_t HISTORY_SIZE = 20; /////////////////////////////////////////////// -/// START: EventBasedPublisher declaration +/// START: CustomPublisher declaration /////////////////////////////////////////////// // High-level publisher class that contains besides a publisher also a notifier and a listener. // The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` // and the listener to wait for new subscribers. -class EventBasedPublisher : public FileDescriptorBased { +class CustomPublisher : public FileDescriptorBased { public: - EventBasedPublisher(const EventBasedPublisher&) = delete; - EventBasedPublisher(EventBasedPublisher&&) = default; - ~EventBasedPublisher() override; + CustomPublisher(const CustomPublisher&) = delete; + CustomPublisher(CustomPublisher&&) = default; + ~CustomPublisher() override; - auto operator=(const EventBasedPublisher&) -> EventBasedPublisher& = delete; - auto operator=(EventBasedPublisher&&) -> EventBasedPublisher& = default; + auto operator=(const CustomPublisher&) -> CustomPublisher& = delete; + auto operator=(CustomPublisher&&) -> CustomPublisher& = default; - static auto create(Node& node, const ServiceName& service_name) -> EventBasedPublisher; + static auto create(Node& node, const ServiceName& service_name) -> CustomPublisher; void handle_event(); void send(uint64_t counter); auto file_descriptor() const -> FileDescriptorView override; private: - EventBasedPublisher(Publisher&& publisher, - Listener&& listener, - Notifier&& notifier); + CustomPublisher(Publisher&& publisher, + Listener&& listener, + Notifier&& notifier); Publisher m_publisher; Listener m_listener; @@ -68,7 +68,7 @@ class EventBasedPublisher : public FileDescriptorBased { auto main() -> int { auto node = NodeBuilder().create().expect("successful node creation"); - auto publisher = EventBasedPublisher::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); + auto publisher = CustomPublisher::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); auto waitset = WaitSetBuilder().create().expect(""); // Whenever our publisher receives an event we get notified. @@ -102,23 +102,23 @@ auto main() -> int { } /////////////////////////////////////////////// -/// START: EventBasedPublisher implementation +/// START: CustomPublisher implementation /////////////////////////////////////////////// -EventBasedPublisher::EventBasedPublisher(Publisher&& publisher, - Listener&& listener, - Notifier&& notifier) +CustomPublisher::CustomPublisher(Publisher&& publisher, + Listener&& listener, + Notifier&& notifier) : m_publisher { std::move(publisher) } , m_listener { std::move(listener) } , m_notifier { std::move(notifier) } { } -EventBasedPublisher::~EventBasedPublisher() { +CustomPublisher::~CustomPublisher() { m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::PublisherDisconnected))) .expect(""); } -auto EventBasedPublisher::create(Node& node, const ServiceName& service_name) -> EventBasedPublisher { +auto CustomPublisher::create(Node& node, const ServiceName& service_name) -> CustomPublisher { auto pubsub_service = node.service_builder(service_name) .publish_subscribe() .history_size(HISTORY_SIZE) @@ -134,14 +134,14 @@ auto EventBasedPublisher::create(Node& node, const ServiceName notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::PublisherConnected))) .expect(""); - return EventBasedPublisher { std::move(publisher), std::move(listener), std::move(notifier) }; + return CustomPublisher { std::move(publisher), std::move(listener), std::move(notifier) }; } -auto EventBasedPublisher::file_descriptor() const -> FileDescriptorView { +auto CustomPublisher::file_descriptor() const -> FileDescriptorView { return m_listener.file_descriptor(); } -void EventBasedPublisher::handle_event() { +void CustomPublisher::handle_event() { for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); event = m_listener.try_wait_one()) { switch (iox::from(event.value()->as_value())) { @@ -167,7 +167,7 @@ void EventBasedPublisher::handle_event() { } } -void EventBasedPublisher::send(const uint64_t counter) { +void CustomPublisher::send(const uint64_t counter) { constexpr double SOME_NUMBER = 812.12; auto sample = m_publisher.loan_uninit().expect(""); sample.write_payload(TransmissionData { diff --git a/examples/cxx/event_based_communication/src/subscriber.cpp b/examples/cxx/event_based_communication/src/subscriber.cpp index 43fa0147a..b9632ba5d 100644 --- a/examples/cxx/event_based_communication/src/subscriber.cpp +++ b/examples/cxx/event_based_communication/src/subscriber.cpp @@ -30,7 +30,7 @@ constexpr iox::units::Duration DEADLINE = iox::units::Duration::fromSeconds(2); using namespace iox2; /////////////////////////////////////////////// -/// START: EventBasedSubscriber declaration +/// START: CustomSubscriber declaration /////////////////////////////////////////////// // High-level subscriber class that contains besides a subscriber also a notifier @@ -39,23 +39,23 @@ using namespace iox2; // connected. // The listener waits for events originating from the publisher like // `PubSubEvent::SentSample`. -class EventBasedSubscriber : public FileDescriptorBased { +class CustomSubscriber : public FileDescriptorBased { public: - EventBasedSubscriber(const EventBasedSubscriber&) = delete; - EventBasedSubscriber(EventBasedSubscriber&&) = default; - ~EventBasedSubscriber() override; - auto operator=(const EventBasedSubscriber&) -> EventBasedSubscriber& = delete; - auto operator=(EventBasedSubscriber&&) -> EventBasedSubscriber& = default; + CustomSubscriber(const CustomSubscriber&) = delete; + CustomSubscriber(CustomSubscriber&&) = default; + ~CustomSubscriber() override; + auto operator=(const CustomSubscriber&) -> CustomSubscriber& = delete; + auto operator=(CustomSubscriber&&) -> CustomSubscriber& = default; - static auto create(Node& node, const ServiceName& service_name) -> EventBasedSubscriber; + static auto create(Node& node, const ServiceName& service_name) -> CustomSubscriber; auto file_descriptor() const -> FileDescriptorView override; void handle_event(); auto receive() -> iox::optional>; private: - EventBasedSubscriber(Subscriber&& subscriber, - Notifier&& notifier, - Listener&& listener); + CustomSubscriber(Subscriber&& subscriber, + Notifier&& notifier, + Listener&& listener); Subscriber m_subscriber; Notifier m_notifier; @@ -69,7 +69,7 @@ class EventBasedSubscriber : public FileDescriptorBased { auto main() -> int { auto node = NodeBuilder().create().expect("successful node creation"); - auto subscriber = EventBasedSubscriber::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); + auto subscriber = CustomSubscriber::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); auto waitset = WaitSetBuilder().create().expect(""); @@ -98,25 +98,24 @@ auto main() -> int { } /////////////////////////////////////////////// -/// START: EventBasedSubscriber implementation +/// START: CustomSubscriber implementation /////////////////////////////////////////////// -EventBasedSubscriber::EventBasedSubscriber(Subscriber&& subscriber, - Notifier&& notifier, - Listener&& listener) +CustomSubscriber::CustomSubscriber(Subscriber&& subscriber, + Notifier&& notifier, + Listener&& listener) : m_subscriber { std::move(subscriber) } , m_notifier { std::move(notifier) } , m_listener { std::move(listener) } { } -EventBasedSubscriber::~EventBasedSubscriber() { +CustomSubscriber::~CustomSubscriber() { m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SubscriberDisconnected))) .expect(""); } -auto EventBasedSubscriber::create(Node& node, - const ServiceName& service_name) -> EventBasedSubscriber { +auto CustomSubscriber::create(Node& node, const ServiceName& service_name) -> CustomSubscriber { auto pubsub_service = node.service_builder(service_name).publish_subscribe().open_or_create().expect(""); auto event_service = node.service_builder(service_name).event().open_or_create().expect(""); @@ -128,14 +127,14 @@ auto EventBasedSubscriber::create(Node& node, notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SubscriberConnected))) .expect(""); - return EventBasedSubscriber { std::move(subscriber), std::move(notifier), std::move(listener) }; + return CustomSubscriber { std::move(subscriber), std::move(notifier), std::move(listener) }; } -auto EventBasedSubscriber::file_descriptor() const -> FileDescriptorView { +auto CustomSubscriber::file_descriptor() const -> FileDescriptorView { return m_listener.file_descriptor(); } -void EventBasedSubscriber::handle_event() { +void CustomSubscriber::handle_event() { for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); event = m_listener.try_wait_one()) { switch (iox::from(event.value()->as_value())) { @@ -167,7 +166,7 @@ void EventBasedSubscriber::handle_event() { } } -auto EventBasedSubscriber::receive() -> iox::optional> { +auto CustomSubscriber::receive() -> iox::optional> { auto sample = m_subscriber.receive().expect(""); if (sample.has_value()) { m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::ReceivedSample))) diff --git a/examples/rust/event_based_communication/publisher.rs b/examples/rust/event_based_communication/publisher.rs index 834ac971c..4e2fd44f1 100644 --- a/examples/rust/event_based_communication/publisher.rs +++ b/examples/rust/event_based_communication/publisher.rs @@ -25,7 +25,7 @@ const HISTORY_SIZE: usize = 20; fn main() -> Result<(), Box> { let node = NodeBuilder::new().create::()?; - let publisher = EventBasedPublisher::new(&node, &"My/Funk/ServiceName".try_into()?)?; + let publisher = CustomPublisher::new(&node, &"My/Funk/ServiceName".try_into()?)?; let waitset = WaitSetBuilder::new().create::()?; @@ -63,21 +63,21 @@ fn main() -> Result<(), Box> { /// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` /// and the listener to wait for new subscribers. #[derive(Debug)] -struct EventBasedPublisher { +struct CustomPublisher { publisher: Publisher, listener: Listener, notifier: Notifier, } -impl FileDescriptorBased for EventBasedPublisher { +impl FileDescriptorBased for CustomPublisher { fn file_descriptor(&self) -> &FileDescriptor { self.listener.file_descriptor() } } -impl SynchronousMultiplexing for EventBasedPublisher {} +impl SynchronousMultiplexing for CustomPublisher {} -impl EventBasedPublisher { +impl CustomPublisher { fn new( node: &Node, service_name: &ServiceName, @@ -147,7 +147,7 @@ impl EventBasedPublisher { } } -impl Drop for EventBasedPublisher { +impl Drop for CustomPublisher { fn drop(&mut self) { let _ = self .notifier diff --git a/examples/rust/event_based_communication/subscriber.rs b/examples/rust/event_based_communication/subscriber.rs index 88c074645..693b3b59f 100644 --- a/examples/rust/event_based_communication/subscriber.rs +++ b/examples/rust/event_based_communication/subscriber.rs @@ -25,7 +25,7 @@ const DEADLINE: Duration = Duration::from_secs(2); fn main() -> Result<(), Box> { let node = NodeBuilder::new().create::()?; - let subscriber = EventBasedSubscriber::new(&node, &"My/Funk/ServiceName".try_into()?)?; + let subscriber = CustomSubscriber::new(&node, &"My/Funk/ServiceName".try_into()?)?; let waitset = WaitSetBuilder::new().create::()?; @@ -57,19 +57,19 @@ fn main() -> Result<(), Box> { } #[derive(Debug)] -struct EventBasedSubscriber { +struct CustomSubscriber { subscriber: Subscriber, notifier: Notifier, listener: Listener, } -impl FileDescriptorBased for EventBasedSubscriber { +impl FileDescriptorBased for CustomSubscriber { fn file_descriptor(&self) -> &FileDescriptor { self.listener.file_descriptor() } } -impl SynchronousMultiplexing for EventBasedSubscriber {} +impl SynchronousMultiplexing for CustomSubscriber {} // High-level subscriber class that contains besides a subscriber also a notifier // and a listener. The notifier is used to send events like @@ -77,7 +77,7 @@ impl SynchronousMultiplexing for EventBasedSubscriber {} // connected. // The listener waits for events originating from the publisher like // `PubSubEvent::SentSample`. -impl EventBasedSubscriber { +impl CustomSubscriber { fn new( node: &Node, service_name: &ServiceName, @@ -145,7 +145,7 @@ impl EventBasedSubscriber { } } -impl Drop for EventBasedSubscriber { +impl Drop for CustomSubscriber { fn drop(&mut self) { self.notifier .notify_with_custom_event_id(PubSubEvent::SubscriberDisconnected.into()) From d87a79bacdb5eda60a4c9274c75e7d284d20b77a Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Tue, 26 Nov 2024 17:31:55 +0100 Subject: [PATCH 10/10] [#390] Split up event based comm example into multiple files --- .../cxx/event_based_communication/BUILD.bazel | 2 + .../src/custom_publisher.hpp | 126 ++++++++++++++++++ .../src/custom_subscriber.hpp | 126 ++++++++++++++++++ .../src/publisher.cpp | 123 +---------------- .../src/subscriber.cpp | 125 +---------------- 5 files changed, 256 insertions(+), 246 deletions(-) create mode 100644 examples/cxx/event_based_communication/src/custom_publisher.hpp create mode 100644 examples/cxx/event_based_communication/src/custom_subscriber.hpp diff --git a/examples/cxx/event_based_communication/BUILD.bazel b/examples/cxx/event_based_communication/BUILD.bazel index 5a6758442..ca5637e5b 100644 --- a/examples/cxx/event_based_communication/BUILD.bazel +++ b/examples/cxx/event_based_communication/BUILD.bazel @@ -18,6 +18,7 @@ cc_binary( "src/publisher.cpp", "src/transmission_data.hpp", "src/pubsub_event.hpp", + "src/custom_publisher.hpp", ], deps = [ "@iceoryx//:iceoryx_hoofs", @@ -31,6 +32,7 @@ cc_binary( "src/subscriber.cpp", "src/transmission_data.hpp", "src/pubsub_event.hpp", + "src/custom_subscriber.hpp", ], deps = [ "@iceoryx//:iceoryx_hoofs", diff --git a/examples/cxx/event_based_communication/src/custom_publisher.hpp b/examples/cxx/event_based_communication/src/custom_publisher.hpp new file mode 100644 index 000000000..8b96e1fce --- /dev/null +++ b/examples/cxx/event_based_communication/src/custom_publisher.hpp @@ -0,0 +1,126 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#ifndef IOX2_EXAMPLES_CUSTOM_PUBLISHER_HPP +#define IOX2_EXAMPLES_CUSTOM_PUBLISHER_HPP + +#include "iox2/file_descriptor.hpp" +#include "iox2/listener.hpp" +#include "iox2/node.hpp" +#include "iox2/notifier.hpp" +#include "iox2/publisher.hpp" +#include "iox2/sample_mut.hpp" +#include "iox2/service_name.hpp" +#include "pubsub_event.hpp" +#include "transmission_data.hpp" + +#include + +constexpr uint64_t HISTORY_SIZE = 20; + +// High-level publisher class that contains besides a publisher also a notifier and a listener. +// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` +// and the listener to wait for new subscribers. +class CustomPublisher : public iox2::FileDescriptorBased { + public: + CustomPublisher(const CustomPublisher&) = delete; + CustomPublisher(CustomPublisher&&) = default; + ~CustomPublisher() override { + m_notifier + .notify_with_custom_event_id( + iox2::EventId(iox::from(PubSubEvent::PublisherDisconnected))) + .expect(""); + } + + static auto create(iox2::Node& node, + const iox2::ServiceName& service_name) -> CustomPublisher { + auto pubsub_service = node.service_builder(service_name) + .publish_subscribe() + .history_size(HISTORY_SIZE) + .subscriber_max_buffer_size(HISTORY_SIZE) + .open_or_create() + .expect(""); + auto event_service = node.service_builder(service_name).event().open_or_create().expect(""); + + auto notifier = event_service.notifier_builder().create().expect(""); + auto listener = event_service.listener_builder().create().expect(""); + auto publisher = pubsub_service.publisher_builder().create().expect(""); + + notifier + .notify_with_custom_event_id(iox2::EventId(iox::from(PubSubEvent::PublisherConnected))) + .expect(""); + + return CustomPublisher { std::move(publisher), std::move(listener), std::move(notifier) }; + } + + auto file_descriptor() const -> iox2::FileDescriptorView override { + return m_listener.file_descriptor(); + } + + void handle_event() { + for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); + event = m_listener.try_wait_one()) { + switch (iox::from(event.value()->as_value())) { + case PubSubEvent::SubscriberConnected: { + std::cout << "new subscriber connected - delivering history" << std::endl; + m_publisher.update_connections().expect(""); + m_notifier + .notify_with_custom_event_id( + iox2::EventId(iox::from(PubSubEvent::SentHistory))) + .expect(""); + break; + } + case PubSubEvent::SubscriberDisconnected: { + std::cout << "subscriber disconnected" << std::endl; + break; + } + case PubSubEvent::ReceivedSample: { + std::cout << "subscriber has consumed sample" << std::endl; + break; + } + default: { + break; + } + } + } + } + + void send(const uint64_t counter) { + constexpr double SOME_NUMBER = 812.12; + auto sample = m_publisher.loan_uninit().expect(""); + sample.write_payload(TransmissionData { + static_cast(counter), static_cast(counter), static_cast(counter) * SOME_NUMBER }); + auto initialized_sample = assume_init(std::move(sample)); + ::iox2::send(std::move(initialized_sample)).expect(""); + + m_notifier.notify_with_custom_event_id(iox2::EventId(iox::from(PubSubEvent::SentSample))) + .expect(""); + } + + auto operator=(const CustomPublisher&) -> CustomPublisher& = delete; + auto operator=(CustomPublisher&&) -> CustomPublisher& = default; + + private: + CustomPublisher(iox2::Publisher&& publisher, + iox2::Listener&& listener, + iox2::Notifier&& notifier) + : m_publisher { std::move(publisher) } + , m_listener { std::move(listener) } + , m_notifier { std::move(notifier) } { + } + + iox2::Publisher m_publisher; + iox2::Listener m_listener; + iox2::Notifier m_notifier; +}; + +#endif diff --git a/examples/cxx/event_based_communication/src/custom_subscriber.hpp b/examples/cxx/event_based_communication/src/custom_subscriber.hpp new file mode 100644 index 000000000..566662290 --- /dev/null +++ b/examples/cxx/event_based_communication/src/custom_subscriber.hpp @@ -0,0 +1,126 @@ +// Copyright (c) 2024 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache Software License 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license +// which is available at https://opensource.org/licenses/MIT. +// +// SPDX-License-Identifier: Apache-2.0 OR MIT + +#ifndef IOX2_EXAMPLES_CUSTOM_SUBSCRIBER_HPP +#define IOX2_EXAMPLES_CUSTOM_SUBSCRIBER_HPP + +#include "iox/into.hpp" +#include "iox2/file_descriptor.hpp" +#include "iox2/listener.hpp" +#include "iox2/node.hpp" +#include "iox2/notifier.hpp" +#include "iox2/service_name.hpp" +#include "iox2/service_type.hpp" +#include "iox2/subscriber.hpp" +#include "pubsub_event.hpp" +#include "transmission_data.hpp" + +// High-level subscriber class that contains besides a subscriber also a notifier +// and a listener. The notifier is used to send events like +// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber +// connected. +// The listener waits for events originating from the publisher like +// `PubSubEvent::SentSample`. +class CustomSubscriber : public iox2::FileDescriptorBased { + public: + CustomSubscriber(const CustomSubscriber&) = delete; + CustomSubscriber(CustomSubscriber&&) = default; + ~CustomSubscriber() override { + m_notifier + .notify_with_custom_event_id( + iox2::EventId(iox::from(PubSubEvent::SubscriberDisconnected))) + .expect(""); + } + + auto operator=(const CustomSubscriber&) -> CustomSubscriber& = delete; + auto operator=(CustomSubscriber&&) -> CustomSubscriber& = default; + + static auto create(iox2::Node& node, + const iox2::ServiceName& service_name) -> CustomSubscriber { + auto pubsub_service = + node.service_builder(service_name).publish_subscribe().open_or_create().expect(""); + auto event_service = node.service_builder(service_name).event().open_or_create().expect(""); + + auto listener = event_service.listener_builder().create().expect(""); + auto notifier = event_service.notifier_builder().create().expect(""); + auto subscriber = pubsub_service.subscriber_builder().create().expect(""); + + notifier + .notify_with_custom_event_id( + iox2::EventId(iox::from(PubSubEvent::SubscriberConnected))) + .expect(""); + + return CustomSubscriber { std::move(subscriber), std::move(notifier), std::move(listener) }; + } + + auto file_descriptor() const -> iox2::FileDescriptorView override { + return m_listener.file_descriptor(); + } + + void handle_event() { + for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); + event = m_listener.try_wait_one()) { + switch (iox::from(event.value()->as_value())) { + case PubSubEvent::SentHistory: { + std::cout << "History delivered" << std::endl; + for (auto sample = receive(); sample.has_value(); sample = receive()) { + std::cout << " history: " << sample->payload().x << std::endl; + } + break; + } + case PubSubEvent::SentSample: { + for (auto sample = receive(); sample.has_value(); sample = receive()) { + std::cout << "received: " << sample->payload().x << std::endl; + } + break; + } + case PubSubEvent::PublisherConnected: { + std::cout << "new publisher connected" << std::endl; + break; + } + case PubSubEvent::PublisherDisconnected: { + std::cout << "publisher disconnected" << std::endl; + break; + } + default: { + break; + } + } + } + } + + auto receive() -> iox::optional> { + auto sample = m_subscriber.receive().expect(""); + if (sample.has_value()) { + m_notifier + .notify_with_custom_event_id(iox2::EventId(iox::from(PubSubEvent::ReceivedSample))) + .expect(""); + } + + return sample; + } + + private: + CustomSubscriber(iox2::Subscriber&& subscriber, + iox2::Notifier&& notifier, + iox2::Listener&& listener) + : m_subscriber { std::move(subscriber) } + , m_notifier { std::move(notifier) } + , m_listener { std::move(listener) } { + } + + iox2::Subscriber m_subscriber; + iox2::Notifier m_notifier; + iox2::Listener m_listener; +}; + +#endif diff --git a/examples/cxx/event_based_communication/src/publisher.cpp b/examples/cxx/event_based_communication/src/publisher.cpp index 26eba81fd..a23f866c8 100644 --- a/examples/cxx/event_based_communication/src/publisher.cpp +++ b/examples/cxx/event_based_communication/src/publisher.cpp @@ -10,61 +10,17 @@ // // SPDX-License-Identifier: Apache-2.0 OR MIT -#include "iox2/publisher.hpp" +#include "custom_publisher.hpp" #include "iox/duration.hpp" -#include "iox2/file_descriptor.hpp" -#include "iox2/listener.hpp" #include "iox2/node.hpp" -#include "iox2/notifier.hpp" -#include "iox2/sample_mut.hpp" -#include "iox2/service_name.hpp" #include "iox2/service_type.hpp" #include "iox2/waitset.hpp" -#include "pubsub_event.hpp" -#include "transmission_data.hpp" #include -#include using namespace iox2; constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); -constexpr uint64_t HISTORY_SIZE = 20; - -/////////////////////////////////////////////// -/// START: CustomPublisher declaration -/////////////////////////////////////////////// - -// High-level publisher class that contains besides a publisher also a notifier and a listener. -// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` -// and the listener to wait for new subscribers. -class CustomPublisher : public FileDescriptorBased { - public: - CustomPublisher(const CustomPublisher&) = delete; - CustomPublisher(CustomPublisher&&) = default; - ~CustomPublisher() override; - - auto operator=(const CustomPublisher&) -> CustomPublisher& = delete; - auto operator=(CustomPublisher&&) -> CustomPublisher& = default; - - static auto create(Node& node, const ServiceName& service_name) -> CustomPublisher; - void handle_event(); - void send(uint64_t counter); - auto file_descriptor() const -> FileDescriptorView override; - - private: - CustomPublisher(Publisher&& publisher, - Listener&& listener, - Notifier&& notifier); - - Publisher m_publisher; - Listener m_listener; - Notifier m_notifier; -}; - -/////////////////////////////////////////////// -/// START: main -/////////////////////////////////////////////// auto main() -> int { auto node = NodeBuilder().create().expect("successful node creation"); @@ -100,80 +56,3 @@ auto main() -> int { return 0; } - -/////////////////////////////////////////////// -/// START: CustomPublisher implementation -/////////////////////////////////////////////// - -CustomPublisher::CustomPublisher(Publisher&& publisher, - Listener&& listener, - Notifier&& notifier) - : m_publisher { std::move(publisher) } - , m_listener { std::move(listener) } - , m_notifier { std::move(notifier) } { -} - -CustomPublisher::~CustomPublisher() { - m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::PublisherDisconnected))) - .expect(""); -} - -auto CustomPublisher::create(Node& node, const ServiceName& service_name) -> CustomPublisher { - auto pubsub_service = node.service_builder(service_name) - .publish_subscribe() - .history_size(HISTORY_SIZE) - .subscriber_max_buffer_size(HISTORY_SIZE) - .open_or_create() - .expect(""); - auto event_service = node.service_builder(service_name).event().open_or_create().expect(""); - - auto notifier = event_service.notifier_builder().create().expect(""); - auto listener = event_service.listener_builder().create().expect(""); - auto publisher = pubsub_service.publisher_builder().create().expect(""); - - notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::PublisherConnected))) - .expect(""); - - return CustomPublisher { std::move(publisher), std::move(listener), std::move(notifier) }; -} - -auto CustomPublisher::file_descriptor() const -> FileDescriptorView { - return m_listener.file_descriptor(); -} - -void CustomPublisher::handle_event() { - for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); - event = m_listener.try_wait_one()) { - switch (iox::from(event.value()->as_value())) { - case PubSubEvent::SubscriberConnected: { - std::cout << "new subscriber connected - delivering history" << std::endl; - m_publisher.update_connections().expect(""); - m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SentHistory))) - .expect(""); - break; - } - case PubSubEvent::SubscriberDisconnected: { - std::cout << "subscriber disconnected" << std::endl; - break; - } - case PubSubEvent::ReceivedSample: { - std::cout << "subscriber has consumed sample" << std::endl; - break; - } - default: { - break; - } - } - } -} - -void CustomPublisher::send(const uint64_t counter) { - constexpr double SOME_NUMBER = 812.12; - auto sample = m_publisher.loan_uninit().expect(""); - sample.write_payload(TransmissionData { - static_cast(counter), static_cast(counter), static_cast(counter) * SOME_NUMBER }); - auto initialized_sample = assume_init(std::move(sample)); - ::iox2::send(std::move(initialized_sample)).expect(""); - - m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SentSample))).expect(""); -} diff --git a/examples/cxx/event_based_communication/src/subscriber.cpp b/examples/cxx/event_based_communication/src/subscriber.cpp index b9632ba5d..c68cdf4a0 100644 --- a/examples/cxx/event_based_communication/src/subscriber.cpp +++ b/examples/cxx/event_based_communication/src/subscriber.cpp @@ -12,60 +12,16 @@ #include +#include "custom_subscriber.hpp" #include "iox/duration.hpp" -#include "iox/into.hpp" -#include "iox2/file_descriptor.hpp" -#include "iox2/listener.hpp" #include "iox2/node.hpp" -#include "iox2/notifier.hpp" -#include "iox2/service_name.hpp" #include "iox2/service_type.hpp" -#include "iox2/subscriber.hpp" #include "iox2/waitset.hpp" -#include "pubsub_event.hpp" -#include "transmission_data.hpp" constexpr iox::units::Duration DEADLINE = iox::units::Duration::fromSeconds(2); using namespace iox2; -/////////////////////////////////////////////// -/// START: CustomSubscriber declaration -/////////////////////////////////////////////// - -// High-level subscriber class that contains besides a subscriber also a notifier -// and a listener. The notifier is used to send events like -// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber -// connected. -// The listener waits for events originating from the publisher like -// `PubSubEvent::SentSample`. -class CustomSubscriber : public FileDescriptorBased { - public: - CustomSubscriber(const CustomSubscriber&) = delete; - CustomSubscriber(CustomSubscriber&&) = default; - ~CustomSubscriber() override; - auto operator=(const CustomSubscriber&) -> CustomSubscriber& = delete; - auto operator=(CustomSubscriber&&) -> CustomSubscriber& = default; - - static auto create(Node& node, const ServiceName& service_name) -> CustomSubscriber; - auto file_descriptor() const -> FileDescriptorView override; - void handle_event(); - auto receive() -> iox::optional>; - - private: - CustomSubscriber(Subscriber&& subscriber, - Notifier&& notifier, - Listener&& listener); - - Subscriber m_subscriber; - Notifier m_notifier; - Listener m_listener; -}; - -/////////////////////////////////////////////// -/// START: main -/////////////////////////////////////////////// - auto main() -> int { auto node = NodeBuilder().create().expect("successful node creation"); @@ -96,82 +52,3 @@ auto main() -> int { return 0; } - -/////////////////////////////////////////////// -/// START: CustomSubscriber implementation -/////////////////////////////////////////////// - -CustomSubscriber::CustomSubscriber(Subscriber&& subscriber, - Notifier&& notifier, - Listener&& listener) - : m_subscriber { std::move(subscriber) } - , m_notifier { std::move(notifier) } - , m_listener { std::move(listener) } { -} - -CustomSubscriber::~CustomSubscriber() { - m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SubscriberDisconnected))) - .expect(""); -} - - -auto CustomSubscriber::create(Node& node, const ServiceName& service_name) -> CustomSubscriber { - auto pubsub_service = - node.service_builder(service_name).publish_subscribe().open_or_create().expect(""); - auto event_service = node.service_builder(service_name).event().open_or_create().expect(""); - - auto listener = event_service.listener_builder().create().expect(""); - auto notifier = event_service.notifier_builder().create().expect(""); - auto subscriber = pubsub_service.subscriber_builder().create().expect(""); - - notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SubscriberConnected))) - .expect(""); - - return CustomSubscriber { std::move(subscriber), std::move(notifier), std::move(listener) }; -} - -auto CustomSubscriber::file_descriptor() const -> FileDescriptorView { - return m_listener.file_descriptor(); -} - -void CustomSubscriber::handle_event() { - for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); - event = m_listener.try_wait_one()) { - switch (iox::from(event.value()->as_value())) { - case PubSubEvent::SentHistory: { - std::cout << "History delivered" << std::endl; - for (auto sample = receive(); sample.has_value(); sample = receive()) { - std::cout << " history: " << sample->payload().x << std::endl; - } - break; - } - case PubSubEvent::SentSample: { - for (auto sample = receive(); sample.has_value(); sample = receive()) { - std::cout << "received: " << sample->payload().x << std::endl; - } - break; - } - case PubSubEvent::PublisherConnected: { - std::cout << "new publisher connected" << std::endl; - break; - } - case PubSubEvent::PublisherDisconnected: { - std::cout << "publisher disconnected" << std::endl; - break; - } - default: { - break; - } - } - } -} - -auto CustomSubscriber::receive() -> iox::optional> { - auto sample = m_subscriber.receive().expect(""); - if (sample.has_value()) { - m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::ReceivedSample))) - .expect(""); - } - - return sample; -}