Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#390] event based comm example cpp #524

Merged
2 changes: 1 addition & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ These types are demonstrated in the complex data types example.
| docker | [all](rust/docker) | Communicate between different docker containers and the host. |
| domains | [C](c/domains) [C++](cxx/domains) [Rust](rust/domains) | Establish separate domains that operate independently from one another. |
| event | [C](c/event) [C++](cxx/event) [Rust](rust/event) | Push notifications - send event signals to wakeup processes that are waiting for them. |
| event based communication | [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. |
| event based communication | [C++](rust/event_based_communication) [Rust](rust/event_based_communication) | Define multiple events like publisher/subscriber created or removed, send sample, received sample, deliver history etc. and react on them for a fully event driven communication setup. |
elfenpiff marked this conversation as resolved.
Show resolved Hide resolved
| event multiplexing | [C](c/event_multiplexing) [C++](cxx/event_multiplexing) [Rust](rust/event_multiplexing) | Wait on multiple listeners or sockets with a single call. The WaitSet demultiplexes incoming events and notifies the user. |
| publish subscribe | [C](c/publish_subscribe) [C++](cxx/publish_subscribe) [Rust](rust/publish_subscribe) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern). |
| publish subscribe dynamic data | [C++](cxx/publish_subscribe_dynamic_data) [Rust](rust/publish_subscribe_dynamic_data) | Communication between multiple processes with a [publish subscribe messaging pattern](https://en.wikipedia.org/wiki/Publish–subscribe_pattern) and payload data that has a dynamic size. |
Expand Down
1 change: 1 addition & 0 deletions examples/cxx/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ add_subdirectory(discovery)
add_subdirectory(domains)
add_subdirectory(event)
add_subdirectory(event_multiplexing)
add_subdirectory(event_based_communication)
add_subdirectory(publish_subscribe)
add_subdirectory(publish_subscribe_dynamic_data)
add_subdirectory(publish_subscribe_with_user_header)
Expand Down
39 changes: 39 additions & 0 deletions examples/cxx/event_based_communication/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright (c) 2024 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache Software License 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
# which is available at https://opensource.org/licenses/MIT.
#
# SPDX-License-Identifier: Apache-2.0 OR MIT

load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library")

cc_binary(
name = "example_cxx_event_based_communication_publisher",
srcs = [
"src/publisher.cpp",
"src/transmission_data.hpp",
"src/pubsub_event.hpp",
],
deps = [
"@iceoryx//:iceoryx_hoofs",
"//:iceoryx2-cxx-static",
],
)

cc_binary(
name = "example_cxx_event_based_communication_subscriber",
srcs = [
"src/subscriber.cpp",
"src/transmission_data.hpp",
"src/pubsub_event.hpp",
],
deps = [
"@iceoryx//:iceoryx_hoofs",
"//:iceoryx2-cxx-static",
],
)
22 changes: 22 additions & 0 deletions examples/cxx/event_based_communication/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright (c) 2024 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache Software License 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
# which is available at https://opensource.org/licenses/MIT.
#
# SPDX-License-Identifier: Apache-2.0 OR MIT

cmake_minimum_required(VERSION 3.22)
project(example_cxx_event_based_communication LANGUAGES CXX)

find_package(iceoryx2-cxx 0.4.1 REQUIRED)

add_executable(example_cxx_event_based_communication_publisher src/publisher.cpp)
target_link_libraries(example_cxx_event_based_communication_publisher iceoryx2-cxx::static-lib-cxx)

add_executable(example_cxx_event_based_communication_subscriber src/subscriber.cpp)
target_link_libraries(example_cxx_event_based_communication_subscriber iceoryx2-cxx::static-lib-cxx)
35 changes: 35 additions & 0 deletions examples/cxx/event_based_communication/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Event-Based Communication

## Running The Example

This example demonstrates iceoryx2's event multiplexing mechanism in a more
complex setup. The iceoryx2 `Publisher` and `Subscriber` are integrated into
custom `ExamplePublisher` and `ExampleSubscriber` classes, which also
incorporate an additional iceoryx2 `Notifier` and `Listener`. This setup
enables automatic event emission whenever an `ExamplePublisher` or
`ExampleSubscriber` is created or dropped. Additionally, events are emitted
whenever a new `Sample` is sent or received.

When a `class` inherits from `FileDescriptorBased`, it can be attached to a
`WaitSet`. Both `ExamplePublisher` and `ExampleSubscriber` implement this
interface by forwarding calls to their underlying `Listener`, which already
provides an implementation of `FileDescriptorBased`.

The `WaitSet` notifies the user of the origin of an event notification. The
user can then acquire the `EventId` from the `Listener`. Based on the value of
the `EventId`, the user can identify the specific event that occurred and take
appropriate action.

### Terminal 1

```sh
./target/ffi/build/examples/cxx/event_based_communication/example_cxx_event_based_communication_publisher
```

### Terminal 2

```sh
./target/ffi/build/examples/cxx/event_based_communication/example_cxx_event_based_communication_subscriber
```

Feel free to run multiple publishers or subscribers in parallel.
167 changes: 167 additions & 0 deletions examples/cxx/event_based_communication/src/publisher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright (c) 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

#include "iox2/publisher.hpp"
#include "iox/duration.hpp"
#include "iox2/file_descriptor.hpp"
#include "iox2/listener.hpp"
#include "iox2/node.hpp"
#include "iox2/notifier.hpp"
#include "iox2/sample_mut.hpp"
#include "iox2/service_name.hpp"
#include "iox2/service_type.hpp"
#include "iox2/waitset.hpp"
#include "pubsub_event.hpp"
#include "transmission_data.hpp"

#include <iostream>
#include <utility>

using namespace iox2;

constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1);
constexpr uint64_t HISTORY_SIZE = 20;

// High-level publisher class that contains besides a publisher also a notifier and a listener.
// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory`
// and the listener to wait for new subscribers.
class EventBasedPublisher : public FileDescriptorBased {
elfenpiff marked this conversation as resolved.
Show resolved Hide resolved
public:
EventBasedPublisher(const EventBasedPublisher&) = delete;
EventBasedPublisher(EventBasedPublisher&&) = default;
~EventBasedPublisher() override;

auto operator=(const EventBasedPublisher&) -> EventBasedPublisher& = delete;
auto operator=(EventBasedPublisher&&) -> EventBasedPublisher& = default;

static auto create(Node<ServiceType::Ipc>& node, const ServiceName& service_name) -> EventBasedPublisher;
void handle_event();
void send(uint64_t counter);
auto file_descriptor() const -> FileDescriptorView override;

private:
EventBasedPublisher(Publisher<ServiceType::Ipc, TransmissionData, void>&& publisher,
Listener<ServiceType::Ipc>&& listener,
Notifier<ServiceType::Ipc>&& notifier);

Publisher<ServiceType::Ipc, TransmissionData, void> m_publisher;
Listener<ServiceType::Ipc> m_listener;
Notifier<ServiceType::Ipc> m_notifier;
};
elfenpiff marked this conversation as resolved.
Show resolved Hide resolved

auto main() -> int {
elfenpiff marked this conversation as resolved.
Show resolved Hide resolved
auto node = NodeBuilder().create<ServiceType::Ipc>().expect("successful node creation");
auto publisher = EventBasedPublisher::create(node, ServiceName::create("My/Funk/ServiceName").expect(""));

auto waitset = WaitSetBuilder().create<ServiceType::Ipc>().expect("");
// Whenever our publisher receives an event we get notified.
auto publisher_guard = waitset.attach_notification(publisher).expect("");
// Attach an interval so that we wake up and can publish a new message
auto cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME).expect("");

uint64_t counter = 0;

// Event callback that is called whenever the WaitSet received an event.
auto on_event = [&](WaitSetAttachmentId<ServiceType::Ipc> attachment_id) -> CallbackProgression {
// when the cyclic trigger guard gets notified we send out a new message
if (attachment_id.has_event_from(cyclic_trigger_guard)) {
std::cout << "send message: " << counter << std::endl;
publisher.send(counter);
counter += 1;
// when something else happens on the publisher we handle the events
} else if (attachment_id.has_event_from(publisher_guard)) {
publisher.handle_event();
}
return CallbackProgression::Continue;
};

// Start the event loop. It will run until `CallbackProgression::Stop` is returned by the
// event callback or an interrupt/termination signal was received.
waitset.wait_and_process(on_event).expect("");

std::cout << "exit ..." << std::endl;

return 0;
}

EventBasedPublisher::EventBasedPublisher(Publisher<ServiceType::Ipc, TransmissionData, void>&& publisher,
Listener<ServiceType::Ipc>&& listener,
Notifier<ServiceType::Ipc>&& notifier)
: m_publisher { std::move(publisher) }
, m_listener { std::move(listener) }
, m_notifier { std::move(notifier) } {
}

EventBasedPublisher::~EventBasedPublisher() {
m_notifier.notify_with_custom_event_id(EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::PublisherDisconnected)))
.expect("");
}

auto EventBasedPublisher::create(Node<ServiceType::Ipc>& node, const ServiceName& service_name) -> EventBasedPublisher {
auto pubsub_service = node.service_builder(service_name)
.publish_subscribe<TransmissionData>()
.history_size(HISTORY_SIZE)
.subscriber_max_buffer_size(HISTORY_SIZE)
.open_or_create()
.expect("");
auto event_service = node.service_builder(service_name).event().open_or_create().expect("");

auto notifier = event_service.notifier_builder().create().expect("");
auto listener = event_service.listener_builder().create().expect("");
auto publisher = pubsub_service.publisher_builder().create().expect("");

notifier.notify_with_custom_event_id(EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::PublisherConnected)))
.expect("");

return EventBasedPublisher { std::move(publisher), std::move(listener), std::move(notifier) };
}

auto EventBasedPublisher::file_descriptor() const -> FileDescriptorView {
return m_listener.file_descriptor();
}

void EventBasedPublisher::handle_event() {
for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value();
event = m_listener.try_wait_one()) {
switch (iox::from<size_t, PubSubEvent>(event.value()->as_value())) {
case PubSubEvent::SubscriberConnected: {
std::cout << "new subscriber connected - delivering history" << std::endl;
m_publisher.update_connections().expect("");
m_notifier.notify_with_custom_event_id(EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::SentHistory)))
.expect("");
break;
}
case PubSubEvent::SubscriberDisconnected: {
std::cout << "subscriber disconnected" << std::endl;
break;
}
case PubSubEvent::ReceivedSample: {
std::cout << "subscriber has consumed sample" << std::endl;
break;
}
default: {
break;
}
}
}
}

void EventBasedPublisher::send(const uint64_t counter) {
constexpr double SOME_NUMBER = 812.12;
auto sample = m_publisher.loan_uninit().expect("");
sample.write_payload(TransmissionData {
static_cast<int32_t>(counter), static_cast<int32_t>(counter), static_cast<double>(counter) * SOME_NUMBER });
auto initialized_sample = assume_init(std::move(sample));
::iox2::send(std::move(initialized_sample)).expect("");

m_notifier.notify_with_custom_event_id(EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::SentSample))).expect("");
}
63 changes: 63 additions & 0 deletions examples/cxx/event_based_communication/src/pubsub_event.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2024 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
//
// This program and the accompanying materials are made available under the
// terms of the Apache Software License 2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0, or the MIT license
// which is available at https://opensource.org/licenses/MIT.
//
// SPDX-License-Identifier: Apache-2.0 OR MIT

#ifndef IOX2_EXAMPLES_PUBSUB_EVENT_HPP
#define IOX2_EXAMPLES_PUBSUB_EVENT_HPP

#include "iox/into.hpp"
#include "iox2/event_id.hpp"

#include <cstdint>

enum class PubSubEvent : uint8_t {
PublisherConnected = 0,
PublisherDisconnected = 1,
SubscriberConnected = 2,
SubscriberDisconnected = 3,
SentSample = 4,
ReceivedSample = 5,
SentHistory = 6,
Unknown
};

namespace iox {
template <>
constexpr auto from<PubSubEvent, size_t>(const PubSubEvent value) noexcept -> size_t {
return static_cast<uint8_t>(value);
}

template <>
constexpr auto from<size_t, PubSubEvent>(const size_t value) noexcept -> PubSubEvent {
switch (value) {
case from<PubSubEvent, size_t>(PubSubEvent::PublisherConnected):
return PubSubEvent::PublisherConnected;
case from<PubSubEvent, size_t>(PubSubEvent::PublisherDisconnected):
return PubSubEvent::PublisherDisconnected;
case from<PubSubEvent, size_t>(PubSubEvent::SubscriberConnected):
return PubSubEvent::SubscriberConnected;
case from<PubSubEvent, size_t>(PubSubEvent::SubscriberDisconnected):
return PubSubEvent::SubscriberDisconnected;
case from<PubSubEvent, size_t>(PubSubEvent::SentSample):
return PubSubEvent::SentSample;
case from<PubSubEvent, size_t>(PubSubEvent::ReceivedSample):
return PubSubEvent::ReceivedSample;
case from<PubSubEvent, size_t>(PubSubEvent::SentHistory):
return PubSubEvent::SentHistory;
elfenpiff marked this conversation as resolved.
Show resolved Hide resolved
default:
return PubSubEvent::Unknown;
}
IOX_UNREACHABLE();
}
} // namespace iox


#endif
Loading
Loading