Skip to content

Commit

Permalink
[#390] Add documentation to example source code
Browse files Browse the repository at this point in the history
  • Loading branch information
elfenpiff committed Nov 24, 2024
1 parent 7655e31 commit 9003d8b
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 13 deletions.
13 changes: 13 additions & 0 deletions examples/cxx/event_based_communication/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ using namespace iox2;
constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1);
constexpr uint64_t HISTORY_SIZE = 20;

// High-level publisher class that contains besides a publisher also a notifier and a listener.
// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory`
// and the listener to wait for new subscribers.
class EventBasedPublisher : public FileDescriptorBased {
public:
EventBasedPublisher(const EventBasedPublisher&) = delete;
Expand Down Expand Up @@ -60,22 +63,29 @@ auto main() -> int {
auto publisher = EventBasedPublisher::create(node, ServiceName::create("My/Funk/ServiceName").expect(""));

auto waitset = WaitSetBuilder().create<ServiceType::Ipc>().expect("");
// Whenever our publisher receives an event we get notified.
auto publisher_guard = waitset.attach_notification(publisher).expect("");
// Attach an interval so that we wake up and can publish a new message
auto cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME).expect("");

uint64_t counter = 0;

// Event callback that is called whenever the WaitSet received an event.
auto on_event = [&](WaitSetAttachmentId<ServiceType::Ipc> attachment_id) -> CallbackProgression {
// when the cyclic trigger guard gets notified we send out a new message
if (attachment_id.has_event_from(cyclic_trigger_guard)) {
std::cout << "send message: " << counter << std::endl;
publisher.send(counter);
counter += 1;
// when something else happens on the publisher we handle the events
} else if (attachment_id.has_event_from(publisher_guard)) {
publisher.handle_event();
}
return CallbackProgression::Continue;
};

// Start the event loop. It will run until `CallbackProgression::Stop` is returned by the
// event callback or an interrupt/termination signal was received.
waitset.wait_and_process(on_event).expect("");

std::cout << "exit ..." << std::endl;
Expand Down Expand Up @@ -109,6 +119,9 @@ auto EventBasedPublisher::create(Node<ServiceType::Ipc>& node, const ServiceName
auto listener = event_service.listener_builder().create().expect("");
auto publisher = pubsub_service.publisher_builder().create().expect("");

notifier.notify_with_custom_event_id(EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::PublisherConnected)))
.expect("");

return EventBasedPublisher { std::move(publisher), std::move(listener), std::move(notifier) };
}

Expand Down
148 changes: 135 additions & 13 deletions examples/cxx/event_based_communication/src/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,155 @@
#include <iostream>

#include "iox/duration.hpp"
#include "iox/into.hpp"
#include "iox2/file_descriptor.hpp"
#include "iox2/listener.hpp"
#include "iox2/node.hpp"
#include "iox2/notifier.hpp"
#include "iox2/service_name.hpp"
#include "iox2/service_type.hpp"
#include "iox2/subscriber.hpp"
#include "iox2/waitset.hpp"
#include "pubsub_event.hpp"
#include "transmission_data.hpp"

constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1);
constexpr iox::units::Duration DEADLINE = iox::units::Duration::fromSeconds(2);

using namespace iox2;

// High-level subscriber class that contains besides a subscriber also a notifier
// and a listener. The notifier is used to send events like
// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber
// connected.
// The listener waits for events originating from the publisher like
// `PubSubEvent::SentSample`.
class EventBasedSubscriber : public FileDescriptorBased {
public:
EventBasedSubscriber(const EventBasedSubscriber&) = delete;
EventBasedSubscriber(EventBasedSubscriber&&) = default;
~EventBasedSubscriber() override;
auto operator=(const EventBasedSubscriber&) -> EventBasedSubscriber& = delete;
auto operator=(EventBasedSubscriber&&) -> EventBasedSubscriber& = default;

static auto create(Node<ServiceType::Ipc>& node, const ServiceName& service_name) -> EventBasedSubscriber;
auto file_descriptor() const -> FileDescriptorView override;
void handle_event();
auto receive() -> iox::optional<Sample<ServiceType::Ipc, TransmissionData, void>>;

private:
EventBasedSubscriber(Subscriber<ServiceType::Ipc, TransmissionData, void>&& subscriber,
Notifier<ServiceType::Ipc>&& notifier,
Listener<ServiceType::Ipc>&& listener);

Subscriber<ServiceType::Ipc, TransmissionData, void> m_subscriber;
Notifier<ServiceType::Ipc> m_notifier;
Listener<ServiceType::Ipc> m_listener;
};

auto main() -> int {
using namespace iox2;
auto node = NodeBuilder().create<ServiceType::Ipc>().expect("successful node creation");

auto service = node.service_builder(ServiceName::create("My/Funk/ServiceName").expect("valid service name"))
.publish_subscribe<TransmissionData>()
.open_or_create()
.expect("successful service creation/opening");
auto subscriber = EventBasedSubscriber::create(node, ServiceName::create("My/Funk/ServiceName").expect(""));

auto subscriber = service.subscriber_builder().create().expect("successful subscriber creation");
auto waitset = WaitSetBuilder().create<ServiceType::Ipc>().expect("");

while (node.wait(CYCLE_TIME).has_value()) {
auto sample = subscriber.receive().expect("receive succeeds");
while (sample.has_value()) {
std::cout << "received: " << sample->payload() << std::endl;
sample = subscriber.receive().expect("receive succeeds");
// The subscriber is attached as a deadline, meaning that we expect some activity
// latest after the deadline has passed.
auto subscriber_guard = waitset.attach_deadline(subscriber, DEADLINE).expect("");

auto on_event = [&](WaitSetAttachmentId<ServiceType::Ipc> attachment_id) {
// If we have received a new event on the subscriber we handle it.
if (attachment_id.has_event_from(subscriber_guard)) {
subscriber.handle_event();
// If the subscriber did not receive an event until DEADLINE has
// passed, we print out a warning.
} else if (attachment_id.has_missed_deadline(subscriber_guard)) {
std::cout << "Contract violation! The subscriber did not receive a message for " << DEADLINE << std::endl;
}
}

return CallbackProgression::Continue;
};

waitset.wait_and_process(on_event).expect("");

std::cout << "exit" << std::endl;

return 0;
}

EventBasedSubscriber::EventBasedSubscriber(Subscriber<ServiceType::Ipc, TransmissionData, void>&& subscriber,
Notifier<ServiceType::Ipc>&& notifier,
Listener<ServiceType::Ipc>&& listener)
: m_subscriber { std::move(subscriber) }
, m_notifier { std::move(notifier) }
, m_listener { std::move(listener) } {
}

EventBasedSubscriber::~EventBasedSubscriber() {
m_notifier.notify_with_custom_event_id(EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::SubscriberDisconnected)))
.expect("");
}


auto EventBasedSubscriber::create(Node<ServiceType::Ipc>& node,
const ServiceName& service_name) -> EventBasedSubscriber {
auto pubsub_service =
node.service_builder(service_name).publish_subscribe<TransmissionData>().open_or_create().expect("");
auto event_service = node.service_builder(service_name).event().open_or_create().expect("");

auto listener = event_service.listener_builder().create().expect("");
auto notifier = event_service.notifier_builder().create().expect("");
auto subscriber = pubsub_service.subscriber_builder().create().expect("");

notifier.notify_with_custom_event_id(EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::SubscriberConnected)))
.expect("");

return EventBasedSubscriber { std::move(subscriber), std::move(notifier), std::move(listener) };
}

auto EventBasedSubscriber::file_descriptor() const -> FileDescriptorView {
return m_listener.file_descriptor();
}

void EventBasedSubscriber::handle_event() {
for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value();
event = m_listener.try_wait_one()) {
switch (iox::from<size_t, PubSubEvent>(event.value()->as_value())) {
case PubSubEvent::SentHistory: {
std::cout << "History delivered" << std::endl;
for (auto sample = receive(); sample.has_value(); sample = receive()) {
std::cout << " history: " << sample->payload().x << std::endl;
}
break;
}
case PubSubEvent::SentSample: {
for (auto sample = receive(); sample.has_value(); sample = receive()) {
std::cout << "received: " << sample->payload().x << std::endl;
}
break;
}
case PubSubEvent::PublisherConnected: {
std::cout << "new publisher connected" << std::endl;
break;
}
case PubSubEvent::PublisherDisconnected: {
std::cout << "publisher disconnected" << std::endl;
break;
}
default: {
break;
}
}
}
}

auto EventBasedSubscriber::receive() -> iox::optional<Sample<ServiceType::Ipc, TransmissionData, void>> {
auto sample = m_subscriber.receive().expect("");
if (sample.has_value()) {
m_notifier.notify_with_custom_event_id(EventId(iox::from<PubSubEvent, size_t>(PubSubEvent::ReceivedSample)))
.expect("");
}

return sample;
}

11 changes: 11 additions & 0 deletions examples/rust/event_based_communication/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,40 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let publisher = EventBasedPublisher::new(&node, &"My/Funk/ServiceName".try_into()?)?;

let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;

// Whenever our publisher receives an event we get notified.
let publisher_guard = waitset.attach_notification(&publisher)?;
// Attach an interval so that we wake up and can publish a new message
let cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME)?;

let mut counter: u64 = 0;

// Event callback that is called whenever the WaitSet received an event.
let on_event = |attachment_id: WaitSetAttachmentId<ipc::Service>| {
// when the cyclic trigger guard gets notified we send out a new message
if attachment_id.has_event_from(&cyclic_trigger_guard) {
println!("send message: {}", counter);
publisher.send(counter).unwrap();
counter += 1;
// when something else happens on the publisher we handle the events
} else if attachment_id.has_event_from(&publisher_guard) {
publisher.handle_event().unwrap();
}
CallbackProgression::Continue
};

// Start the event loop. It will run until `CallbackProgression::Stop` is returned by the
// event callback or an interrupt/termination signal was received.
waitset.wait_and_process(on_event)?;

println!("exit ...");

Ok(())
}

/// High-level publisher class that contains besides a publisher also a notifier and a listener.
/// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory`
/// and the listener to wait for new subscribers.
#[derive(Debug)]
struct EventBasedPublisher {
publisher: Publisher<ipc::Service, TransmissionData, ()>,
Expand Down
12 changes: 12 additions & 0 deletions examples/rust/event_based_communication/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let subscriber = EventBasedSubscriber::new(&node, &"My/Funk/ServiceName".try_into()?)?;

let waitset = WaitSetBuilder::new().create::<ipc::Service>()?;

// The subscriber is attached as a deadline, meaning that we expect some activity
// latest after the deadline has passed.
let subscriber_guard = waitset.attach_deadline(&subscriber, DEADLINE)?;

let on_event = |attachment_id: WaitSetAttachmentId<ipc::Service>| {
// If we have received a new event on the subscriber we handle it.
if attachment_id.has_event_from(&subscriber_guard) {
subscriber.handle_event().unwrap();
// If the subscriber did not receive an event until DEADLINE has
// passed, we print out a warning.
} else if attachment_id.has_missed_deadline(&subscriber_guard) {
println!(
"Contract violation! The subscriber did not receive a message for {:?}.",
Expand Down Expand Up @@ -65,6 +71,12 @@ impl FileDescriptorBased for EventBasedSubscriber {

impl SynchronousMultiplexing for EventBasedSubscriber {}

// High-level subscriber class that contains besides a subscriber also a notifier
// and a listener. The notifier is used to send events like
// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber
// connected.
// The listener waits for events originating from the publisher like
// `PubSubEvent::SentSample`.
impl EventBasedSubscriber {
fn new(
node: &Node<ipc::Service>,
Expand Down

0 comments on commit 9003d8b

Please sign in to comment.