Skip to content

Commit

Permalink
Add and_then and transpose methods
Browse files Browse the repository at this point in the history
The two transpose methods allow transposing Options and Results
without going to the worker thread.
`and_then` with the re-added BaggageHandler type allows users to
implement similar functionality, albeit with a round trip to the
worker thread.
  • Loading branch information
Skepfyr committed Feb 13, 2021
1 parent 2bd418d commit 831ae29
Showing 1 changed file with 160 additions and 1 deletion.
161 changes: 160 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crossbeam_channel::{bounded, unbounded, Sender};
use once_cell::sync::Lazy;
use std::{
fmt,
marker::PhantomData,
mem::{self, ManuallyDrop},
ptr,
sync::Mutex,
Expand Down Expand Up @@ -171,6 +172,48 @@ impl<T> DiplomaticBag<T> {
})
}

/// Maps a `DiplomaticBag<T>` to a `U`, by running a closure on the wrapped
/// value.
///
/// This closure must be [`Send`] as it will run on a worker thread. It
/// should also not panic, if it does all other active bags will leak, see
/// the type-level docs for more information.
///
/// This function is especially useful for mapping over wrapper types like
/// `Vec`, `Result`, `Option` (although see [`transpose()`][Self::transpose()]
/// for those last two). It allows this by giving a [`BaggageHandler`] to
/// the closure, which provides methods to wrap and unwrap `DiplomaticBag`s.
///
/// # Panics
/// This function will panic if there is an issue with the underlying worker
/// thread, which is usually caused by this or another bag panicking.
///
/// # Examples
/// ```
/// # use diplomatic_bag::DiplomaticBag;
/// let foo = DiplomaticBag::new(|| vec!["ho"; 3]);
/// let foo: Vec<_> = foo.and_then(|handler, foo| {
/// foo.map(|item| handler.wrap(item)).collect()
/// });
/// # assert_eq!("ho", foo[0].into_inner());
/// ```
pub fn and_then<U, F>(self, f: F) -> U
where
U: Send,
F: for<'a> FnOnce(BaggageHandler<'a>, T) -> U,
F: Send,
{
run(move || {
// Safety:
// `into_inner` can only be called on the worker thread, as it gives
// access to a value of type `T` and `T` isn't necessarily `Send`,
// however that's where this will run.
let value = unsafe { self.into_inner_unchecked() };
let handler = unsafe { BaggageHandler::new() };
f(handler, value)
})
}

/// Combine a `DiplomaticBag<T>` and a `DiplomaticBag<U>` into a
/// `DiplomaticBag<(T, U)>`.
///
Expand Down Expand Up @@ -283,6 +326,60 @@ impl<T: Send> DiplomaticBag<T> {
}
}

impl<T, E> DiplomaticBag<Result<T, E>> {
/// Convert a `DiplomaticBag<Result<T, E>>` into a
/// `Result<DiplomaticBag<T>, DiplomaticBag<E>>`.
///
/// # Examples
/// ```
/// fn foo() -> Result<DiplomaticBag<()>, String> {
/// let bag = DiplomaticBag::new(|| {
/// Ok(())
/// });
/// bag.transpose().map_err(|err| err.into_inner())
/// }
/// ```
pub fn transpose(self) -> Result<DiplomaticBag<T>, DiplomaticBag<E>> {
// Safety:
// Unclear, this isn't safe by the letter of the law as we read the
// bytes of inner to determine if it's `Ok` or `Err`. However, this feels
// safe because we never actually run any code that could depend on `!Send`
// bits of `T` or `E`.
let inner = unsafe { self.into_inner_unchecked() };
match inner {
Ok(val) => Ok(DiplomaticBag {
value: Untouchable::new(val),
}),
Err(err) => Err(DiplomaticBag {
value: Untouchable::new(err),
}),
}
}
}

impl<T> DiplomaticBag<Option<T>> {
/// Convert a `DiplomaticBag<Option<T>>` into a
/// `Option<DiplomaticBag<T>>`.
///
/// # Examples
/// ```
/// DiplomaticBag::new(|| Some(())).transpose().unwrap_or_default()
/// ```
pub fn transpose(self) -> Option<DiplomaticBag<T>> {
// Safety:
// Unclear, this isn't safe by the letter of the law as we read the
// bytes of inner to determine if it's `Some` or `None`. However, this
// feels safe for the same reason as above.
let inner = unsafe { self.into_inner_unchecked() };
match inner {
Some(val) => Some(DiplomaticBag {
value: Untouchable::new(val),
}),
None => None,
}
}
}

/// `Drop` the inner type when the `DiplomaticBag` is dropped.
///
/// Ideally, this would only be implemented when `T` is `Drop` but `Drop` must
Expand Down Expand Up @@ -353,6 +450,67 @@ impl<T: Ord> Ord for DiplomaticBag<T> {
}
}

/// A type that allows wrapping and unwrapping [`DiplomaticBag`]s inside the
/// execution context of another bag.
///
/// This allows computations on the wrapped values of multiple bags, and
/// provides a mechanism for returning `!Send` and `!Sync` types from
/// computations done on values inside bags.
///
/// # Examples
/// ```
/// # use diplomatic_bag::DiplomaticBag;
/// let one = DiplomaticBag::new(|_| 1);
/// let two = DiplomaticBag::new(|_| 2);
/// let three = one.and_then(|handler, one| {
/// let three = one + handler.unwrap(two.as_ref());
/// handler.wrap(three)
/// });
/// # assert_eq!(3, three.execute(|_, three| three));
#[derive(Debug, Clone, Copy)]
pub struct BaggageHandler<'a>(PhantomData<(&'a (), *mut ())>);

impl BaggageHandler<'_> {
/// Create a new `BaggageHandler`.
///
/// # Safety
/// This must only be called from the worker thread, as it allows safe
/// wrapping and unwrapping of `DiplomaticBag`s.
unsafe fn new() -> Self {
Self(PhantomData)
}

/// Wrap a value in a [`DiplomaticBag`], allowing it to be sent to other
/// threads even if it is not `Send`.
///
/// # Examples
/// ```
/// # use diplomatic_bag::DiplomaticBag;
/// let foo: DiplomaticBag<u8> = DiplomaticBag::new(|_| 2);
/// let bar: DiplomaticBag<u8> =
/// foo.execute_ref(|handler, value| handler.wrap(value.clone()));
/// ```
pub fn wrap<T>(&self, value: T) -> DiplomaticBag<T> {
DiplomaticBag {
value: Untouchable::new(value),
}
}
/// Unwrap a value in a [`DiplomaticBag`], allowing it to be used inside
/// the execution context of another bag.
///
/// # Examples
/// ```
/// # use diplomatic_bag::DiplomaticBag;
/// let one = DiplomaticBag::new(|_| 1);
/// let two = DiplomaticBag::new(|_| 2);
/// let three = one.execute(|handler, one| one + handler.unwrap(two));
/// # assert_eq!(3, three);
/// ```
pub fn unwrap<T>(&self, proxy: DiplomaticBag<T>) -> T {
unsafe { proxy.into_inner_unchecked() }
}
}

/// Run an arbitrary closure on the shared worker thread.
///
/// # Panics
Expand Down Expand Up @@ -530,7 +688,7 @@ unsafe impl<T> Sync for Untouchable<T> {}
#[cfg(test)]
mod tests {
use slotmap::{DefaultKey, SlotMap};
use static_assertions::assert_impl_all;
use static_assertions::{assert_impl_all, assert_not_impl_any};
use std::{
cell::{Cell, RefCell},
marker::PhantomData,
Expand Down Expand Up @@ -581,6 +739,7 @@ mod tests {
}

assert_impl_all!(DiplomaticBag<*mut ()>: Send, Sync);
assert_not_impl_any!(BaggageHandler<'_>: Send, Sync);
assert_impl_all!(Error: std::error::Error, Send, Sync);

#[test]
Expand Down

0 comments on commit 831ae29

Please sign in to comment.