Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce how much code requires tokio_async to compile #113

Merged
merged 5 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/libs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ pub mod error;
mod iter;
pub mod runtime;
pub mod strings;
#[cfg(feature = "tokio_async")]
pub mod sync;
pub mod types;

Expand Down
6 changes: 0 additions & 6 deletions crates/libs/core/src/sync/bridge.rs

This file was deleted.

166 changes: 165 additions & 1 deletion crates/libs/core/src/sync/bridge_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,168 @@
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
// ------------------------------------------------------------

// TODO: move bridge context in this file.
use std::{cell::Cell, future::Future};

use crate::{
error::FabricErrorCode,
runtime::executor::{Executor, JoinHandle},
};
use mssf_com::FabricCommon::{
IFabricAsyncOperationCallback, IFabricAsyncOperationContext, IFabricAsyncOperationContext_Impl,
};
use windows_core::{implement, AsImpl};

use super::CancellationToken;

/// Async operation context for bridging rust code into SF COM api that supports cancellation.
#[implement(IFabricAsyncOperationContext)]
pub struct BridgeContext3<T>
where
T: 'static,
{
/// The task result. Initially it is None.
/// If the task panics, the error is propagated here.
content: Cell<Option<crate::Result<T>>>,
/// Indicates the async operation has completed or not.
/// This is a memory barrier for making the content available
/// from writer thread to the reader thread. It is needed because
/// in SF COM API, the caller can call Begin operation, poll on this
/// status until complete, and End operation without barriers.
is_completed: std::sync::atomic::AtomicBool,
/// mssf never completes async operations synchronously.
/// This is always false.
is_completed_synchronously: bool,
callback: IFabricAsyncOperationCallback,
token: CancellationToken,
}

impl<T> BridgeContext3<T>
where
T: Send,
{
fn new(callback: IFabricAsyncOperationCallback, token: CancellationToken) -> Self {
Self {
content: Cell::new(None),
is_completed: std::sync::atomic::AtomicBool::new(false),
is_completed_synchronously: false,
callback,
token,
}
}

/// Creates the context from callback, and returns a cancellation token that
/// can be used in rust code, and the cancellation token is hooked into self,
/// where Cancel() api cancels the operation.
pub fn make(callback: Option<&IFabricAsyncOperationCallback>) -> (Self, CancellationToken) {
let token = CancellationToken::new();
let ctx = Self::new(callback.unwrap().clone(), token.clone());
(ctx, token)
}

/// Spawns the future on rt.
/// Returns a context that can be returned to SF runtime.
/// This is intended to be used in SF Begin COM api, where
/// rust code is spawned in background and the context is returned
/// to caller.
/// If the future panics, an error is set in the resulting content,
/// caller will still get callback and receive an error in the End api.
/// This api is in some sense unsafe, because the developer needs to ensure
/// the following:
/// * return type of the future needs to match SF COM api end return type.
pub fn spawn<F>(
self,
rt: &impl Executor,
future: F,
) -> crate::Result<IFabricAsyncOperationContext>
where
F: Future<Output = T> + Send + 'static,
{
let self_cp: IFabricAsyncOperationContext = self.into();
let self_cp2 = self_cp.clone();
let rt_cp = rt.clone();
rt.spawn(async move {
// Run user code in a task and wait on its status.
// If user code panics we propagate the error back to SF.
let task_res = rt_cp.spawn(future).join().await;
// TODO: maybe it is good to report health to SF here the same way that sf dotnet app works.

// We trust the code in mssf here to not panic, or we have bigger problem (memory corruption etc.).
let self_impl: &BridgeContext3<T> = unsafe { self_cp.as_impl() };
self_impl.set_content(task_res);
let cb = unsafe { self_cp.Callback().unwrap() };
unsafe { cb.Invoke(&self_cp) };
});
Ok(self_cp2)
}

/// Get the result from the context from the SF End COM api.
/// This api is in some sense unsafe, because the developer needs to ensure
/// the following:
/// * context impl type is `BridgeContext3`, and the T matches the SF end api
/// return type.
/// Note that if T is of Result<ICOM> type, the current function return type is
/// Result<Result<ICOM>>, so unwrap is needed.
pub fn result(context: Option<&IFabricAsyncOperationContext>) -> crate::Result<T> {
let self_impl: &BridgeContext3<T> = unsafe { context.unwrap().as_impl() };
self_impl.consume_content()
}

/// Set the content for the ctx.
/// Marks the ctx as completed.
fn set_content(&self, content: crate::Result<T>) {
let prev = self.content.replace(Some(content));
assert!(prev.is_none());
self.set_complete();
}

/// Consumes the content set by set_content().
/// can only be called once after set content.
fn consume_content(&self) -> crate::Result<T> {
match self.check_complete() {
true => self.content.take().expect("content is consumed twice."),
false => {
if self.token.is_cancelled() {
Err(FabricErrorCode::E_ABORT.into())
} else {
Err(FabricErrorCode::FABRIC_E_OPERATION_NOT_COMPLETE.into())
}
}
}
}

/// Set the ctx as completed. Requires the ctx content to be set. Makes
/// the content available for access from other threads using barrier.
fn set_complete(&self) {
self.is_completed
.store(true, std::sync::atomic::Ordering::Release);
}

/// Checks ctx is completed.
/// Makes sure content sets by other threads is visible from this thread.
fn check_complete(&self) -> bool {
self.is_completed.load(std::sync::atomic::Ordering::Acquire)
}
}

impl<T> IFabricAsyncOperationContext_Impl for BridgeContext3_Impl<T> {
fn IsCompleted(&self) -> crate::BOOLEAN {
self.is_completed
.load(std::sync::atomic::Ordering::Relaxed)
.into()
}

// This always returns false because we defer all tasks in the background executuor.
fn CompletedSynchronously(&self) -> crate::BOOLEAN {
self.is_completed_synchronously.into()
}

fn Callback(&self) -> crate::Result<IFabricAsyncOperationCallback> {
let cp = self.callback.clone();
Ok(cp)
}

fn Cancel(&self) -> crate::Result<()> {
self.token.cancel();
Ok(())
}
}
169 changes: 4 additions & 165 deletions crates/libs/core/src/sync/cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,175 +4,15 @@
// ------------------------------------------------------------

use std::{
cell::Cell,
future::Future,
pin::Pin,
task::{Context, Poll},
};

use mssf_com::FabricCommon::{
IFabricAsyncOperationCallback, IFabricAsyncOperationContext, IFabricAsyncOperationContext_Impl,
};
use mssf_com::FabricCommon::{IFabricAsyncOperationCallback, IFabricAsyncOperationContext};
pub use tokio_util::sync::CancellationToken;
use windows_core::{implement, AsImpl};

use crate::{
error::FabricErrorCode,
runtime::executor::{Executor, JoinHandle},
};

/// Async operation context for bridging rust code into SF COM api that supports cancellation.
#[implement(IFabricAsyncOperationContext)]
pub struct BridgeContext3<T>
where
T: 'static,
{
/// The task result. Initially it is None.
/// If the task panics, the error is propagated here.
content: Cell<Option<crate::Result<T>>>,
/// Indicates the async operation has completed or not.
/// This is a memory barrier for making the content available
/// from writer thread to the reader thread. It is needed because
/// in SF COM API, the caller can call Begin operation, poll on this
/// status until complete, and End operation without barriers.
is_completed: std::sync::atomic::AtomicBool,
/// mssf never completes async operations synchronously.
/// This is always false.
is_completed_synchronously: bool,
callback: IFabricAsyncOperationCallback,
token: CancellationToken,
}

impl<T> BridgeContext3<T>
where
T: Send,
{
fn new(callback: IFabricAsyncOperationCallback, token: CancellationToken) -> Self {
Self {
content: Cell::new(None),
is_completed: std::sync::atomic::AtomicBool::new(false),
is_completed_synchronously: false,
callback,
token,
}
}

/// Creates the context from callback, and returns a cancellation token that
/// can be used in rust code, and the cancellation token is hooked into self,
/// where Cancel() api cancels the operation.
pub fn make(callback: Option<&IFabricAsyncOperationCallback>) -> (Self, CancellationToken) {
let token = CancellationToken::new();
let ctx = Self::new(callback.unwrap().clone(), token.clone());
(ctx, token)
}

/// Spawns the future on rt.
/// Returns a context that can be returned to SF runtime.
/// This is intended to be used in SF Begin COM api, where
/// rust code is spawned in background and the context is returned
/// to caller.
/// If the future panics, an error is set in the resulting content,
/// caller will still get callback and receive an error in the End api.
/// This api is in some sense unsafe, because the developer needs to ensure
/// the following:
/// * return type of the future needs to match SF COM api end return type.
pub fn spawn<F>(
self,
rt: &impl Executor,
future: F,
) -> crate::Result<IFabricAsyncOperationContext>
where
F: Future<Output = T> + Send + 'static,
{
let self_cp: IFabricAsyncOperationContext = self.into();
let self_cp2 = self_cp.clone();
let rt_cp = rt.clone();
rt.spawn(async move {
// Run user code in a task and wait on its status.
// If user code panics we propagate the error back to SF.
let task_res = rt_cp.spawn(future).join().await;
// TODO: maybe it is good to report health to SF here the same way that sf dotnet app works.

// We trust the code in mssf here to not panic, or we have bigger problem (memory corruption etc.).
let self_impl: &BridgeContext3<T> = unsafe { self_cp.as_impl() };
self_impl.set_content(task_res);
let cb = unsafe { self_cp.Callback().unwrap() };
unsafe { cb.Invoke(&self_cp) };
});
Ok(self_cp2)
}

/// Get the result from the context from the SF End COM api.
/// This api is in some sense unsafe, because the developer needs to ensure
/// the following:
/// * context impl type is `BridgeContext3`, and the T matches the SF end api
/// return type.
/// Note that if T is of Result<ICOM> type, the current function return type is
/// Result<Result<ICOM>>, so unwrap is needed.
pub fn result(context: Option<&IFabricAsyncOperationContext>) -> crate::Result<T> {
let self_impl: &BridgeContext3<T> = unsafe { context.unwrap().as_impl() };
self_impl.consume_content()
}

/// Set the content for the ctx.
/// Marks the ctx as completed.
fn set_content(&self, content: crate::Result<T>) {
let prev = self.content.replace(Some(content));
assert!(prev.is_none());
self.set_complete();
}

/// Consumes the content set by set_content().
/// can only be called once after set content.
fn consume_content(&self) -> crate::Result<T> {
match self.check_complete() {
true => self.content.take().expect("content is consumed twice."),
false => {
if self.token.is_cancelled() {
Err(FabricErrorCode::E_ABORT.into())
} else {
Err(FabricErrorCode::FABRIC_E_OPERATION_NOT_COMPLETE.into())
}
}
}
}

/// Set the ctx as completed. Requires the ctx content to be set. Makes
/// the content available for access from other threads using barrier.
fn set_complete(&self) {
self.is_completed
.store(true, std::sync::atomic::Ordering::Release);
}

/// Checks ctx is completed.
/// Makes sure content sets by other threads is visible from this thread.
fn check_complete(&self) -> bool {
self.is_completed.load(std::sync::atomic::Ordering::Acquire)
}
}

impl<T> IFabricAsyncOperationContext_Impl for BridgeContext3_Impl<T> {
fn IsCompleted(&self) -> crate::BOOLEAN {
self.is_completed
.load(std::sync::atomic::Ordering::Relaxed)
.into()
}

// This always returns false because we defer all tasks in the background executuor.
fn CompletedSynchronously(&self) -> crate::BOOLEAN {
self.is_completed_synchronously.into()
}

fn Callback(&self) -> crate::Result<IFabricAsyncOperationCallback> {
let cp = self.callback.clone();
Ok(cp)
}

fn Cancel(&self) -> crate::Result<()> {
self.token.cancel();
Ok(())
}
}
use crate::error::FabricErrorCode;

// proxy impl

Expand Down Expand Up @@ -417,9 +257,8 @@ mod test {
use tokio_util::sync::CancellationToken;

use crate::{
error::FabricErrorCode,
runtime::executor::DefaultExecutor,
sync::cancel::{oneshot_channel, BridgeContext3},
error::FabricErrorCode, runtime::executor::DefaultExecutor,
sync::bridge_context::BridgeContext3, sync::cancel::oneshot_channel,
};

use super::fabric_begin_end_proxy2;
Expand Down
Loading
Loading