diff --git a/samplecode/localattestation/attestation/src/func.rs b/samplecode/localattestation/attestation/src/func.rs index 23dae4809..589d2f3ce 100644 --- a/samplecode/localattestation/attestation/src/func.rs +++ b/samplecode/localattestation/attestation/src/func.rs @@ -45,18 +45,23 @@ extern { dest_enclave_id:sgx_enclave_id_t) -> sgx_status_t; } +// For AtomicPtr, the methods load and store read and write the address. +// To load the contents is pretty much another operation, but it is not +// atomic and and it is very unsafe. If there is access to the content +// where the pointer is pointed, it is best to use Acquire/Release to +// prevent get uninitialized memory. static CALLBACK_FN: AtomicPtr<()> = AtomicPtr::new(0 as * mut ()); pub fn init(cb: Callback) { - let ptr = CALLBACK_FN.load(Ordering::Relaxed); + let ptr = CALLBACK_FN.load(Ordering::Acquire); if ptr.is_null() { let ptr: * mut Callback = Box::into_raw(Box::new(cb)); - CALLBACK_FN.store(ptr as * mut (), Ordering::Relaxed); + CALLBACK_FN.store(ptr as * mut (), Ordering::Release); } } fn get_callback() -> Option<&'static Callback>{ - let ptr = CALLBACK_FN.load(Ordering::Relaxed) as *mut Callback; + let ptr = CALLBACK_FN.load(Ordering::Acquire) as *mut Callback; if ptr.is_null() { return None; } diff --git a/sgx_no_tstd/src/lib.rs b/sgx_no_tstd/src/lib.rs index 8df6fa6c7..74749776e 100644 --- a/sgx_no_tstd/src/lib.rs +++ b/sgx_no_tstd/src/lib.rs @@ -45,6 +45,11 @@ fn begin_panic_handler(_info: &PanicInfo<'_>) -> ! { #[no_mangle] unsafe extern "C" fn rust_eh_personality() {} +/// Note about memory ordering: +/// +/// HOOK here is used as an allocation error handler, which needs to make sure the +/// memory contents are initialized before storing the memory address to the HOOK. +/// So use Release and Acquire is enough. static HOOK: AtomicPtr<()> = AtomicPtr::new(ptr::null_mut()); /// Registers a custom allocation error hook, replacing any that was previously registered. @@ -59,7 +64,7 @@ static HOOK: AtomicPtr<()> = AtomicPtr::new(ptr::null_mut()); /// /// The allocation error hook is a global resource. pub fn set_alloc_error_hook(hook: fn(Layout)) { - HOOK.store(hook as *mut (), Ordering::SeqCst); + HOOK.store(hook as *mut (), Ordering::Release); } /// Unregisters the current allocation error hook, returning it. @@ -68,7 +73,7 @@ pub fn set_alloc_error_hook(hook: fn(Layout)) { /// /// If no custom hook is registered, the default hook will be returned. pub fn take_alloc_error_hook() -> fn(Layout) { - let hook = HOOK.swap(ptr::null_mut(), Ordering::SeqCst); + let hook = HOOK.swap(ptr::null_mut(), Ordering::AcqRel); if hook.is_null() { default_alloc_error_hook } else { @@ -80,7 +85,7 @@ fn default_alloc_error_hook(_layout: Layout) {} #[alloc_error_handler] pub fn rust_oom(layout: Layout) -> ! { - let hook = HOOK.load(Ordering::SeqCst); + let hook = HOOK.load(Ordering::Acquire); let hook: fn(Layout) = if hook.is_null() { default_alloc_error_hook } else { diff --git a/sgx_tstd/hashbrown/benches/bench.rs b/sgx_tstd/hashbrown/benches/bench.rs index afa48b59a..8260a4e6b 100644 --- a/sgx_tstd/hashbrown/benches/bench.rs +++ b/sgx_tstd/hashbrown/benches/bench.rs @@ -45,6 +45,10 @@ impl Iterator for RandomKeys { // Just an arbitrary side effect to make the maps not shortcircuit to the non-dropping path // when dropping maps/entries (most real world usages likely have drop in the key or value) +// +// Note about memory ordering: +// Here SIDE_EFFECT does't synchronize with other varibales, it is just a counting +// variable, using ‘Relaxed’ is enough to ensure the correctness of the program. lazy_static::lazy_static! { static ref SIDE_EFFECT: AtomicUsize = AtomicUsize::new(0); } diff --git a/sgx_tstd/src/alloc.rs b/sgx_tstd/src/alloc.rs index 240b2baba..f5c7c01d3 100644 --- a/sgx_tstd/src/alloc.rs +++ b/sgx_tstd/src/alloc.rs @@ -78,6 +78,11 @@ pub use alloc_crate::alloc::*; pub use sgx_alloc::System; +/// Note about memory ordering: +/// +/// HOOK here is used as an allocation error handler, which needs to make sure the +/// memory contents are initialized before storing the memory address to the HOOK. +/// So using Release/Acquire is enough. static HOOK: AtomicPtr<()> = AtomicPtr::new(ptr::null_mut()); /// Registers a custom allocation error hook, replacing any that was previously registered. @@ -106,7 +111,7 @@ static HOOK: AtomicPtr<()> = AtomicPtr::new(ptr::null_mut()); /// set_alloc_error_hook(custom_alloc_error_hook); /// ``` pub fn set_alloc_error_hook(hook: fn(Layout)) { - HOOK.store(hook as *mut (), Ordering::SeqCst); + HOOK.store(hook as *mut (), Ordering::Release); } /// Unregisters the current allocation error hook, returning it. @@ -115,7 +120,7 @@ pub fn set_alloc_error_hook(hook: fn(Layout)) { /// /// If no custom hook is registered, the default hook will be returned. pub fn take_alloc_error_hook() -> fn(Layout) { - let hook = HOOK.swap(ptr::null_mut(), Ordering::SeqCst); + let hook = HOOK.swap(ptr::null_mut(), Ordering::AcqRel); if hook.is_null() { default_alloc_error_hook } else { unsafe { mem::transmute(hook) } } } @@ -137,7 +142,7 @@ fn default_alloc_error_hook(layout: Layout) { #[doc(hidden)] #[alloc_error_handler] pub fn rust_oom(layout: Layout) -> ! { - let hook = HOOK.load(Ordering::SeqCst); + let hook = HOOK.load(Ordering::Acquire); let hook: fn(Layout) = if hook.is_null() { default_alloc_error_hook } else { unsafe { mem::transmute(hook) } }; hook(layout); diff --git a/sgx_tstd/src/panic.rs b/sgx_tstd/src/panic.rs index cca723ae0..3b561bcb4 100644 --- a/sgx_tstd/src/panic.rs +++ b/sgx_tstd/src/panic.rs @@ -248,6 +248,9 @@ impl BacktraceStyle { // that backtrace. // // Internally stores equivalent of an Option. +// +// Here SHOULD_CAPTURE is just used as a shared variety in multithreaded environment, +// and doesn't synchronize with other variables. Using relaxed here is enough. #[cfg(feature = "backtrace")] static SHOULD_CAPTURE: AtomicUsize = AtomicUsize::new(0); @@ -258,7 +261,7 @@ static SHOULD_CAPTURE: AtomicUsize = AtomicUsize::new(0); /// environment variable; see the details in [`get_backtrace_style`]. #[cfg(feature = "backtrace")] pub fn set_backtrace_style(style: BacktraceStyle) { - SHOULD_CAPTURE.store(style.as_usize(), Ordering::Release); + SHOULD_CAPTURE.store(style.as_usize(), Ordering::Relaxed); } /// Checks whether the standard library's panic hook will capture and print a @@ -284,5 +287,5 @@ pub fn set_backtrace_style(style: BacktraceStyle) { /// Returns `None` if backtraces aren't currently supported. #[cfg(feature = "backtrace")] pub fn get_backtrace_style() -> Option { - BacktraceStyle::from_usize(SHOULD_CAPTURE.load(Ordering::Acquire)) + BacktraceStyle::from_usize(SHOULD_CAPTURE.load(Ordering::Relaxed)) } diff --git a/sgx_tstd/src/panicking.rs b/sgx_tstd/src/panicking.rs index b6f55d89d..4290e3245 100644 --- a/sgx_tstd/src/panicking.rs +++ b/sgx_tstd/src/panicking.rs @@ -275,6 +275,10 @@ fn default_hook(info: &PanicInfo<'_>) { #[cfg(feature = "backtrace")] { + // Note about memory ordering: + // Here FIRST_PANIC just using in this function. the only read and load + // operation just do in line 292, and it doesn't sychornized with other + // varieties, using relaxed is enough to ensure safety here. static FIRST_PANIC: AtomicBool = AtomicBool::new(true); match backtrace { @@ -285,7 +289,7 @@ fn default_hook(info: &PanicInfo<'_>) { drop(backtrace::print(err, crate::sys::backtrace::PrintFmt::Full)) } Some(BacktraceStyle::Off) => { - if FIRST_PANIC.swap(false, Ordering::SeqCst) { + if FIRST_PANIC.swap(false, Ordering::Relaxed) { let _ = writeln!( err, "note: call backtrace::enable_backtrace with 'PrintFormat::Short/Full' for a backtrace." diff --git a/sgx_tstd/src/sync/mpsc/blocking.rs b/sgx_tstd/src/sync/mpsc/blocking.rs index 4ea795f97..6baa97064 100644 --- a/sgx_tstd/src/sync/mpsc/blocking.rs +++ b/sgx_tstd/src/sync/mpsc/blocking.rs @@ -24,6 +24,12 @@ use crate::time::Instant; #[cfg(not(feature = "untrusted_time"))] use crate::untrusted::time::InstantEx; +/// Note about memory ordering: +/// +/// Here woken needs to synchronize with thread, So using Acquire and +/// Release is enough. Success in CAS is safer to use AcqRel, fail in CAS +/// does not synchronize other variables, and using Relaxed can ensure the +/// correctness of the program. struct Inner { thread: Thread, woken: AtomicBool, @@ -57,7 +63,7 @@ impl SignalToken { let wake = self .inner .woken - .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed) .is_ok(); if wake { self.inner.thread.unpark(); @@ -82,14 +88,14 @@ impl SignalToken { impl WaitToken { pub fn wait(self) { - while !self.inner.woken.load(Ordering::SeqCst) { + while !self.inner.woken.load(Ordering::Acquire) { thread::park() } } /// Returns `true` if we wake up normally. pub fn wait_max_until(self, end: Instant) -> bool { - while !self.inner.woken.load(Ordering::SeqCst) { + while !self.inner.woken.load(Ordering::Acquire) { let now = Instant::now(); if now >= end { return false; diff --git a/sgx_tstd/src/sync/mpsc/oneshot.rs b/sgx_tstd/src/sync/mpsc/oneshot.rs index 9050ddf36..91f207ccf 100644 --- a/sgx_tstd/src/sync/mpsc/oneshot.rs +++ b/sgx_tstd/src/sync/mpsc/oneshot.rs @@ -107,15 +107,18 @@ impl Packet { assert!((*self.data.get()).is_none()); ptr::write(self.data.get(), Some(t)); ptr::write(self.upgrade.get(), SendUsed); - - match self.state.swap(DATA, Ordering::SeqCst) { + // Here state needs to synchronize with "data" and "upgrade", + // using release is enough to ensure the safety. + match self.state.swap(DATA, Ordering::Release) { // Sent the data, no one was waiting EMPTY => Ok(()), // Couldn't send the data, the port hung up first. Return the data // back up the stack. DISCONNECTED => { - self.state.swap(DISCONNECTED, Ordering::SeqCst); + // Here state needs to synchronize with upgrade, + // using Acquire is enough to ensure the safety. + self.state.swap(DISCONNECTED, Ordering::Acquire); ptr::write(self.upgrade.get(), NothingSent); Err((&mut *self.data.get()).take().unwrap()) } @@ -142,12 +145,12 @@ impl Packet { pub fn recv(&self, deadline: Option) -> Result> { // Attempt to not block the thread (it's a little expensive). If it looks // like we're not empty, then immediately go through to `try_recv`. - if self.state.load(Ordering::SeqCst) == EMPTY { + if self.state.load(Ordering::Acquire) == EMPTY { let (wait_token, signal_token) = blocking::tokens(); let ptr = unsafe { signal_token.to_raw() }; // race with senders to enter the blocking state - if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() { + if self.state.compare_exchange(EMPTY, ptr, Ordering::Release, Ordering::Relaxed).is_ok() { if let Some(deadline) = deadline { let timed_out = !wait_token.wait_max_until(deadline); // Try to reset the state @@ -169,7 +172,7 @@ impl Packet { pub fn try_recv(&self) -> Result> { unsafe { - match self.state.load(Ordering::SeqCst) { + match self.state.load(Ordering::Acquire) { EMPTY => Err(Empty), // We saw some data on the channel, but the channel can be used @@ -179,11 +182,13 @@ impl Packet { // the state changes under our feet we'd rather just see that state // change. DATA => { + // Here state needs to synchronize with "data", + // using release is enough to ensure the safety. let _ = self.state.compare_exchange( DATA, EMPTY, - Ordering::SeqCst, - Ordering::SeqCst, + Ordering::Acquire, + Ordering::Relaxed, ); match (&mut *self.data.get()).take() { Some(data) => Ok(data), @@ -222,7 +227,9 @@ impl Packet { }; ptr::write(self.upgrade.get(), GoUp(up)); - match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + // Here state needs to synchronize upgrade before and after it + // using AcqRel is enough to ensure the safety. + match self.state.swap(DISCONNECTED, Ordering::AcqRel) { // If the channel is empty or has data on it, then we're good to go. // Senders will check the data before the upgrade (in case we // plastered over the DATA state). @@ -242,7 +249,7 @@ impl Packet { } pub fn drop_chan(&self) { - match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + match self.state.swap(DISCONNECTED, Ordering::AcqRel) { DATA | DISCONNECTED | EMPTY => {} // If someone's waiting, we gotta wake them up @@ -253,7 +260,7 @@ impl Packet { } pub fn drop_port(&self) { - match self.state.swap(DISCONNECTED, Ordering::SeqCst) { + match self.state.swap(DISCONNECTED, Ordering::AcqRel) { // An empty channel has nothing to do, and a remotely disconnected // channel also has nothing to do b/c we're about to run the drop // glue @@ -280,7 +287,7 @@ impl Packet { // // The return value indicates whether there's data on this port. pub fn abort_selection(&self) -> Result> { - let state = match self.state.load(Ordering::SeqCst) { + let state = match self.state.load(Ordering::Acquire) { // Each of these states means that no further activity will happen // with regard to abortion selection s @ (EMPTY | DATA | DISCONNECTED) => s, @@ -289,7 +296,7 @@ impl Packet { // of it (may fail) ptr => self .state - .compare_exchange(ptr, EMPTY, Ordering::SeqCst, Ordering::SeqCst) + .compare_exchange(ptr, EMPTY, Ordering::AcqRel, Ordering::Acquire) .unwrap_or_else(|x| x), }; diff --git a/sgx_tstd/src/sync/mpsc/shared.rs b/sgx_tstd/src/sync/mpsc/shared.rs index b346faabe..592c2be00 100644 --- a/sgx_tstd/src/sync/mpsc/shared.rs +++ b/sgx_tstd/src/sync/mpsc/shared.rs @@ -48,16 +48,31 @@ const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked re pub struct Packet { queue: mpsc::Queue, + cnt: AtomicIsize, // How many items are on this channel steals: UnsafeCell, // How many times has a port received without blocking? + + // For AtomicPtr, the methods load and store read and write the address. + // To load the contents is pretty much another operation, but it is not + // atomic and and it is very unsafe. If there is access to the content + // where the pointer is pointed, it is best to use Acquire/Release. to_wake: AtomicPtr, // SignalToken for wake up // The number of channels which are currently using this packet. + // + // Here channels field is used for counts across multiple threads, and doesn't + // synchronized with other variety. Using Relaxed is enough. channels: AtomicUsize, // See the discussion in Port::drop and the channel send methods for what // these are used for + // + // Here port_dropped is just a signal variety to see if this packet has been dropped + // and it doesn't synchronized with any other variety. Using Relaxed is enough. port_dropped: AtomicBool, + + // Here sender_drain field is used for counts across multiple threads, and also + // doesn't synchronized with other variety. Using Relaxed is enough. sender_drain: AtomicIsize, // this lock protects various portions of this implementation during @@ -110,8 +125,11 @@ impl Packet { if let Some(token) = token { assert_eq!(self.cnt.load(Ordering::SeqCst), 0); assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY); - self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst); - self.cnt.store(-1, Ordering::SeqCst); + self.to_wake.store(unsafe { token.to_raw() }, Ordering::Release); + // Here cnt is synchronized with to_wake, using Release to + // ensure that when cnt changes to -1, operation about to_wake + // has been completed. + self.cnt.store(-1, Ordering::Release); // This store is a little sketchy. What's happening here is that // we're transferring a blocker from a oneshot or stream channel to @@ -146,7 +164,7 @@ impl Packet { pub fn send(&self, t: T) -> Result<(), T> { // See Port::drop for what's going on - if self.port_dropped.load(Ordering::SeqCst) { + if self.port_dropped.load(Ordering::Relaxed) { return Err(t); } @@ -175,12 +193,16 @@ impl Packet { // preflight check serves as the definitive "this will never be // received". Once we get beyond this check, we have permanently // entered the realm of "this may be received" - if self.cnt.load(Ordering::SeqCst) < DISCONNECTED + FUDGE { + // + // Relaxed can be used here, because there no variety to synchronize. + if self.cnt.load(Ordering::Relaxed) < DISCONNECTED + FUDGE { return Err(t); } self.queue.push(t); - match self.cnt.fetch_add(1, Ordering::SeqCst) { + // Here cnt needs to synchronize with queue before cnt and to_wake in the + // take_to_wake operation after cnt. Using AcqRel is enough. + match self.cnt.fetch_add(1, Ordering::AcqRel) { -1 => { self.take_to_wake().signal(); } @@ -197,9 +219,10 @@ impl Packet { n if n < DISCONNECTED + FUDGE => { // see the comment in 'try' for a shared channel for why this // window of "not disconnected" is ok. - self.cnt.store(DISCONNECTED, Ordering::SeqCst); - - if self.sender_drain.fetch_add(1, Ordering::SeqCst) == 0 { + // + // there is nothing to synchronized with. Using Relaxed is enough. + self.cnt.store(DISCONNECTED, Ordering::Relaxed); + if self.sender_drain.fetch_add(1, Ordering::Relaxed) == 0 { loop { // drain the queue, for info on the thread yield see the // discussion in try_recv @@ -212,7 +235,7 @@ impl Packet { } // maybe we're done, if we're not the last ones // here, then we need to go try again. - if self.sender_drain.fetch_sub(1, Ordering::SeqCst) == 1 { + if self.sender_drain.fetch_sub(1, Ordering::Relaxed) == 1 { break; } } @@ -270,13 +293,15 @@ impl Packet { "This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364" ); let ptr = token.to_raw(); - self.to_wake.store(ptr, Ordering::SeqCst); + self.to_wake.store(ptr, Ordering::Release); let steals = ptr::replace(self.steals.get(), 0); - match self.cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + match self.cnt.fetch_sub(1 + steals, Ordering::Release) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + // Here cnt doesn't synchronize with to other shared variables, Using + // Relaxed is enough. + self.cnt.store(DISCONNECTED, Ordering::Relaxed); } // If we factor in our steals and notice that the channel has no // data, we successfully sleep @@ -288,7 +313,7 @@ impl Packet { } } - self.to_wake.store(EMPTY, Ordering::SeqCst); + self.to_wake.store(EMPTY, Ordering::Release); drop(SignalToken::from_raw(ptr)); Abort } @@ -333,9 +358,12 @@ impl Packet { // might decrement steals. Some(data) => unsafe { if *self.steals.get() > MAX_STEALS { - match self.cnt.swap(0, Ordering::SeqCst) { + // Here cnt needs to synchronize with steals, Using Acquire is enough. + match self.cnt.swap(0, Ordering::Acquire) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + // Here cnt doesn't synchronize with to other shared variables, Using + // Relaxed is enough. + self.cnt.store(DISCONNECTED, Ordering::Release); } n => { let m = cmp::min(n, *self.steals.get()); @@ -352,7 +380,8 @@ impl Packet { // See the discussion in the stream implementation for why we try // again. None => { - match self.cnt.load(Ordering::SeqCst) { + // Here cnt needs to sychronize with queue, Using Acquire is enough. + match self.cnt.load(Ordering::Acquire) { n if n != DISCONNECTED => Err(Empty), _ => { match self.queue.pop() { @@ -370,7 +399,8 @@ impl Packet { // Prepares this shared packet for a channel clone, essentially just bumping // a refcount. pub fn clone_chan(&self) { - let old_count = self.channels.fetch_add(1, Ordering::SeqCst); + // There also doesn't exist variety to synchronize. + let old_count = self.channels.fetch_add(1, Ordering::Relaxed); // See comments on Arc::clone() on why we do this (for `mem::forget`). if old_count > MAX_REFCOUNT { @@ -382,13 +412,14 @@ impl Packet { // Chan is dropped and may end up waking up a receiver. It's the receiver's // responsibility on the other end to figure out that we've disconnected. pub fn drop_chan(&self) { - match self.channels.fetch_sub(1, Ordering::SeqCst) { + match self.channels.fetch_sub(1, Ordering::Relaxed) { 1 => {} n if n > 1 => return, n => panic!("bad number of channels left {n}"), } - match self.cnt.swap(DISCONNECTED, Ordering::SeqCst) { + // Here cnt needs to synchronize with to_wake. + match self.cnt.swap(DISCONNECTED, Ordering::Acquire) { -1 => { self.take_to_wake().signal(); } @@ -403,13 +434,16 @@ impl Packet { // and why it is done in this fashion. #[allow(clippy::while_let_loop)] pub fn drop_port(&self) { - self.port_dropped.store(true, Ordering::SeqCst); + self.port_dropped.store(true, Ordering::Relaxed); let mut steals = unsafe { *self.steals.get() }; + // Here cnt needs to sychronize with queue when CAS fails, Using Acquire is enough. + // In general, ordering success in CAS is at least the same or more strict than + // fail in CAS. So Using Acquire in success is OK. while match self.cnt.compare_exchange( steals, DISCONNECTED, - Ordering::SeqCst, - Ordering::SeqCst, + Ordering::Acquire, + Ordering::Acquire, ) { Ok(_) => false, Err(old) => old != DISCONNECTED, @@ -429,8 +463,8 @@ impl Packet { // Consumes ownership of the 'to_wake' field. fn take_to_wake(&self) -> SignalToken { - let ptr = self.to_wake.load(Ordering::SeqCst); - self.to_wake.store(EMPTY, Ordering::SeqCst); + let ptr = self.to_wake.load(Ordering::Acquire); + self.to_wake.store(EMPTY, Ordering::Release); assert!(ptr != EMPTY); unsafe { SignalToken::from_raw(ptr) } } @@ -441,9 +475,11 @@ impl Packet { // increment the count on the channel (used for selection) fn bump(&self, amt: isize) -> isize { - match self.cnt.fetch_add(amt, Ordering::SeqCst) { + // There doesn't exist operation needed to be synchronized, + // Using Relaxed is safe. + match self.cnt.fetch_add(amt, Ordering::Relaxed) { DISCONNECTED => { - self.cnt.store(DISCONNECTED, Ordering::SeqCst); + self.cnt.store(DISCONNECTED, Ordering::Relaxed); DISCONNECTED } n => n, @@ -471,7 +507,9 @@ impl Packet { // the channel count and figure out what we should do to make it // positive. let steals = { - let cnt = self.cnt.load(Ordering::SeqCst); + // Here doesn't exist variety or operation need to be + // synchronized, no need to use other ordering to restrict + let cnt = self.cnt.load(Ordering::Relaxed); if cnt < 0 && cnt != DISCONNECTED { -cnt } else { 0 } }; let prev = self.bump(steals + 1); diff --git a/sgx_tstd/src/sync/mpsc/spsc_queue.rs b/sgx_tstd/src/sync/mpsc/spsc_queue.rs index 2766c51ae..d925b0e8f 100644 --- a/sgx_tstd/src/sync/mpsc/spsc_queue.rs +++ b/sgx_tstd/src/sync/mpsc/spsc_queue.rs @@ -31,7 +31,12 @@ use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use super::cache_aligned::CacheAligned; -// Node within the linked list queue of messages to send +/// Node within the linked list queue of messages to send +/// +/// For AtomicPtr, the methods load and store read and write the address. +/// To load the contents is pretty much another operation, but it is not +/// atomic and and it is very unsafe. If there is access to the content +/// where the pointer is pointed, it is best to use Acquire/Release. struct Node { // FIXME: this could be an uninitialized T if we're careful enough, and // that would reduce memory usage (and be a bit faster). @@ -53,6 +58,13 @@ pub struct Queue { producer: CacheAligned>, } +/// Note about memory ordering: +/// +/// Here cached_nodes is just a counter between threads and doesn't synchronize +/// with other variables. Therefore, `Relaxed` can be used in both single-threaded +/// and multi-threaded environments. +/// +/// About AtomicPtr, see `struct Node`. struct Consumer { tail: UnsafeCell<*mut Node>, // where to pop from tail_prev: AtomicPtr>, // where to pop from @@ -143,6 +155,8 @@ impl Queue Queue Queue Queue Drop for Queue> = Box::from_raw(cur); cur = next; } diff --git a/sgx_tstd/src/sync/mpsc/stream.rs b/sgx_tstd/src/sync/mpsc/stream.rs index 1f5051e7a..e6854ec24 100644 --- a/sgx_tstd/src/sync/mpsc/stream.rs +++ b/sgx_tstd/src/sync/mpsc/stream.rs @@ -50,8 +50,15 @@ pub struct Packet { struct ProducerAddition { cnt: AtomicIsize, // How many items are on this channel + + // For AtomicPtr, the methods load and store read and write the address.To load the + // contents is pretty much another operation, but it is not atomic and and it is very + // unsafe. If there is access to the content where the pointer is pointed, it is best + // to use Acquire/Release. to_wake: AtomicPtr, // SignalToken for the blocked thread to wake up + // Here port_dropped is just a signal variety to see if this packet has been dropped + // and it doesn't synchronized with any other variety. Using Relaxed is enough. port_dropped: AtomicBool, // flag if the channel has been destroyed. } @@ -101,7 +108,7 @@ impl Packet { // If the other port has deterministically gone away, then definitely // must return the data back up the stack. Otherwise, the data is // considered as being sent. - if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { + if self.queue.producer_addition().port_dropped.load(Ordering::Relaxed) { return Err(t); } @@ -117,7 +124,7 @@ impl Packet { pub fn upgrade(&self, up: Receiver) -> UpgradeResult { // If the port has gone away, then there's no need to proceed any // further. - if self.queue.producer_addition().port_dropped.load(Ordering::SeqCst) { + if self.queue.producer_addition().port_dropped.load(Ordering::Relaxed) { return UpDisconnected; } @@ -126,7 +133,9 @@ impl Packet { fn do_send(&self, t: Message) -> UpgradeResult { self.queue.push(t); - match self.queue.producer_addition().cnt.fetch_add(1, Ordering::SeqCst) { + // Here cnt needs to synchronize with queue before cnt and to_wake in the + // take_to_wake operation after cnt. Using AcqRel is enough. + match self.queue.producer_addition().cnt.fetch_add(1, Ordering::AcqRel) { // As described in the mod's doc comment, -1 == wakeup -1 => UpWoke(self.take_to_wake()), // As as described before, SPSC queues must be >= -2 @@ -140,7 +149,8 @@ impl Packet { // will never remove this data. We can only have at most one item to // drain (the port drains the rest). DISCONNECTED => { - self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + // There is no other varieties to synchronize. So use Relaxed here. + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::Relaxed); let first = self.queue.pop(); let second = self.queue.pop(); assert!(second.is_none()); @@ -162,8 +172,8 @@ impl Packet { // Consumes ownership of the 'to_wake' field. fn take_to_wake(&self) -> SignalToken { - let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst); - self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst); + let ptr = self.queue.producer_addition().to_wake.load(Ordering::Acquire); + self.queue.producer_addition().to_wake.store(EMPTY, Ordering::Release); assert!(ptr != EMPTY); unsafe { SignalToken::from_raw(ptr) } } @@ -174,13 +184,16 @@ impl Packet { fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> { assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY); let ptr = unsafe { token.to_raw() }; - self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(ptr, Ordering::Release); let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) }; - match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::SeqCst) { + // Here cnt is synchronized with to_wake and steals, using Release to + // the correctness of the code. + match self.queue.producer_addition().cnt.fetch_sub(1 + steals, Ordering::Release) { DISCONNECTED => { - self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + // There is no other varieties to synchronize. So use Relaxed here. + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::Relaxed); } // If we factor in our steals and notice that the channel has no // data, we successfully sleep @@ -192,7 +205,7 @@ impl Packet { } } - self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst); + self.queue.producer_addition().to_wake.store(EMPTY, Ordering::Release); Err(unsafe { SignalToken::from_raw(ptr) }) } @@ -245,12 +258,15 @@ impl Packet { // adding back in whatever we couldn't factor into steals. Some(data) => unsafe { if *self.queue.consumer_addition().steals.get() > MAX_STEALS { - match self.queue.producer_addition().cnt.swap(0, Ordering::SeqCst) { + // Here cnt needs to synchronize with steals. Using Acquire is enough. + match self.queue.producer_addition().cnt.swap(0, Ordering::Acquire) { DISCONNECTED => { + // Here cnt doesn't synchronize with to other shared variables, Using + // Relaxed is enough. self.queue .producer_addition() .cnt - .store(DISCONNECTED, Ordering::SeqCst); + .store(DISCONNECTED, Ordering::Relaxed); } n => { let m = cmp::min(n, *self.queue.consumer_addition().steals.get()); @@ -268,7 +284,8 @@ impl Packet { }, None => { - match self.queue.producer_addition().cnt.load(Ordering::SeqCst) { + // Here cnt needs to synchronize with queue, Using Acquire is enough. + match self.queue.producer_addition().cnt.load(Ordering::Acquire) { n if n != DISCONNECTED => Err(Empty), // This is a little bit of a tricky case. We failed to pop @@ -295,7 +312,10 @@ impl Packet { pub fn drop_chan(&self) { // Dropping a channel is pretty simple, we just flag it as disconnected // and then wakeup a blocker if there is one. - match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::SeqCst) { + // + // Here cnt needs to synchronize with to_wake in the take_to_wake operation + // after cnt. Using Acquire is enough. + match self.queue.producer_addition().cnt.swap(DISCONNECTED, Ordering::Acquire) { -1 => { self.take_to_wake().signal(); } @@ -337,11 +357,14 @@ impl Packet { // data, but eventually we're guaranteed to break out of this loop // (because there is a bounded number of senders). let mut steals = unsafe { *self.queue.consumer_addition().steals.get() }; + // Here cnt needs to sychronize with queue when CAS fails, Using Acquire is enough. + // In general, ordering success in CAS is at least the same or more strict than + // fail in CAS. So Using Acquire in success is OK. while match self.queue.producer_addition().cnt.compare_exchange( steals, DISCONNECTED, - Ordering::SeqCst, - Ordering::SeqCst, + Ordering::Acquire, + Ordering::Acquire, ) { Ok(_) => false, Err(old) => old != DISCONNECTED, @@ -364,9 +387,11 @@ impl Packet { // increment the count on the channel (used for selection) fn bump(&self, amt: isize) -> isize { - match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::SeqCst) { + // There doesn't exist operation needed to be synchronized, + // Using Relaxed is safe. + match self.queue.producer_addition().cnt.fetch_add(amt, Ordering::Relaxed) { DISCONNECTED => { - self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::SeqCst); + self.queue.producer_addition().cnt.store(DISCONNECTED, Ordering::Relaxed); DISCONNECTED } n => n, @@ -426,7 +451,7 @@ impl Packet { if prev < 0 { drop(self.take_to_wake()); } else { - while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY { + while self.queue.producer_addition().to_wake.load(Ordering::Acquire) != EMPTY { thread::yield_now(); } } diff --git a/sgx_tstd/src/sync/mpsc/sync.rs b/sgx_tstd/src/sync/mpsc/sync.rs index 8bf273fd0..664ab0c70 100644 --- a/sgx_tstd/src/sync/mpsc/sync.rs +++ b/sgx_tstd/src/sync/mpsc/sync.rs @@ -54,6 +54,12 @@ use sgx_trts::trts; const MAX_REFCOUNT: usize = (isize::MAX) as usize; +/// Note about memory ordering: +/// +/// Here channels is just a shared variables between threads and doesn't synchronize +/// with other variables. Fetch_add/fetch_sub guarantees that the operation is on the +/// "latest" value. Therefore, `Relaxed` can be used in both single-threaded and +/// multi-threaded environments. pub struct Packet { /// Only field outside of the mutex. Just done for kicks, but mainly because /// the other shared channel already had the code implemented @@ -373,7 +379,7 @@ impl Packet { // Prepares this shared packet for a channel clone, essentially just bumping // a refcount. pub fn clone_chan(&self) { - let old_count = self.channels.fetch_add(1, Ordering::SeqCst); + let old_count = self.channels.fetch_add(1, Ordering::Relaxed); // See comments on Arc::clone() on why we do this (for `mem::forget`). if old_count > MAX_REFCOUNT { @@ -383,7 +389,7 @@ impl Packet { pub fn drop_chan(&self) { // Only flag the channel as disconnected if we're the last channel - match self.channels.fetch_sub(1, Ordering::SeqCst) { + match self.channels.fetch_sub(1, Ordering::Relaxed) { 1 => {} _ => return, } diff --git a/sgx_tstd/src/thread/scoped.rs b/sgx_tstd/src/thread/scoped.rs index 9dd17754c..6dbb0d79c 100644 --- a/sgx_tstd/src/thread/scoped.rs +++ b/sgx_tstd/src/thread/scoped.rs @@ -50,6 +50,11 @@ pub struct Scope<'scope, 'env: 'scope> { /// See [`Scope::spawn`] for details. pub struct ScopedJoinHandle<'scope, T>(JoinInner<'scope, T>); +/// Note about memory ordering: +/// +/// Here num_running_threads is a count variety used to see how many thread is +/// running and a_thread_panicked is just a signal. They don't synchronize with +/// any data at all, so Relaxed is technically sufficient. pub(super) struct ScopeData { num_running_threads: AtomicUsize, a_thread_panicked: AtomicBool, @@ -70,7 +75,7 @@ impl ScopeData { if panic { self.a_thread_panicked.store(true, Ordering::Relaxed); } - if self.num_running_threads.fetch_sub(1, Ordering::Release) == 1 { + if self.num_running_threads.fetch_sub(1, Ordering::Relaxed) == 1 { self.main_thread.unpark(); } } @@ -160,7 +165,7 @@ where let result = catch_unwind(AssertUnwindSafe(|| f(&scope))); // Wait until all the threads are finished. - while scope.data.num_running_threads.load(Ordering::Acquire) != 0 { + while scope.data.num_running_threads.load(Ordering::Relaxed) != 0 { park(); }