Skip to content

Commit

Permalink
Merge pull request #509 from elfenpiff/iox2-507-waitset-adjustments
Browse files Browse the repository at this point in the history
[#507] waitset adjustments
  • Loading branch information
elfenpiff authored Nov 12, 2024
2 parents 810fb87 + 71c4db4 commit 5b8e6ee
Show file tree
Hide file tree
Showing 14 changed files with 353 additions and 191 deletions.
3 changes: 2 additions & 1 deletion examples/c/event_multiplexing/src/wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct CallbackContext {


// the function that is called when a listener has received an event
void on_event(iox2_waitset_attachment_id_h attachment_id, void* context) {
iox2_callback_progression_e on_event(iox2_waitset_attachment_id_h attachment_id, void* context) {
struct CallbackContext* ctx = (struct CallbackContext*) context;

iox2_event_id_t event_id;
Expand Down Expand Up @@ -65,6 +65,7 @@ void on_event(iox2_waitset_attachment_id_h attachment_id, void* context) {
}

iox2_waitset_attachment_id_drop(attachment_id);
return iox2_callback_progression_e_CONTINUE;
}

//NOLINTBEGIN(readability-function-size)
Expand Down
2 changes: 2 additions & 0 deletions examples/cxx/event_multiplexing/src/wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ auto main(int argc, char** argv) -> int {
listener.try_wait_all([](auto event_id) { std::cout << " " << event_id; }).expect("");
std::cout << std::endl;
}

return iox2::CallbackProgression::Continue;
};

std::cout << "Waiting on the following services: " << service_name_1.to_string().c_str() << ", "
Expand Down
2 changes: 2 additions & 0 deletions examples/rust/event_multiplexing/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("");
}

CallbackProgression::Continue
};

// loops until the user has pressed CTRL+c, the application has received a SIGTERM or SIGINT
Expand Down
16 changes: 8 additions & 8 deletions iceoryx2-bb/posix/src/deadline_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
//! // contains all the deadlines where the deadline was hit
//! let mut missed_deadlines = vec![];
//! deadline_queue
//! .missed_deadlines(|deadline_queue_index| missed_deadlines.push(deadline_queue_index));
//! .missed_deadlines(|deadline_queue_index| {
//! missed_deadlines.push(deadline_queue_index);
//! CallbackProgression::Continue
//! });
//! ```
use std::{cell::RefCell, fmt::Debug, sync::atomic::Ordering, time::Duration};
pub use iceoryx2_bb_elementary::CallbackProgression;

use iceoryx2_bb_elementary::CallbackProgression;
use iceoryx2_bb_log::fail;
use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicU64;
use std::{cell::RefCell, fmt::Debug, sync::atomic::Ordering, time::Duration};

use crate::{
clock::ClockType,
Expand Down Expand Up @@ -268,18 +271,15 @@ impl DeadlineQueue {

/// Iterates over all missed deadlines and calls the provided callback for each of them
/// and provide the [`DeadlineQueueIndex`] to identify them.
pub fn missed_deadlines<F: FnMut(DeadlineQueueIndex)>(
pub fn missed_deadlines<F: FnMut(DeadlineQueueIndex) -> CallbackProgression>(
&self,
mut call: F,
) -> Result<(), TimeError> {
let now = fail!(from self, when Time::now_with_clock(self.clock_type),
"Unable to return next duration since the current time could not be acquired.");

let now = now.as_duration().as_nanos();
self.handle_missed_deadlines(now, |idx| {
call(idx);
CallbackProgression::Continue
});
self.handle_missed_deadlines(now, |idx| -> CallbackProgression { call(idx) });

Ok(())
}
Expand Down
45 changes: 38 additions & 7 deletions iceoryx2-bb/posix/tests/deadline_queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// SPDX-License-Identifier: Apache-2.0 OR MIT

mod deadline_queue {
use iceoryx2_bb_elementary::CallbackProgression;
use iceoryx2_bb_posix::deadline_queue::*;
use iceoryx2_bb_testing::assert_that;
use std::time::Duration;
Expand Down Expand Up @@ -85,8 +86,11 @@ mod deadline_queue {
.unwrap();

let mut missed_deadline_queues = vec![];
sut.missed_deadlines(|idx| missed_deadline_queues.push(idx))
.unwrap();
sut.missed_deadlines(|idx| {
missed_deadline_queues.push(idx);
CallbackProgression::Continue
})
.unwrap();

assert_that!(missed_deadline_queues, len 0);
}
Expand All @@ -104,8 +108,11 @@ mod deadline_queue {
std::thread::sleep(Duration::from_millis(10));

let mut missed_deadlines = vec![];
sut.missed_deadlines(|idx| missed_deadlines.push(idx))
.unwrap();
sut.missed_deadlines(|idx| {
missed_deadlines.push(idx);
CallbackProgression::Continue
})
.unwrap();

assert_that!(missed_deadlines, len 1);
assert_that!(missed_deadlines, contains _guard_1.index());
Expand All @@ -122,15 +129,38 @@ mod deadline_queue {
std::thread::sleep(Duration::from_millis(10));

let mut missed_deadlines = vec![];
sut.missed_deadlines(|idx| missed_deadlines.push(idx))
.unwrap();
sut.missed_deadlines(|idx| {
missed_deadlines.push(idx);
CallbackProgression::Continue
})
.unwrap();

assert_that!(missed_deadlines, len 3);
assert_that!(missed_deadlines, contains guard_1.index());
assert_that!(missed_deadlines, contains guard_2.index());
assert_that!(missed_deadlines, contains guard_3.index());
}

#[test]
fn missed_deadline_iteration_stops_when_requested() {
let sut = DeadlineQueueBuilder::new().create().unwrap();

let _guard_1 = sut.add_deadline_interval(Duration::from_nanos(1)).unwrap();
let _guard_2 = sut.add_deadline_interval(Duration::from_nanos(10)).unwrap();
let _guard_3 = sut.add_deadline_interval(Duration::from_nanos(20)).unwrap();

std::thread::sleep(Duration::from_millis(10));

let mut missed_deadlines = vec![];
sut.missed_deadlines(|idx| {
missed_deadlines.push(idx);
CallbackProgression::Stop
})
.unwrap();

assert_that!(missed_deadlines, len 1);
}

#[test]
fn duration_until_next_deadline_is_zero_if_deadline_is_already_missed() {
let sut = DeadlineQueueBuilder::new().create().unwrap();
Expand All @@ -149,7 +179,8 @@ mod deadline_queue {
let mut deadline_idx = None;
sut.missed_deadlines(|idx| {
missed_deadline_counter += 1;
deadline_idx = Some(idx)
deadline_idx = Some(idx);
CallbackProgression::Continue
})
.unwrap();

Expand Down
2 changes: 2 additions & 0 deletions iceoryx2-ffi/cxx/include/iox2/enum_translation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,8 @@ constexpr auto from<int, iox2::WaitSetRunResult>(const int value) noexcept -> io
return iox2::WaitSetRunResult::TerminationRequest;
case iox2_waitset_run_result_e_STOP_REQUEST:
return iox2::WaitSetRunResult::StopRequest;
case iox2_waitset_run_result_e_ALL_EVENTS_HANDLED:
return iox2::WaitSetRunResult::AllEventsHandled;
}

IOX_UNREACHABLE();
Expand Down
51 changes: 34 additions & 17 deletions iceoryx2-ffi/cxx/include/iox2/waitset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "iox/duration.hpp"
#include "iox/expected.hpp"
#include "iox2/callback_progression.hpp"
#include "iox2/file_descriptor.hpp"
#include "iox2/internal/iceoryx2.hpp"
#include "iox2/listener.hpp"
Expand Down Expand Up @@ -78,7 +79,7 @@ class WaitSetAttachmentId {
private:
explicit WaitSetAttachmentId(iox2_waitset_attachment_id_h handle);
template <ServiceType>
friend auto run_callback(iox2_waitset_attachment_id_h, void*);
friend auto run_callback(iox2_waitset_attachment_id_h, void*) -> iox2_callback_progression_e;
template <ServiceType ST>
friend auto operator==(const WaitSetAttachmentId<ST>&, const WaitSetAttachmentId<ST>&) -> bool;
template <ServiceType ST>
Expand All @@ -103,7 +104,8 @@ auto operator<<(std::ostream& stream, const WaitSetAttachmentId<S>& self) -> std
/// The [`WaitSet`] implements a reactor pattern and allows to wait on multiple events in one
/// single call [`WaitSet::try_wait_and_process()`] until it wakes up or to run repeatedly with
/// [`WaitSet::wait_and_process()`] until the a interrupt or termination signal was received or the user
/// has explicitly requested to stop with [`WaitSet::stop()`].
/// has explicitly requested to stop by returning [`CallbackProgression::Stop`] in the provided
/// callback.
///
/// The [`Listener`] can be attached as well as sockets or anything else that
/// can be packed into a [`FileDescriptorView`].
Expand All @@ -118,24 +120,39 @@ class WaitSet {
auto operator=(WaitSet&&) noexcept -> WaitSet&;
~WaitSet();

/// Can be called from within a callback during [`WaitSet::wait_and_process()`] to signal the [`WaitSet`]
/// to stop running after this iteration.
void stop();

/// Waits in an infinite loop on the [`WaitSet`]. The provided callback is called for every
/// attachment that was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to
/// acquire the source.
/// Waits until an event arrives on the [`WaitSet`], then collects all events by calling the
/// provided `fn_call` callback with the corresponding [`WaitSetAttachmentId`]. In contrast
/// to [`WaitSet::wait_and_process_once()`] it will never return until the user explicitly
/// requests it by returning [`CallbackProgression::Stop`] or by receiving a signal.
///
/// The provided callback must return [`CallbackProgression::Continue`] to continue the event
/// processing and handle the next event or [`CallbackProgression::Stop`] to return from this
/// call immediately. All unhandled events will be lost forever and the call will return
/// [`WaitSetRunResult::StopRequest`].
///
/// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit
/// the loop and inform the user via [`WaitSetRunResult`].
auto wait_and_process(const iox::function<void(WaitSetAttachmentId<S>)>& fn_call)
/// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or
/// [`WaitSetRunResult::TerminationRequest`].
auto wait_and_process(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError>;

/// Tries to wait on the [`WaitSet`]. The provided callback is called for every attachment that
/// was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to acquire the
/// source.
/// If nothing was triggered the [`WaitSet`] returns immediately.
auto try_wait_and_process(const iox::function<void(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<void, WaitSetRunError>;
/// Waits until an event arrives on the [`WaitSet`], then collects all events by calling the
/// provided `fn_call` callback with the corresponding [`WaitSetAttachmentId`] and then
/// returns. This makes it ideal to be called in some kind of event-loop.
///
/// The provided callback must return [`CallbackProgression::Continue`] to continue the event
/// processing and handle the next event or [`CallbackProgression::Stop`] to return from this
/// call immediately. All unhandled events will be lost forever and the call will return
/// [`WaitSetRunResult::StopRequest`].
///
/// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit
/// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or
/// [`WaitSetRunResult::TerminationRequest`].
///
/// When no signal was received and all events were handled, it will return
/// [`WaitSetRunResult::AllEventsHandled`].
auto wait_and_process_once(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError>;

/// Returns the capacity of the [`WaitSet`]
auto capacity() const -> uint64_t;
Expand Down
6 changes: 4 additions & 2 deletions iceoryx2-ffi/cxx/include/iox2/waitset_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ enum class WaitSetRunResult : uint8_t {
TerminationRequest,
/// An interrupt signal `SIGINT` was received.
Interrupt,
/// The user explicitly called [`WaitSet::stop()`].
StopRequest
/// The users callback returned [`CallbackProgression::Stop`].
StopRequest,
/// All events were handled.
AllEventsHandled
};

/// Defines the failures that can occur when attaching something with
Expand Down
22 changes: 9 additions & 13 deletions iceoryx2-ffi/cxx/src/waitset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,6 @@ auto WaitSet<S>::is_empty() const -> bool {
return iox2_waitset_is_empty(&m_handle);
}

template <ServiceType S>
void WaitSet<S>::stop() {
iox2_waitset_stop(&m_handle);
}

template <ServiceType S>
auto WaitSet<S>::attach_interval(const iox::units::Duration deadline)
-> iox::expected<WaitSetGuard<S>, WaitSetAttachmentError> {
Expand Down Expand Up @@ -290,13 +285,13 @@ auto WaitSet<S>::attach_notification(const Listener<S>& listener)
}

template <ServiceType S>
auto run_callback(iox2_waitset_attachment_id_h attachment_id, void* context) {
auto* fn_call = internal::ctx_cast<iox::function<void(WaitSetAttachmentId<S>)>>(context);
fn_call->value()(WaitSetAttachmentId<S>(attachment_id));
auto run_callback(iox2_waitset_attachment_id_h attachment_id, void* context) -> iox2_callback_progression_e {
auto* fn_call = internal::ctx_cast<iox::function<CallbackProgression(WaitSetAttachmentId<S>)>>(context);
return iox::into<iox2_callback_progression_e>(fn_call->value()(WaitSetAttachmentId<S>(attachment_id)));
}

template <ServiceType S>
auto WaitSet<S>::wait_and_process(const iox::function<void(WaitSetAttachmentId<S>)>& fn_call)
auto WaitSet<S>::wait_and_process(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError> {
iox2_waitset_run_result_e run_result = iox2_waitset_run_result_e_STOP_REQUEST;
auto ctx = internal::ctx(fn_call);
Expand All @@ -310,13 +305,14 @@ auto WaitSet<S>::wait_and_process(const iox::function<void(WaitSetAttachmentId<S
}

template <ServiceType S>
auto WaitSet<S>::try_wait_and_process(const iox::function<void(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<void, WaitSetRunError> {
auto WaitSet<S>::wait_and_process_once(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError> {
iox2_waitset_run_result_e run_result = iox2_waitset_run_result_e_STOP_REQUEST;
auto ctx = internal::ctx(fn_call);
auto result = iox2_waitset_try_wait_and_process(&m_handle, run_callback<S>, static_cast<void*>(&ctx));
auto result = iox2_waitset_wait_and_process_once(&m_handle, run_callback<S>, static_cast<void*>(&ctx), &run_result);

if (result == IOX2_OK) {
return iox::ok();
return iox::ok(iox::into<WaitSetRunResult>(static_cast<int>(run_result)));
}

return iox::err(iox::into<WaitSetRunError>(result));
Expand Down
Loading

0 comments on commit 5b8e6ee

Please sign in to comment.