From bacd922ca56f02edb0bd508b597b67df4b629588 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 15 Oct 2024 14:00:19 -0600 Subject: [PATCH 01/26] zephyr: sys::sync: move semaphores into their own module In preparation for adding more synchronization primitives, move the Semaphore implementation into its own module. There is enough stuff associated with each primitive that it can be confusing if they are not in their own modules. Signed-off-by: David Brown --- zephyr/src/sys/sync.rs | 125 ++------------------- zephyr/src/sys/sync/mutex.rs | 185 +++++++++++++++++++++++++++++++ zephyr/src/sys/sync/semaphore.rs | 127 +++++++++++++++++++++ 3 files changed, 324 insertions(+), 113 deletions(-) create mode 100644 zephyr/src/sys/sync/mutex.rs create mode 100644 zephyr/src/sys/sync/semaphore.rs diff --git a/zephyr/src/sys/sync.rs b/zephyr/src/sys/sync.rs index b19b593f..19d459e6 100644 --- a/zephyr/src/sys/sync.rs +++ b/zephyr/src/sys/sync.rs @@ -30,117 +30,16 @@ //! Later, there will be a pool mechanism to allow these kernel objects to be allocated and freed //! from a pool, although the objects will still be statically allocated. -use core::ffi::c_uint; -use core::fmt; - -use crate::{ - error::{Result, to_result_void}, - object::{StaticKernelObject, Wrapped}, - raw::{ - k_sem, - k_sem_init, - k_sem_take, - k_sem_give, - k_sem_reset, - k_sem_count_get, - }, - time::Timeout, +pub mod mutex; +pub mod semaphore; + +pub use mutex::{ + Condvar, + StaticCondvar, + Mutex, + StaticMutex, +}; +pub use semaphore::{ + Semaphore, + StaticSemaphore, }; - -pub use crate::raw::K_SEM_MAX_LIMIT; - -/// A zephyr `k_sem` usable from safe Rust code. -#[derive(Clone)] -pub struct Semaphore { - /// The raw Zephyr `k_sem`. - item: *mut k_sem, -} - -/// By nature, Semaphores are both Sync and Send. Safety is handled by the underlying Zephyr -/// implementation (which is why Clone is also implemented). -unsafe impl Sync for Semaphore {} -unsafe impl Send for Semaphore {} - -impl Semaphore { - /// Take a semaphore. - /// - /// Can be called from ISR if called with [`NoWait`]. - /// - /// [`NoWait`]: crate::time::NoWait - pub fn take(&self, timeout: T) -> Result<()> - where T: Into, - { - let timeout: Timeout = timeout.into(); - let ret = unsafe { - k_sem_take(self.item, timeout.0) - }; - to_result_void(ret) - } - - /// Give a semaphore. - /// - /// This routine gives to the semaphore, unless the semaphore is already at its maximum - /// permitted count. - pub fn give(&self) { - unsafe { - k_sem_give(self.item) - } - } - - /// Resets a semaphor's count to zero. - /// - /// This resets the count to zero. Any outstanding [`take`] calls will be aborted with - /// `Error(EAGAIN)`. - /// - /// [`take`]: Self::take - pub fn reset(&mut self) { - unsafe { - k_sem_reset(self.item) - } - } - - /// Get a semaphore's count. - /// - /// Returns the current count. - pub fn count_get(&mut self) -> usize { - unsafe { - k_sem_count_get(self.item) as usize - } - } -} - -/// A static Zephyr `k_sem`. -/// -/// This is intended to be used from within the `kobj_define!` macro. It declares a static ksem -/// that will be properly registered with the Zephyr kernel object system. Call [`init_once`] to -/// get the [`Semaphore`] that is represents. -/// -/// [`init_once`]: StaticKernelObject::init_once -pub type StaticSemaphore = StaticKernelObject; - -unsafe impl Sync for StaticSemaphore {} - -impl Wrapped for StaticKernelObject { - type T = Semaphore; - - /// The initializer for Semaphores is the initial count, and the count limit (which can be - /// K_SEM_MAX_LIMIT, re-exported here. - type I = (c_uint, c_uint); - - // TODO: Thoughts about how to give parameters to the initialzation. - fn get_wrapped(&self, arg: Self::I) -> Semaphore { - let ptr = self.value.get(); - unsafe { - k_sem_init(ptr, arg.0, arg.1); - } - Semaphore { - item: ptr, - } - } -} - -impl fmt::Debug for Semaphore { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "sys::Semaphore") - } -} diff --git a/zephyr/src/sys/sync/mutex.rs b/zephyr/src/sys/sync/mutex.rs new file mode 100644 index 00000000..457f9941 --- /dev/null +++ b/zephyr/src/sys/sync/mutex.rs @@ -0,0 +1,185 @@ +// Copyright (c) 2024 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! Zephyr `k_mutex` wrapper. +//! +//! This module implements a thing wrapper around the `k_mutex` type in Zephyr. It works with the +//! kernel [`object`] system, to allow the mutexes to be defined statically. +//! +//! [`object`]: crate::object + +use core::fmt; +use crate::{ + error::{Result, to_result_void}, + raw::{ + k_condvar, + k_condvar_init, + k_condvar_broadcast, + k_condvar_signal, + k_condvar_wait, + k_mutex, + k_mutex_init, + k_mutex_lock, + k_mutex_unlock, + }, + time::Timeout, +}; +use crate::object::{ + StaticKernelObject, + Wrapped, +}; +use crate::sys::K_FOREVER; + +/// A Zephyr `k_mutux` usable from safe Rust code. +/// +/// This merely wraps a pointer to the kernel object. It implements clone, send and sync as it is +/// safe to have multiple instances of these, as well as use them across multiple threads. +/// +/// Note that these are Safe in the sense that memory safety is guaranteed. Attempts to +/// recursively lock, or incorrect nesting can easily result in deadlock. +/// +/// Safety: Typically, the Mutex type in Rust does not implement Clone, and must be shared between +/// threads using Arc. However, these sys Mutexes are wrappers around static kernel objects, and +/// Drop doesn't make sense for them. In addition, Arc requires alloc, and one possible place to +/// make use of the sys Mutex is to be able to do so in an environment without alloc. +/// +/// This mutex type of only of limited use to application programs. It can be used as a simple +/// binary semaphore, although it has strict semantics, requiring the release to be called by the +/// same thread that called lock. It can be used to protect data that Rust itself is either not +/// managing, or is managing in an unsafe way. +/// +/// For a Mutex type that is useful in a Rust type of manner, please see the regular [`sync::Mutex`] +/// type. +/// +/// [`sync::Mutex`]: http://example.com/TODO +#[derive(Clone)] +pub struct Mutex { + /// The raw Zephyr mutex. + item: *mut k_mutex, +} + +impl Mutex { + /// Lock a Zephyr Mutex. + /// + /// Will wait for the lock, returning status, with `Ok(())` indicating the lock has been + /// acquired, and an error indicating a timeout (Zephyr returns different errors depending on + /// the reason). + pub fn lock(&self, timeout: T) -> Result<()> + where T: Into, + { + let timeout: Timeout = timeout.into(); + to_result_void(unsafe { k_mutex_lock(self.item, timeout.0) }) + } + + /// Unlock a Zephyr Mutex. + /// + /// The mutex must already be locked by the calling thread. Mutexes may not be unlocked in + /// ISRs. + pub unsafe fn unlock(&self) -> Result<()> { + to_result_void(unsafe { k_mutex_unlock(self.item) }) + } +} + + +/// A static Zephyr `k_mutex` +/// +/// This is intended to be used from within the `kobj_define!` macro. It declares a static +/// `k_mutex` that will be properly registered with the Zephyr object system. Call [`init_once`] to +/// get the [`Mutex`] that it represents. +/// +/// [`init_once`]: StaticMutex::init_once +pub type StaticMutex = StaticKernelObject; + +// Sync and Send are meaningful, as the underlying Zephyr API can use these values from any thread. +// Care must be taken to use these in a safe manner. +unsafe impl Sync for StaticMutex {} +unsafe impl Send for StaticMutex {} + +impl Wrapped for StaticKernelObject { + type T = Mutex; + + /// Mutex initializers take no argument. + type I = (); + + fn get_wrapped(&self, _arg: Self::I) -> Mutex { + let ptr = self.value.get(); + unsafe { + k_mutex_init(ptr); + } + Mutex { + item: ptr, + } + } +} + +impl fmt::Debug for Mutex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "sys::Mutex {:?}", self.item) + } +} + +/// A Condition Variable +/// +/// Lightweight wrappers for Zephyr's `k_condvar`. +#[derive(Clone)] +pub struct Condvar { + /// The underlying `k_condvar`. + item: *mut k_condvar, +} + +#[doc(hidden)] +pub type StaticCondvar = StaticKernelObject; + +unsafe impl Sync for StaticKernelObject { } + +unsafe impl Sync for Condvar {} +unsafe impl Send for Condvar {} + +impl Condvar { + /// Wait for someone else using this mutex/condvar pair to notify. + /// + /// Note that this requires the lock to be held by use, but as this is a low-level binding to + /// Zephyr's interfaces, this is not enforced. See [`sync::Condvar`] for a safer and easier to + /// use interface. + /// + /// [`sync::Condvar`]: http://www.example.com/TODO + // /// [`sync::Condvar`]: crate::sync::Condvar + pub fn wait(&self, lock: &Mutex) { + unsafe { k_condvar_wait(self.item, lock.item, K_FOREVER); } + } + + // TODO: timeout. + + /// Wake a single thread waiting on this condition variable. + pub fn notify_one(&self) { + unsafe { k_condvar_signal(self.item); } + } + + /// Wake all threads waiting on this condition variable. + pub fn notify_all(&self) { + unsafe { k_condvar_broadcast(self.item); } + } +} + +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "sys::Condvar {:?}", self.item) + } +} + +impl Wrapped for StaticCondvar { + type T = Condvar; + + /// Condvar initializers take no argument. + type I = (); + + fn get_wrapped(&self, _arg: Self::I) -> Condvar { + let ptr = self.value.get(); + unsafe { + k_condvar_init(ptr); + } + Condvar { + item: ptr, + } + } +} diff --git a/zephyr/src/sys/sync/semaphore.rs b/zephyr/src/sys/sync/semaphore.rs new file mode 100644 index 00000000..3c690092 --- /dev/null +++ b/zephyr/src/sys/sync/semaphore.rs @@ -0,0 +1,127 @@ +// Copyright (c) 2024 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! Zephyr Semaphore support +//! +//! This is a thin wrapper around Zephyr's `k_sem`. This is one of the few of the `sys` primitives +//! in Zephyr that is actually perfectly usable on its own, without needing additional wrappers. +//! +//! Zephyr implements counting semaphores, with both an upper and lower bound on the count. Note +//! that calling 'give' on a semaphore that is at the maximum count will discard the 'give' +//! operation, which in situation where counting is actually desired, will result in the count being +//! incorrect. + +use core::ffi::c_uint; +use core::fmt; + +use crate::{ + error::{Result, to_result_void}, + object::{StaticKernelObject, Wrapped}, + raw::{ + k_sem, + k_sem_init, + k_sem_take, + k_sem_give, + k_sem_reset, + k_sem_count_get, + }, + time::Timeout, +}; + +pub use crate::raw::K_SEM_MAX_LIMIT; + +/// A zephyr `k_sem` usable from safe Rust code. +#[derive(Clone)] +pub struct Semaphore { + /// The raw Zephyr `k_sem`. + item: *mut k_sem, +} + +/// By nature, Semaphores are both Sync and Send. Safety is handled by the underlying Zephyr +/// implementation (which is why Clone is also implemented). +unsafe impl Sync for Semaphore {} +unsafe impl Send for Semaphore {} + +impl Semaphore { + /// Take a semaphore. + /// + /// Can be called from ISR if called with [`NoWait`]. + /// + /// [`NoWait`]: crate::time::NoWait + pub fn take(&self, timeout: T) -> Result<()> + where T: Into, + { + let timeout: Timeout = timeout.into(); + let ret = unsafe { + k_sem_take(self.item, timeout.0) + }; + to_result_void(ret) + } + + /// Give a semaphore. + /// + /// This routine gives to the semaphore, unless the semaphore is already at its maximum + /// permitted count. + pub fn give(&self) { + unsafe { + k_sem_give(self.item) + } + } + + /// Resets a semaphor's count to zero. + /// + /// This resets the count to zero. Any outstanding [`take`] calls will be aborted with + /// `Error(EAGAIN)`. + /// + /// [`take`]: Self::take + pub fn reset(&mut self) { + unsafe { + k_sem_reset(self.item) + } + } + + /// Get a semaphore's count. + /// + /// Returns the current count. + pub fn count_get(&mut self) -> usize { + unsafe { + k_sem_count_get(self.item) as usize + } + } +} + +/// A static Zephyr `k_sem`. +/// +/// This is intended to be used from within the `kobj_define!` macro. It declares a static ksem +/// that will be properly registered with the Zephyr kernel object system. Call [`init_once`] to +/// get the [`Semaphore`] that is represents. +/// +/// [`init_once`]: StaticKernelObject::init_once +pub type StaticSemaphore = StaticKernelObject; + +unsafe impl Sync for StaticSemaphore {} + +impl Wrapped for StaticKernelObject { + type T = Semaphore; + + /// The initializer for Semaphores is the initial count, and the count limit (which can be + /// K_SEM_MAX_LIMIT, re-exported here. + type I = (c_uint, c_uint); + + // TODO: Thoughts about how to give parameters to the initialzation. + fn get_wrapped(&self, arg: Self::I) -> Semaphore { + let ptr = self.value.get(); + unsafe { + k_sem_init(ptr, arg.0, arg.1); + } + Semaphore { + item: ptr, + } + } +} + +impl fmt::Debug for Semaphore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "sys::Semaphore") + } +} From c0434a2cb16f564835bb48e10cb3311dc259a466 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 15 Oct 2024 16:27:17 -0600 Subject: [PATCH 02/26] zephyr: kobj_define: Add sys Mutex and sys Condvar Add support for declaring static instances and arrays of the sys Mutex and sys Condvar. Signed-off-by: David Brown --- zephyr/src/object.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/zephyr/src/object.rs b/zephyr/src/object.rs index 417c1bd3..d9d1c36b 100644 --- a/zephyr/src/object.rs +++ b/zephyr/src/object.rs @@ -236,6 +236,40 @@ macro_rules! _kobj_rule { unsafe { ::core::mem::zeroed() }; }; + // static NAME: StaticMutex + ($v:vis, $name:ident, StaticMutex) => { + #[link_section = concat!("._k_mutex.static.", stringify!($name), ".", file!(), line!())] + $v static $name: $crate::sys::sync::StaticMutex = + $crate::sys::sync::StaticMutex::new(); + }; + + // static NAMES: [StaticMutex; COUNT]; + ($v:vis, $name:ident, [StaticMutex; $size:expr]) => { + #[link_section = concat!("._k_mutex.static.", stringify!($name), ".", file!(), line!())] + $v static $name: [$crate::sys::sync::StaticMutex; $size] = + // This isn't Copy, intentionally, so initialize the whole thing with zerored memory. + // Relying on the atomic to be 0 for the uninitialized state. + // [$crate::sys::sync::StaticMutex::new(); $size]; + unsafe { ::core::mem::zeroed() }; + }; + + // static NAME: StaticCondvar; + ($v:vis, $name:ident, StaticCondvar) => { + #[link_section = concat!("._k_condvar.static.", stringify!($name), ".", file!(), line!())] + $v static $name: $crate::sys::sync::StaticCondvar = + $crate::sys::sync::StaticCondvar::new(); + }; + + // static NAMES: [StaticCondvar; COUNT]; + ($v:vis, $name:ident, [StaticCondvar; $size:expr]) => { + #[link_section = concat!("._k_condvar.static.", stringify!($name), ".", file!(), line!())] + $v static $name: [$crate::sys::sync::StaticCondvar; $size] = + // This isn't Copy, intentionally, so initialize the whole thing with zerored memory. + // Relying on the atomic to be 0 for the uninitialized state. + // [$crate::sys::sync::StaticMutex::new(); $size]; + unsafe { ::core::mem::zeroed() }; + }; + // static THREAD: staticThread; ($v:vis, $name:ident, StaticThread) => { // Since the static object has an atomic that we assume is initialized, we cannot use the From 4d4ee9bd1416581ffb8dcaa8c2c62351dbebd980 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 15 Oct 2024 16:28:09 -0600 Subject: [PATCH 03/26] zephyr: sys: sync: Mutex: Implement Send/Sync Although it is tradition in Rust to have types such as Semaphores and Mutexes not implement Sync, these primitives, built as thing wrappers become fairly useless. Sharing them would require `Arc`, which requires alloc. Presumably someone wanting to use lower level primitives would also not likely be wanting to use allocation. For the most part, these primtives have as their real purpose to be used in the implementation of the higher level synchronization primtives, such as sync::Mutex. unlock is unsafe because it is required to only call unlock on the same thread that locked. Signed-off-by: David Brown --- zephyr/src/sys/sync/mutex.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zephyr/src/sys/sync/mutex.rs b/zephyr/src/sys/sync/mutex.rs index 457f9941..1760781c 100644 --- a/zephyr/src/sys/sync/mutex.rs +++ b/zephyr/src/sys/sync/mutex.rs @@ -90,6 +90,9 @@ impl Mutex { /// [`init_once`]: StaticMutex::init_once pub type StaticMutex = StaticKernelObject; +unsafe impl Sync for Mutex {} +unsafe impl Send for Mutex {} + // Sync and Send are meaningful, as the underlying Zephyr API can use these values from any thread. // Care must be taken to use these in a safe manner. unsafe impl Sync for StaticMutex {} From 2731deb311d2a2467f3d912a51cd16820eb3102f Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 15 Oct 2024 16:31:21 -0600 Subject: [PATCH 04/26] samples: philosophers: Add sysmutex version Add a version of the philosopher's demo that is built around sys::Mutex, in it's simplest use case. Signed-off-by: David Brown --- samples/philosophers/Kconfig | 12 +++++-- samples/philosophers/sample.yaml | 5 +++ samples/philosophers/src/lib.rs | 15 ++++++++ samples/philosophers/src/sysmutex.rs | 51 ++++++++++++++++++++++++++++ 4 files changed, 80 insertions(+), 3 deletions(-) create mode 100644 samples/philosophers/src/sysmutex.rs diff --git a/samples/philosophers/Kconfig b/samples/philosophers/Kconfig index 2e481703..3739df1a 100644 --- a/samples/philosophers/Kconfig +++ b/samples/philosophers/Kconfig @@ -7,12 +7,18 @@ source "Kconfig.zephyr" choice prompt "Select Synchronization implementation" - default SYNC_SYS_SEMAPHORE + default SYNC_SYS_MUTEX config SYNC_SYS_SEMAPHORE bool "Use sys::Semaphore to synchronize forks" help - Use to have the dining philosophers sample use sys::Semaphore, with one per form, - to synchronize. + Use to have the dining philosophers sample use sys::Mutex, with one per fork, to + synchronize. + + config SYNC_SYS_MUTEX + bool "Use sys::Semaphore to synchronize forks" + help + Use to have the dining philosophers sample use sys::Mutex, with one per fork, to + synchronize. endchoice diff --git a/samples/philosophers/sample.yaml b/samples/philosophers/sample.yaml index cfd1eb47..604abc1a 100644 --- a/samples/philosophers/sample.yaml +++ b/samples/philosophers/sample.yaml @@ -20,3 +20,8 @@ tests: min_ram: 32 extra_configs: - CONFIG_SYNC_SYS_SEMAPHORE=y + sample.rust.philosopher.sysmutex: + tags: introduction + min_ram: 32 + extra_configs: + - CONFIG_SYNC_SYS_MUTEX=y diff --git a/samples/philosophers/src/lib.rs b/samples/philosophers/src/lib.rs index 7e514305..a87a3f54 100644 --- a/samples/philosophers/src/lib.rs +++ b/samples/philosophers/src/lib.rs @@ -23,8 +23,11 @@ use zephyr::{ // These are optional, based on Kconfig, so allow them to be unused. #[allow(unused_imports)] +use crate::sysmutex::SysMutexSync; +#[allow(unused_imports)] use crate::semsync::semaphore_sync; +mod sysmutex; mod semsync; /// How many philosophers. There will be the same number of forks. @@ -82,6 +85,18 @@ fn get_syncer() -> Vec> { semaphore_sync() } +#[cfg(CONFIG_SYNC_SYS_MUTEX)] +fn get_syncer() -> Vec> { + let syncer = Box::new(SysMutexSync::new()) + as Box; + let syncer: Arc = Arc::from(syncer); + let mut result = Vec::new(); + for _ in 0..NUM_PHIL { + result.push(syncer.clone()); + } + result +} + fn phil_thread(n: usize, syncer: Arc) { printkln!("Child {} started: {:?}", n, syncer); diff --git a/samples/philosophers/src/sysmutex.rs b/samples/philosophers/src/sysmutex.rs new file mode 100644 index 00000000..9203c29f --- /dev/null +++ b/samples/philosophers/src/sysmutex.rs @@ -0,0 +1,51 @@ +// Copyright (c) 2024 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! # sys::Mutex implementation of ForkSync +//! +//! This is a simple implementation of the Fork synchronizer that uses underlying Zephyr `k_mutex` +//! wrapped in `sys::Mutex`. The ForkSync semantics map simply to these. + +use crate::{ + ForkSync, + NUM_PHIL, +}; +use zephyr::kobj_define; +use zephyr::sys::sync::Mutex; +use zephyr::time::Forever; + +type SysMutexes = [Mutex; NUM_PHIL]; + +/// A simple implementation of ForkSync based on underlying Zephyr sys::Mutex, which uses explicit +/// lock and release semantics. + +#[derive(Debug)] +pub struct SysMutexSync { + locks: SysMutexes, +} + +impl SysMutexSync { + #[allow(dead_code)] + pub fn new() -> SysMutexSync { + let locks = MUTEXES.each_ref().map(|m| { + m.init_once(()).unwrap() + }); + SysMutexSync { locks } + } +} + +impl ForkSync for SysMutexSync { + fn take(&self, index: usize) { + self.locks[index].lock(Forever).unwrap(); + } + + fn release(&self, index: usize) { + unsafe { + self.locks[index].unlock().unwrap(); + } + } +} + +kobj_define! { + static MUTEXES: [StaticMutex; NUM_PHIL]; +} From 102478e85d83dad6cc208fc9d6792410c01d04de Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 15 Oct 2024 16:52:34 -0600 Subject: [PATCH 05/26] zephyr: sync: Create Mutex/Condvar Create higher-level Mutex and Condvar types that are similar to std::sync::Mutex and std::sync::Condvar. The main difference is that the only current constructor for this requires the sys Mutex and sys Condvar from the sys versions that are statically allocated. Signed-off-by: David Brown --- zephyr/src/sync.rs | 237 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 237 insertions(+) diff --git a/zephyr/src/sync.rs b/zephyr/src/sync.rs index bdce4e20..996c0ff4 100644 --- a/zephyr/src/sync.rs +++ b/zephyr/src/sync.rs @@ -5,6 +5,16 @@ //! [`crossbeam-channel`](https://docs.rs/crossbeam-channel/latest/crossbeam_channel/), in as much //! as it makes sense. +use core::{ + cell::UnsafeCell, + fmt, + marker::PhantomData, + ops::{Deref, DerefMut}, +}; + +use crate::time::{Forever, NoWait}; +use crate::sys::sync as sys; + pub mod atomic { //! Re-export portable atomic. //! @@ -22,3 +32,230 @@ pub mod atomic { #[cfg(CONFIG_RUST_ALLOC)] pub use portable_atomic_util::Arc; + +// Channels are currently only available with allocation. Bounded channels later might be +// available. + +/// Until poisoning is implemented, mutexes never return an error, and we just get back the guard. +pub type LockResult = Result; + +/// The return type from [`Mutex::try_lock`]. +/// +/// The error indicates the reason for the failure. Until poisoning is +/// implemented, there is only a single type of failure. +pub type TryLockResult = Result; + +/// An enumeration of possible errors associated with a [`TryLockResult`]. +/// +/// Note that until Poisoning is implemented, there is only one value of this. +pub enum TryLockError { + /// The lock could not be acquired at this time because the operation would otherwise block. + WouldBlock, +} + +/// A mutual exclusion primitive useful for protecting shared data. +/// +/// This mutex will block threads waiting for the lock to become available. This is modeled after +/// [`std::sync::Mutex`](https://doc.rust-lang.org/stable/std/sync/struct.Mutex.html), and attempts +/// to implement that API as closely as makes sense on Zephyr. Currently, it has the following +/// differences: +/// - Poisoning: This does not yet implement poisoning, as there is no way to recover from panic at +/// this time on Zephyr. +/// - Allocation: `new` is not yet provided, and will be provided once kernel object pools are +/// implemented. Please use `new_from` which takes a reference to a statically allocated +/// `sys::Mutex`. +pub struct Mutex { + inner: sys::Mutex, + // poison: ... + data: UnsafeCell, +} + +// At least if correctly done, the Mutex provides for Send and Sync as long as the inner data +// supports Send. +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} + +impl fmt::Debug for Mutex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Mutex {:?}", self.inner) + } +} + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is dropped (faslls +/// out of scope), the lock will be unlocked. +/// +/// The data protected by the mutex can be accessed through this guard via its [`Deref`] and +/// [`DerefMut`] implementations. +/// +/// This structure is created by the [`lock`] and [`try_lock`] methods on [`Mutex`]. +/// +/// [`lock`]: Mutex::lock +/// [`try_lock`]: Mutex::try_lock +/// +/// Taken directly from +/// [`std::sync::MutexGuard`](https://doc.rust-lang.org/stable/std/sync/struct.MutexGuard.html). +pub struct MutexGuard<'a, T: ?Sized + 'a> { + lock: &'a Mutex, + // until is implemented, we have to mark unsend + // explicitly. This can be done by holding Phantom data with an unsafe cell in it. + _nosend: PhantomData>, +} + +// Make sure the guard doesn't get sent. +// Negative trait bounds are unstable, see marker above. +// impl !Send for MutexGuard<'_, T> {} +unsafe impl Sync for MutexGuard<'_, T> {} + +impl Mutex { + /// Construct a new wrapped Mutex, using the given underlying sys mutex. This is different that + /// `std::sync::Mutex` in that in Zephyr, objects are frequently allocated statically, and the + /// sys Mutex will be taken by this structure. It is safe to share the underlying Mutex between + /// different items, but without careful use, it is easy to deadlock, so it is not recommended. + pub const fn new_from(t: T, raw_mutex: sys::Mutex) -> Mutex { + Mutex { inner: raw_mutex, data: UnsafeCell::new(t) } + } +} + +impl Mutex { + /// Acquires a mutex, blocking the current thread until it is able to do so. + /// + /// This function will block the local thread until it is available to acquire the mutex. Upon + /// returning, the thread is the only thread with the lock held. An RAII guard is returned to + /// allow scoped unlock of the lock. When the guard goes out of scope, the mutex will be + /// unlocked. + /// + /// In `std`, an attempt to lock a mutex by a thread that already holds the mutex is + /// unspecified. Zephyr explicitly supports this behavior, by simply incrementing a lock + /// count. + pub fn lock(&self) -> LockResult> { + // With `Forever`, should never return an error. + self.inner.lock(Forever).unwrap(); + unsafe { + Ok(MutexGuard::new(self)) + } + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then [`Err`] is returned. Otherwise, an RAII + /// guard is returned. The lock will be unlocked when the guard is dropped. + /// + /// This function does not block. + pub fn try_lock(&self) -> TryLockResult> { + match self.inner.lock(NoWait) { + Ok(()) => { + unsafe { + Ok(MutexGuard::new(self)) + } + } + // TODO: It might be better to distinguish these errors, and only return the WouldBlock + // if that is the corresponding error. But, the lock shouldn't fail in Zephyr. + Err(_) => { + Err(TryLockError::WouldBlock) + } + } + } +} + +impl<'mutex, T: ?Sized> MutexGuard<'mutex, T> { + unsafe fn new(lock: &'mutex Mutex) -> MutexGuard<'mutex, T> { + // poison todo + MutexGuard { lock, _nosend: PhantomData } + } +} + +impl Deref for MutexGuard<'_, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { + &*self.lock.data.get() + } + } +} + +impl DerefMut for MutexGuard<'_, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.lock.data.get() } + } +} + +impl Drop for MutexGuard<'_, T> { + #[inline] + fn drop(&mut self) { + unsafe { + self.lock.inner.unlock().unwrap(); + } + } +} + +/// Inspired by +/// [`std::sync::Condvar`](https://doc.rust-lang.org/stable/std/sync/struct.Condvar.html), +/// implemented directly using `z_condvar` in Zephyr. +/// +/// Condition variables represent the ability to block a thread such that it consumes no CPU time +/// while waiting for an even to occur. Condition variables are typically associated with a +/// boolean predicate (a condition) and a mutex. The predicate is always verified inside of the +/// mutex before determining that a thread must block. +/// +/// Functions in this module will block the current **thread** of execution. Note that any attempt +/// to use multiple mutexces on the same condition variable may result in a runtime panic. +pub struct Condvar { + inner: sys::Condvar, +} + +impl Condvar { + /// Construct a new wrapped Condvar, using the given underlying `k_condvar`. + /// + /// This is different from `std::sync::Condvar` in that in Zephyr, objects are frequently + /// allocated statically, and the sys Condvar will be taken by this structure. + pub const fn new_from(raw_condvar: sys::Condvar) -> Condvar { + Condvar { inner: raw_condvar } + } + + /// Blocks the current thread until this conditional variable receives a notification. + /// + /// This function will automatically unlock the mutex specified (represented by `guard`) and + /// block the current thread. This means that any calls to `notify_one` or `notify_all` which + /// happen logically after the mutex is unlocked are candidates to wake this thread up. When + /// this function call returns, the lock specified will have been re-equired. + /// + /// Note that this function is susceptable to spurious wakeups. Condition variables normally + /// have a boolean predicate associated with them, and the predicate must always be checked + /// each time this function returns to protect against spurious wakeups. + pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> LockResult> { + self.inner.wait(&guard.lock.inner); + Ok(guard) + } + + // TODO: wait_while + // TODO: wait_timeout_ms + // TODO: wait_timeout + // TODO: wait_timeout_while + + /// Wakes up one blocked thread on this condvar. + /// + /// If there is a blocked thread on this condition variable, then it will be woken up from its + /// call to `wait` or `wait_timeout`. Calls to `notify_one` are not buffered in any way. + /// + /// To wakeup all threads, see `notify_all`. + pub fn notify_one(&self) { + self.inner.notify_one(); + } + + /// Wakes up all blocked threads on this condvar. + /// + /// This methods will ensure that any current waiters on the condition variable are awoken. + /// Calls to `notify_all()` are not buffered in any way. + /// + /// To wake up only one thread, see `notify_one`. + pub fn notify_all(&self) { + self.inner.notify_all(); + } +} + +impl fmt::Debug for Condvar { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Condvar {:?}", self.inner) + } +} From 2a8a82db024bd40e40287ef637c0d71abf00e743 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 15 Oct 2024 16:59:08 -0600 Subject: [PATCH 06/26] samples: philosophers: Add sync using Mutex/Condvar Build a syncer that coordinates the forks using a single Mutex/Condvar pair, where the Mutex protects the data, and Condvar is used to coordinate. This is a common paradigm for shared synchronization. Signed-off-by: David Brown --- samples/philosophers/Kconfig | 8 +++- samples/philosophers/sample.yaml | 5 +++ samples/philosophers/src/condsync.rs | 58 ++++++++++++++++++++++++++++ samples/philosophers/src/lib.rs | 16 ++++++++ 4 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 samples/philosophers/src/condsync.rs diff --git a/samples/philosophers/Kconfig b/samples/philosophers/Kconfig index 3739df1a..a78142c0 100644 --- a/samples/philosophers/Kconfig +++ b/samples/philosophers/Kconfig @@ -7,7 +7,7 @@ source "Kconfig.zephyr" choice prompt "Select Synchronization implementation" - default SYNC_SYS_MUTEX + default SYNC_CONDVAR config SYNC_SYS_SEMAPHORE bool "Use sys::Semaphore to synchronize forks" @@ -21,4 +21,10 @@ choice Use to have the dining philosophers sample use sys::Mutex, with one per fork, to synchronize. + config SYNC_CONDVAR + bool "Use sync::Condvar and sync::Mutex to synchronize forks" + help + Use to have the dining philosophers sample use a single data structure, protected + by a sync::Mutex and coordinated with a sync::Condvar, to synchronize. + endchoice diff --git a/samples/philosophers/sample.yaml b/samples/philosophers/sample.yaml index 604abc1a..94a92847 100644 --- a/samples/philosophers/sample.yaml +++ b/samples/philosophers/sample.yaml @@ -25,3 +25,8 @@ tests: min_ram: 32 extra_configs: - CONFIG_SYNC_SYS_MUTEX=y + sample.rust.philosopher.condvar: + tags: introduction + min_ram: 32 + extra_configs: + - CONFIG_SYNC_CONDVAR=y diff --git a/samples/philosophers/src/condsync.rs b/samples/philosophers/src/condsync.rs new file mode 100644 index 00000000..58cc45f3 --- /dev/null +++ b/samples/philosophers/src/condsync.rs @@ -0,0 +1,58 @@ +// Copyright (c) 2024 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! # sync::Mutex/sync::Condvar implementation of ForkSync +//! +//! This implementation of the Fork synchronizer uses a single data object, protected by a +//! `sync::Mutex`, and coordinated by a `sync::Condvar`. + +use crate::{ + ForkSync, + NUM_PHIL, +}; +use zephyr::kobj_define; +use zephyr::sync::Mutex; +use zephyr::sync::Condvar; +// use zephyr::time::Forever; + +#[derive(Debug)] +pub struct CondSync { + /// The lock that holds the flag for each philosopher. + lock: Mutex<[bool; NUM_PHIL]>, + /// Condition variable to wake other threads. + cond: Condvar, +} + +impl CondSync { + #[allow(dead_code)] + pub fn new() -> CondSync { + let sys_mutex = MUTEX.init_once(()).unwrap(); + let sys_condvar = CONDVAR.init_once(()).unwrap(); + + let lock = Mutex::new_from([false; NUM_PHIL], sys_mutex); + let cond = Condvar::new_from(sys_condvar); + CondSync { lock, cond } + } +} + +impl ForkSync for CondSync { + fn take(&self, index: usize) { + let mut lock = self.lock.lock().unwrap(); + while lock[index] { + lock = self.cond.wait(lock).unwrap(); + } + lock[index] = true; + } + + fn release(&self, index: usize) { + let mut lock = self.lock.lock().unwrap(); + lock[index] = false; + // No predictible waiter, so must wake everyone. + self.cond.notify_all(); + } +} + +kobj_define! { + static MUTEX: StaticMutex; + static CONDVAR: StaticCondvar; +} diff --git a/samples/philosophers/src/lib.rs b/samples/philosophers/src/lib.rs index a87a3f54..73915328 100644 --- a/samples/philosophers/src/lib.rs +++ b/samples/philosophers/src/lib.rs @@ -23,10 +23,13 @@ use zephyr::{ // These are optional, based on Kconfig, so allow them to be unused. #[allow(unused_imports)] +use crate::condsync::CondSync; +#[allow(unused_imports)] use crate::sysmutex::SysMutexSync; #[allow(unused_imports)] use crate::semsync::semaphore_sync; +mod condsync; mod sysmutex; mod semsync; @@ -97,6 +100,19 @@ fn get_syncer() -> Vec> { result } +#[cfg(CONFIG_SYNC_CONDVAR)] +fn get_syncer() -> Vec> { + // Condvar version + let syncer = Box::new(CondSync::new()) + as Box; + let syncer: Arc = Arc::from(syncer); + let mut result = Vec::new(); + for _ in 0..NUM_PHIL { + result.push(syncer.clone()); + } + result +} + fn phil_thread(n: usize, syncer: Arc) { printkln!("Child {} started: {:?}", n, syncer); From 1c8d82aa23aa8164fa4614f8be5ce4af07e6b901 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 15 Oct 2024 17:24:29 -0600 Subject: [PATCH 07/26] zephyr: sys: Create queue type based on k_queue A simple wrapper around Zephyr's k_queue. Signed-off-by: David Brown --- zephyr/src/sys.rs | 1 + zephyr/src/sys/queue.rs | 79 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 zephyr/src/sys/queue.rs diff --git a/zephyr/src/sys.rs b/zephyr/src/sys.rs index dfb16e4d..0783d3af 100644 --- a/zephyr/src/sys.rs +++ b/zephyr/src/sys.rs @@ -11,6 +11,7 @@ use zephyr_sys::k_timeout_t; +pub mod queue; pub mod sync; pub mod thread; diff --git a/zephyr/src/sys/queue.rs b/zephyr/src/sys/queue.rs new file mode 100644 index 00000000..cc634c84 --- /dev/null +++ b/zephyr/src/sys/queue.rs @@ -0,0 +1,79 @@ +//! Lightweight wrapper around Zephyr's `k_queue`. +//! +//! The underlying operations on the `k_queue` are all unsafe, as the model does not match the +//! borrowing model that Rust expects. This module is mainly intended to be used by the +//! implementation of `zephyr::sys::channel`, which can be used without needing unsafe. + +use core::ffi::c_void; + +use zephyr_sys::{ + k_queue, + k_queue_init, + k_queue_append, + k_queue_get, +}; + +use crate::sys::K_FOREVER; +use crate::object::{StaticKernelObject, Wrapped}; + +/// A wrapper around a Zephyr `k_queue` object. +#[derive(Clone, Debug)] +pub struct Queue { + item: *mut k_queue, +} + +unsafe impl Sync for StaticKernelObject { } + +unsafe impl Sync for Queue { } +unsafe impl Send for Queue { } + +impl Queue { + /// Append an element to the end of a queue. + /// + /// This adds an element to the given [`Queue`]. Zephyr requires the + /// first word of this message to be available for the OS to enqueue + /// the message. See [`Message`] for details on how this can be used + /// safely. + /// + /// [`Message`]: crate::sync::channel::Message + pub unsafe fn send(&self, data: *mut c_void) { + k_queue_append(self.item, data) + } + + /// Get an element from a queue. + /// + /// This routine removes the first data item from the [`Queue`]. + pub unsafe fn recv(&self) -> *mut c_void { + k_queue_get(self.item, K_FOREVER) + } +} + +impl Wrapped for StaticKernelObject { + type T = Queue; + + type I = (); + + fn get_wrapped(&self, _arg: Self::I) -> Queue { + let ptr = self.value.get(); + unsafe { + k_queue_init(ptr); + } + Queue { + item: ptr, + } + } +} + +/// A statically defined Zephyr `k_queue`. +/// +/// This should be declared as follows: +/// ``` +/// kobj_define! { +/// static MY_QUEUE: StaticQueue; +/// } +/// +/// let my_queue = MY_QUEUE.init_once(()); +/// +/// my_queue.send(...); +/// ``` +pub type StaticQueue = StaticKernelObject; From 9187024a6e9cb8c91142de692fe9eb8f76aa0182 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 15 Oct 2024 17:08:46 -0600 Subject: [PATCH 08/26] zephyr: sync: channel: Create unbounded channels Create an implementation of bounded channels, in the spirit of crossbeam-channel. Currently, only the bounded channels are supported, an as we don't yet support recovery from panic, ther is no poisoning. As the underlying Zephyr queues don't support deallocation, drop is also a no-op. Signed-off-by: David Brown --- samples/philosophers/src/channel.rs | 176 +++++++++++++++++++++++++ zephyr/src/sync.rs | 5 + zephyr/src/sync/channel.rs | 191 ++++++++++++++++++++++++++++ zephyr/src/sync/channel/counter.rs | 151 ++++++++++++++++++++++ 4 files changed, 523 insertions(+) create mode 100644 samples/philosophers/src/channel.rs create mode 100644 zephyr/src/sync/channel.rs create mode 100644 zephyr/src/sync/channel/counter.rs diff --git a/samples/philosophers/src/channel.rs b/samples/philosophers/src/channel.rs new file mode 100644 index 00000000..baa7762b --- /dev/null +++ b/samples/philosophers/src/channel.rs @@ -0,0 +1,176 @@ +// Copyright (c) 2023 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! Synchronizer using channels +//! +//! Synchronize between the philosophers using channels to communicate with a thread that handles +//! the messages. + +extern crate alloc; + +use alloc::vec::Vec; +use alloc::boxed::Box; + +use zephyr::sync::channel::{self, Receiver, Sender}; +use zephyr::{ + kobj_define, + sync::Arc, +}; +use zephyr::object::KobjInit; + +use crate::{NUM_PHIL, ForkSync}; + +/// An implementation of ForkSync that uses a server commnicated with channels to perform the +/// synchronization. +#[derive(Debug)] +struct ChannelSync { + command: Sender, + reply_send: Sender<()>, + reply_recv: Receiver<()>, +} + +#[derive(Debug)] +enum Command { + Acquire(usize, Sender<()>), + Release(usize), +} + +/// This implements a single Fork on the server side for the ChannelSync. +enum ChannelFork { + /// The fork is free, + Free, + /// The work is in use, nobody is waiting. + InUse, + /// The fork is in use, and someone is waiting on it. + InUseWait(Sender<()>), +} + +impl Default for ChannelFork { + fn default() -> Self { + ChannelFork::Free + } +} + +impl ChannelFork { + /// Attempt to aquire the work. If it is free, reply to the sender, otherwise, track them to + /// reply to them when the fork is freed up. + fn acquire(&mut self, reply: Sender<()>) { + // For debugging, just stop here, and wait for a stack report. + let next = match *self { + ChannelFork::Free => { + // Reply immediately that this fork is free. + reply.send(()).unwrap(); + ChannelFork::InUse + } + ChannelFork::InUse => { + // The fork is being used, become the waiter. + ChannelFork::InUseWait(reply) + } + ChannelFork::InUseWait(_) => { + // There is already a wait. Something has gone wrong as this should never happen. + panic!("Mutliple waiters on fork"); + } + }; + *self = next; + } + + /// Release the fork. This is presumably sent from the same sender that requested it, although + /// this is not checked. + fn release(&mut self) { + let next = match self { + ChannelFork::Free => { + // An error case, the fork is not in use, it shouldn't be freed. + panic!("Release of fork that is not in use"); + } + ChannelFork::InUse => { + // The fork is in use, and nobody else is waiting. + ChannelFork::Free + } + ChannelFork::InUseWait(waiter) => { + // The fork is in use by us, and someone else is waiting. Tell the other waiter + // they now have the work. + waiter.send(()).unwrap(); + ChannelFork::InUse + } + }; + *self = next; + } +} + +impl ChannelSync { + pub fn new( + command: Sender, + reply: (Sender<()>, Receiver<()>)) -> ChannelSync + { + ChannelSync { + command, + reply_send: reply.0, + reply_recv: reply.1, + } + } +} + +/// Generate a syncer out of a ChannelSync. +#[allow(dead_code)] +pub fn get_channel_syncer() -> Vec> { + COMMAND_QUEUE.init(); + let command_queue = COMMAND_QUEUE.get(); + let (cq_send, cq_recv) = channel::unbounded_from(command_queue); + let reply_queues = REPLY_QUEUES.each_ref().map(|m| { + m.init(); + channel::unbounded_from(m.get()) + }); + let syncer = reply_queues.into_iter().map(|rqueue| { + let item = Box::new(ChannelSync::new(cq_send.clone(), rqueue)) + as Box; + Arc::from(item) + }); + + let channel_thread = CHANNEL_THREAD.spawn(CHANNEL_STACK.token(), move || { + channel_thread(cq_recv); + }); + channel_thread.start(); + + syncer.collect() +} + +/// The thread that handles channel requests. +/// +/// Spawned when we are using the channel syncer. +fn channel_thread(cq_recv: Receiver) { + let mut forks = [(); NUM_PHIL].each_ref().map(|_| ChannelFork::default()); + + loop { + match cq_recv.recv().unwrap() { + Command::Acquire(fork, reply) => { + forks[fork].acquire(reply); + } + Command::Release(fork) => { + forks[fork].release(); + } + } + } +} + +impl ForkSync for ChannelSync { + fn take(&self, index: usize) { + self.command.send(Command::Acquire(index, self.reply_send.clone())).unwrap(); + // When the reply comes, we know we have the resource. + self.reply_recv.recv().unwrap(); + } + + fn release(&self, index: usize) { + self.command.send(Command::Release(index)).unwrap(); + // Release does not have a reply. + } +} + +kobj_define! { + static CHANNEL_STACK: ThreadStack<2054>; + static CHANNEL_THREAD: StaticThread; + + // For communicating using Queue, there is one to the main thread (the manager), and one back + // to each philosopher. + static COMMAND_QUEUE: StaticQueue; + static REPLY_QUEUES: [StaticQueue; NUM_PHIL]; +} diff --git a/zephyr/src/sync.rs b/zephyr/src/sync.rs index 996c0ff4..54b8f762 100644 --- a/zephyr/src/sync.rs +++ b/zephyr/src/sync.rs @@ -15,6 +15,11 @@ use core::{ use crate::time::{Forever, NoWait}; use crate::sys::sync as sys; +// Channels are currently only available with allocation. Bounded channels later might be +// available. +#[cfg(CONFIG_RUST_ALLOC)] +pub mod channel; + pub mod atomic { //! Re-export portable atomic. //! diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs new file mode 100644 index 00000000..c2fc5afe --- /dev/null +++ b/zephyr/src/sync/channel.rs @@ -0,0 +1,191 @@ +//! Close-to-Zephyr channels +//! +//! This module attempts to provide a mechanism as close as possible to `crossbeam-channel` as we +//! can get, directly using Zephyr primitives. +//! +//! The channels are built around `k_queue` in Zephyr. As is the case with most Zephyr types, +//! these are typically statically allocated. Similar to the other close-to-zephyr primitives, +//! this means that there is a constructor that can directly take one of these primitives. +//! +//! In other words, `zephyr::sys::Queue` is a Rust friendly implementation of `k_queue` in Zephyr. +//! This module provides `Sender` and `Receiver`, which can be cloned and behave as if they had an +//! internal `Arc` inside them, but without the overhead of an actual Arc. + +extern crate alloc; + +use alloc::boxed::Box; + +use core::ffi::c_void; +use core::fmt; +use core::marker::PhantomData; + +use crate::sys::queue::Queue; + +mod counter; + +// The zephyr queue does not allocate or manage the data of the messages, so we need to handle +// allocation as such as well. However, we don't need to manage anything, so it is sufficient to +// simply Box the message, leak it out of the box, and give it to Zephyr, and then on receipt, wrap +// it back into a Box, and give it to the recipient. + +/// Create a multi-producer multi-consumer channel of unbounded capacity. The messages are +/// allocated individually as "Box", and the queue is managed by the underlying Zephyr queue. +pub fn unbounded_from(queue: Queue) -> (Sender, Receiver) { + let (s, r) = counter::new(queue); + let s = Sender { + queue: s, + _phantom: PhantomData, + }; + let r = Receiver { + queue: r, + _phantom: PhantomData, + }; + (s, r) +} + +/// The underlying type for Messages through Zephyr's [`Queue`]. +/// +/// This wrapper is used internally to wrap user messages through the queue. It is not useful in +/// safe code, but may be useful for implementing other types of message queues. +#[repr(C)] +pub struct Message { + /// The private data used by the kernel to enqueue messages and such. + _private: usize, + /// The actual data being transported. + data: T, +} + +impl Message { + fn new(data: T) -> Message { + Message { + _private: 0, + data, + } + } +} + +/// The sending side of a channel. +pub struct Sender { + queue: counter::Sender, + _phantom: PhantomData, +} + +unsafe impl Send for Sender {} +unsafe impl Sync for Sender {} + +impl Sender { + /// Sends a message over the given channel. This will perform an alloc of the message, which + /// will have an accompanied free on the recipient side. + pub fn send(&self, msg: T) -> Result<(), SendError> { + let msg = Box::new(Message::new(msg)); + let msg = Box::into_raw(msg); + unsafe { + self.queue.send(msg as *mut c_void); + } + Ok(()) + } +} + +impl Drop for Sender { + fn drop(&mut self) { + unsafe { + self.queue.release(|_| { + crate::printkln!("Release"); + true + }) + } + } +} + +impl Clone for Sender { + fn clone(&self) -> Self { + Sender { + queue: self.queue.acquire(), + _phantom: PhantomData, + } + } +} + +impl fmt::Debug for Sender { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Sender {:?}", *self.queue) + } +} + +/// The receiving side of a channel. +pub struct Receiver { + queue: counter::Receiver, + _phantom: PhantomData, +} + +unsafe impl Send for Receiver {} +unsafe impl Sync for Receiver {} + +impl Receiver { + /// Blocks the current thread until a message is received or the channel is empty and + /// disconnected. + /// + /// If the channel is empty and not disconnected, this call will block until the receive + /// operation can proceed. If the channel is empty and becomes disconnected, this call will + /// wake up and return an error. + pub fn recv(&self) -> Result { + let msg = unsafe { + self.queue.recv() + }; + let msg = msg as *mut Message; + let msg = unsafe { Box::from_raw(msg) }; + Ok(msg.data) + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + unsafe { + self.queue.release(|_| { + crate::printkln!("Release"); + true + }) + } + } +} + +impl Clone for Receiver { + fn clone(&self) -> Self { + Receiver { + queue: self.queue.acquire(), + _phantom: PhantomData, + } + } +} + +impl fmt::Debug for Receiver { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Sender {:?}", *self.queue) + } +} + +// TODO: Move to err + +/// An error returned from the [`send`] method. +/// +/// The message could not be sent because the channel is disconnected. +/// +/// The error contains the message so it can be recovered. +/// +/// [`send`]: Sender::send +#[derive(PartialEq, Eq, Clone, Copy)] +pub struct SendError(pub T); + +impl fmt::Debug for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "SendError(..)".fmt(f) + } +} + +/// An error returned from the [`recv`] method. +/// +/// A message could not be received because the channel is empty and disconnected. +/// +/// [`recv`]: Receiver::recv +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +pub struct RecvError; diff --git a/zephyr/src/sync/channel/counter.rs b/zephyr/src/sync/channel/counter.rs new file mode 100644 index 00000000..4b28ae84 --- /dev/null +++ b/zephyr/src/sync/channel/counter.rs @@ -0,0 +1,151 @@ +// SPDX-License-Identifier: Apache-2.0 +//! Reference counter for channels. + +// This file is taken from crossbeam-channels, with modifications to be nostd. + + +extern crate alloc; + +use alloc::boxed::Box; +use core::ops; +use core::ptr::NonNull; +use crate::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +/// Reference counter internals. +struct Counter { + /// The number of senders associated with the channel. + senders: AtomicUsize, + + /// The number of receivers associated with the channel. + receivers: AtomicUsize, + + /// Set to `true` if the last sender or the last receiver reference deallocates the channel. + destroy: AtomicBool, + + /// The internal channel. + chan: C, +} + +/// Wraps a channel into the reference counter. +pub(crate) fn new(chan: C) -> (Sender, Receiver) { + let counter = NonNull::from(Box::leak(Box::new(Counter { + senders: AtomicUsize::new(1), + receivers: AtomicUsize::new(1), + destroy: AtomicBool::new(false), + chan, + }))); + let s = Sender { counter }; + let r = Receiver { counter }; + (s, r) +} + +/// The sending side. +pub(crate) struct Sender { + counter: NonNull>, +} + +impl Sender { + /// Returns the internal `Counter`. + fn counter(&self) -> &Counter { + unsafe { self.counter.as_ref() } + } + + /// Acquires another sender reference. + pub(crate) fn acquire(&self) -> Self { + let count = self.counter().senders.fetch_add(1, Ordering::Relaxed); + + // Cloning senders and calling `mem::forget` on the clones could potentially overflow the + // counter. It's very difficult to recover sensibly from such degenerate scenarios so we + // just abort when the count becomes very large. + if count > isize::MAX as usize { + // TODO: We need some kind of equivalent here. + unimplemented!(); + } + + Self { + counter: self.counter, + } + } + + /// Releases the sender reference. + /// + /// Function `disconnect` will be called if this is the last sender reference. + pub(crate) unsafe fn release bool>(&self, disconnect: F) { + if self.counter().senders.fetch_sub(1, Ordering::AcqRel) == 1 { + disconnect(&self.counter().chan); + + if self.counter().destroy.swap(true, Ordering::AcqRel) { + drop(unsafe { Box::from_raw(self.counter.as_ptr()) }); + } + } + } +} + +impl ops::Deref for Sender { + type Target = C; + + fn deref(&self) -> &C { + &self.counter().chan + } +} + +impl PartialEq for Sender { + fn eq(&self, other: &Self) -> bool { + self.counter == other.counter + } +} + +/// The receiving side. +pub(crate) struct Receiver { + counter: NonNull>, +} + +impl Receiver { + /// Returns the internal `Counter`. + fn counter(&self) -> &Counter { + unsafe { self.counter.as_ref() } + } + + /// Acquires another receiver reference. + pub(crate) fn acquire(&self) -> Self { + let count = self.counter().receivers.fetch_add(1, Ordering::Relaxed); + + // Cloning receivers and calling `mem::forget` on the clones could potentially overflow the + // counter. It's very difficult to recover sensibly from such degenerate scenarios so we + // just abort when the count becomes very large. + if count > isize::MAX as usize { + unimplemented!(); + } + + Self { + counter: self.counter, + } + } + + /// Releases the receiver reference. + /// + /// Function `disconnect` will be called if this is the last receiver reference. + pub(crate) unsafe fn release bool>(&self, disconnect: F) { + if self.counter().receivers.fetch_sub(1, Ordering::AcqRel) == 1 { + disconnect(&self.counter().chan); + + if self.counter().destroy.swap(true, Ordering::AcqRel) { + drop(unsafe { Box::from_raw(self.counter.as_ptr()) }); + } + } + } +} + +impl ops::Deref for Receiver { + type Target = C; + + fn deref(&self) -> &C { + &self.counter().chan + } +} + +impl PartialEq for Receiver { + fn eq(&self, other: &Self) -> bool { + self.counter == other.counter + } +} From 1e62073ce7e1a0625d59bfff8ad43a9beb66e21b Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 15 Oct 2024 17:26:01 -0600 Subject: [PATCH 09/26] samples: philosophers: Add implementation for channels Add a synchronizer for forks based on sending messages over channels to a worker thread. Signed-off-by: David Brown --- samples/philosophers/Kconfig | 8 +++++++- samples/philosophers/sample.yaml | 5 +++++ samples/philosophers/src/channel.rs | 11 ++++------- samples/philosophers/src/lib.rs | 8 ++++++++ zephyr/src/object.rs | 13 +++++++++++++ 5 files changed, 37 insertions(+), 8 deletions(-) diff --git a/samples/philosophers/Kconfig b/samples/philosophers/Kconfig index a78142c0..97dfe318 100644 --- a/samples/philosophers/Kconfig +++ b/samples/philosophers/Kconfig @@ -7,7 +7,7 @@ source "Kconfig.zephyr" choice prompt "Select Synchronization implementation" - default SYNC_CONDVAR + default SYNC_CHANNEL config SYNC_SYS_SEMAPHORE bool "Use sys::Semaphore to synchronize forks" @@ -27,4 +27,10 @@ choice Use to have the dining philosophers sample use a single data structure, protected by a sync::Mutex and coordinated with a sync::Condvar, to synchronize. + config SYNC_CHANNEL + bool "Use sync::channel to synchronize forks" + help + Use to have the dining philosophers sample use a worker thread, communicating via + channels to synchronize. + endchoice diff --git a/samples/philosophers/sample.yaml b/samples/philosophers/sample.yaml index 94a92847..4094502b 100644 --- a/samples/philosophers/sample.yaml +++ b/samples/philosophers/sample.yaml @@ -30,3 +30,8 @@ tests: min_ram: 32 extra_configs: - CONFIG_SYNC_CONDVAR=y + sample.rust.philosopher.channel: + tags: introduction + min_ram: 32 + extra_configs: + - CONFIG_SYNC_CHANNEL=y diff --git a/samples/philosophers/src/channel.rs b/samples/philosophers/src/channel.rs index baa7762b..5688c349 100644 --- a/samples/philosophers/src/channel.rs +++ b/samples/philosophers/src/channel.rs @@ -16,7 +16,6 @@ use zephyr::{ kobj_define, sync::Arc, }; -use zephyr::object::KobjInit; use crate::{NUM_PHIL, ForkSync}; @@ -113,12 +112,10 @@ impl ChannelSync { /// Generate a syncer out of a ChannelSync. #[allow(dead_code)] pub fn get_channel_syncer() -> Vec> { - COMMAND_QUEUE.init(); - let command_queue = COMMAND_QUEUE.get(); + let command_queue = COMMAND_QUEUE.init_once(()).unwrap(); let (cq_send, cq_recv) = channel::unbounded_from(command_queue); let reply_queues = REPLY_QUEUES.each_ref().map(|m| { - m.init(); - channel::unbounded_from(m.get()) + channel::unbounded_from(m.init_once(()).unwrap()) }); let syncer = reply_queues.into_iter().map(|rqueue| { let item = Box::new(ChannelSync::new(cq_send.clone(), rqueue)) @@ -126,10 +123,10 @@ pub fn get_channel_syncer() -> Vec> { Arc::from(item) }); - let channel_thread = CHANNEL_THREAD.spawn(CHANNEL_STACK.token(), move || { + let channel_child = CHANNEL_THREAD.init_once(CHANNEL_STACK.init_once(()).unwrap()).unwrap(); + channel_child.spawn(move || { channel_thread(cq_recv); }); - channel_thread.start(); syncer.collect() } diff --git a/samples/philosophers/src/lib.rs b/samples/philosophers/src/lib.rs index 73915328..3f9f00a4 100644 --- a/samples/philosophers/src/lib.rs +++ b/samples/philosophers/src/lib.rs @@ -27,8 +27,11 @@ use crate::condsync::CondSync; #[allow(unused_imports)] use crate::sysmutex::SysMutexSync; #[allow(unused_imports)] +use crate::channel::get_channel_syncer; +#[allow(unused_imports)] use crate::semsync::semaphore_sync; +mod channel; mod condsync; mod sysmutex; mod semsync; @@ -113,6 +116,11 @@ fn get_syncer() -> Vec> { result } +#[cfg(CONFIG_SYNC_CHANNEL)] +fn get_syncer() -> Vec> { + get_channel_syncer() +} + fn phil_thread(n: usize, syncer: Arc) { printkln!("Child {} started: {:?}", n, syncer); diff --git a/zephyr/src/object.rs b/zephyr/src/object.rs index d9d1c36b..1d325f2a 100644 --- a/zephyr/src/object.rs +++ b/zephyr/src/object.rs @@ -309,6 +309,19 @@ macro_rules! _kobj_rule { ($v:vis, $name:ident, [ThreadStack<{$size:expr}>; $asize:expr]) => { $crate::_kobj_stack!($v, $name, $size, $asize); }; + + // Queues. + ($v:vis, $name: ident, StaticQueue) => { + #[link_section = concat!("._k_queue.static.", stringify!($name), ".", file!(), line!())] + $v static $name: $crate::sys::queue::StaticQueue = + unsafe { ::core::mem::zeroed() }; + }; + + ($v:vis, $name: ident, [StaticQueue; $size:expr]) => { + #[link_section = concat!("._k_queue.static.", stringify!($name), ".", file!(), line!())] + $v static $name: [$crate::sys::queue::StaticQueue; $size] = + unsafe { ::core::mem::zeroed() }; + }; } #[doc(hidden)] From 099aaa7b879de34a7220d1537bc10c5819db61f7 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 15 Oct 2024 21:54:21 -0600 Subject: [PATCH 10/26] samples: philosophers: Add statistics Rather than just printing a bunch of information out as the various philosopher threads dine, use some data protected within a Mutex to collect statistics, and print those out periodically. Signed-off-by: David Brown --- samples/philosophers/sample.yaml | 5 +-- samples/philosophers/src/lib.rs | 55 +++++++++++++++++++++++++------- 2 files changed, 45 insertions(+), 15 deletions(-) diff --git a/samples/philosophers/sample.yaml b/samples/philosophers/sample.yaml index 4094502b..b59af7b1 100644 --- a/samples/philosophers/sample.yaml +++ b/samples/philosophers/sample.yaml @@ -8,10 +8,7 @@ common: regex: # Match the statistics, and make sure that each philosopher has at least 10 (two digits) # meals. - # - "^\\[\\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}\\]" - # - # Until the stastics have been implemented, just match on one of the children thinking - - "^Child 5 thinking \\(\\d+ ticks.*" + - "^\\[\\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}, \\d{2,}\\]" tags: rust filter: CONFIG_RUST_SUPPORTED tests: diff --git a/samples/philosophers/src/lib.rs b/samples/philosophers/src/lib.rs index 3f9f00a4..ed6a093e 100644 --- a/samples/philosophers/src/lib.rs +++ b/samples/philosophers/src/lib.rs @@ -18,7 +18,7 @@ use zephyr::{ printkln, kobj_define, sys::uptime_get, - sync::Arc, + sync::{Arc, Mutex}, }; // These are optional, based on Kconfig, so allow them to be unused. @@ -68,14 +68,17 @@ extern "C" fn rust_main() { zephyr::kconfig::CONFIG_BOARD); printkln!("Time tick: {}", zephyr::time::SYS_FREQUENCY); + let stats = Arc::new(Mutex::new_from(Stats::default(), STAT_MUTEX.init_once(()).unwrap())); + let syncers = get_syncer(); printkln!("Pre fork"); for (i, syncer) in (0..NUM_PHIL).zip(syncers.into_iter()) { + let child_stat = stats.clone(); let thread = PHIL_THREADS[i].init_once(PHIL_STACKS[i].init_once(()).unwrap()).unwrap(); thread.spawn(move || { - phil_thread(i, syncer); + phil_thread(i, syncer, child_stat); }); } @@ -83,6 +86,7 @@ extern "C" fn rust_main() { loop { // Periodically, printout the stats. zephyr::time::sleep(delay); + stats.lock().unwrap().show(); } } @@ -121,7 +125,7 @@ fn get_syncer() -> Vec> { get_channel_syncer() } -fn phil_thread(n: usize, syncer: Arc) { +fn phil_thread(n: usize, syncer: Arc, stats: Arc>) { printkln!("Child {} started: {:?}", n, syncer); // Determine our two forks. @@ -134,26 +138,26 @@ fn phil_thread(n: usize, syncer: Arc) { loop { { - printkln!("Child {} hungry", n); - printkln!("Child {} take left fork", n); + // printkln!("Child {} hungry", n); + // printkln!("Child {} take left fork", n); syncer.take(forks.0); - printkln!("Child {} take right fork", n); + // printkln!("Child {} take right fork", n); syncer.take(forks.1); let delay = get_random_delay(n, 25); - printkln!("Child {} eating ({} ms)", n, delay); + // printkln!("Child {} eating ({} ms)", n, delay); sleep(delay); - // stats.lock().unwrap().record_eat(n, delay); + stats.lock().unwrap().record_eat(n, delay); // Release the forks. - printkln!("Child {} giving up forks", n); + // printkln!("Child {} giving up forks", n); syncer.release(forks.1); syncer.release(forks.0); let delay = get_random_delay(n, 25); - printkln!("Child {} thinking ({} ms)", n, delay); + // printkln!("Child {} thinking ({} ms)", n, delay); sleep(delay); - // stats.lock().unwrap().record_think(n, delay); + stats.lock().unwrap().record_think(n, delay); } } } @@ -167,7 +171,36 @@ fn get_random_delay(id: usize, period: usize) -> Duration { Duration::millis_at_least(((delay + 1) * period) as Tick) } +/// Instead of just printint out so much information that the data just scolls by, gather +/// statistics. +#[derive(Default)] +struct Stats { + /// How many times each philosopher has gone through the loop. + count: [u64; NUM_PHIL], + /// How much time each philosopher has spent eating. + eating: [u64; NUM_PHIL], + /// How much time each philosopher has spent thinking. + thinking: [u64; NUM_PHIL], +} + +impl Stats { + fn record_eat(&mut self, index: usize, time: Duration) { + self.eating[index] += time.to_millis(); + } + + fn record_think(&mut self, index: usize, time: Duration) { + self.thinking[index] += time.to_millis(); + self.count[index] += 1; + } + + fn show(&self) { + printkln!("{:?}, e:{:?}, t:{:?}", self.count, self.eating, self.thinking); + } +} + kobj_define! { static PHIL_THREADS: [StaticThread; NUM_PHIL]; static PHIL_STACKS: [ThreadStack; NUM_PHIL]; + + static STAT_MUTEX: StaticMutex; } From 2413a53d38c37d9bbe70b7f9cc1b8629c62af6b5 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 17 Oct 2024 07:49:21 -0600 Subject: [PATCH 11/26] zephyr: Remove 'unsafe' from sys::Mutex::unlock Although this function has constraints on how it can be used (the thread that calls unlock must also have called lock). However, according to the documentation, it detects this, and returns an error. As such, the wrapper in Rust does not need to be `unsafe` but can merely reflect that error code in the `Result` that it returns. Signed-off-by: David Brown --- samples/philosophers/src/sysmutex.rs | 4 +--- zephyr/src/sys/sync/mutex.rs | 2 +- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/samples/philosophers/src/sysmutex.rs b/samples/philosophers/src/sysmutex.rs index 9203c29f..15a7e829 100644 --- a/samples/philosophers/src/sysmutex.rs +++ b/samples/philosophers/src/sysmutex.rs @@ -40,9 +40,7 @@ impl ForkSync for SysMutexSync { } fn release(&self, index: usize) { - unsafe { - self.locks[index].unlock().unwrap(); - } + self.locks[index].unlock().unwrap(); } } diff --git a/zephyr/src/sys/sync/mutex.rs b/zephyr/src/sys/sync/mutex.rs index 1760781c..9b63e588 100644 --- a/zephyr/src/sys/sync/mutex.rs +++ b/zephyr/src/sys/sync/mutex.rs @@ -75,7 +75,7 @@ impl Mutex { /// /// The mutex must already be locked by the calling thread. Mutexes may not be unlocked in /// ISRs. - pub unsafe fn unlock(&self) -> Result<()> { + pub fn unlock(&self) -> Result<()> { to_result_void(unsafe { k_mutex_unlock(self.item) }) } } From 2717d8bc277c0dc2954d86e536991e954da89ad7 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 17 Oct 2024 08:34:42 -0600 Subject: [PATCH 12/26] zephyr: sync: Remove unsafe block from unlock Now that `sys::Mutex::unlock` has lost its `unsafe`, we don't need an unsafe block for it. Signed-off-by: David Brown --- zephyr/src/sync.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/zephyr/src/sync.rs b/zephyr/src/sync.rs index 54b8f762..cd205ae1 100644 --- a/zephyr/src/sync.rs +++ b/zephyr/src/sync.rs @@ -188,9 +188,7 @@ impl DerefMut for MutexGuard<'_, T> { impl Drop for MutexGuard<'_, T> { #[inline] fn drop(&mut self) { - unsafe { - self.lock.inner.unlock().unwrap(); - } + self.lock.inner.unlock().unwrap(); } } From 93448bfbb8e44b8425c6cfb7e3668cad466429c5 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 17 Oct 2024 08:36:19 -0600 Subject: [PATCH 13/26] zephyr: object: Make StaticKernelObject::new unsafe This function returns initialized memory, and is therefore inherently unsafe. Added some commentary about how it is used safely. Signed-off-by: David Brown --- zephyr/src/object.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/zephyr/src/object.rs b/zephyr/src/object.rs index 1d325f2a..eed9951c 100644 --- a/zephyr/src/object.rs +++ b/zephyr/src/object.rs @@ -161,7 +161,7 @@ where /// Construct an empty of these objects, with the zephyr data zero-filled. This is safe in the /// sense that Zephyr we track the initialization, they start in the uninitialized state, and /// the zero value of the initialize atomic indicates that it is uninitialized. - pub const fn new() -> StaticKernelObject { + pub const unsafe fn new() -> StaticKernelObject { StaticKernelObject { value: unsafe { mem::zeroed() }, init: AtomicUsize::new(KOBJ_UNINITIALIZED), @@ -240,7 +240,7 @@ macro_rules! _kobj_rule { ($v:vis, $name:ident, StaticMutex) => { #[link_section = concat!("._k_mutex.static.", stringify!($name), ".", file!(), line!())] $v static $name: $crate::sys::sync::StaticMutex = - $crate::sys::sync::StaticMutex::new(); + unsafe { $crate::sys::sync::StaticMutex::new() }; }; // static NAMES: [StaticMutex; COUNT]; @@ -257,7 +257,7 @@ macro_rules! _kobj_rule { ($v:vis, $name:ident, StaticCondvar) => { #[link_section = concat!("._k_condvar.static.", stringify!($name), ".", file!(), line!())] $v static $name: $crate::sys::sync::StaticCondvar = - $crate::sys::sync::StaticCondvar::new(); + unsafe { $crate::sys::sync::StaticCondvar::new() }; }; // static NAMES: [StaticCondvar; COUNT]; From 058740f16276999dc8471947028bc5b4923c801d Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 17 Oct 2024 08:46:12 -0600 Subject: [PATCH 14/26] zephyr: object: Reorder declarations Move the Wrapped trait above the StaticKernelObject so that the traits are immediately declared after the type the apply to. Signed-off-by: David Brown --- zephyr/src/object.rs | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/zephyr/src/object.rs b/zephyr/src/object.rs index eed9951c..c142895d 100644 --- a/zephyr/src/object.rs +++ b/zephyr/src/object.rs @@ -91,28 +91,6 @@ use crate::sync::atomic::{AtomicUsize, Ordering}; // the mutations happen from C code, so this is less important than the data being placed in the // proper section. Many will have the link section overridden by the `kobj_define` macro. -/// A kernel object represented statically in Rust code. -/// -/// These should not be declared directly by the user, as they generally need linker decorations to -/// be properly registered in Zephyr as kernel objects. The object has the underlying Zephyr type -/// T, and the wrapper type W. -/// -/// Kernel objects will have their `StaticThing` implemented as `StaticKernelObject` where -/// `kobj` is the type of the underlying Zephyr object. `Thing` will usually be a struct with a -/// single field, which is a `*mut kobj`. -/// -/// TODO: Can we avoid the public fields with a const new method? -/// -/// TODO: Handling const-defined alignment for these. -pub struct StaticKernelObject { - #[allow(dead_code)] - /// The underlying zephyr kernel object. - pub value: UnsafeCell, - /// Initialization status of this object. Most objects will start uninitialized and be - /// initialized manually. - pub init: AtomicUsize, -} - /// Define the Wrapping of a kernel object. /// /// This trait defines the association between a static kernel object and the two associated Rust @@ -154,6 +132,28 @@ pub const KOBJ_INITING: usize = 1; /// take has been called. And shouldn't be allowed additional times. pub const KOBJ_INITIALIZED: usize = 2; +/// A kernel object represented statically in Rust code. +/// +/// These should not be declared directly by the user, as they generally need linker decorations to +/// be properly registered in Zephyr as kernel objects. The object has the underlying Zephyr type +/// T, and the wrapper type W. +/// +/// Kernel objects will have their `StaticThing` implemented as `StaticKernelObject` where +/// `kobj` is the type of the underlying Zephyr object. `Thing` will usually be a struct with a +/// single field, which is a `*mut kobj`. +/// +/// TODO: Can we avoid the public fields with a const new method? +/// +/// TODO: Handling const-defined alignment for these. +pub struct StaticKernelObject { + #[allow(dead_code)] + /// The underlying zephyr kernel object. + pub value: UnsafeCell, + /// Initialization status of this object. Most objects will start uninitialized and be + /// initialized manually. + pub init: AtomicUsize, +} + impl StaticKernelObject where StaticKernelObject: Wrapped, From 43c0c8addefc32a55abea7e9df921f9008a2a15e Mon Sep 17 00:00:00 2001 From: David Brown Date: Mon, 21 Oct 2024 08:31:12 -0600 Subject: [PATCH 15/26] zephyr: sys: sync: Remove Clone from Mutex and Semaphore Although these, in their current state, are safe to Clone, having these semantics will make it difficult for us to later add these types that are allocated from a pool. Uses that currently expect to clone can generally wrap these in an Arc, to allow for the sharing. Signed-off-by: David Brown --- samples/philosophers/src/semsync.rs | 8 +++----- zephyr/src/sys/sync/mutex.rs | 1 - zephyr/src/sys/sync/semaphore.rs | 1 - 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/samples/philosophers/src/semsync.rs b/samples/philosophers/src/semsync.rs index 03bf8ce9..889ab412 100644 --- a/samples/philosophers/src/semsync.rs +++ b/samples/philosophers/src/semsync.rs @@ -11,9 +11,7 @@ use alloc::vec::Vec; use alloc::boxed::Box; use zephyr::{ - kobj_define, - sync::Arc, - time::Forever, + kobj_define, sync::Arc, sys::sync::Semaphore, time::Forever }; use crate::{ForkSync, NUM_PHIL}; @@ -22,7 +20,7 @@ use crate::{ForkSync, NUM_PHIL}; pub struct SemSync { /// The forks for this philosopher. This is a big excessive, as we really don't need all of /// them, but the ForSync code uses the index here. - forks: [zephyr::sys::sync::Semaphore; NUM_PHIL], + forks: [Arc; NUM_PHIL], } impl ForkSync for SemSync { @@ -39,7 +37,7 @@ impl ForkSync for SemSync { pub fn semaphore_sync() -> Vec> { let forks = SEMS.each_ref().map(|m| { // Each fork starts as taken. - m.init_once((1, 1)).unwrap() + Arc::new(m.init_once((1, 1)).unwrap()) }); let syncers = (0..NUM_PHIL).map(|_| { diff --git a/zephyr/src/sys/sync/mutex.rs b/zephyr/src/sys/sync/mutex.rs index 9b63e588..41bfe461 100644 --- a/zephyr/src/sys/sync/mutex.rs +++ b/zephyr/src/sys/sync/mutex.rs @@ -52,7 +52,6 @@ use crate::sys::K_FOREVER; /// type. /// /// [`sync::Mutex`]: http://example.com/TODO -#[derive(Clone)] pub struct Mutex { /// The raw Zephyr mutex. item: *mut k_mutex, diff --git a/zephyr/src/sys/sync/semaphore.rs b/zephyr/src/sys/sync/semaphore.rs index 3c690092..edfafae4 100644 --- a/zephyr/src/sys/sync/semaphore.rs +++ b/zephyr/src/sys/sync/semaphore.rs @@ -31,7 +31,6 @@ use crate::{ pub use crate::raw::K_SEM_MAX_LIMIT; /// A zephyr `k_sem` usable from safe Rust code. -#[derive(Clone)] pub struct Semaphore { /// The raw Zephyr `k_sem`. item: *mut k_sem, From aaad7eef7864fbd1e5ca91b8933b7c259ff14ebb Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 24 Oct 2024 09:14:40 -0600 Subject: [PATCH 16/26] zephry: object: Create Fixed concept The `Fixed` type encapsulates something that can either be a statically allocated object or a dynamically allocated one. It is conditional on `CONFIG_RUST_ALLOC`, and if that is not defined, will just end up represented as the underlying static pointer. Signed-off-by: David Brown --- zephyr/src/object.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/zephyr/src/object.rs b/zephyr/src/object.rs index c142895d..a6d136ff 100644 --- a/zephyr/src/object.rs +++ b/zephyr/src/object.rs @@ -79,8 +79,17 @@ //! [`kobj_define!`]: crate::kobj_define //! [`init_once`]: StaticKernelObject::init_once +#[cfg(CONFIG_RUST_ALLOC)] +extern crate alloc; + use core::{cell::UnsafeCell, mem}; +#[cfg(CONFIG_RUST_ALLOC)] +use core::pin::Pin; + +#[cfg(CONFIG_RUST_ALLOC)] +use alloc::boxed::Box; + use crate::sync::atomic::{AtomicUsize, Ordering}; // The kernel object itself must be wrapped in `UnsafeCell` in Rust. This does several thing, but @@ -189,6 +198,41 @@ where } } +/// Objects that can be fixed or allocated. +/// +/// When using Rust threads from userspace, the `kobj_define` declarations and the complexity behind +/// it is required. If all Rust use of kernel objects is from system threads, and dynamic memory is +/// available, kernel objects can be freeallocated, as long as the allocations themselves are +/// pinned. This `Fixed` encapsulates both of these. +pub enum Fixed { + /// Objects that have been statically declared and just pointed to. + Static(*mut T), + /// Objects that are owned by the wrapper, and contained here. + #[cfg(CONFIG_RUST_ALLOC)] + Owned(Pin>>), +} + +impl Fixed { + /// Get the raw pointer out of the fixed object. + /// + /// Returns the `*mut T` pointer held by this object. It is either just the static pointer, or + /// the pointer outside of the unsafe cell holding the dynamic kernel object. + pub fn get(&self) -> *mut T { + match self { + Fixed::Static(ptr) => *ptr, + #[cfg(CONFIG_RUST_ALLOC)] + Fixed::Owned(item) => item.get(), + } + } + + /// Construct a new fixed from an allocation. Note that the object will not be fixed in memory, + /// until _after_ this returns, and it should not be initialized until then. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new(item: T) -> Fixed { + Fixed::Owned(Box::pin(UnsafeCell::new(item))) + } +} + /// Declare a static kernel object. This helps declaring static values of Zephyr objects. /// /// This can typically be used as: From fba339327b94c229ed78bf6497cd8cbe18d02c95 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 24 Oct 2024 09:18:22 -0600 Subject: [PATCH 17/26] zephyr: sys: sync: sempahore: Add dynamic support Add support for dynamically allocated Semaphores. These can be freely allocated, but are not usable from userspace. Signed-off-by: David Brown --- zephyr/src/sys/sync/semaphore.rs | 38 ++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/zephyr/src/sys/sync/semaphore.rs b/zephyr/src/sys/sync/semaphore.rs index edfafae4..ddb54d04 100644 --- a/zephyr/src/sys/sync/semaphore.rs +++ b/zephyr/src/sys/sync/semaphore.rs @@ -13,17 +13,13 @@ use core::ffi::c_uint; use core::fmt; +use core::mem; use crate::{ - error::{Result, to_result_void}, - object::{StaticKernelObject, Wrapped}, + error::{to_result_void, Result}, + object::{Fixed, StaticKernelObject, Wrapped}, raw::{ - k_sem, - k_sem_init, - k_sem_take, - k_sem_give, - k_sem_reset, - k_sem_count_get, + k_sem, k_sem_count_get, k_sem_give, k_sem_init, k_sem_reset, k_sem_take }, time::Timeout, }; @@ -33,7 +29,7 @@ pub use crate::raw::K_SEM_MAX_LIMIT; /// A zephyr `k_sem` usable from safe Rust code. pub struct Semaphore { /// The raw Zephyr `k_sem`. - item: *mut k_sem, + item: Fixed, } /// By nature, Semaphores are both Sync and Send. Safety is handled by the underlying Zephyr @@ -42,6 +38,20 @@ unsafe impl Sync for Semaphore {} unsafe impl Send for Semaphore {} impl Semaphore { + /// Create a new semaphore. + /// + /// Create a new dynamically allocated Semaphore. This semaphore can only be used from system + /// threads. The arguments are as described in [the + /// docs](https://docs.zephyrproject.org/latest/kernel/services/synchronization/semaphores.html). + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new(initial_count: c_uint, limit: c_uint) -> Result { + let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); + unsafe { + to_result_void(k_sem_init(item.get(), initial_count, limit))?; + } + Ok(Semaphore { item }) + } + /// Take a semaphore. /// /// Can be called from ISR if called with [`NoWait`]. @@ -52,7 +62,7 @@ impl Semaphore { { let timeout: Timeout = timeout.into(); let ret = unsafe { - k_sem_take(self.item, timeout.0) + k_sem_take(self.item.get(), timeout.0) }; to_result_void(ret) } @@ -63,7 +73,7 @@ impl Semaphore { /// permitted count. pub fn give(&self) { unsafe { - k_sem_give(self.item) + k_sem_give(self.item.get()) } } @@ -75,7 +85,7 @@ impl Semaphore { /// [`take`]: Self::take pub fn reset(&mut self) { unsafe { - k_sem_reset(self.item) + k_sem_reset(self.item.get()) } } @@ -84,7 +94,7 @@ impl Semaphore { /// Returns the current count. pub fn count_get(&mut self) -> usize { unsafe { - k_sem_count_get(self.item) as usize + k_sem_count_get(self.item.get()) as usize } } } @@ -114,7 +124,7 @@ impl Wrapped for StaticKernelObject { k_sem_init(ptr, arg.0, arg.1); } Semaphore { - item: ptr, + item: Fixed::Static(ptr), } } } From ad0924dd050cdcd2c2d24718e7ce70065e6bbb15 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 24 Oct 2024 09:22:08 -0600 Subject: [PATCH 18/26] samples: philosophers: Add dynamic semaphore example Add a sample that implements the forks using dynamically allocate semaphores. Signed-off-by: David Brown --- samples/philosophers/Kconfig | 8 +++- samples/philosophers/sample.yaml | 5 +++ samples/philosophers/src/dynsemsync.rs | 51 ++++++++++++++++++++++++++ samples/philosophers/src/lib.rs | 8 ++++ 4 files changed, 71 insertions(+), 1 deletion(-) create mode 100644 samples/philosophers/src/dynsemsync.rs diff --git a/samples/philosophers/Kconfig b/samples/philosophers/Kconfig index 97dfe318..3b0ca539 100644 --- a/samples/philosophers/Kconfig +++ b/samples/philosophers/Kconfig @@ -12,9 +12,15 @@ choice config SYNC_SYS_SEMAPHORE bool "Use sys::Semaphore to synchronize forks" help - Use to have the dining philosophers sample use sys::Mutex, with one per fork, to + Use to have the dining philosophers sample use sys::Semaphore, with one per fork, to synchronize. + config SYNC_SYS_DYNAMIC_SEMAPHORE + bool "Use a dynamic sys::Semaphore to synchronize forks" + help + Use to have the dining philosophers sample use sys::Semaphore, with one per fork, to + synchronize. The Semaphores will be dynamically allocated. + config SYNC_SYS_MUTEX bool "Use sys::Semaphore to synchronize forks" help diff --git a/samples/philosophers/sample.yaml b/samples/philosophers/sample.yaml index b59af7b1..8e938bd2 100644 --- a/samples/philosophers/sample.yaml +++ b/samples/philosophers/sample.yaml @@ -17,6 +17,11 @@ tests: min_ram: 32 extra_configs: - CONFIG_SYNC_SYS_SEMAPHORE=y + sample.rust.philosopher.dynsemaphore: + tags: introduction + min_ram: 32 + extra_configs: + - CONFIG_SYNC_SYS_DYNAMIC_SEMAPHORE=y sample.rust.philosopher.sysmutex: tags: introduction min_ram: 32 diff --git a/samples/philosophers/src/dynsemsync.rs b/samples/philosophers/src/dynsemsync.rs new file mode 100644 index 00000000..8015ffe9 --- /dev/null +++ b/samples/philosophers/src/dynsemsync.rs @@ -0,0 +1,51 @@ +// Copyright (c) 2023 Linaro LTD +// SPDX-License-Identifier: Apache-2.0 + +//! Semaphore based sync. +//! +//! This is the simplest type of sync, which uses a single semaphore per fork. + +extern crate alloc; + +use alloc::vec::Vec; +use alloc::boxed::Box; + +use zephyr::{ + sync::Arc, sys::sync::Semaphore, time::Forever +}; + +use crate::{ForkSync, NUM_PHIL}; + +#[derive(Debug)] +pub struct SemSync { + /// The forks for this philosopher. This is a big excessive, as we really don't need all of + /// them, but the ForSync code uses the index here. + forks: [Arc; NUM_PHIL], +} + +impl ForkSync for SemSync { + fn take(&self, index: usize) { + self.forks[index].take(Forever).unwrap(); + } + + fn release(&self, index: usize) { + self.forks[index].give(); + } +} + +#[allow(dead_code)] +pub fn dyn_semaphore_sync() -> Vec> { + let forks = [(); NUM_PHIL].each_ref().map(|()| { + Arc::new(Semaphore::new(1, 1).unwrap()) + }); + + let syncers = (0..NUM_PHIL).map(|_| { + let syncer = SemSync { + forks: forks.clone(), + }; + let item = Box::new(syncer) as Box; + Arc::from(item) + }).collect(); + + syncers +} diff --git a/samples/philosophers/src/lib.rs b/samples/philosophers/src/lib.rs index ed6a093e..35be8f25 100644 --- a/samples/philosophers/src/lib.rs +++ b/samples/philosophers/src/lib.rs @@ -30,9 +30,12 @@ use crate::sysmutex::SysMutexSync; use crate::channel::get_channel_syncer; #[allow(unused_imports)] use crate::semsync::semaphore_sync; +#[allow(unused_imports)] +use crate::dynsemsync::dyn_semaphore_sync; mod channel; mod condsync; +mod dynsemsync; mod sysmutex; mod semsync; @@ -95,6 +98,11 @@ fn get_syncer() -> Vec> { semaphore_sync() } +#[cfg(CONFIG_SYNC_SYS_DYNAMIC_SEMAPHORE)] +fn get_syncer() -> Vec> { + dyn_semaphore_sync() +} + #[cfg(CONFIG_SYNC_SYS_MUTEX)] fn get_syncer() -> Vec> { let syncer = Box::new(SysMutexSync::new()) From 8cdf4a6937b51c52cd22b24a3c31b67832dee55f Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 24 Oct 2024 10:18:37 -0600 Subject: [PATCH 19/26] zephyr: sys: sync: Add Mutex::new() Add support for dynamically allocated sys::Mutexes Signed-off-by: David Brown --- zephyr/src/sys/sync/mutex.rs | 49 +++++++++++++++++++++++++++--------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/zephyr/src/sys/sync/mutex.rs b/zephyr/src/sys/sync/mutex.rs index 41bfe461..66bf2a86 100644 --- a/zephyr/src/sys/sync/mutex.rs +++ b/zephyr/src/sys/sync/mutex.rs @@ -9,6 +9,7 @@ //! [`object`]: crate::object use core::fmt; +use core::mem; use crate::{ error::{Result, to_result_void}, raw::{ @@ -25,6 +26,7 @@ use crate::{ time::Timeout, }; use crate::object::{ + Fixed, StaticKernelObject, Wrapped, }; @@ -54,10 +56,22 @@ use crate::sys::K_FOREVER; /// [`sync::Mutex`]: http://example.com/TODO pub struct Mutex { /// The raw Zephyr mutex. - item: *mut k_mutex, + item: Fixed, } impl Mutex { + /// Create a new Mutex in an unlocked state. + /// + /// Create a new dynamically allocated Mutex. The Mutex can only be used from system threads. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new() -> Result { + let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); + unsafe { + to_result_void(k_mutex_init(item.get()))?; + } + Ok(Mutex { item }) + } + /// Lock a Zephyr Mutex. /// /// Will wait for the lock, returning status, with `Ok(())` indicating the lock has been @@ -67,7 +81,7 @@ impl Mutex { where T: Into, { let timeout: Timeout = timeout.into(); - to_result_void(unsafe { k_mutex_lock(self.item, timeout.0) }) + to_result_void(unsafe { k_mutex_lock(self.item.get(), timeout.0) }) } /// Unlock a Zephyr Mutex. @@ -75,7 +89,7 @@ impl Mutex { /// The mutex must already be locked by the calling thread. Mutexes may not be unlocked in /// ISRs. pub fn unlock(&self) -> Result<()> { - to_result_void(unsafe { k_mutex_unlock(self.item) }) + to_result_void(unsafe { k_mutex_unlock(self.item.get()) }) } } @@ -109,24 +123,23 @@ impl Wrapped for StaticKernelObject { k_mutex_init(ptr); } Mutex { - item: ptr, + item: Fixed::Static(ptr), } } } impl fmt::Debug for Mutex { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "sys::Mutex {:?}", self.item) + write!(f, "sys::Mutex {:?}", self.item.get()) } } /// A Condition Variable /// /// Lightweight wrappers for Zephyr's `k_condvar`. -#[derive(Clone)] pub struct Condvar { /// The underlying `k_condvar`. - item: *mut k_condvar, + item: Fixed, } #[doc(hidden)] @@ -138,6 +151,18 @@ unsafe impl Sync for Condvar {} unsafe impl Send for Condvar {} impl Condvar { + /// Create a new Condvar. + /// + /// Create a new dynamically allocated Condvar. The Condvar can only be used from system threads. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new() -> Result { + let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); + unsafe { + to_result_void(k_condvar_init(item.get()))?; + } + Ok(Condvar { item }) + } + /// Wait for someone else using this mutex/condvar pair to notify. /// /// Note that this requires the lock to be held by use, but as this is a low-level binding to @@ -147,25 +172,25 @@ impl Condvar { /// [`sync::Condvar`]: http://www.example.com/TODO // /// [`sync::Condvar`]: crate::sync::Condvar pub fn wait(&self, lock: &Mutex) { - unsafe { k_condvar_wait(self.item, lock.item, K_FOREVER); } + unsafe { k_condvar_wait(self.item.get(), lock.item.get(), K_FOREVER); } } // TODO: timeout. /// Wake a single thread waiting on this condition variable. pub fn notify_one(&self) { - unsafe { k_condvar_signal(self.item); } + unsafe { k_condvar_signal(self.item.get()); } } /// Wake all threads waiting on this condition variable. pub fn notify_all(&self) { - unsafe { k_condvar_broadcast(self.item); } + unsafe { k_condvar_broadcast(self.item.get()); } } } impl fmt::Debug for Condvar { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "sys::Condvar {:?}", self.item) + write!(f, "sys::Condvar {:?}", self.item.get()) } } @@ -181,7 +206,7 @@ impl Wrapped for StaticCondvar { k_condvar_init(ptr); } Condvar { - item: ptr, + item: Fixed::Static(ptr), } } } From 56f1d318698169672e3d72585c8d2550e5db0821 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 24 Oct 2024 10:19:15 -0600 Subject: [PATCH 20/26] samples: philosophers: Change sys::Mutex to dynamic Switch to using dynamic Mutex types for this demo. Signed-off-by: David Brown --- samples/philosophers/src/sysmutex.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/samples/philosophers/src/sysmutex.rs b/samples/philosophers/src/sysmutex.rs index 15a7e829..969efb5f 100644 --- a/samples/philosophers/src/sysmutex.rs +++ b/samples/philosophers/src/sysmutex.rs @@ -10,7 +10,6 @@ use crate::{ ForkSync, NUM_PHIL, }; -use zephyr::kobj_define; use zephyr::sys::sync::Mutex; use zephyr::time::Forever; @@ -27,8 +26,8 @@ pub struct SysMutexSync { impl SysMutexSync { #[allow(dead_code)] pub fn new() -> SysMutexSync { - let locks = MUTEXES.each_ref().map(|m| { - m.init_once(()).unwrap() + let locks = [(); NUM_PHIL].each_ref().map(|()| { + Mutex::new().unwrap() }); SysMutexSync { locks } } @@ -43,7 +42,3 @@ impl ForkSync for SysMutexSync { self.locks[index].unlock().unwrap(); } } - -kobj_define! { - static MUTEXES: [StaticMutex; NUM_PHIL]; -} From 4850c4dd387365444b9dcb6d08a5b134a5778358 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 24 Oct 2024 10:37:06 -0600 Subject: [PATCH 21/26] zephyr: sys: queue: Add Queue::new() Add support for dynamically allocate sys::Queues. Signed-off-by: David Brown --- zephyr/src/sys/queue.rs | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/zephyr/src/sys/queue.rs b/zephyr/src/sys/queue.rs index cc634c84..f111c613 100644 --- a/zephyr/src/sys/queue.rs +++ b/zephyr/src/sys/queue.rs @@ -5,6 +5,8 @@ //! implementation of `zephyr::sys::channel`, which can be used without needing unsafe. use core::ffi::c_void; +use core::fmt; +use core::mem; use zephyr_sys::{ k_queue, @@ -13,13 +15,13 @@ use zephyr_sys::{ k_queue_get, }; +use crate::error::Result; use crate::sys::K_FOREVER; -use crate::object::{StaticKernelObject, Wrapped}; +use crate::object::{Fixed, StaticKernelObject, Wrapped}; /// A wrapper around a Zephyr `k_queue` object. -#[derive(Clone, Debug)] pub struct Queue { - item: *mut k_queue, + item: Fixed, } unsafe impl Sync for StaticKernelObject { } @@ -28,6 +30,21 @@ unsafe impl Sync for Queue { } unsafe impl Send for Queue { } impl Queue { + /// Create a new Queue, dynamically allocated. + /// + /// This Queue can only be used from system threads. + /// + /// **Note**: When a Queue is dropped, any messages that have been added to the queue will be + /// leaked. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new() -> Result { + let item: Fixed = Fixed::new(unsafe { mem::zeroed() }); + unsafe { + k_queue_init(item.get()); + } + Ok(Queue { item }) + } + /// Append an element to the end of a queue. /// /// This adds an element to the given [`Queue`]. Zephyr requires the @@ -37,14 +54,14 @@ impl Queue { /// /// [`Message`]: crate::sync::channel::Message pub unsafe fn send(&self, data: *mut c_void) { - k_queue_append(self.item, data) + k_queue_append(self.item.get(), data) } /// Get an element from a queue. /// /// This routine removes the first data item from the [`Queue`]. pub unsafe fn recv(&self) -> *mut c_void { - k_queue_get(self.item, K_FOREVER) + k_queue_get(self.item.get(), K_FOREVER) } } @@ -59,7 +76,7 @@ impl Wrapped for StaticKernelObject { k_queue_init(ptr); } Queue { - item: ptr, + item: Fixed::Static(ptr), } } } @@ -77,3 +94,9 @@ impl Wrapped for StaticKernelObject { /// my_queue.send(...); /// ``` pub type StaticQueue = StaticKernelObject; + +impl fmt::Debug for Queue { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "sys::Queue {:?}", self.item.get()) + } +} From f12e56831bdc9307e9bcaa8b3da84338b2d5ceef Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 24 Oct 2024 10:37:49 -0600 Subject: [PATCH 22/26] zephyr: sync: channel: Fully dynamically allocated Implement `unbounded` that is able to dynamically allocate the underlying Zephyr queue. This is useful in cases where none of the queue access happens from userspace. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index c2fc5afe..c541b75d 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -28,8 +28,11 @@ mod counter; // simply Box the message, leak it out of the box, and give it to Zephyr, and then on receipt, wrap // it back into a Box, and give it to the recipient. -/// Create a multi-producer multi-consumer channel of unbounded capacity. The messages are -/// allocated individually as "Box", and the queue is managed by the underlying Zephyr queue. +/// Create a multi-producer multi-consumer channel of unbounded capacity, using an existing Queue +/// object. +/// +/// The messages are allocated individually as "Box", and the queue is managed by the underlying +/// Zephyr queue. pub fn unbounded_from(queue: Queue) -> (Sender, Receiver) { let (s, r) = counter::new(queue); let s = Sender { @@ -43,6 +46,18 @@ pub fn unbounded_from(queue: Queue) -> (Sender, Receiver) { (s, r) } +/// Create a multi-producer multi-consumer channel of unbounded capacity. +/// +/// The messages are allocated individually as "Box". The underlying Zephyr queue will be +/// dynamically allocated. +/// +/// **Note**: Currently Drop is not propertly supported on Zephyr. If all senders are dropped, any +/// receivers will likely be blocked forever. Any data that has been queued and not received will +/// be leaked when all receivers have been droped. +pub fn unbounded() -> (Sender, Receiver) { + unbounded_from(Queue::new().unwrap()) +} + /// The underlying type for Messages through Zephyr's [`Queue`]. /// /// This wrapper is used internally to wrap user messages through the queue. It is not useful in From 150ec35784bd121ba1bff75b4c5b2eadebd92737 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 24 Oct 2024 10:39:34 -0600 Subject: [PATCH 23/26] samples: philosophers: Convert channel sync to dynamic Use the new dynamically allocated Channels to simplify the code. Signed-off-by: David Brown --- samples/philosophers/src/channel.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/samples/philosophers/src/channel.rs b/samples/philosophers/src/channel.rs index 5688c349..399bb2b3 100644 --- a/samples/philosophers/src/channel.rs +++ b/samples/philosophers/src/channel.rs @@ -112,10 +112,9 @@ impl ChannelSync { /// Generate a syncer out of a ChannelSync. #[allow(dead_code)] pub fn get_channel_syncer() -> Vec> { - let command_queue = COMMAND_QUEUE.init_once(()).unwrap(); - let (cq_send, cq_recv) = channel::unbounded_from(command_queue); - let reply_queues = REPLY_QUEUES.each_ref().map(|m| { - channel::unbounded_from(m.init_once(()).unwrap()) + let (cq_send, cq_recv) = channel::unbounded(); + let reply_queues = [(); NUM_PHIL].each_ref().map(|()| { + channel::unbounded() }); let syncer = reply_queues.into_iter().map(|rqueue| { let item = Box::new(ChannelSync::new(cq_send.clone(), rqueue)) @@ -165,9 +164,4 @@ impl ForkSync for ChannelSync { kobj_define! { static CHANNEL_STACK: ThreadStack<2054>; static CHANNEL_THREAD: StaticThread; - - // For communicating using Queue, there is one to the main thread (the manager), and one back - // to each philosopher. - static COMMAND_QUEUE: StaticQueue; - static REPLY_QUEUES: [StaticQueue; NUM_PHIL]; } From 70e2710c56a6d98df0bd17c02535926da119777f Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 24 Oct 2024 10:48:25 -0600 Subject: [PATCH 24/26] zephyr: sync: Add Mutex::new and Condvar::new Add dynamic allocation support to the sync Mutex and Condvar. When allocation is available, and userspace is not used, these can be used pretty much the same as those from std. Signed-off-by: David Brown --- zephyr/src/sync.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/zephyr/src/sync.rs b/zephyr/src/sync.rs index cd205ae1..8003d9e2 100644 --- a/zephyr/src/sync.rs +++ b/zephyr/src/sync.rs @@ -119,6 +119,12 @@ impl Mutex { pub const fn new_from(t: T, raw_mutex: sys::Mutex) -> Mutex { Mutex { inner: raw_mutex, data: UnsafeCell::new(t) } } + + /// Construct a new Mutex, dynamically allocating the underlying sys Mutex. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new(t: T) -> Mutex { + Mutex::new_from(t, sys::Mutex::new().unwrap()) + } } impl Mutex { @@ -216,6 +222,12 @@ impl Condvar { Condvar { inner: raw_condvar } } + /// Construct a new Condvar, dynamically allocating the underlying Zephyr `k_condvar`. + #[cfg(CONFIG_RUST_ALLOC)] + pub fn new() -> Condvar { + Condvar::new_from(sys::Condvar::new().unwrap()) + } + /// Blocks the current thread until this conditional variable receives a notification. /// /// This function will automatically unlock the mutex specified (represented by `guard`) and From 7758fe555af140e95decce7273245e0c87c43fab Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 24 Oct 2024 10:49:16 -0600 Subject: [PATCH 25/26] samples: philosophers: Switch condsync to dynamic Use the simpler `new` methods instead of static allocaiton. Signed-off-by: David Brown --- samples/philosophers/src/condsync.rs | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/samples/philosophers/src/condsync.rs b/samples/philosophers/src/condsync.rs index 58cc45f3..f8266e90 100644 --- a/samples/philosophers/src/condsync.rs +++ b/samples/philosophers/src/condsync.rs @@ -10,7 +10,6 @@ use crate::{ ForkSync, NUM_PHIL, }; -use zephyr::kobj_define; use zephyr::sync::Mutex; use zephyr::sync::Condvar; // use zephyr::time::Forever; @@ -26,12 +25,10 @@ pub struct CondSync { impl CondSync { #[allow(dead_code)] pub fn new() -> CondSync { - let sys_mutex = MUTEX.init_once(()).unwrap(); - let sys_condvar = CONDVAR.init_once(()).unwrap(); - - let lock = Mutex::new_from([false; NUM_PHIL], sys_mutex); - let cond = Condvar::new_from(sys_condvar); - CondSync { lock, cond } + CondSync { + lock: Mutex::new([false; NUM_PHIL]), + cond: Condvar::new(), + } } } @@ -51,8 +48,3 @@ impl ForkSync for CondSync { self.cond.notify_all(); } } - -kobj_define! { - static MUTEX: StaticMutex; - static CONDVAR: StaticCondvar; -} From d78650fdbd7b336a009bb47a988e0dfeb2cc42de Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 25 Oct 2024 09:47:55 -0600 Subject: [PATCH 26/26] CI: Clean up the work the CI build does Add -M full to the twister build so that it cleans up space as it goes. This should allow the build to run to completion. Add -j4 to the build. Although the CI images currently have 4 cpus, this will help protect us against future ones that have more. Give twister an explicit list of platforms. This fixes two issues. One is that the output is getting flooded with messages about skipped tests. The other is this allows us to add platforms that qemu seems to be ignoring entirely. Signed-off-by: David Brown --- .github/workflows/build.yml | 12 ++++++++---- etc/platforms.txt | 8 ++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) create mode 100644 etc/platforms.txt diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b7056f84..2efb4841 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -26,7 +26,7 @@ jobs: - name: Checkout uses: actions/checkout@v4 with: - path: zephyr-rust-lang + path: apptest - name: Set up Python uses: actions/setup-python@v5 @@ -36,7 +36,7 @@ jobs: - name: Setup Zephyr project uses: zephyrproject-rtos/action-zephyr-setup@v1 with: - app-path: zephyr-rust-lang + app-path: apptest manifest-file-name: ci-manifest.yml toolchains: arm-zephyr-eabi:riscv64-zephyr-elf @@ -51,9 +51,13 @@ jobs: rustup target add thumbv8m.main-none-eabi - name: Build firmware - working-directory: zephyr-rust-lang + working-directory: apptest shell: bash run: | cargo --version - west twister -T samples -T tests -v --inline-logs --integration + lscpu + df -h + + west twister -M all -T samples -T tests -v --inline-logs --integration -j 4 \ + $(cat etc/platforms.txt) diff --git a/etc/platforms.txt b/etc/platforms.txt new file mode 100644 index 00000000..32579f8e --- /dev/null +++ b/etc/platforms.txt @@ -0,0 +1,8 @@ +-p mps2/an385 +-p mps2/an521/cpu0 +-p qemu_cortex_m0 +-p qemu_cortex_m3 +-p qemu_riscv32 +-p qemu_riscv32/qemu_virt_riscv32/smp +-p qemu_riscv64 +-p qemu_riscv64/qemu_virt_riscv64/smp