From 88af47740b747bba7064174d06328afab5feb291 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Sat, 9 Nov 2024 13:38:41 +0100 Subject: [PATCH 1/5] [#507] Introduce CallbackProgression in all wait_and_process calls; rename try_wait_and_process into wait_and_process_once --- iceoryx2-bb/posix/src/deadline_queue.rs | 7 +- .../posix/tests/deadline_queue_tests.rs | 45 +++++++-- iceoryx2/src/port/waitset.rs | 92 +++++++++---------- iceoryx2/tests/waitset_tests.rs | 53 +++++++++-- 4 files changed, 126 insertions(+), 71 deletions(-) diff --git a/iceoryx2-bb/posix/src/deadline_queue.rs b/iceoryx2-bb/posix/src/deadline_queue.rs index 75c947b7a..c7e1ea5bb 100644 --- a/iceoryx2-bb/posix/src/deadline_queue.rs +++ b/iceoryx2-bb/posix/src/deadline_queue.rs @@ -268,7 +268,7 @@ impl DeadlineQueue { /// Iterates over all missed deadlines and calls the provided callback for each of them /// and provide the [`DeadlineQueueIndex`] to identify them. - pub fn missed_deadlines( + pub fn missed_deadlines CallbackProgression>( &self, mut call: F, ) -> Result<(), TimeError> { @@ -276,10 +276,7 @@ impl DeadlineQueue { "Unable to return next duration since the current time could not be acquired."); let now = now.as_duration().as_nanos(); - self.handle_missed_deadlines(now, |idx| { - call(idx); - CallbackProgression::Continue - }); + self.handle_missed_deadlines(now, |idx| -> CallbackProgression { call(idx) }); Ok(()) } diff --git a/iceoryx2-bb/posix/tests/deadline_queue_tests.rs b/iceoryx2-bb/posix/tests/deadline_queue_tests.rs index d0b0bb636..529f23198 100644 --- a/iceoryx2-bb/posix/tests/deadline_queue_tests.rs +++ b/iceoryx2-bb/posix/tests/deadline_queue_tests.rs @@ -11,6 +11,7 @@ // SPDX-License-Identifier: Apache-2.0 OR MIT mod deadline_queue { + use iceoryx2_bb_elementary::CallbackProgression; use iceoryx2_bb_posix::deadline_queue::*; use iceoryx2_bb_testing::assert_that; use std::time::Duration; @@ -85,8 +86,11 @@ mod deadline_queue { .unwrap(); let mut missed_deadline_queues = vec![]; - sut.missed_deadlines(|idx| missed_deadline_queues.push(idx)) - .unwrap(); + sut.missed_deadlines(|idx| { + missed_deadline_queues.push(idx); + CallbackProgression::Continue + }) + .unwrap(); assert_that!(missed_deadline_queues, len 0); } @@ -104,8 +108,11 @@ mod deadline_queue { std::thread::sleep(Duration::from_millis(10)); let mut missed_deadlines = vec![]; - sut.missed_deadlines(|idx| missed_deadlines.push(idx)) - .unwrap(); + sut.missed_deadlines(|idx| { + missed_deadlines.push(idx); + CallbackProgression::Continue + }) + .unwrap(); assert_that!(missed_deadlines, len 1); assert_that!(missed_deadlines, contains _guard_1.index()); @@ -122,8 +129,11 @@ mod deadline_queue { std::thread::sleep(Duration::from_millis(10)); let mut missed_deadlines = vec![]; - sut.missed_deadlines(|idx| missed_deadlines.push(idx)) - .unwrap(); + sut.missed_deadlines(|idx| { + missed_deadlines.push(idx); + CallbackProgression::Continue + }) + .unwrap(); assert_that!(missed_deadlines, len 3); assert_that!(missed_deadlines, contains guard_1.index()); @@ -131,6 +141,26 @@ mod deadline_queue { assert_that!(missed_deadlines, contains guard_3.index()); } + #[test] + fn missed_deadline_iteration_stops_when_requested() { + let sut = DeadlineQueueBuilder::new().create().unwrap(); + + let _guard_1 = sut.add_deadline_interval(Duration::from_nanos(1)).unwrap(); + let _guard_2 = sut.add_deadline_interval(Duration::from_nanos(10)).unwrap(); + let _guard_3 = sut.add_deadline_interval(Duration::from_nanos(20)).unwrap(); + + std::thread::sleep(Duration::from_millis(10)); + + let mut missed_deadlines = vec![]; + sut.missed_deadlines(|idx| { + missed_deadlines.push(idx); + CallbackProgression::Stop + }) + .unwrap(); + + assert_that!(missed_deadlines, len 1); + } + #[test] fn duration_until_next_deadline_is_zero_if_deadline_is_already_missed() { let sut = DeadlineQueueBuilder::new().create().unwrap(); @@ -149,7 +179,8 @@ mod deadline_queue { let mut deadline_idx = None; sut.missed_deadlines(|idx| { missed_deadline_counter += 1; - deadline_idx = Some(idx) + deadline_idx = Some(idx); + CallbackProgression::Continue }) .unwrap(); diff --git a/iceoryx2/src/port/waitset.rs b/iceoryx2/src/port/waitset.rs index 74f0a1e97..679675b34 100644 --- a/iceoryx2/src/port/waitset.rs +++ b/iceoryx2/src/port/waitset.rs @@ -187,6 +187,7 @@ use std::{ sync::atomic::Ordering, time::Duration, }; +use iceoryx2_bb_elementary::CallbackProgression; use iceoryx2_bb_log::fail; use iceoryx2_bb_posix::{ deadline_queue::{DeadlineQueue, DeadlineQueueBuilder, DeadlineQueueGuard, DeadlineQueueIndex}, @@ -195,7 +196,7 @@ use iceoryx2_bb_posix::{ signal::SignalHandler, }; use iceoryx2_cal::reactor::*; -use iceoryx2_pal_concurrency_sync::iox_atomic::{IoxAtomicBool, IoxAtomicUsize}; +use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicUsize; /// States why the [`WaitSet::wait_and_process()`] method returned. #[derive(Debug, PartialEq, Eq, Copy, Clone)] @@ -206,6 +207,8 @@ pub enum WaitSetRunResult { Interrupt, /// The user explicitly called [`WaitSet::stop()`]. StopRequest, + /// All events were handled. + AllEventsHandled, } /// Defines the failures that can occur when attaching something with @@ -237,10 +240,6 @@ pub enum WaitSetRunError { InternalError, /// Waiting on an empty [`WaitSet`] would lead to a deadlock therefore it causes an error. NoAttachments, - /// A termination signal `SIGTERM` was received. - TerminationRequest, - /// An interrupt signal `SIGINT` was received. - Interrupt, } impl std::fmt::Display for WaitSetRunError { @@ -456,7 +455,6 @@ impl WaitSetBuilder { attachment_to_deadline: RefCell::new(HashMap::new()), deadline_to_attachment: RefCell::new(HashMap::new()), attachment_counter: IoxAtomicUsize::new(0), - keep_running: IoxAtomicBool::new(true), }), Err(ReactorCreateError::UnknownError(e)) => { fail!(from self, with WaitSetCreateError::InternalError, @@ -483,7 +481,6 @@ pub struct WaitSet { attachment_to_deadline: RefCell>, deadline_to_attachment: RefCell>, attachment_counter: IoxAtomicUsize, - keep_running: IoxAtomicBool, } impl WaitSet { @@ -526,18 +523,25 @@ impl WaitSet { } } - fn handle_deadlines)>( + fn handle_deadlines) -> CallbackProgression>( &self, fn_call: &mut F, error_msg: &str, - ) -> Result<(), WaitSetRunError> { + ) -> Result { let deadline_to_attachment = self.deadline_to_attachment.borrow(); - let call = |idx: DeadlineQueueIndex| { - if let Some(reactor_idx) = deadline_to_attachment.get(&idx) { - fn_call(WaitSetAttachmentId::deadline(self, *reactor_idx, idx)); + let mut result = WaitSetRunResult::AllEventsHandled; + let call = |idx: DeadlineQueueIndex| -> CallbackProgression { + let progression = if let Some(reactor_idx) = deadline_to_attachment.get(&idx) { + fn_call(WaitSetAttachmentId::deadline(self, *reactor_idx, idx)) } else { - fn_call(WaitSetAttachmentId::tick(self, idx)); + fn_call(WaitSetAttachmentId::tick(self, idx)) + }; + + if let CallbackProgression::Stop = progression { + result = WaitSetRunResult::StopRequest; } + + progression }; fail!(from self, @@ -545,15 +549,15 @@ impl WaitSet { with WaitSetRunError::InternalError, "{error_msg} since the missed deadlines could not be acquired."); - Ok(()) + Ok(result) } - fn handle_all_attachments)>( + fn handle_all_attachments) -> CallbackProgression>( &self, triggered_file_descriptors: &Vec, fn_call: &mut F, error_msg: &str, - ) -> Result<(), WaitSetRunError> { + ) -> Result { // we need to reset the deadlines first, otherwise a long fn_call may extend the // deadline unintentionally let mut fd_and_deadline_queue_idx = Vec::with_capacity(triggered_file_descriptors.len()); @@ -564,13 +568,20 @@ impl WaitSet { // must be called after the deadlines have been reset, in the case that the // event has been received shortly before the deadline ended. - self.handle_deadlines(fn_call, error_msg)?; + + match self.handle_deadlines(fn_call, error_msg)? { + WaitSetRunResult::AllEventsHandled => (), + v => return Ok(v), + }; for fd in triggered_file_descriptors { - fn_call(WaitSetAttachmentId::notification(self, *fd)); + if let CallbackProgression::Stop = fn_call(WaitSetAttachmentId::notification(self, *fd)) + { + return Ok(WaitSetRunResult::StopRequest); + } } - Ok(()) + Ok(WaitSetRunResult::AllEventsHandled) } /// Attaches an object as notification to the [`WaitSet`]. Whenever an event is received on the @@ -635,51 +646,39 @@ impl WaitSet { }) } - /// Can be called from within a callback during [`WaitSet::wait_and_process()`] to signal the [`WaitSet`] - /// to stop running after this iteration. - pub fn stop(&self) { - self.keep_running.store(false, Ordering::Relaxed); - } - /// Waits in an infinite loop on the [`WaitSet`]. The provided callback is called for every /// attachment that was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to /// acquire the source. /// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit /// the loop and inform the user via [`WaitSetRunResult`]. - pub fn wait_and_process)>( + pub fn wait_and_process) -> CallbackProgression>( &self, mut fn_call: F, ) -> Result { - while self.keep_running.load(Ordering::Relaxed) { - match self.try_wait_and_process(&mut fn_call) { - Ok(()) => (), - Err(WaitSetRunError::TerminationRequest) => { - return Ok(WaitSetRunResult::TerminationRequest) - } - Err(WaitSetRunError::Interrupt) => return Ok(WaitSetRunResult::Interrupt), + loop { + match self.wait_and_process_once(&mut fn_call) { + Ok(WaitSetRunResult::AllEventsHandled) => (), + Ok(v) => return Ok(v), Err(e) => { fail!(from self, with e, "Unable to run in WaitSet::wait_and_process() loop since ({:?}) has occurred.", e); } } } - - Ok(WaitSetRunResult::StopRequest) } /// Tries to wait on the [`WaitSet`]. The provided callback is called for every attachment that /// was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to acquire the /// source. /// If nothing was triggered the [`WaitSet`] returns immediately. - pub fn try_wait_and_process)>( + pub fn wait_and_process_once) -> CallbackProgression>( &self, mut fn_call: F, - ) -> Result<(), WaitSetRunError> { + ) -> Result { let msg = "Unable to call WaitSet::try_wait_and_process()"; if SignalHandler::termination_requested() { - fail!(from self, with WaitSetRunError::TerminationRequest, - "{msg} since a termination request was received."); + return Ok(WaitSetRunResult::TerminationRequest); } if self.is_empty() { @@ -708,18 +707,9 @@ impl WaitSet { }; match reactor_wait_result { - Ok(0) => { - self.handle_deadlines(&mut fn_call, msg)?; - Ok(()) - } - Ok(_) => { - self.handle_all_attachments(&triggered_file_descriptors, &mut fn_call, msg)?; - Ok(()) - } - Err(ReactorWaitError::Interrupt) => { - fail!(from self, with WaitSetRunError::Interrupt, - "{msg} since an interrupt signal was received."); - } + Ok(0) => self.handle_deadlines(&mut fn_call, msg), + Ok(_) => self.handle_all_attachments(&triggered_file_descriptors, &mut fn_call, msg), + Err(ReactorWaitError::Interrupt) => return Ok(WaitSetRunResult::Interrupt), Err(ReactorWaitError::InsufficientPermissions) => { fail!(from self, with WaitSetRunError::InsufficientPermissions, "{msg} due to insufficient permissions."); diff --git a/iceoryx2/tests/waitset_tests.rs b/iceoryx2/tests/waitset_tests.rs index 38f5f9193..4253863f5 100644 --- a/iceoryx2/tests/waitset_tests.rs +++ b/iceoryx2/tests/waitset_tests.rs @@ -86,7 +86,7 @@ mod waitset { #[test] fn calling_run_on_empty_waitset_fails() { let sut = WaitSetBuilder::new().create::().unwrap(); - let result = sut.try_wait_and_process(|_| {}); + let result = sut.wait_and_process_once(|_| CallbackProgression::Continue); assert_that!(result.err(), eq Some(WaitSetRunError::NoAttachments)); } @@ -202,7 +202,7 @@ mod waitset { let mut receiver_1_triggered = false; let mut receiver_2_triggered = false; - sut.try_wait_and_process(|attachment_id| { + sut.wait_and_process_once(|attachment_id| { if attachment_id.has_event_from(&listener_1_guard) { listener_1_triggered = true; } else if attachment_id.has_event_from(&listener_2_guard) { @@ -214,6 +214,8 @@ mod waitset { } else { test_fail!("only attachments shall trigger"); } + + CallbackProgression::Continue }) .unwrap(); @@ -237,10 +239,11 @@ mod waitset { let mut callback_called = false; let start = Instant::now(); - sut.try_wait_and_process(|id| { + sut.wait_and_process_once(|id| { callback_called = true; assert_that!(id.has_event_from(&tick_guard), eq true); assert_that!(id.has_missed_deadline(&tick_guard), eq false); + CallbackProgression::Continue }) .unwrap(); @@ -262,8 +265,9 @@ mod waitset { let guard = sut.attach_deadline(&listener, TIMEOUT).unwrap(); let start = Instant::now(); - sut.try_wait_and_process(|id| { + sut.wait_and_process_once(|id| { assert_that!(id.has_missed_deadline(&guard), eq true); + CallbackProgression::Continue }) .unwrap(); @@ -303,7 +307,7 @@ mod waitset { let mut receiver_1_triggered = false; let mut receiver_2_triggered = false; - sut.try_wait_and_process(|attachment_id| { + sut.wait_and_process_once(|attachment_id| { if attachment_id.has_event_from(&listener_1_guard) { listener_1_triggered = true; } else if attachment_id.has_missed_deadline(&listener_2_guard) { @@ -315,6 +319,8 @@ mod waitset { } else { test_fail!("only attachments shall trigger"); } + + CallbackProgression::Continue }) .unwrap(); @@ -343,7 +349,7 @@ mod waitset { let mut tick_3_triggered = false; let mut tick_4_triggered = false; - sut.try_wait_and_process(|attachment_id| { + sut.wait_and_process_once(|attachment_id| { if attachment_id.has_event_from(&tick_1_guard) { tick_1_triggered = true; } else if attachment_id.has_event_from(&tick_2_guard) { @@ -355,6 +361,8 @@ mod waitset { } else { test_fail!("only attachments shall trigger"); } + + CallbackProgression::Continue }) .unwrap(); @@ -364,6 +372,31 @@ mod waitset { assert_that!(tick_4_triggered, eq false); } + #[test] + fn wait_and_process_stops_when_requested() + where + ::Listener: SynchronousMultiplexing, + { + let sut = WaitSetBuilder::new().create::().unwrap(); + + let _tick_1_guard = sut.attach_interval(Duration::from_nanos(1)).unwrap(); + let _tick_2_guard = sut.attach_interval(Duration::from_nanos(1)).unwrap(); + let _tick_3_guard = sut.attach_interval(TIMEOUT * 1000).unwrap(); + let _tick_4_guard = sut.attach_interval(TIMEOUT * 1000).unwrap(); + + std::thread::sleep(TIMEOUT); + + let mut counter = 0; + + sut.wait_and_process(|_| { + counter += 1; + CallbackProgression::Stop + }) + .unwrap(); + + assert_that!(counter, eq 1); + } + #[test] fn run_lists_mixed() where @@ -401,7 +434,7 @@ mod waitset { let mut deadline_1_missed = false; let mut deadline_2_missed = false; - sut.try_wait_and_process(|attachment_id| { + sut.wait_and_process_once(|attachment_id| { if attachment_id.has_event_from(&tick_1_guard) { tick_1_triggered = true; } else if attachment_id.has_event_from(&tick_2_guard) { @@ -421,6 +454,8 @@ mod waitset { } else { test_fail!("only attachments shall trigger"); } + + CallbackProgression::Continue }) .unwrap(); @@ -454,7 +489,7 @@ mod waitset { let mut missed_deadline = false; let mut received_event = false; - sut.try_wait_and_process(|attachment_id| { + sut.wait_and_process_once(|attachment_id| { if attachment_id.has_event_from(&deadline_1_guard) { received_event = true; } else if attachment_id.has_missed_deadline(&deadline_1_guard) { @@ -462,6 +497,8 @@ mod waitset { } else { test_fail!("only attachments shall trigger"); } + + CallbackProgression::Continue }) .unwrap(); From 528b2d42203a43e8b77ce2606de1b7bedd5ff17b Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Sat, 9 Nov 2024 13:54:51 +0100 Subject: [PATCH 2/5] [#507] Adjust FFI binding --- examples/rust/event_multiplexing/wait.rs | 2 + iceoryx2-ffi/ffi/src/api/waitset.rs | 52 ++++++++---------------- 2 files changed, 19 insertions(+), 35 deletions(-) diff --git a/examples/rust/event_multiplexing/wait.rs b/examples/rust/event_multiplexing/wait.rs index 65276aaa0..2506123bf 100644 --- a/examples/rust/event_multiplexing/wait.rs +++ b/examples/rust/event_multiplexing/wait.rs @@ -69,6 +69,8 @@ fn main() -> Result<(), Box> { println!(""); } + + CallbackProgression::Continue }; // loops until the user has pressed CTRL+c, the application has received a SIGTERM or SIGINT diff --git a/iceoryx2-ffi/ffi/src/api/waitset.rs b/iceoryx2-ffi/ffi/src/api/waitset.rs index eb97309b6..c2ec94edf 100644 --- a/iceoryx2-ffi/ffi/src/api/waitset.rs +++ b/iceoryx2-ffi/ffi/src/api/waitset.rs @@ -15,9 +15,9 @@ use std::{ffi::c_int, mem::ManuallyDrop, time::Duration}; use crate::{ - c_size_t, iox2_callback_context, iox2_file_descriptor_ptr, iox2_service_type_e, - iox2_waitset_attachment_id_h, iox2_waitset_attachment_id_t, iox2_waitset_guard_h, - iox2_waitset_guard_t, AttachmentIdUnion, GuardUnion, IOX2_OK, + c_size_t, iox2_callback_context, iox2_callback_progression_e, iox2_file_descriptor_ptr, + iox2_service_type_e, iox2_waitset_attachment_id_h, iox2_waitset_attachment_id_t, + iox2_waitset_guard_h, iox2_waitset_guard_t, AttachmentIdUnion, GuardUnion, IOX2_OK, }; use super::{AssertNonNullHandle, HandleToType, IntoCInt}; @@ -50,8 +50,6 @@ impl IntoCInt for WaitSetRunError { } WaitSetRunError::InternalError => iox2_waitset_run_error_e::INTERNAL_ERROR, WaitSetRunError::NoAttachments => iox2_waitset_run_error_e::NO_ATTACHMENTS, - WaitSetRunError::TerminationRequest => iox2_waitset_run_error_e::TERMINATION_REQUEST, - WaitSetRunError::Interrupt => iox2_waitset_run_error_e::INTERRUPT, }) as c_int } } @@ -62,6 +60,7 @@ pub enum iox2_waitset_run_result_e { TERMINATION_REQUEST = IOX2_OK as isize + 1, INTERRUPT, STOP_REQUEST, + ALL_EVENTS_HANDLED, } impl IntoCInt for WaitSetRunResult { @@ -76,6 +75,7 @@ impl From for iox2_waitset_run_result_e { WaitSetRunResult::TerminationRequest => iox2_waitset_run_result_e::TERMINATION_REQUEST, WaitSetRunResult::Interrupt => iox2_waitset_run_result_e::INTERRUPT, WaitSetRunResult::StopRequest => iox2_waitset_run_result_e::STOP_REQUEST, + WaitSetRunResult::AllEventsHandled => iox2_waitset_run_result_e::ALL_EVENTS_HANDLED, } } } @@ -201,8 +201,10 @@ impl HandleToType for iox2_waitset_h_ref { } } -pub type iox2_waitset_run_callback = - extern "C" fn(iox2_waitset_attachment_id_h, iox2_callback_context); +pub type iox2_waitset_run_callback = extern "C" fn( + iox2_waitset_attachment_id_h, + iox2_callback_context, +) -> iox2_callback_progression_e; // END type definition // BEGIN C API @@ -283,26 +285,6 @@ pub unsafe extern "C" fn iox2_waitset_capacity(handle: iox2_waitset_h_ref) -> c_ } } -/// Stops the current [`iox2_waitset_wait_and_process()`] operation. Any [`iox2_waitset_wait_and_process()`] -/// call after this call is not affected and the user needs to call -/// [`iox2_waitset_stop()`] again. -/// -/// # Safety -/// -/// * `handle` must be valid and acquired with -/// [`iox2_waitset_builder_create()`](crate::iox2_waitset_builder_create()) -#[no_mangle] -pub unsafe extern "C" fn iox2_waitset_stop(handle: iox2_waitset_h_ref) { - handle.assert_non_null(); - - let waitset = &mut *handle.as_type(); - - match waitset.service_type { - iox2_service_type_e::IPC => waitset.value.as_ref().ipc.stop(), - iox2_service_type_e::LOCAL => waitset.value.as_ref().local.stop(), - } -} - /// Attaches a provided [`iox2_file_descriptor_ptr`] as notification to the /// [`iox2_waitset_h`]. As soon as the attachment receives data, the WaitSet /// wakes up in [`iox2_waitset_wait_and_process()`] and informs the user. @@ -568,7 +550,7 @@ pub unsafe extern "C" fn iox2_waitset_attach_interval( /// * the provided [`iox2_waitset_attachment_id_h`] in the callback must be released via /// [`iox2_waitset_attachment_id_drop()`](crate::iox2_waitset_attachment_id_drop()) #[no_mangle] -pub unsafe extern "C" fn iox2_waitset_try_wait_and_process( +pub unsafe extern "C" fn iox2_waitset_wait_and_process_once( handle: iox2_waitset_h_ref, callback: iox2_waitset_run_callback, callback_ctx: iox2_callback_context, @@ -583,7 +565,7 @@ pub unsafe extern "C" fn iox2_waitset_try_wait_and_process( .value .as_ref() .ipc - .try_wait_and_process(|attachment_id| { + .wait_and_process_once(|attachment_id| { let attachment_id_ptr = iox2_waitset_attachment_id_t::alloc(); (*attachment_id_ptr).init( waitset.service_type, @@ -591,7 +573,7 @@ pub unsafe extern "C" fn iox2_waitset_try_wait_and_process( iox2_waitset_attachment_id_t::dealloc, ); let attachment_id_handle_ptr = (*attachment_id_ptr).as_handle(); - callback(attachment_id_handle_ptr, callback_ctx); + callback(attachment_id_handle_ptr, callback_ctx).into() }) } iox2_service_type_e::LOCAL => { @@ -599,7 +581,7 @@ pub unsafe extern "C" fn iox2_waitset_try_wait_and_process( .value .as_ref() .local - .try_wait_and_process(|attachment_id| { + .wait_and_process_once(|attachment_id| { let attachment_id_ptr = iox2_waitset_attachment_id_t::alloc(); (*attachment_id_ptr).init( waitset.service_type, @@ -607,13 +589,13 @@ pub unsafe extern "C" fn iox2_waitset_try_wait_and_process( iox2_waitset_attachment_id_t::dealloc, ); let attachment_id_handle_ptr = (*attachment_id_ptr).as_handle(); - callback(attachment_id_handle_ptr, callback_ctx); + callback(attachment_id_handle_ptr, callback_ctx).into() }) } }; match run_once_result { - Ok(()) => IOX2_OK, + Ok(v) => v.into_c_int(), Err(e) => e.into_c_int(), } } @@ -667,7 +649,7 @@ pub unsafe extern "C" fn iox2_waitset_wait_and_process( iox2_waitset_attachment_id_t::dealloc, ); let attachment_id_handle_ptr = (*attachment_id_ptr).as_handle(); - callback(attachment_id_handle_ptr, callback_ctx); + callback(attachment_id_handle_ptr, callback_ctx).into() }), iox2_service_type_e::LOCAL => { waitset @@ -682,7 +664,7 @@ pub unsafe extern "C" fn iox2_waitset_wait_and_process( iox2_waitset_attachment_id_t::dealloc, ); let attachment_id_handle_ptr = (*attachment_id_ptr).as_handle(); - callback(attachment_id_handle_ptr, callback_ctx); + callback(attachment_id_handle_ptr, callback_ctx).into() }) } }; From e1893c66563530266ee2a5c47c116be731519d85 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Sat, 9 Nov 2024 16:15:32 +0100 Subject: [PATCH 3/5] [#507] Adjust C++ API to new Rust API --- examples/c/event_multiplexing/src/wait.c | 3 +- examples/cxx/event_multiplexing/src/wait.cpp | 2 + .../cxx/include/iox2/enum_translation.hpp | 2 + iceoryx2-ffi/cxx/include/iox2/waitset.hpp | 13 ++--- .../cxx/include/iox2/waitset_enums.hpp | 4 +- iceoryx2-ffi/cxx/src/waitset.cpp | 22 +++---- iceoryx2-ffi/cxx/tests/src/waitset_tests.cpp | 57 ++++++++++--------- iceoryx2-ffi/ffi/src/api/waitset.rs | 7 ++- 8 files changed, 59 insertions(+), 51 deletions(-) diff --git a/examples/c/event_multiplexing/src/wait.c b/examples/c/event_multiplexing/src/wait.c index 492824209..4d4b5000a 100644 --- a/examples/c/event_multiplexing/src/wait.c +++ b/examples/c/event_multiplexing/src/wait.c @@ -27,7 +27,7 @@ struct CallbackContext { // the function that is called when a listener has received an event -void on_event(iox2_waitset_attachment_id_h attachment_id, void* context) { +iox2_callback_progression_e on_event(iox2_waitset_attachment_id_h attachment_id, void* context) { struct CallbackContext* ctx = (struct CallbackContext*) context; iox2_event_id_t event_id; @@ -65,6 +65,7 @@ void on_event(iox2_waitset_attachment_id_h attachment_id, void* context) { } iox2_waitset_attachment_id_drop(attachment_id); + return iox2_callback_progression_e_CONTINUE; } //NOLINTBEGIN(readability-function-size) diff --git a/examples/cxx/event_multiplexing/src/wait.cpp b/examples/cxx/event_multiplexing/src/wait.cpp index d8bd7eec8..00b661a2e 100644 --- a/examples/cxx/event_multiplexing/src/wait.cpp +++ b/examples/cxx/event_multiplexing/src/wait.cpp @@ -86,6 +86,8 @@ auto main(int argc, char** argv) -> int { listener.try_wait_all([](auto event_id) { std::cout << " " << event_id; }).expect(""); std::cout << std::endl; } + + return iox2::CallbackProgression::Continue; }; std::cout << "Waiting on the following services: " << service_name_1.to_string().c_str() << ", " diff --git a/iceoryx2-ffi/cxx/include/iox2/enum_translation.hpp b/iceoryx2-ffi/cxx/include/iox2/enum_translation.hpp index 0d2ea2d3b..22b3c6397 100644 --- a/iceoryx2-ffi/cxx/include/iox2/enum_translation.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/enum_translation.hpp @@ -704,6 +704,8 @@ constexpr auto from(const int value) noexcept -> io return iox2::WaitSetRunResult::TerminationRequest; case iox2_waitset_run_result_e_STOP_REQUEST: return iox2::WaitSetRunResult::StopRequest; + case iox2_waitset_run_result_e_ALL_EVENTS_HANDLED: + return iox2::WaitSetRunResult::AllEventsHandled; } IOX_UNREACHABLE(); diff --git a/iceoryx2-ffi/cxx/include/iox2/waitset.hpp b/iceoryx2-ffi/cxx/include/iox2/waitset.hpp index 2addf55b1..e75e2ce81 100644 --- a/iceoryx2-ffi/cxx/include/iox2/waitset.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/waitset.hpp @@ -15,6 +15,7 @@ #include "iox/duration.hpp" #include "iox/expected.hpp" +#include "iox2/callback_progression.hpp" #include "iox2/file_descriptor.hpp" #include "iox2/internal/iceoryx2.hpp" #include "iox2/listener.hpp" @@ -75,7 +76,7 @@ class WaitSetAttachmentId { private: explicit WaitSetAttachmentId(iox2_waitset_attachment_id_h handle); template - friend auto run_callback(iox2_waitset_attachment_id_h, void*); + friend auto run_callback(iox2_waitset_attachment_id_h, void*) -> iox2_callback_progression_e; template friend auto operator==(const WaitSetAttachmentId&, const WaitSetAttachmentId&) -> bool; template @@ -110,24 +111,20 @@ class WaitSet { auto operator=(WaitSet&&) noexcept -> WaitSet&; ~WaitSet(); - /// Can be called from within a callback during [`WaitSet::wait_and_process()`] to signal the [`WaitSet`] - /// to stop running after this iteration. - void stop(); - /// Waits in an infinite loop on the [`WaitSet`]. The provided callback is called for every /// attachment that was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to /// acquire the source. /// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit /// the loop and inform the user via [`WaitSetRunResult`]. - auto wait_and_process(const iox::function)>& fn_call) + auto wait_and_process(const iox::function)>& fn_call) -> iox::expected; /// Tries to wait on the [`WaitSet`]. The provided callback is called for every attachment that /// was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to acquire the /// source. /// If nothing was triggered the [`WaitSet`] returns immediately. - auto try_wait_and_process(const iox::function)>& fn_call) - -> iox::expected; + auto wait_and_process_once(const iox::function)>& fn_call) + -> iox::expected; /// Returns the capacity of the [`WaitSet`] auto capacity() const -> uint64_t; diff --git a/iceoryx2-ffi/cxx/include/iox2/waitset_enums.hpp b/iceoryx2-ffi/cxx/include/iox2/waitset_enums.hpp index b85bc01ea..1fc8c30ea 100644 --- a/iceoryx2-ffi/cxx/include/iox2/waitset_enums.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/waitset_enums.hpp @@ -29,7 +29,9 @@ enum class WaitSetRunResult : uint8_t { /// An interrupt signal `SIGINT` was received. Interrupt, /// The user explicitly called [`WaitSet::stop()`]. - StopRequest + StopRequest, + /// All events were handled. + AllEventsHandled }; /// Defines the failures that can occur when attaching something with diff --git a/iceoryx2-ffi/cxx/src/waitset.cpp b/iceoryx2-ffi/cxx/src/waitset.cpp index 6460cfb56..d92ba2105 100644 --- a/iceoryx2-ffi/cxx/src/waitset.cpp +++ b/iceoryx2-ffi/cxx/src/waitset.cpp @@ -206,11 +206,6 @@ auto WaitSet::is_empty() const -> bool { return iox2_waitset_is_empty(&m_handle); } -template -void WaitSet::stop() { - iox2_waitset_stop(&m_handle); -} - template auto WaitSet::attach_interval(const iox::units::Duration deadline) -> iox::expected, WaitSetAttachmentError> { @@ -274,13 +269,13 @@ auto WaitSet::attach_notification(const Listener& listener) } template -auto run_callback(iox2_waitset_attachment_id_h attachment_id, void* context) { - auto* fn_call = internal::ctx_cast)>>(context); - fn_call->value()(WaitSetAttachmentId(attachment_id)); +auto run_callback(iox2_waitset_attachment_id_h attachment_id, void* context) -> iox2_callback_progression_e { + auto* fn_call = internal::ctx_cast)>>(context); + return iox::into(fn_call->value()(WaitSetAttachmentId(attachment_id))); } template -auto WaitSet::wait_and_process(const iox::function)>& fn_call) +auto WaitSet::wait_and_process(const iox::function)>& fn_call) -> iox::expected { iox2_waitset_run_result_e run_result = iox2_waitset_run_result_e_STOP_REQUEST; auto ctx = internal::ctx(fn_call); @@ -294,13 +289,14 @@ auto WaitSet::wait_and_process(const iox::function -auto WaitSet::try_wait_and_process(const iox::function)>& fn_call) - -> iox::expected { +auto WaitSet::wait_and_process_once(const iox::function)>& fn_call) + -> iox::expected { + iox2_waitset_run_result_e run_result = iox2_waitset_run_result_e_STOP_REQUEST; auto ctx = internal::ctx(fn_call); - auto result = iox2_waitset_try_wait_and_process(&m_handle, run_callback, static_cast(&ctx)); + auto result = iox2_waitset_wait_and_process_once(&m_handle, run_callback, static_cast(&ctx), &run_result); if (result == IOX2_OK) { - return iox::ok(); + return iox::ok(iox::into(static_cast(run_result))); } return iox::err(iox::into(result)); diff --git a/iceoryx2-ffi/cxx/tests/src/waitset_tests.cpp b/iceoryx2-ffi/cxx/tests/src/waitset_tests.cpp index faf90c173..0340e0efc 100644 --- a/iceoryx2-ffi/cxx/tests/src/waitset_tests.cpp +++ b/iceoryx2-ffi/cxx/tests/src/waitset_tests.cpp @@ -132,7 +132,7 @@ TYPED_TEST(WaitSetTest, attaching_same_notification_twice_fails) { TYPED_TEST(WaitSetTest, empty_waitset_returns_error_on_run) { auto sut = this->create_sut(); - auto result = sut.wait_and_process([](auto) {}); + auto result = sut.wait_and_process([](auto) { return CallbackProgression::Continue; }); ASSERT_THAT(result.has_error(), Eq(true)); ASSERT_THAT(result.get_error(), Eq(WaitSetRunError::NoAttachments)); @@ -140,7 +140,7 @@ TYPED_TEST(WaitSetTest, empty_waitset_returns_error_on_run) { TYPED_TEST(WaitSetTest, empty_waitset_returns_error_on_run_once) { auto sut = this->create_sut(); - auto result = sut.try_wait_and_process([](auto) {}); + auto result = sut.wait_and_process_once([](auto) { return CallbackProgression::Continue; }); ASSERT_THAT(result.has_error(), Eq(true)); ASSERT_THAT(result.get_error(), Eq(WaitSetRunError::NoAttachments)); @@ -153,11 +153,11 @@ TYPED_TEST(WaitSetTest, interval_attachment_blocks_for_at_least_timeout) { auto guard = sut.attach_interval(TIMEOUT).expect(""); auto callback_called = false; - auto result = sut.wait_and_process([&](auto attachment_id) { + auto result = sut.wait_and_process([&](auto attachment_id) -> CallbackProgression { callback_called = true; - sut.stop(); - ASSERT_THAT(attachment_id.has_event_from(guard), Eq(true)); - ASSERT_THAT(attachment_id.has_missed_deadline(guard), Eq(false)); + EXPECT_THAT(attachment_id.has_event_from(guard), Eq(true)); + EXPECT_THAT(attachment_id.has_missed_deadline(guard), Eq(false)); + return CallbackProgression::Stop; }); auto end = std::chrono::steady_clock::now(); @@ -175,11 +175,11 @@ TYPED_TEST(WaitSetTest, deadline_attachment_blocks_for_at_least_timeout) { auto guard = sut.attach_deadline(listener, TIMEOUT).expect(""); auto callback_called = false; - auto result = sut.wait_and_process([&](auto attachment_id) { + auto result = sut.wait_and_process([&](auto attachment_id) -> CallbackProgression { callback_called = true; - sut.stop(); - ASSERT_THAT(attachment_id.has_event_from(guard), Eq(false)); - ASSERT_THAT(attachment_id.has_missed_deadline(guard), Eq(true)); + EXPECT_THAT(attachment_id.has_event_from(guard), Eq(false)); + EXPECT_THAT(attachment_id.has_missed_deadline(guard), Eq(true)); + return CallbackProgression::Stop; }); auto end = std::chrono::steady_clock::now(); @@ -201,11 +201,11 @@ TYPED_TEST(WaitSetTest, deadline_attachment_wakes_up_when_notified) { auto notifier = this->create_notifier(); notifier.notify().expect(""); }); - auto result = sut.wait_and_process([&](auto attachment_id) { + auto result = sut.wait_and_process([&](auto attachment_id) -> CallbackProgression { callback_called = true; - sut.stop(); - ASSERT_THAT(attachment_id.has_event_from(guard), Eq(true)); - ASSERT_THAT(attachment_id.has_missed_deadline(guard), Eq(false)); + EXPECT_THAT(attachment_id.has_event_from(guard), Eq(true)); + EXPECT_THAT(attachment_id.has_missed_deadline(guard), Eq(false)); + return CallbackProgression::Stop; }); notifier_thread.join(); @@ -224,11 +224,11 @@ TYPED_TEST(WaitSetTest, notification_attachment_wakes_up_when_notified) { auto notifier = this->create_notifier(); notifier.notify().expect(""); }); - auto result = sut.wait_and_process([&](auto attachment_id) { + auto result = sut.wait_and_process([&](auto attachment_id) -> CallbackProgression { callback_called = true; - sut.stop(); - ASSERT_THAT(attachment_id.has_event_from(guard), Eq(true)); - ASSERT_THAT(attachment_id.has_missed_deadline(guard), Eq(false)); + EXPECT_THAT(attachment_id.has_event_from(guard), Eq(true)); + EXPECT_THAT(attachment_id.has_missed_deadline(guard), Eq(false)); + return CallbackProgression::Stop; }); notifier_thread.join(); @@ -266,15 +266,18 @@ TYPED_TEST(WaitSetTest, triggering_everything_works) { std::this_thread::sleep_for(std::chrono::milliseconds(TIMEOUT.toMilliseconds())); std::vector was_triggered(guards.size(), false); - sut.try_wait_and_process([&](auto attachment_id) { - for (uint64_t idx = 0; idx < guards.size(); ++idx) { - if (attachment_id.has_event_from(guards[idx])) { - was_triggered[idx] = true; - break; - } - } - }) - .expect(""); + auto result = sut.wait_and_process_once([&](auto attachment_id) -> CallbackProgression { + for (uint64_t idx = 0; idx < guards.size(); ++idx) { + if (attachment_id.has_event_from(guards[idx])) { + was_triggered[idx] = true; + break; + } + } + + return CallbackProgression::Continue; + }); + + ASSERT_THAT(result.has_error(), Eq(false)); for (auto triggered : was_triggered) { ASSERT_THAT(triggered, Eq(true)); diff --git a/iceoryx2-ffi/ffi/src/api/waitset.rs b/iceoryx2-ffi/ffi/src/api/waitset.rs index c2ec94edf..60d4f6035 100644 --- a/iceoryx2-ffi/ffi/src/api/waitset.rs +++ b/iceoryx2-ffi/ffi/src/api/waitset.rs @@ -554,8 +554,10 @@ pub unsafe extern "C" fn iox2_waitset_wait_and_process_once( handle: iox2_waitset_h_ref, callback: iox2_waitset_run_callback, callback_ctx: iox2_callback_context, + result: *mut iox2_waitset_run_result_e, ) -> c_int { handle.assert_non_null(); + debug_assert!(!result.is_null()); let waitset = &mut *handle.as_type(); @@ -595,7 +597,10 @@ pub unsafe extern "C" fn iox2_waitset_wait_and_process_once( }; match run_once_result { - Ok(v) => v.into_c_int(), + Ok(v) => { + *result = v.into(); + IOX2_OK + } Err(e) => e.into_c_int(), } } From a6537c600ab2dd67fd2121daf1f4633017291aae Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Sat, 9 Nov 2024 23:10:15 +0100 Subject: [PATCH 4/5] [#507] Add documentation --- iceoryx2-ffi/cxx/include/iox2/waitset.hpp | 35 +++++++++++++++++------ iceoryx2-ffi/ffi/src/api/waitset.rs | 22 ++++++++------ iceoryx2/src/port/waitset.rs | 35 +++++++++++++++++------ 3 files changed, 67 insertions(+), 25 deletions(-) diff --git a/iceoryx2-ffi/cxx/include/iox2/waitset.hpp b/iceoryx2-ffi/cxx/include/iox2/waitset.hpp index e75e2ce81..91e29c167 100644 --- a/iceoryx2-ffi/cxx/include/iox2/waitset.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/waitset.hpp @@ -111,18 +111,37 @@ class WaitSet { auto operator=(WaitSet&&) noexcept -> WaitSet&; ~WaitSet(); - /// Waits in an infinite loop on the [`WaitSet`]. The provided callback is called for every - /// attachment that was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to - /// acquire the source. + /// Waits until an event arrives on the [`WaitSet`], then collects all events by calling the + /// provided `fn_call` callback with the corresponding [`WaitSetAttachmentId`]. In contrast + /// to [`WaitSet::wait_and_process_once()`] it will never return until the user explicitly + /// requests it by returning [`CallbackProgression::Stop`] or by receiving a signal. + /// + /// The provided callback must return [`CallbackProgression::Continue`] to continue the event + /// processing and handle the next event or [`CallbackProgression::Stop`] to return from this + /// call immediately. All unhandled events will be lost forever and the call will return + /// [`WaitSetRunResult::StopRequest`]. + /// /// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit - /// the loop and inform the user via [`WaitSetRunResult`]. + /// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or + /// [`WaitSetRunResult::TerminationRequest`]. auto wait_and_process(const iox::function)>& fn_call) -> iox::expected; - /// Tries to wait on the [`WaitSet`]. The provided callback is called for every attachment that - /// was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to acquire the - /// source. - /// If nothing was triggered the [`WaitSet`] returns immediately. + /// Waits until an event arrives on the [`WaitSet`], then collects all events by calling the + /// provided `fn_call` callback with the corresponding [`WaitSetAttachmentId`] and then + /// returns. This makes it ideal to be called in some kind of event-loop. + /// + /// The provided callback must return [`CallbackProgression::Continue`] to continue the event + /// processing and handle the next event or [`CallbackProgression::Stop`] to return from this + /// call immediately. All unhandled events will be lost forever and the call will return + /// [`WaitSetRunResult::StopRequest`]. + /// + /// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit + /// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or + /// [`WaitSetRunResult::TerminationRequest`]. + /// + /// When no signal was received and all events were handled, it will return + /// [`WaitSetRunResult::AllEventsHandled`]. auto wait_and_process_once(const iox::function)>& fn_call) -> iox::expected; diff --git a/iceoryx2-ffi/ffi/src/api/waitset.rs b/iceoryx2-ffi/ffi/src/api/waitset.rs index 60d4f6035..4b57a9cfa 100644 --- a/iceoryx2-ffi/ffi/src/api/waitset.rs +++ b/iceoryx2-ffi/ffi/src/api/waitset.rs @@ -528,17 +528,21 @@ pub unsafe extern "C" fn iox2_waitset_attach_interval( IOX2_OK } -/// Checks the [`iox2_waitset_h`] for new events once. The provided `callback` is called -/// for every events that was received and the corresponding owning [`iox2_waitset_attachment_id_h`] -/// is provided as input argument, as well as the `callback_ctx`. +/// Waits until an event arrives on the [`iox2_waitset_h`], then collects all events by calling the +/// provided `fn_call` callback with the corresponding [`iox2_waitset_attachment_id_h`] and then +/// returns. This makes it ideal to be called in some kind of event-loop. /// -/// With [`iox2_waitset_attachment_id_has_event_from()`](crate::iox2_waitset_attachment_id_has_event_from()) -/// the origin of the event can be determined from its corresponding -/// [`iox2_waitset_guard_h`]. -/// If the deadline was hit the function -/// [`iox2_waitset_attachment_id_has_missed_deadline()`](crate::iox2_waitset_attachment_id_has_missed_deadline()) -/// can be used to identify it. +/// The provided callback must return [`iox2_callback_progression_e::CONTINUE`] to continue the event +/// processing and handle the next event or [`iox2_callback_progression_e::STOP`] to return from this +/// call immediately. All unhandled events will be lost forever and the call will return +/// [`iox2_waitset_run_result_e::STOP_REQUEST`]. +/// +/// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit +/// the loop and inform the user with [`iox2_waitset_run_result_e::INTERRUPT`] or +/// [`iox2_waitset_run_result_e::TERMINATION_REQUEST`]. /// +/// When no signal was received and all events were handled, it will return +/// [`iox2_waitset_run_result_e::ALL_EVENTS_HANDLED`]. /// # Return /// /// `IOX2_OK` on success, otherwise [`iox2_waitset_run_error_e`]. diff --git a/iceoryx2/src/port/waitset.rs b/iceoryx2/src/port/waitset.rs index 679675b34..7737e519d 100644 --- a/iceoryx2/src/port/waitset.rs +++ b/iceoryx2/src/port/waitset.rs @@ -646,11 +646,19 @@ impl WaitSet { }) } - /// Waits in an infinite loop on the [`WaitSet`]. The provided callback is called for every - /// attachment that was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to - /// acquire the source. + /// Waits until an event arrives on the [`WaitSet`], then collects all events by calling the + /// provided `fn_call` callback with the corresponding [`WaitSetAttachmentId`]. In contrast + /// to [`WaitSet::wait_and_process_once()`] it will never return until the user explicitly + /// requests it by returning [`CallbackProgression::Stop`] or by receiving a signal. + /// + /// The provided callback must return [`CallbackProgression::Continue`] to continue the event + /// processing and handle the next event or [`CallbackProgression::Stop`] to return from this + /// call immediately. All unhandled events will be lost forever and the call will return + /// [`WaitSetRunResult::StopRequest`]. + /// /// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit - /// the loop and inform the user via [`WaitSetRunResult`]. + /// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or + /// [`WaitSetRunResult::TerminationRequest`]. pub fn wait_and_process) -> CallbackProgression>( &self, mut fn_call: F, @@ -667,10 +675,21 @@ impl WaitSet { } } - /// Tries to wait on the [`WaitSet`]. The provided callback is called for every attachment that - /// was triggered and the [`WaitSetAttachmentId`] is provided as an input argument to acquire the - /// source. - /// If nothing was triggered the [`WaitSet`] returns immediately. + /// Waits until an event arrives on the [`WaitSet`], then collects all events by calling the + /// provided `fn_call` callback with the corresponding [`WaitSetAttachmentId`] and then + /// returns. This makes it ideal to be called in some kind of event-loop. + /// + /// The provided callback must return [`CallbackProgression::Continue`] to continue the event + /// processing and handle the next event or [`CallbackProgression::Stop`] to return from this + /// call immediately. All unhandled events will be lost forever and the call will return + /// [`WaitSetRunResult::StopRequest`]. + /// + /// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit + /// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or + /// [`WaitSetRunResult::TerminationRequest`]. + /// + /// When no signal was received and all events were handled, it will return + /// [`WaitSetRunResult::AllEventsHandled`]. pub fn wait_and_process_once) -> CallbackProgression>( &self, mut fn_call: F, From 71c4db407330b1c56cf9b5fc93f487c5631b5580 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Sat, 9 Nov 2024 23:24:10 +0100 Subject: [PATCH 5/5] [#507] Clippy warnings and doc tests --- iceoryx2-bb/posix/src/deadline_queue.rs | 9 ++- iceoryx2-ffi/cxx/include/iox2/waitset.hpp | 3 +- .../cxx/include/iox2/waitset_enums.hpp | 2 +- iceoryx2-ffi/ffi/src/api/waitset.rs | 2 +- .../ffi/src/api/waitset_attachment_id.rs | 2 +- iceoryx2/src/port/waitset.rs | 77 ++++++++++++++++++- 6 files changed, 84 insertions(+), 11 deletions(-) diff --git a/iceoryx2-bb/posix/src/deadline_queue.rs b/iceoryx2-bb/posix/src/deadline_queue.rs index c7e1ea5bb..773b332ce 100644 --- a/iceoryx2-bb/posix/src/deadline_queue.rs +++ b/iceoryx2-bb/posix/src/deadline_queue.rs @@ -30,14 +30,17 @@ //! // contains all the deadlines where the deadline was hit //! let mut missed_deadlines = vec![]; //! deadline_queue -//! .missed_deadlines(|deadline_queue_index| missed_deadlines.push(deadline_queue_index)); +//! .missed_deadlines(|deadline_queue_index| { +//! missed_deadlines.push(deadline_queue_index); +//! CallbackProgression::Continue +//! }); //! ``` -use std::{cell::RefCell, fmt::Debug, sync::atomic::Ordering, time::Duration}; +pub use iceoryx2_bb_elementary::CallbackProgression; -use iceoryx2_bb_elementary::CallbackProgression; use iceoryx2_bb_log::fail; use iceoryx2_pal_concurrency_sync::iox_atomic::IoxAtomicU64; +use std::{cell::RefCell, fmt::Debug, sync::atomic::Ordering, time::Duration}; use crate::{ clock::ClockType, diff --git a/iceoryx2-ffi/cxx/include/iox2/waitset.hpp b/iceoryx2-ffi/cxx/include/iox2/waitset.hpp index 91e29c167..7ad944814 100644 --- a/iceoryx2-ffi/cxx/include/iox2/waitset.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/waitset.hpp @@ -96,7 +96,8 @@ auto operator<(const WaitSetAttachmentId& lhs, const WaitSetAttachmentId& /// The [`WaitSet`] implements a reactor pattern and allows to wait on multiple events in one /// single call [`WaitSet::try_wait_and_process()`] until it wakes up or to run repeatedly with /// [`WaitSet::wait_and_process()`] until the a interrupt or termination signal was received or the user -/// has explicitly requested to stop with [`WaitSet::stop()`]. +/// has explicitly requested to stop by returning [`CallbackProgression::Stop`] in the provided +/// callback. /// /// The [`Listener`] can be attached as well as sockets or anything else that /// can be packed into a [`FileDescriptorView`]. diff --git a/iceoryx2-ffi/cxx/include/iox2/waitset_enums.hpp b/iceoryx2-ffi/cxx/include/iox2/waitset_enums.hpp index 1fc8c30ea..8bf03b444 100644 --- a/iceoryx2-ffi/cxx/include/iox2/waitset_enums.hpp +++ b/iceoryx2-ffi/cxx/include/iox2/waitset_enums.hpp @@ -28,7 +28,7 @@ enum class WaitSetRunResult : uint8_t { TerminationRequest, /// An interrupt signal `SIGINT` was received. Interrupt, - /// The user explicitly called [`WaitSet::stop()`]. + /// The users callback returned [`CallbackProgression::Stop`]. StopRequest, /// All events were handled. AllEventsHandled diff --git a/iceoryx2-ffi/ffi/src/api/waitset.rs b/iceoryx2-ffi/ffi/src/api/waitset.rs index 4b57a9cfa..2a41bb975 100644 --- a/iceoryx2-ffi/ffi/src/api/waitset.rs +++ b/iceoryx2-ffi/ffi/src/api/waitset.rs @@ -614,7 +614,7 @@ pub unsafe extern "C" fn iox2_waitset_wait_and_process_once( /// owning [`iox2_waitset_attachment_id_h`] is provided as input argument, as well as the /// `callback_ctx`. /// The infinite loop is interrupted either by a `SIGINT` or `SIGTERM` signal or -/// when the user has called [`iox2_waitset_stop()`]. +/// when the user callback returned [`iox2_callback_progression_e::STOP`]. /// /// With [`iox2_waitset_attachment_id_has_event_from()`](crate::iox2_waitset_attachment_id_has_event_from()) /// the origin of the event can be determined from its corresponding diff --git a/iceoryx2-ffi/ffi/src/api/waitset_attachment_id.rs b/iceoryx2-ffi/ffi/src/api/waitset_attachment_id.rs index 8f3147b29..642d0c15e 100644 --- a/iceoryx2-ffi/ffi/src/api/waitset_attachment_id.rs +++ b/iceoryx2-ffi/ffi/src/api/waitset_attachment_id.rs @@ -113,7 +113,7 @@ impl HandleToType for iox2_waitset_attachment_id_h_ref { // BEGIN C API /// Release an [`iox2_waitset_attachment_id_h`] that was acquired by calling either /// * [`iox2_waitset_wait_and_process()`](crate::iox2_waitset_wait_and_process()) -/// * [`iox2_waitset_try_wait_and_process()`](crate::iox2_waitset_try_wait_and_process()) +/// * [`iox2_waitset_wait_and_process_once()`](crate::iox2_waitset_wait_and_process_once()) /// /// # Safety /// * `handle` must be valid and provided by the previously mentioned functions. diff --git a/iceoryx2/src/port/waitset.rs b/iceoryx2/src/port/waitset.rs index 7737e519d..69645ac22 100644 --- a/iceoryx2/src/port/waitset.rs +++ b/iceoryx2/src/port/waitset.rs @@ -63,6 +63,7 @@ //! println!("received notification {:?}", event_id); //! } //! } +//! CallbackProgression::Continue //! }; //! //! waitset.wait_and_process(on_event)?; @@ -96,6 +97,7 @@ //! } else if attachment_id.has_missed_deadline(&guard) { //! println!("Oh no, we hit the deadline without receiving any kind of event"); //! } +//! CallbackProgression::Continue //! }; //! //! waitset.wait_and_process(on_event)?; @@ -131,6 +133,7 @@ //! } else if attachment_id.has_event_from(&guard_2) { //! publisher_2.send_copy(456); //! } +//! CallbackProgression::Continue //! }; //! //! waitset.wait_and_process(on_event)?; @@ -173,6 +176,7 @@ //! println!("received notification {:?}", event_id); //! } //! } +//! CallbackProgression::Continue //! }; //! //! waitset.wait_and_process(on_event)?; @@ -205,7 +209,7 @@ pub enum WaitSetRunResult { TerminationRequest, /// An interrupt signal `SIGINT` was received. Interrupt, - /// The user explicitly called [`WaitSet::stop()`]. + /// The users callback returned [`CallbackProgression::Stop`]. StopRequest, /// All events were handled. AllEventsHandled, @@ -465,9 +469,10 @@ impl WaitSetBuilder { } /// The [`WaitSet`] implements a reactor pattern and allows to wait on multiple events in one -/// single call [`WaitSet::try_wait_and_process()`] until it wakes up or to run repeatedly with +/// single call [`WaitSet::wait_and_process_once()`] until it wakes up or to run repeatedly with /// [`WaitSet::wait_and_process()`] until the a interrupt or termination signal was received or the user -/// has explicitly requested to stop with [`WaitSet::stop()`]. +/// has explicitly requested to stop by returning [`CallbackProgression::Stop`] in the provided +/// callback. /// /// An struct must implement [`SynchronousMultiplexing`] to be attachable. The /// [`Listener`](crate::port::listener::Listener) can be attached as well as sockets or anything else that @@ -659,6 +664,39 @@ impl WaitSet { /// If an interrupt- (`SIGINT`) or a termination-signal (`SIGTERM`) was received, it will exit /// the loop and inform the user with [`WaitSetRunResult::Interrupt`] or /// [`WaitSetRunResult::TerminationRequest`]. + /// + /// # Example + /// + /// ```no_run + /// use iceoryx2::prelude::*; + /// # use core::time::Duration; + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # let event = node.service_builder(&"MyEventName_1".try_into()?) + /// # .event() + /// # .open_or_create()?; + /// + /// # let mut listener = event.listener_builder().create()?; + /// + /// let waitset = WaitSetBuilder::new().create::()?; + /// # let guard = waitset.attach_notification(&listener)?; + /// + /// let on_event = |attachment_id: WaitSetAttachmentId| { + /// if attachment_id.has_event_from(&guard) { + /// // when a certain event arrives we stop the event processing + /// // to terminate the process + /// CallbackProgression::Stop + /// } else { + /// CallbackProgression::Continue + /// } + /// }; + /// + /// waitset.wait_and_process(on_event)?; + /// println!("goodbye"); + /// + /// # Ok(()) + /// # } + /// ``` pub fn wait_and_process) -> CallbackProgression>( &self, mut fn_call: F, @@ -690,6 +728,37 @@ impl WaitSet { /// /// When no signal was received and all events were handled, it will return /// [`WaitSetRunResult::AllEventsHandled`]. + /// + /// # Example + /// + /// ```no_run + /// use iceoryx2::prelude::*; + /// # use core::time::Duration; + /// # fn main() -> Result<(), Box> { + /// # let node = NodeBuilder::new().create::()?; + /// # let event = node.service_builder(&"MyEventName_1".try_into()?) + /// # .event() + /// # .open_or_create()?; + /// + /// let waitset = WaitSetBuilder::new().create::()?; + /// + /// let on_event = |attachment_id: WaitSetAttachmentId| { + /// // do some event processing + /// CallbackProgression::Continue + /// }; + /// + /// // main event loop + /// loop { + /// // blocks until an event arrives, handles all arrived events and then + /// // returns. + /// waitset.wait_and_process_once(on_event)?; + /// // do some event post processing + /// println!("handled events"); + /// } + /// + /// # Ok(()) + /// # } + /// ``` pub fn wait_and_process_once) -> CallbackProgression>( &self, mut fn_call: F, @@ -728,7 +797,7 @@ impl WaitSet { match reactor_wait_result { Ok(0) => self.handle_deadlines(&mut fn_call, msg), Ok(_) => self.handle_all_attachments(&triggered_file_descriptors, &mut fn_call, msg), - Err(ReactorWaitError::Interrupt) => return Ok(WaitSetRunResult::Interrupt), + Err(ReactorWaitError::Interrupt) => Ok(WaitSetRunResult::Interrupt), Err(ReactorWaitError::InsufficientPermissions) => { fail!(from self, with WaitSetRunError::InsufficientPermissions, "{msg} due to insufficient permissions.");