Skip to content

Commit

Permalink
Refurbished extendr#635 (extendr#658)
Browse files Browse the repository at this point in the history
  • Loading branch information
CGMossa authored Nov 2, 2023
1 parent 244c044 commit 2cf85e1
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 61 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/msrv.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,17 @@ permissions:

jobs:
test-min-rust-version:
# PowerShell core is available on all platforms and can be used to unify scripts
defaults:
run:
shell: pwsh
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: baptiste0928/cargo-install@v2
with:
crate: cargo-msrv
- name: Verify minimum rust version
run: cargo-msrv --path extendr-api/ verify
run: |
. ./ci-cargo.ps1
ci-cargo msrv --path extendr-api/ verify -ActionName "Verify Rust MSRV"
3 changes: 1 addition & 2 deletions extendr-api/src/lang_macros.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
//! Argument parsing and checking.
//!
use libR_sys::*;
//use crate::robj::*;
use crate::robj::GetSexp;
use crate::robj::Robj;
use crate::single_threaded;
use libR_sys::*;

/// Convert a list of tokens to an array of tuples.
#[doc(hidden)]
Expand Down
4 changes: 1 addition & 3 deletions extendr-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,7 @@ pub use lang_macros::*;
pub use na::*;
pub use rmacros::*;
pub use robj::*;
pub use thread_safety::{
catch_r_error, handle_panic, single_threaded, this_thread_id, throw_r_error,
};
pub use thread_safety::{catch_r_error, handle_panic, single_threaded, throw_r_error};
pub use wrapper::*;

pub use extendr_macros::*;
Expand Down
6 changes: 3 additions & 3 deletions extendr-api/src/ownership.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
//! Maintain ownership of R objects.
//!
//! This provides the functions protect() and unprotect()
//! This provides the functions [protect] and [unprotect].
//! A single preserved vector holds ownership of all protected objects.
//!
//! Objects are reference counted, so multiple calls are possible,
//! unlike R_PreserveObject.
//! unlike `R_PreserveObject`.
//!
//! This module exports two functions, protect(sexp) and unprotect(sexp).
//! This module exports two functions, `protect(sexp)` and `unprotect(sexp)`.
use once_cell::sync::Lazy;
use std::collections::hash_map::{Entry, HashMap};
Expand Down
4 changes: 1 addition & 3 deletions extendr-api/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ pub use super::robj::{
RobjItertools, Slices, Types,
};

pub use super::thread_safety::{
catch_r_error, handle_panic, single_threaded, this_thread_id, throw_r_error,
};
pub use super::thread_safety::{catch_r_error, handle_panic, single_threaded, throw_r_error};

pub use super::wrapper::{
Complexes, Dataframe, Doubles, EnvIter, Environment, Expressions, ExternalPtr, FromList,
Expand Down
76 changes: 27 additions & 49 deletions extendr-api/src/thread_safety.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,47 @@
//! Provide limited protection for multithreaded access to the R API.
use crate::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::cell::Cell;
use std::sync::Mutex;

static OWNER_THREAD: AtomicU32 = AtomicU32::new(0);
static NEXT_THREAD_ID: AtomicU32 = AtomicU32::new(1);
/// A global lock, that should represent the global lock on the R-API.
/// It is not tied to an actual instance of R.
static R_API_LOCK: Mutex<()> = Mutex::new(());

thread_local! {
static THREAD_ID: u32 = NEXT_THREAD_ID.fetch_add(1, Ordering::SeqCst);
static THREAD_HAS_LOCK: Cell<bool> = Cell::new(false);
}

// Get an integer 1.. for each thread that calls this.
pub fn this_thread_id() -> u32 {
THREAD_ID.with(|&v| v)
}

/// Run a function single threaded.
/// Note: This will fail badly if the called function panics or calls RF_error.
/// Run `f` while ensuring that `f` runs in a single-threaded manner.
///
/// ```
/// use extendr_api::prelude::*;
/// use std::sync::atomic::{AtomicU32, Ordering};
/// static GLOBAL_THREAD_COUNT: AtomicU32 = AtomicU32::new(0);
/// This is intended for single-threaded access of the R's C-API.
/// It is possible to have nested calls of `single_threaded` without deadlocking.
///
/// let threads : Vec<_> = (0..100).map(|_| {
/// std::thread::spawn(move|| {
/// single_threaded(|| {
/// // check that we are single threaded.
/// let old_thread_count = GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::AcqRel);
/// assert_eq!(old_thread_count, 0);
/// std::thread::sleep(std::time::Duration::from_millis(1));
/// GLOBAL_THREAD_COUNT.fetch_sub(1, Ordering::AcqRel);
/// // recursive calls are ok.
/// assert_eq!(single_threaded(|| {
/// 1
/// }), 1);
/// })
/// })
/// }).collect();
/// ```
/// Note: This will fail badly if the called function `f` panics or calls `Rf_error`.
pub fn single_threaded<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
let id = this_thread_id();
let old_id = OWNER_THREAD.load(Ordering::Acquire);

if old_id != id {
// wait for OWNER_THREAD to become 0 and put us as the owner.
while OWNER_THREAD
.compare_exchange(0, id, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
let has_lock = THREAD_HAS_LOCK.with(|x| x.get());

// acquire R-API lock
let _guard = if !has_lock {
Some(R_API_LOCK.lock().unwrap())
} else {
None
};

// this thread now has the lock
THREAD_HAS_LOCK.with(|x| x.set(true));

let res = f();
let result = f();

if old_id != id {
// release the lock and signal waiting threads.
OWNER_THREAD.store(0, Ordering::Release);
// release the R-API lock
if _guard.is_some() {
THREAD_HAS_LOCK.with(|x| x.set(false));
}

res
result
}

/// This function is used by the wrapper logic to catch
Expand Down Expand Up @@ -94,7 +72,7 @@ pub fn throw_r_error<S: AsRef<str>>(s: S) -> ! {
};
}

/// Wrap an R function such as Rf_findFunction and convert errors and panics into results.
/// Wrap an R function such as `Rf_findFunction` and convert errors and panics into results.
/// ```ignore
/// use extendr_api::prelude::*;
/// test! {
Expand Down

0 comments on commit 2cf85e1

Please sign in to comment.