Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#390] event based comm example cpp #524

Merged
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ These types are demonstrated in the complex data types example.
| docker | [all](rust/docker) | Communicate between different docker containers and the host. |
| domains | [C](c/domains) [C++](cxx/domains) [Rust](rust/domains) | Establish separate domains that operate independently from one another. |
| event | [C](c/event) [C++](cxx/event) [Rust](rust/event) | Push notifications - send event signals to wakeup processes that are waiting for them. |
| event based communication | [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. |
| event based communication | [C++](cxx/event_based_communication) [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. |
| event multiplexing | [C](c/event_multiplexing) [C++](cxx/event_multiplexing) [Rust](rust/event_multiplexing) | Wait on multiple listeners or sockets with a single call. The WaitSet demultiplexes incoming events and notifies the user. |
| publish subscribe | [C](c/publish_subscribe) [C++](cxx/publish_subscribe) [Rust](rust/publish_subscribe) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern). |
| publish subscribe dynamic data | [C++](cxx/publish_subscribe_dynamic_data) [Rust](rust/publish_subscribe_dynamic_data) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern) and payload data that has a dynamic size. |
Expand Down
1 change: 1 addition & 0 deletions examples/cxx/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ add_subdirectory(discovery)
add_subdirectory(domains)
add_subdirectory(event)
add_subdirectory(event_multiplexing)
add_subdirectory(event_based_communication)
add_subdirectory(publish_subscribe)
add_subdirectory(publish_subscribe_dynamic_data)
add_subdirectory(publish_subscribe_with_user_header)
Expand Down
41 changes: 41 additions & 0 deletions examples/cxx/event_based_communication/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) 2024 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache Software License 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
# which is available at https://opensource.org/licenses/MIT.
#
# SPDX-License-Identifier: Apache-2.0 OR MIT

load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library")

cc_binary(
name = "example_cxx_event_based_communication_publisher",
srcs = [
"src/publisher.cpp",
"src/transmission_data.hpp",
"src/pubsub_event.hpp",
"src/custom_publisher.hpp",
],
deps = [
"@iceoryx//:iceoryx_hoofs",
"//:iceoryx2-cxx-static",
],
)

cc_binary(
name = "example_cxx_event_based_communication_subscriber",
srcs = [
"src/subscriber.cpp",
"src/transmission_data.hpp",
"src/pubsub_event.hpp",
"src/custom_subscriber.hpp",
],
deps = [
"@iceoryx//:iceoryx_hoofs",
"//:iceoryx2-cxx-static",
],
)
22 changes: 22 additions & 0 deletions examples/cxx/event_based_communication/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) 2024 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache Software License 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
# which is available at https://opensource.org/licenses/MIT.
#
# SPDX-License-Identifier: Apache-2.0 OR MIT

cmake_minimum_required(VERSION 3.22)
project(example_cxx_event_based_communication LANGUAGES CXX)

find_package(iceoryx2-cxx 0.4.1 REQUIRED)

add_executable(example_cxx_event_based_communication_publisher src/publisher.cpp)
target_link_libraries(example_cxx_event_based_communication_publisher iceoryx2-cxx::static-lib-cxx)

add_executable(example_cxx_event_based_communication_subscriber src/subscriber.cpp)
target_link_libraries(example_cxx_event_based_communication_subscriber iceoryx2-cxx::static-lib-cxx)
35 changes: 35 additions & 0 deletions examples/cxx/event_based_communication/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Event-Based Communication

## Running The Example

This example demonstrates iceoryx2's event multiplexing mechanism in a more
complex setup. The iceoryx2 `Publisher` and `Subscriber` are integrated into
custom `ExamplePublisher` and `ExampleSubscriber` classes, which also
incorporate an additional iceoryx2 `Notifier` and `Listener`. This setup
enables automatic event emission whenever an `ExamplePublisher` or
`ExampleSubscriber` is created or dropped. Additionally, events are emitted
whenever a new `Sample` is sent or received.

When a `class` inherits from `FileDescriptorBased`, it can be attached to a
`WaitSet`. Both `ExamplePublisher` and `ExampleSubscriber` implement this
interface by forwarding calls to their underlying `Listener`, which already
provides an implementation of `FileDescriptorBased`.

The `WaitSet` notifies the user of the origin of an event notification. The
user can then acquire the `EventId` from the `Listener`. Based on the value of
the `EventId`, the user can identify the specific event that occurred and take
appropriate action.

### Terminal 1

```sh
./target/ffi/build/examples/cxx/event_based_communication/example_cxx_event_based_communication_publisher
```

### Terminal 2

```sh
./target/ffi/build/examples/cxx/event_based_communication/example_cxx_event_based_communication_subscriber
```

Feel free to run multiple publishers or subscribers in parallel.
126 changes: 126 additions & 0 deletions examples/cxx/event_based_communication/src/custom_publisher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

#ifndef IOX2_EXAMPLES_CUSTOM_PUBLISHER_HPP
#define IOX2_EXAMPLES_CUSTOM_PUBLISHER_HPP

#include "iox2/file_descriptor.hpp"
#include "iox2/listener.hpp"
#include "iox2/node.hpp"
#include "iox2/notifier.hpp"
#include "iox2/publisher.hpp"
#include "iox2/sample_mut.hpp"
#include "iox2/service_name.hpp"
#include "pubsub_event.hpp"
#include "transmission_data.hpp"

#include <utility>

constexpr uint64_t HISTORY_SIZE = 20;

// High-level publisher class that contains besides a publisher also a notifier and a listener.
// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory`
// and the listener to wait for new subscribers.
class CustomPublisher : public iox2::FileDescriptorBased {
public:
CustomPublisher(const CustomPublisher&) = delete;
CustomPublisher(CustomPublisher&&) = default;
~CustomPublisher() override {
m_notifier
.notify_with_custom_event_id(
iox2::EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::PublisherDisconnected)))
.expect("");
}

static auto create(iox2::Node<iox2::ServiceType::Ipc>& node,
const iox2::ServiceName& service_name) -> CustomPublisher {
auto pubsub_service = node.service_builder(service_name)
.publish_subscribe<TransmissionData>()
.history_size(HISTORY_SIZE)
.subscriber_max_buffer_size(HISTORY_SIZE)
.open_or_create()
.expect("");
auto event_service = node.service_builder(service_name).event().open_or_create().expect("");

auto notifier = event_service.notifier_builder().create().expect("");
auto listener = event_service.listener_builder().create().expect("");
auto publisher = pubsub_service.publisher_builder().create().expect("");

notifier
.notify_with_custom_event_id(iox2::EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::PublisherConnected)))
.expect("");

return CustomPublisher { std::move(publisher), std::move(listener), std::move(notifier) };
}

auto file_descriptor() const -> iox2::FileDescriptorView override {
return m_listener.file_descriptor();
}

void handle_event() {
for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value();
event = m_listener.try_wait_one()) {
switch (iox::from<size_t, PubSubEvent>(event.value()->as_value())) {
case PubSubEvent::SubscriberConnected: {
std::cout << "new subscriber connected - delivering history" << std::endl;
m_publisher.update_connections().expect("");
m_notifier
.notify_with_custom_event_id(
iox2::EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::SentHistory)))
.expect("");
break;
}
case PubSubEvent::SubscriberDisconnected: {
std::cout << "subscriber disconnected" << std::endl;
break;
}
case PubSubEvent::ReceivedSample: {
std::cout << "subscriber has consumed sample" << std::endl;
break;
}
default: {
break;
}
}
}
}

void send(const uint64_t counter) {
constexpr double SOME_NUMBER = 812.12;
auto sample = m_publisher.loan_uninit().expect("");
sample.write_payload(TransmissionData {
static_cast<int32_t>(counter), static_cast<int32_t>(counter), static_cast<double>(counter) * SOME_NUMBER });
auto initialized_sample = assume_init(std::move(sample));
::iox2::send(std::move(initialized_sample)).expect("");

m_notifier.notify_with_custom_event_id(iox2::EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::SentSample)))
.expect("");
}

auto operator=(const CustomPublisher&) -> CustomPublisher& = delete;
auto operator=(CustomPublisher&&) -> CustomPublisher& = default;

private:
CustomPublisher(iox2::Publisher<iox2::ServiceType::Ipc, TransmissionData, void>&& publisher,
iox2::Listener<iox2::ServiceType::Ipc>&& listener,
iox2::Notifier<iox2::ServiceType::Ipc>&& notifier)
: m_publisher { std::move(publisher) }
, m_listener { std::move(listener) }
, m_notifier { std::move(notifier) } {
}

iox2::Publisher<iox2::ServiceType::Ipc, TransmissionData, void> m_publisher;
iox2::Listener<iox2::ServiceType::Ipc> m_listener;
iox2::Notifier<iox2::ServiceType::Ipc> m_notifier;
};

#endif
126 changes: 126 additions & 0 deletions examples/cxx/event_based_communication/src/custom_subscriber.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright (c) 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

#ifndef IOX2_EXAMPLES_CUSTOM_SUBSCRIBER_HPP
#define IOX2_EXAMPLES_CUSTOM_SUBSCRIBER_HPP

#include "iox/into.hpp"
#include "iox2/file_descriptor.hpp"
#include "iox2/listener.hpp"
#include "iox2/node.hpp"
#include "iox2/notifier.hpp"
#include "iox2/service_name.hpp"
#include "iox2/service_type.hpp"
#include "iox2/subscriber.hpp"
#include "pubsub_event.hpp"
#include "transmission_data.hpp"

// High-level subscriber class that contains besides a subscriber also a notifier
// and a listener. The notifier is used to send events like
// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber
// connected.
// The listener waits for events originating from the publisher like
// `PubSubEvent::SentSample`.
class CustomSubscriber : public iox2::FileDescriptorBased {
public:
CustomSubscriber(const CustomSubscriber&) = delete;
CustomSubscriber(CustomSubscriber&&) = default;
~CustomSubscriber() override {
m_notifier
.notify_with_custom_event_id(
iox2::EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::SubscriberDisconnected)))
.expect("");
}

auto operator=(const CustomSubscriber&) -> CustomSubscriber& = delete;
auto operator=(CustomSubscriber&&) -> CustomSubscriber& = default;

static auto create(iox2::Node<iox2::ServiceType::Ipc>& node,
const iox2::ServiceName& service_name) -> CustomSubscriber {
auto pubsub_service =
node.service_builder(service_name).publish_subscribe<TransmissionData>().open_or_create().expect("");
auto event_service = node.service_builder(service_name).event().open_or_create().expect("");

auto listener = event_service.listener_builder().create().expect("");
auto notifier = event_service.notifier_builder().create().expect("");
auto subscriber = pubsub_service.subscriber_builder().create().expect("");

notifier
.notify_with_custom_event_id(
iox2::EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::SubscriberConnected)))
.expect("");

return CustomSubscriber { std::move(subscriber), std::move(notifier), std::move(listener) };
}

auto file_descriptor() const -> iox2::FileDescriptorView override {
return m_listener.file_descriptor();
}

void handle_event() {
for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value();
event = m_listener.try_wait_one()) {
switch (iox::from<size_t, PubSubEvent>(event.value()->as_value())) {
case PubSubEvent::SentHistory: {
std::cout << "History delivered" << std::endl;
for (auto sample = receive(); sample.has_value(); sample = receive()) {
std::cout << " history: " << sample->payload().x << std::endl;
}
break;
}
case PubSubEvent::SentSample: {
for (auto sample = receive(); sample.has_value(); sample = receive()) {
std::cout << "received: " << sample->payload().x << std::endl;
}
break;
}
case PubSubEvent::PublisherConnected: {
std::cout << "new publisher connected" << std::endl;
break;
}
case PubSubEvent::PublisherDisconnected: {
std::cout << "publisher disconnected" << std::endl;
break;
}
default: {
break;
}
}
}
}

auto receive() -> iox::optional<iox2::Sample<iox2::ServiceType::Ipc, TransmissionData, void>> {
auto sample = m_subscriber.receive().expect("");
if (sample.has_value()) {
m_notifier
.notify_with_custom_event_id(iox2::EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::ReceivedSample)))
.expect("");
}

return sample;
}

private:
CustomSubscriber(iox2::Subscriber<iox2::ServiceType::Ipc, TransmissionData, void>&& subscriber,
iox2::Notifier<iox2::ServiceType::Ipc>&& notifier,
iox2::Listener<iox2::ServiceType::Ipc>&& listener)
: m_subscriber { std::move(subscriber) }
, m_notifier { std::move(notifier) }
, m_listener { std::move(listener) } {
}

iox2::Subscriber<iox2::ServiceType::Ipc, TransmissionData, void> m_subscriber;
iox2::Notifier<iox2::ServiceType::Ipc> m_notifier;
iox2::Listener<iox2::ServiceType::Ipc> m_listener;
};

#endif
Loading