Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#507] waitset adjustments #509

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/c/event_multiplexing/src/wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct CallbackContext {


// the function that is called when a listener has received an event
void on_event(iox2_waitset_attachment_id_h attachment_id, void* context) {
iox2_callback_progression_e on_event(iox2_waitset_attachment_id_h attachment_id, void* context) {
struct CallbackContext* ctx = (struct CallbackContext*) context;

iox2_event_id_t event_id;
Expand Down Expand Up @@ -65,6 +65,7 @@ void on_event(iox2_waitset_attachment_id_h attachment_id, void* context) {
}

iox2_waitset_attachment_id_drop(attachment_id);
return iox2_callback_progression_e_CONTINUE;
}

//NOLINTBEGIN(readability-function-size)
Expand Down
2 changes: 2 additions & 0 deletions examples/cxx/event_multiplexing/src/wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ auto main(int argc, char** argv) -> int {
listener.try_wait_all([](auto event_id) { std::cout << " " << event_id; }).expect("");
std::cout << std::endl;
}

return iox2::CallbackProgression::Continue;
};

std::cout << "Waiting on the following services: " << service_name_1.to_string().c_str() << ", "
Expand Down
2 changes: 2 additions & 0 deletions examples/rust/event_multiplexing/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

println!("");
}

CallbackProgression::Continue
};

// loops until the user has pressed CTRL+c, the application has received a SIGTERM or SIGINT
Expand Down
16 changes: 8 additions & 8 deletions iceoryx2-bb/posix/src/deadline_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
//! // contains all the deadlines where the deadline was hit
//! let mut missed_deadlines = vec![];
//! deadline_queue
//! .missed_deadlines(|deadline_queue_index| missed_deadlines.push(deadline_queue_index));
//! .missed_deadlines(|deadline_queue_index| {
//! missed_deadlines.push(deadline_queue_index);
//! CallbackProgression::Continue
//! });
//! ```

use std::{cell::RefCell, fmt::Debug, sync::atomic::Ordering, time::Duration};
pub use iceoryx2_bb_elementary::CallbackProgression;

use iceoryx2_bb_elementary::CallbackProgression;
use iceoryx2_bb_log::fail;
use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicU64;
use std::{cell::RefCell, fmt::Debug, sync::atomic::Ordering, time::Duration};

use crate::{
clock::ClockType,
Expand Down Expand Up @@ -268,18 +271,15 @@ impl DeadlineQueue {

/// Iterates over all missed deadlines and calls the provided callback for each of them
/// and provide the [`DeadlineQueueIndex`] to identify them.
pub fn missed_deadlines<F: FnMut(DeadlineQueueIndex)>(
pub fn missed_deadlines<F: FnMut(DeadlineQueueIndex) -> CallbackProgression>(
&self,
mut call: F,
) -> Result<(), TimeError> {
let now = fail!(from self, when Time::now_with_clock(self.clock_type),
"Unable to return next duration since the current time could not be acquired.");

let now = now.as_duration().as_nanos();
self.handle_missed_deadlines(now, |idx| {
call(idx);
CallbackProgression::Continue
});
self.handle_missed_deadlines(now, |idx| -> CallbackProgression { call(idx) });

Ok(())
}
Expand Down
45 changes: 38 additions & 7 deletions iceoryx2-bb/posix/tests/deadline_queue_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// SPDX-License-Identifier: Apache-2.0 OR MIT

mod deadline_queue {
use iceoryx2_bb_elementary::CallbackProgression;
use iceoryx2_bb_posix::deadline_queue::*;
use iceoryx2_bb_testing::assert_that;
use std::time::Duration;
Expand Down Expand Up @@ -85,8 +86,11 @@ mod deadline_queue {
.unwrap();

let mut missed_deadline_queues = vec![];
sut.missed_deadlines(|idx| missed_deadline_queues.push(idx))
.unwrap();
sut.missed_deadlines(|idx| {
missed_deadline_queues.push(idx);
CallbackProgression::Continue
})
.unwrap();

assert_that!(missed_deadline_queues, len 0);
}
Expand All @@ -104,8 +108,11 @@ mod deadline_queue {
std::thread::sleep(Duration::from_millis(10));

let mut missed_deadlines = vec![];
sut.missed_deadlines(|idx| missed_deadlines.push(idx))
.unwrap();
sut.missed_deadlines(|idx| {
missed_deadlines.push(idx);
CallbackProgression::Continue
})
.unwrap();

assert_that!(missed_deadlines, len 1);
assert_that!(missed_deadlines, contains _guard_1.index());
Expand All @@ -122,15 +129,38 @@ mod deadline_queue {
std::thread::sleep(Duration::from_millis(10));

let mut missed_deadlines = vec![];
sut.missed_deadlines(|idx| missed_deadlines.push(idx))
.unwrap();
sut.missed_deadlines(|idx| {
missed_deadlines.push(idx);
CallbackProgression::Continue
})
.unwrap();

assert_that!(missed_deadlines, len 3);
assert_that!(missed_deadlines, contains guard_1.index());
assert_that!(missed_deadlines, contains guard_2.index());
assert_that!(missed_deadlines, contains guard_3.index());
}

#[test]
fn missed_deadline_iteration_stops_when_requested() {
let sut = DeadlineQueueBuilder::new().create().unwrap();

let _guard_1 = sut.add_deadline_interval(Duration::from_nanos(1)).unwrap();
let _guard_2 = sut.add_deadline_interval(Duration::from_nanos(10)).unwrap();
let _guard_3 = sut.add_deadline_interval(Duration::from_nanos(20)).unwrap();

std::thread::sleep(Duration::from_millis(10));

let mut missed_deadlines = vec![];
sut.missed_deadlines(|idx| {
missed_deadlines.push(idx);
CallbackProgression::Stop
})
.unwrap();

assert_that!(missed_deadlines, len 1);
}

#[test]
fn duration_until_next_deadline_is_zero_if_deadline_is_already_missed() {
let sut = DeadlineQueueBuilder::new().create().unwrap();
Expand All @@ -149,7 +179,8 @@ mod deadline_queue {
let mut deadline_idx = None;
sut.missed_deadlines(|idx| {
missed_deadline_counter += 1;
deadline_idx = Some(idx)
deadline_idx = Some(idx);
CallbackProgression::Continue
})
.unwrap();

Expand Down
2 changes: 2 additions & 0 deletions iceoryx2-ffi/cxx/include/iox2/enum_translation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,8 @@ constexpr auto from<int, iox2::WaitSetRunResult>(const int value) noexcept -> io
return iox2::WaitSetRunResult::TerminationRequest;
case iox2_waitset_run_result_e_STOP_REQUEST:
return iox2::WaitSetRunResult::StopRequest;
case iox2_waitset_run_result_e_ALL_EVENTS_HANDLED:
return iox2::WaitSetRunResult::AllEventsHandled;
}

IOX_UNREACHABLE();
Expand Down
51 changes: 34 additions & 17 deletions iceoryx2-ffi/cxx/include/iox2/waitset.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "iox/duration.hpp"
#include "iox/expected.hpp"
#include "iox2/callback_progression.hpp"
#include "iox2/file_descriptor.hpp"
#include "iox2/internal/iceoryx2.hpp"
#include "iox2/listener.hpp"
Expand Down Expand Up @@ -75,7 +76,7 @@ class WaitSetAttachmentId {
private:
explicit WaitSetAttachmentId(iox2_waitset_attachment_id_h handle);
template <ServiceType>
friend auto run_callback(iox2_waitset_attachment_id_h, void*);
friend auto run_callback(iox2_waitset_attachment_id_h, void*) -> iox2_callback_progression_e;
template <ServiceType ST>
friend auto operator==(const WaitSetAttachmentId<ST>&, const WaitSetAttachmentId<ST>&) -> bool;
template <ServiceType ST>
Expand All @@ -95,7 +96,8 @@ auto operator<(const WaitSetAttachmentId<S>& lhs, const WaitSetAttachmentId<S>&
/// The [`WaitSet`] implements a reactor pattern and allows to wait on multiple events in one
/// single call [`WaitSet::try_wait_and_process()`] until it wakes up or to run repeatedly with
/// [`WaitSet::wait_and_process()`] until the a interrupt or termination signal was received or the user
/// has explicitly requested to stop with [`WaitSet::stop()`].
/// has explicitly requested to stop by returning [`CallbackProgression::Stop`] in the provided
/// callback.
///
/// The [`Listener`] can be attached as well as sockets or anything else that
/// can be packed into a [`FileDescriptorView`].
Expand All @@ -110,24 +112,39 @@ class WaitSet {
auto operator=(WaitSet&&) noexcept -> WaitSet&;
~WaitSet();

/// Can be called from within a callback during [`WaitSet::wait_and_process()`] to signal the [`WaitSet`]
/// to stop running after this iteration.
void stop();

/// Waits in an infinite loop on the [`WaitSet`]. The provided callback is called for every
/// attachment that was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to
/// acquire the source.
/// Waits until an event arrives on the [`WaitSet`], then collects all events by calling the
/// provided `fn_call` callback with the corresponding [`WaitSetAttachmentId`]. In contrast
/// to [`WaitSet::wait_and_process_once()`] it will never return until the user explicitly
/// requests it by returning [`CallbackProgression::Stop`] or by receiving a signal.
///
/// The provided callback must return [`CallbackProgression::Continue`] to continue the event
/// processing and handle the next event or [`CallbackProgression::Stop`] to return from this
/// call immediately. All unhandled events will be lost forever and the call will return
/// [`WaitSetRunResult::StopRequest`].
///
/// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit
/// the loop and inform the user via [`WaitSetRunResult`].
auto wait_and_process(const iox::function<void(WaitSetAttachmentId<S>)>& fn_call)
/// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or
/// [`WaitSetRunResult::TerminationRequest`].
auto wait_and_process(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError>;

/// Tries to wait on the [`WaitSet`]. The provided callback is called for every attachment that
/// was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to acquire the
/// source.
/// If nothing was triggered the [`WaitSet`] returns immediately.
auto try_wait_and_process(const iox::function<void(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<void, WaitSetRunError>;
/// Waits until an event arrives on the [`WaitSet`], then collects all events by calling the
/// provided `fn_call` callback with the corresponding [`WaitSetAttachmentId`] and then
/// returns. This makes it ideal to be called in some kind of event-loop.
///
/// The provided callback must return [`CallbackProgression::Continue`] to continue the event
/// processing and handle the next event or [`CallbackProgression::Stop`] to return from this
/// call immediately. All unhandled events will be lost forever and the call will return
/// [`WaitSetRunResult::StopRequest`].
///
/// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit
/// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or
/// [`WaitSetRunResult::TerminationRequest`].
///
/// When no signal was received and all events were handled, it will return
/// [`WaitSetRunResult::AllEventsHandled`].
auto wait_and_process_once(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError>;

/// Returns the capacity of the [`WaitSet`]
auto capacity() const -> uint64_t;
Expand Down
6 changes: 4 additions & 2 deletions iceoryx2-ffi/cxx/include/iox2/waitset_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ enum class WaitSetRunResult : uint8_t {
TerminationRequest,
/// An interrupt signal `SIGINT` was received.
Interrupt,
/// The user explicitly called [`WaitSet::stop()`].
StopRequest
/// The users callback returned [`CallbackProgression::Stop`].
StopRequest,
/// All events were handled.
AllEventsHandled
};

/// Defines the failures that can occur when attaching something with
Expand Down
22 changes: 9 additions & 13 deletions iceoryx2-ffi/cxx/src/waitset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ auto WaitSet<S>::is_empty() const -> bool {
return iox2_waitset_is_empty(&m_handle);
}

template <ServiceType S>
void WaitSet<S>::stop() {
iox2_waitset_stop(&m_handle);
}

template <ServiceType S>
auto WaitSet<S>::attach_interval(const iox::units::Duration deadline)
-> iox::expected<WaitSetGuard<S>, WaitSetAttachmentError> {
Expand Down Expand Up @@ -274,13 +269,13 @@ auto WaitSet<S>::attach_notification(const Listener<S>& listener)
}

template <ServiceType S>
auto run_callback(iox2_waitset_attachment_id_h attachment_id, void* context) {
auto* fn_call = internal::ctx_cast<iox::function<void(WaitSetAttachmentId<S>)>>(context);
fn_call->value()(WaitSetAttachmentId<S>(attachment_id));
auto run_callback(iox2_waitset_attachment_id_h attachment_id, void* context) -> iox2_callback_progression_e {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious - why is the C enum used? I guess it's because this is called directly by the C API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@orecham this is just the internal implementation. I need a C style callback in the C++ API that has the C++ iox::function stored in context and can be used as a callback in the C API.

auto* fn_call = internal::ctx_cast<iox::function<CallbackProgression(WaitSetAttachmentId<S>)>>(context);
return iox::into<iox2_callback_progression_e>(fn_call->value()(WaitSetAttachmentId<S>(attachment_id)));
}

template <ServiceType S>
auto WaitSet<S>::wait_and_process(const iox::function<void(WaitSetAttachmentId<S>)>& fn_call)
auto WaitSet<S>::wait_and_process(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError> {
iox2_waitset_run_result_e run_result = iox2_waitset_run_result_e_STOP_REQUEST;
auto ctx = internal::ctx(fn_call);
Expand All @@ -294,13 +289,14 @@ auto WaitSet<S>::wait_and_process(const iox::function<void(WaitSetAttachmentId<S
}

template <ServiceType S>
auto WaitSet<S>::try_wait_and_process(const iox::function<void(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<void, WaitSetRunError> {
auto WaitSet<S>::wait_and_process_once(const iox::function<CallbackProgression(WaitSetAttachmentId<S>)>& fn_call)
-> iox::expected<WaitSetRunResult, WaitSetRunError> {
iox2_waitset_run_result_e run_result = iox2_waitset_run_result_e_STOP_REQUEST;
auto ctx = internal::ctx(fn_call);
auto result = iox2_waitset_try_wait_and_process(&m_handle, run_callback<S>, static_cast<void*>(&ctx));
auto result = iox2_waitset_wait_and_process_once(&m_handle, run_callback<S>, static_cast<void*>(&ctx), &run_result);

if (result == IOX2_OK) {
return iox::ok();
return iox::ok(iox::into<WaitSetRunResult>(static_cast<int>(run_result)));
}

return iox::err(iox::into<WaitSetRunError>(result));
Expand Down
Loading