diff --git a/crates/libs/core/src/lib.rs b/crates/libs/core/src/lib.rs index 9a5a2647..a7669a8c 100644 --- a/crates/libs/core/src/lib.rs +++ b/crates/libs/core/src/lib.rs @@ -31,7 +31,6 @@ pub mod error; mod iter; pub mod runtime; pub mod strings; -#[cfg(feature = "tokio_async")] pub mod sync; pub mod types; diff --git a/crates/libs/core/src/sync/bridge.rs b/crates/libs/core/src/sync/bridge.rs deleted file mode 100644 index 0223554c..00000000 --- a/crates/libs/core/src/sync/bridge.rs +++ /dev/null @@ -1,6 +0,0 @@ -// ------------------------------------------------------------ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License (MIT). See License.txt in the repo root for license information. -// ------------------------------------------------------------ - -// TODO: move BridgeContext3 here in this file. diff --git a/crates/libs/core/src/sync/bridge_context.rs b/crates/libs/core/src/sync/bridge_context.rs index 2ae011f0..8e597b3e 100644 --- a/crates/libs/core/src/sync/bridge_context.rs +++ b/crates/libs/core/src/sync/bridge_context.rs @@ -3,4 +3,168 @@ // Licensed under the MIT License (MIT). See License.txt in the repo root for license information. // ------------------------------------------------------------ -// TODO: move bridge context in this file. +use std::{cell::Cell, future::Future}; + +use crate::{ + error::FabricErrorCode, + runtime::executor::{Executor, JoinHandle}, +}; +use mssf_com::FabricCommon::{ + IFabricAsyncOperationCallback, IFabricAsyncOperationContext, IFabricAsyncOperationContext_Impl, +}; +use windows_core::{implement, AsImpl}; + +use super::CancellationToken; + +/// Async operation context for bridging rust code into SF COM api that supports cancellation. +#[implement(IFabricAsyncOperationContext)] +pub struct BridgeContext3 +where + T: 'static, +{ + /// The task result. Initially it is None. + /// If the task panics, the error is propagated here. + content: Cell>>, + /// Indicates the async operation has completed or not. + /// This is a memory barrier for making the content available + /// from writer thread to the reader thread. It is needed because + /// in SF COM API, the caller can call Begin operation, poll on this + /// status until complete, and End operation without barriers. + is_completed: std::sync::atomic::AtomicBool, + /// mssf never completes async operations synchronously. + /// This is always false. + is_completed_synchronously: bool, + callback: IFabricAsyncOperationCallback, + token: CancellationToken, +} + +impl BridgeContext3 +where + T: Send, +{ + fn new(callback: IFabricAsyncOperationCallback, token: CancellationToken) -> Self { + Self { + content: Cell::new(None), + is_completed: std::sync::atomic::AtomicBool::new(false), + is_completed_synchronously: false, + callback, + token, + } + } + + /// Creates the context from callback, and returns a cancellation token that + /// can be used in rust code, and the cancellation token is hooked into self, + /// where Cancel() api cancels the operation. + pub fn make(callback: Option<&IFabricAsyncOperationCallback>) -> (Self, CancellationToken) { + let token = CancellationToken::new(); + let ctx = Self::new(callback.unwrap().clone(), token.clone()); + (ctx, token) + } + + /// Spawns the future on rt. + /// Returns a context that can be returned to SF runtime. + /// This is intended to be used in SF Begin COM api, where + /// rust code is spawned in background and the context is returned + /// to caller. + /// If the future panics, an error is set in the resulting content, + /// caller will still get callback and receive an error in the End api. + /// This api is in some sense unsafe, because the developer needs to ensure + /// the following: + /// * return type of the future needs to match SF COM api end return type. + pub fn spawn( + self, + rt: &impl Executor, + future: F, + ) -> crate::Result + where + F: Future + Send + 'static, + { + let self_cp: IFabricAsyncOperationContext = self.into(); + let self_cp2 = self_cp.clone(); + let rt_cp = rt.clone(); + rt.spawn(async move { + // Run user code in a task and wait on its status. + // If user code panics we propagate the error back to SF. + let task_res = rt_cp.spawn(future).join().await; + // TODO: maybe it is good to report health to SF here the same way that sf dotnet app works. + + // We trust the code in mssf here to not panic, or we have bigger problem (memory corruption etc.). + let self_impl: &BridgeContext3 = unsafe { self_cp.as_impl() }; + self_impl.set_content(task_res); + let cb = unsafe { self_cp.Callback().unwrap() }; + unsafe { cb.Invoke(&self_cp) }; + }); + Ok(self_cp2) + } + + /// Get the result from the context from the SF End COM api. + /// This api is in some sense unsafe, because the developer needs to ensure + /// the following: + /// * context impl type is `BridgeContext3`, and the T matches the SF end api + /// return type. + /// Note that if T is of Result type, the current function return type is + /// Result>, so unwrap is needed. + pub fn result(context: Option<&IFabricAsyncOperationContext>) -> crate::Result { + let self_impl: &BridgeContext3 = unsafe { context.unwrap().as_impl() }; + self_impl.consume_content() + } + + /// Set the content for the ctx. + /// Marks the ctx as completed. + fn set_content(&self, content: crate::Result) { + let prev = self.content.replace(Some(content)); + assert!(prev.is_none()); + self.set_complete(); + } + + /// Consumes the content set by set_content(). + /// can only be called once after set content. + fn consume_content(&self) -> crate::Result { + match self.check_complete() { + true => self.content.take().expect("content is consumed twice."), + false => { + if self.token.is_cancelled() { + Err(FabricErrorCode::E_ABORT.into()) + } else { + Err(FabricErrorCode::FABRIC_E_OPERATION_NOT_COMPLETE.into()) + } + } + } + } + + /// Set the ctx as completed. Requires the ctx content to be set. Makes + /// the content available for access from other threads using barrier. + fn set_complete(&self) { + self.is_completed + .store(true, std::sync::atomic::Ordering::Release); + } + + /// Checks ctx is completed. + /// Makes sure content sets by other threads is visible from this thread. + fn check_complete(&self) -> bool { + self.is_completed.load(std::sync::atomic::Ordering::Acquire) + } +} + +impl IFabricAsyncOperationContext_Impl for BridgeContext3_Impl { + fn IsCompleted(&self) -> crate::BOOLEAN { + self.is_completed + .load(std::sync::atomic::Ordering::Relaxed) + .into() + } + + // This always returns false because we defer all tasks in the background executuor. + fn CompletedSynchronously(&self) -> crate::BOOLEAN { + self.is_completed_synchronously.into() + } + + fn Callback(&self) -> crate::Result { + let cp = self.callback.clone(); + Ok(cp) + } + + fn Cancel(&self) -> crate::Result<()> { + self.token.cancel(); + Ok(()) + } +} diff --git a/crates/libs/core/src/sync/cancel.rs b/crates/libs/core/src/sync/cancel.rs index 098a7263..0685da43 100644 --- a/crates/libs/core/src/sync/cancel.rs +++ b/crates/libs/core/src/sync/cancel.rs @@ -4,175 +4,15 @@ // ------------------------------------------------------------ use std::{ - cell::Cell, future::Future, pin::Pin, task::{Context, Poll}, }; -use mssf_com::FabricCommon::{ - IFabricAsyncOperationCallback, IFabricAsyncOperationContext, IFabricAsyncOperationContext_Impl, -}; +use mssf_com::FabricCommon::{IFabricAsyncOperationCallback, IFabricAsyncOperationContext}; pub use tokio_util::sync::CancellationToken; -use windows_core::{implement, AsImpl}; - -use crate::{ - error::FabricErrorCode, - runtime::executor::{Executor, JoinHandle}, -}; - -/// Async operation context for bridging rust code into SF COM api that supports cancellation. -#[implement(IFabricAsyncOperationContext)] -pub struct BridgeContext3 -where - T: 'static, -{ - /// The task result. Initially it is None. - /// If the task panics, the error is propagated here. - content: Cell>>, - /// Indicates the async operation has completed or not. - /// This is a memory barrier for making the content available - /// from writer thread to the reader thread. It is needed because - /// in SF COM API, the caller can call Begin operation, poll on this - /// status until complete, and End operation without barriers. - is_completed: std::sync::atomic::AtomicBool, - /// mssf never completes async operations synchronously. - /// This is always false. - is_completed_synchronously: bool, - callback: IFabricAsyncOperationCallback, - token: CancellationToken, -} - -impl BridgeContext3 -where - T: Send, -{ - fn new(callback: IFabricAsyncOperationCallback, token: CancellationToken) -> Self { - Self { - content: Cell::new(None), - is_completed: std::sync::atomic::AtomicBool::new(false), - is_completed_synchronously: false, - callback, - token, - } - } - - /// Creates the context from callback, and returns a cancellation token that - /// can be used in rust code, and the cancellation token is hooked into self, - /// where Cancel() api cancels the operation. - pub fn make(callback: Option<&IFabricAsyncOperationCallback>) -> (Self, CancellationToken) { - let token = CancellationToken::new(); - let ctx = Self::new(callback.unwrap().clone(), token.clone()); - (ctx, token) - } - - /// Spawns the future on rt. - /// Returns a context that can be returned to SF runtime. - /// This is intended to be used in SF Begin COM api, where - /// rust code is spawned in background and the context is returned - /// to caller. - /// If the future panics, an error is set in the resulting content, - /// caller will still get callback and receive an error in the End api. - /// This api is in some sense unsafe, because the developer needs to ensure - /// the following: - /// * return type of the future needs to match SF COM api end return type. - pub fn spawn( - self, - rt: &impl Executor, - future: F, - ) -> crate::Result - where - F: Future + Send + 'static, - { - let self_cp: IFabricAsyncOperationContext = self.into(); - let self_cp2 = self_cp.clone(); - let rt_cp = rt.clone(); - rt.spawn(async move { - // Run user code in a task and wait on its status. - // If user code panics we propagate the error back to SF. - let task_res = rt_cp.spawn(future).join().await; - // TODO: maybe it is good to report health to SF here the same way that sf dotnet app works. - - // We trust the code in mssf here to not panic, or we have bigger problem (memory corruption etc.). - let self_impl: &BridgeContext3 = unsafe { self_cp.as_impl() }; - self_impl.set_content(task_res); - let cb = unsafe { self_cp.Callback().unwrap() }; - unsafe { cb.Invoke(&self_cp) }; - }); - Ok(self_cp2) - } - - /// Get the result from the context from the SF End COM api. - /// This api is in some sense unsafe, because the developer needs to ensure - /// the following: - /// * context impl type is `BridgeContext3`, and the T matches the SF end api - /// return type. - /// Note that if T is of Result type, the current function return type is - /// Result>, so unwrap is needed. - pub fn result(context: Option<&IFabricAsyncOperationContext>) -> crate::Result { - let self_impl: &BridgeContext3 = unsafe { context.unwrap().as_impl() }; - self_impl.consume_content() - } - - /// Set the content for the ctx. - /// Marks the ctx as completed. - fn set_content(&self, content: crate::Result) { - let prev = self.content.replace(Some(content)); - assert!(prev.is_none()); - self.set_complete(); - } - - /// Consumes the content set by set_content(). - /// can only be called once after set content. - fn consume_content(&self) -> crate::Result { - match self.check_complete() { - true => self.content.take().expect("content is consumed twice."), - false => { - if self.token.is_cancelled() { - Err(FabricErrorCode::E_ABORT.into()) - } else { - Err(FabricErrorCode::FABRIC_E_OPERATION_NOT_COMPLETE.into()) - } - } - } - } - - /// Set the ctx as completed. Requires the ctx content to be set. Makes - /// the content available for access from other threads using barrier. - fn set_complete(&self) { - self.is_completed - .store(true, std::sync::atomic::Ordering::Release); - } - - /// Checks ctx is completed. - /// Makes sure content sets by other threads is visible from this thread. - fn check_complete(&self) -> bool { - self.is_completed.load(std::sync::atomic::Ordering::Acquire) - } -} - -impl IFabricAsyncOperationContext_Impl for BridgeContext3_Impl { - fn IsCompleted(&self) -> crate::BOOLEAN { - self.is_completed - .load(std::sync::atomic::Ordering::Relaxed) - .into() - } - - // This always returns false because we defer all tasks in the background executuor. - fn CompletedSynchronously(&self) -> crate::BOOLEAN { - self.is_completed_synchronously.into() - } - fn Callback(&self) -> crate::Result { - let cp = self.callback.clone(); - Ok(cp) - } - - fn Cancel(&self) -> crate::Result<()> { - self.token.cancel(); - Ok(()) - } -} +use crate::error::FabricErrorCode; // proxy impl @@ -417,9 +257,8 @@ mod test { use tokio_util::sync::CancellationToken; use crate::{ - error::FabricErrorCode, - runtime::executor::DefaultExecutor, - sync::cancel::{oneshot_channel, BridgeContext3}, + error::FabricErrorCode, runtime::executor::DefaultExecutor, + sync::bridge_context::BridgeContext3, sync::cancel::oneshot_channel, }; use super::fabric_begin_end_proxy2; diff --git a/crates/libs/core/src/sync/channel.rs b/crates/libs/core/src/sync/channel.rs new file mode 100644 index 00000000..bf55ac7a --- /dev/null +++ b/crates/libs/core/src/sync/channel.rs @@ -0,0 +1,107 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License (MIT). See License.txt in the repo root for license information. +// ------------------------------------------------------------ + +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use tokio::sync::oneshot::Receiver; + +// Token that wraps oneshot receiver. +// The future recieve does not have error. This is designed for the use +// case where SF guarantees that sender will be called. +pub struct FabricReceiver { + rx: tokio::sync::oneshot::Receiver, +} + +impl FabricReceiver { + pub(super) fn new(rx: tokio::sync::oneshot::Receiver) -> FabricReceiver { + FabricReceiver { rx } + } + + pub fn blocking_recv(self) -> T { + // sender must send stuff so that there is not error. + self.rx.blocking_recv().unwrap() + } +} + +// The future differs from tokio oneshot that it will not error when awaited. +impl Future for FabricReceiver { + type Output = T; + fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + // Try to receive the value from the sender + let innner = as Future>::poll(Pin::new(&mut self.rx), _cx); + match innner { + Poll::Ready(x) => { + // error only happens when sender is dropped without sending. + // we ignore this error since in sf-rs use this will never happen. + Poll::Ready(x.expect("sf sender closed without sending a value.")) + } + Poll::Pending => Poll::Pending, + } + } +} + +pub struct FabricSender { + tx: tokio::sync::oneshot::Sender, +} + +impl FabricSender { + fn new(tx: tokio::sync::oneshot::Sender) -> FabricSender { + FabricSender { tx } + } + + pub fn send(self, data: T) { + let e = self.tx.send(data); + if e.is_err() { + // In SF use case receiver should not be dropped by user. + // If it acctually dropped by user, it is ok to ignore because user + // does not want to want the value any more. But too bad SF has done + // the work to get the value. + debug_assert!(false, "receiver dropped."); + } + } +} + +// Creates a fabric oneshot channel. +pub fn oneshot_channel() -> (FabricSender, FabricReceiver) { + let (tx, rx) = tokio::sync::oneshot::channel::(); + (FabricSender::new(tx), FabricReceiver::new(rx)) +} + +// Send Box. Wrap a type and implement send. +// c pointers are not send in rust, so this forces it. +#[derive(Debug)] +pub struct SBox { + pub b: Box, +} + +// We know that T is send. This requires programmer's check of the internals. +unsafe impl Send for SBox {} + +impl SBox { + pub fn new(x: T) -> SBox { + SBox { b: Box::new(x) } + } + + pub fn into_inner(self) -> T { + *self.b + } +} + +#[cfg(test)] +mod tests { + #[tokio::test] + async fn test_oneshot() { + let (tx, rx) = super::oneshot_channel::(); + tokio::spawn(async move { + tx.send("hello".to_string()); + }); + let val = rx.await; + assert_eq!("hello", val); + } +} diff --git a/crates/libs/core/src/sync/mod.rs b/crates/libs/core/src/sync/mod.rs index 2de79ab6..e6481691 100644 --- a/crates/libs/core/src/sync/mod.rs +++ b/crates/libs/core/src/sync/mod.rs @@ -6,27 +6,29 @@ // this contains some experiments for async #![allow(non_snake_case)] -use std::{ - cell::Cell, - future::Future, - pin::Pin, - task::{Context, Poll}, -}; +use std::cell::Cell; use mssf_com::FabricCommon::{ IFabricAsyncOperationCallback, IFabricAsyncOperationCallback_Impl, IFabricAsyncOperationContext, }; -use tokio::sync::oneshot::Receiver; use windows_core::implement; mod proxy; pub mod wait; -mod bridge; // This is intentional private. User should directly use bridge mod. +#[cfg(feature = "tokio_async")] mod bridge_context; +#[cfg(feature = "tokio_async")] +// TODO: make private? +pub use bridge_context::BridgeContext3; + +#[cfg(feature = "tokio_async")] +pub mod channel; +#[cfg(feature = "tokio_async")] pub mod cancel; +#[cfg(feature = "tokio_async")] pub use cancel::*; // fabric code begins here @@ -72,90 +74,8 @@ impl AwaitableCallback2 { } } -// Token that wraps oneshot receiver. -// The future recieve does not have error. This is designed for the use -// case where SF guarantees that sender will be called. -pub struct FabricReceiver { - rx: tokio::sync::oneshot::Receiver, -} - -impl FabricReceiver { - fn new(rx: tokio::sync::oneshot::Receiver) -> FabricReceiver { - FabricReceiver { rx } - } - - pub fn blocking_recv(self) -> T { - // sender must send stuff so that there is not error. - self.rx.blocking_recv().unwrap() - } -} - -// The future differs from tokio oneshot that it will not error when awaited. -impl Future for FabricReceiver { - type Output = T; - fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { - // Try to receive the value from the sender - let innner = as Future>::poll(Pin::new(&mut self.rx), _cx); - match innner { - Poll::Ready(x) => { - // error only happens when sender is dropped without sending. - // we ignore this error since in sf-rs use this will never happen. - Poll::Ready(x.expect("sf sender closed without sending a value.")) - } - Poll::Pending => Poll::Pending, - } - } -} - -pub struct FabricSender { - tx: tokio::sync::oneshot::Sender, -} - -impl FabricSender { - fn new(tx: tokio::sync::oneshot::Sender) -> FabricSender { - FabricSender { tx } - } - - pub fn send(self, data: T) { - let e = self.tx.send(data); - if e.is_err() { - // In SF use case receiver should not be dropped by user. - // If it acctually dropped by user, it is ok to ignore because user - // does not want to want the value any more. But too bad SF has done - // the work to get the value. - debug_assert!(false, "receiver dropped."); - } - } -} - -// Creates a fabric oneshot channel. -pub fn oneshot_channel() -> (FabricSender, FabricReceiver) { - let (tx, rx) = tokio::sync::oneshot::channel::(); - (FabricSender::new(tx), FabricReceiver::new(rx)) -} - -// Send Box. Wrap a type and implement send. -// c pointers are not send in rust, so this forces it. -#[derive(Debug)] -pub struct SBox { - pub b: Box, -} - -// We know that T is send. This requires programmer's check of the internals. -unsafe impl Send for SBox {} - -impl SBox { - pub fn new(x: T) -> SBox { - SBox { b: Box::new(x) } - } - - pub fn into_inner(self) -> T { - *self.b - } -} - -#[cfg(test)] -mod tests { +#[cfg(all(test, feature = "tokio_async"))] +mod async_tests { use std::cell::Cell; @@ -177,7 +97,7 @@ mod tests { use tokio::sync::oneshot::Sender; use windows_core::{implement, PCWSTR}; - use super::{oneshot_channel, FabricReceiver, SBox}; + use super::channel::{oneshot_channel, FabricReceiver, SBox}; use super::AwaitableCallback2; @@ -212,7 +132,7 @@ mod tests { } } - type AwaitableToken = super::FabricReceiver<()>; + type AwaitableToken = super::channel::FabricReceiver<()>; macro_rules! beginmyclient { ($name: ident) => { @@ -513,14 +433,4 @@ mod tests { let _mgmt = crate::client::FabricClient::builder() .build_interface::(); } - - #[tokio::test] - async fn test_oneshot() { - let (tx, rx) = super::oneshot_channel::(); - tokio::spawn(async move { - tx.send("hello".to_string()); - }); - let val = rx.await; - assert_eq!("hello", val); - } }