diff --git a/examples/cxx/event_based_communication/src/publisher.cpp b/examples/cxx/event_based_communication/src/publisher.cpp index 8bcaf6840..5086cbb27 100644 --- a/examples/cxx/event_based_communication/src/publisher.cpp +++ b/examples/cxx/event_based_communication/src/publisher.cpp @@ -31,6 +31,9 @@ using namespace iox2; constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); constexpr uint64_t HISTORY_SIZE = 20; +// High-level publisher class that contains besides a publisher also a notifier and a listener. +// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` +// and the listener to wait for new subscribers. class EventBasedPublisher : public FileDescriptorBased { public: EventBasedPublisher(const EventBasedPublisher&) = delete; @@ -60,22 +63,29 @@ auto main() -> int { auto publisher = EventBasedPublisher::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); auto waitset = WaitSetBuilder().create().expect(""); + // Whenever our publisher receives an event we get notified. auto publisher_guard = waitset.attach_notification(publisher).expect(""); + // Attach an interval so that we wake up and can publish a new message auto cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME).expect(""); uint64_t counter = 0; + // Event callback that is called whenever the WaitSet received an event. auto on_event = [&](WaitSetAttachmentId attachment_id) -> CallbackProgression { + // when the cyclic trigger guard gets notified we send out a new message if (attachment_id.has_event_from(cyclic_trigger_guard)) { std::cout << "send message: " << counter << std::endl; publisher.send(counter); counter += 1; + // when something else happens on the publisher we handle the events } else if (attachment_id.has_event_from(publisher_guard)) { publisher.handle_event(); } return CallbackProgression::Continue; }; + // Start the event loop. It will run until `CallbackProgression::Stop` is returned by the + // event callback or an interrupt/termination signal was received. waitset.wait_and_process(on_event).expect(""); std::cout << "exit ..." << std::endl; @@ -109,6 +119,9 @@ auto EventBasedPublisher::create(Node& node, const ServiceName auto listener = event_service.listener_builder().create().expect(""); auto publisher = pubsub_service.publisher_builder().create().expect(""); + notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::PublisherConnected))) + .expect(""); + return EventBasedPublisher { std::move(publisher), std::move(listener), std::move(notifier) }; } diff --git a/examples/cxx/event_based_communication/src/subscriber.cpp b/examples/cxx/event_based_communication/src/subscriber.cpp index b534b4e3a..ec2de1203 100644 --- a/examples/cxx/event_based_communication/src/subscriber.cpp +++ b/examples/cxx/event_based_communication/src/subscriber.cpp @@ -13,33 +13,155 @@ #include #include "iox/duration.hpp" +#include "iox/into.hpp" +#include "iox2/file_descriptor.hpp" +#include "iox2/listener.hpp" #include "iox2/node.hpp" +#include "iox2/notifier.hpp" #include "iox2/service_name.hpp" #include "iox2/service_type.hpp" +#include "iox2/subscriber.hpp" +#include "iox2/waitset.hpp" +#include "pubsub_event.hpp" #include "transmission_data.hpp" -constexpr iox::units::Duration CYCLE_TIME = iox::units::Duration::fromSeconds(1); +constexpr iox::units::Duration DEADLINE = iox::units::Duration::fromSeconds(2); + +using namespace iox2; + +// High-level subscriber class that contains besides a subscriber also a notifier +// and a listener. The notifier is used to send events like +// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber +// connected. +// The listener waits for events originating from the publisher like +// `PubSubEvent::SentSample`. +class EventBasedSubscriber : public FileDescriptorBased { + public: + EventBasedSubscriber(const EventBasedSubscriber&) = delete; + EventBasedSubscriber(EventBasedSubscriber&&) = default; + ~EventBasedSubscriber() override; + auto operator=(const EventBasedSubscriber&) -> EventBasedSubscriber& = delete; + auto operator=(EventBasedSubscriber&&) -> EventBasedSubscriber& = default; + + static auto create(Node& node, const ServiceName& service_name) -> EventBasedSubscriber; + auto file_descriptor() const -> FileDescriptorView override; + void handle_event(); + auto receive() -> iox::optional>; + + private: + EventBasedSubscriber(Subscriber&& subscriber, + Notifier&& notifier, + Listener&& listener); + + Subscriber m_subscriber; + Notifier m_notifier; + Listener m_listener; +}; auto main() -> int { - using namespace iox2; auto node = NodeBuilder().create().expect("successful node creation"); - auto service = node.service_builder(ServiceName::create("My/Funk/ServiceName").expect("valid service name")) - .publish_subscribe() - .open_or_create() - .expect("successful service creation/opening"); + auto subscriber = EventBasedSubscriber::create(node, ServiceName::create("My/Funk/ServiceName").expect("")); - auto subscriber = service.subscriber_builder().create().expect("successful subscriber creation"); + auto waitset = WaitSetBuilder().create().expect(""); - while (node.wait(CYCLE_TIME).has_value()) { - auto sample = subscriber.receive().expect("receive succeeds"); - while (sample.has_value()) { - std::cout << "received: " << sample->payload() << std::endl; - sample = subscriber.receive().expect("receive succeeds"); + // The subscriber is attached as a deadline, meaning that we expect some activity + // latest after the deadline has passed. + auto subscriber_guard = waitset.attach_deadline(subscriber, DEADLINE).expect(""); + + auto on_event = [&](WaitSetAttachmentId attachment_id) { + // If we have received a new event on the subscriber we handle it. + if (attachment_id.has_event_from(subscriber_guard)) { + subscriber.handle_event(); + // If the subscriber did not receive an event until DEADLINE has + // passed, we print out a warning. + } else if (attachment_id.has_missed_deadline(subscriber_guard)) { + std::cout << "Contract violation! The subscriber did not receive a message for " << DEADLINE << std::endl; } - } + + return CallbackProgression::Continue; + }; + + waitset.wait_and_process(on_event).expect(""); std::cout << "exit" << std::endl; return 0; } + +EventBasedSubscriber::EventBasedSubscriber(Subscriber&& subscriber, + Notifier&& notifier, + Listener&& listener) + : m_subscriber { std::move(subscriber) } + , m_notifier { std::move(notifier) } + , m_listener { std::move(listener) } { +} + +EventBasedSubscriber::~EventBasedSubscriber() { + m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SubscriberDisconnected))) + .expect(""); +} + + +auto EventBasedSubscriber::create(Node& node, + const ServiceName& service_name) -> EventBasedSubscriber { + auto pubsub_service = + node.service_builder(service_name).publish_subscribe().open_or_create().expect(""); + auto event_service = node.service_builder(service_name).event().open_or_create().expect(""); + + auto listener = event_service.listener_builder().create().expect(""); + auto notifier = event_service.notifier_builder().create().expect(""); + auto subscriber = pubsub_service.subscriber_builder().create().expect(""); + + notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::SubscriberConnected))) + .expect(""); + + return EventBasedSubscriber { std::move(subscriber), std::move(notifier), std::move(listener) }; +} + +auto EventBasedSubscriber::file_descriptor() const -> FileDescriptorView { + return m_listener.file_descriptor(); +} + +void EventBasedSubscriber::handle_event() { + for (auto event = m_listener.try_wait_one(); event.has_value() && event->has_value(); + event = m_listener.try_wait_one()) { + switch (iox::from(event.value()->as_value())) { + case PubSubEvent::SentHistory: { + std::cout << "History delivered" << std::endl; + for (auto sample = receive(); sample.has_value(); sample = receive()) { + std::cout << " history: " << sample->payload().x << std::endl; + } + break; + } + case PubSubEvent::SentSample: { + for (auto sample = receive(); sample.has_value(); sample = receive()) { + std::cout << "received: " << sample->payload().x << std::endl; + } + break; + } + case PubSubEvent::PublisherConnected: { + std::cout << "new publisher connected" << std::endl; + break; + } + case PubSubEvent::PublisherDisconnected: { + std::cout << "publisher disconnected" << std::endl; + break; + } + default: { + break; + } + } + } +} + +auto EventBasedSubscriber::receive() -> iox::optional> { + auto sample = m_subscriber.receive().expect(""); + if (sample.has_value()) { + m_notifier.notify_with_custom_event_id(EventId(iox::from(PubSubEvent::ReceivedSample))) + .expect(""); + } + + return sample; +} + diff --git a/examples/rust/event_based_communication/publisher.rs b/examples/rust/event_based_communication/publisher.rs index d079d8673..69f0388aa 100644 --- a/examples/rust/event_based_communication/publisher.rs +++ b/examples/rust/event_based_communication/publisher.rs @@ -28,22 +28,30 @@ fn main() -> Result<(), Box> { let publisher = EventBasedPublisher::new(&node, &"My/Funk/ServiceName".try_into()?)?; let waitset = WaitSetBuilder::new().create::()?; + + // Whenever our publisher receives an event we get notified. let publisher_guard = waitset.attach_notification(&publisher)?; + // Attach an interval so that we wake up and can publish a new message let cyclic_trigger_guard = waitset.attach_interval(CYCLE_TIME)?; let mut counter: u64 = 0; + // Event callback that is called whenever the WaitSet received an event. let on_event = |attachment_id: WaitSetAttachmentId| { + // when the cyclic trigger guard gets notified we send out a new message if attachment_id.has_event_from(&cyclic_trigger_guard) { println!("send message: {}", counter); publisher.send(counter).unwrap(); counter += 1; + // when something else happens on the publisher we handle the events } else if attachment_id.has_event_from(&publisher_guard) { publisher.handle_event().unwrap(); } CallbackProgression::Continue }; + // Start the event loop. It will run until `CallbackProgression::Stop` is returned by the + // event callback or an interrupt/termination signal was received. waitset.wait_and_process(on_event)?; println!("exit ..."); @@ -51,6 +59,9 @@ fn main() -> Result<(), Box> { Ok(()) } +/// High-level publisher class that contains besides a publisher also a notifier and a listener. +/// The notifier is used to send events like `PubSubEvent::SentSample` or `PubSubEvent::SentHistory` +/// and the listener to wait for new subscribers. #[derive(Debug)] struct EventBasedPublisher { publisher: Publisher, diff --git a/examples/rust/event_based_communication/subscriber.rs b/examples/rust/event_based_communication/subscriber.rs index e144e6bd0..88c074645 100644 --- a/examples/rust/event_based_communication/subscriber.rs +++ b/examples/rust/event_based_communication/subscriber.rs @@ -28,11 +28,17 @@ fn main() -> Result<(), Box> { let subscriber = EventBasedSubscriber::new(&node, &"My/Funk/ServiceName".try_into()?)?; let waitset = WaitSetBuilder::new().create::()?; + + // The subscriber is attached as a deadline, meaning that we expect some activity + // latest after the deadline has passed. let subscriber_guard = waitset.attach_deadline(&subscriber, DEADLINE)?; let on_event = |attachment_id: WaitSetAttachmentId| { + // If we have received a new event on the subscriber we handle it. if attachment_id.has_event_from(&subscriber_guard) { subscriber.handle_event().unwrap(); + // If the subscriber did not receive an event until DEADLINE has + // passed, we print out a warning. } else if attachment_id.has_missed_deadline(&subscriber_guard) { println!( "Contract violation! The subscriber did not receive a message for {:?}.", @@ -65,6 +71,12 @@ impl FileDescriptorBased for EventBasedSubscriber { impl SynchronousMultiplexing for EventBasedSubscriber {} +// High-level subscriber class that contains besides a subscriber also a notifier +// and a listener. The notifier is used to send events like +// `PubSubEvent::ReceivedSample` or to notify the publisher that a new subscriber +// connected. +// The listener waits for events originating from the publisher like +// `PubSubEvent::SentSample`. impl EventBasedSubscriber { fn new( node: &Node,