Skip to content

Commit

Permalink
RUST-1873 Transaction API fluency (#1060)
Browse files Browse the repository at this point in the history
  • Loading branch information
abr-egn authored Apr 2, 2024
1 parent 7275afe commit f47d16e
Show file tree
Hide file tree
Showing 48 changed files with 1,676 additions and 1,649 deletions.
97 changes: 48 additions & 49 deletions action_macro/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
extern crate proc_macro;

use proc_macro2::Span;
use quote::{quote, ToTokens};
use syn::{
braced,
Expand All @@ -9,6 +10,7 @@ use syn::{
parse_quote,
parse_quote_spanned,
spanned::Spanned,
visit_mut::VisitMut,
Block,
Error,
Expr,
Expand All @@ -18,6 +20,7 @@ use syn::{
Lifetime,
Lit,
Meta,
PathArguments,
Token,
Type,
};
Expand All @@ -27,8 +30,12 @@ use syn::{
/// * an opaque wrapper type for the future in case we want to do something more fancy than
/// BoxFuture.
/// * a `run` method for sync execution, optionally with a wrapper function
#[proc_macro]
pub fn action_impl(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
#[proc_macro_attribute]
pub fn action_impl(
attrs: proc_macro::TokenStream,
input: proc_macro::TokenStream,
) -> proc_macro::TokenStream {
let ActionImplAttrs { sync_type } = parse_macro_input!(attrs as ActionImplAttrs);
let ActionImpl {
generics,
lifetime,
Expand All @@ -37,7 +44,6 @@ pub fn action_impl(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
exec_self_mut,
exec_output,
exec_body,
sync_wrap,
} = parse_macro_input!(input as ActionImpl);

let mut unbounded_generics = generics.clone();
Expand All @@ -48,14 +54,34 @@ pub fn action_impl(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
ty.bounds.clear();
}

let SyncWrap {
sync_arg_mut,
sync_arg,
sync_output,
sync_body,
} = sync_wrap.unwrap_or_else(|| {
parse_quote! { fn sync_wrap(out) -> #exec_output { out } }
});
let sync_run = if let Some(sync_type) = sync_type {
// In expression position, the type needs to be of the form Foo::<Bar>, not Foo<Bar>
let mut formal = sync_type.clone();
struct Visitor;
impl VisitMut for Visitor {
fn visit_path_segment_mut(&mut self, segment: &mut syn::PathSegment) {
if let PathArguments::AngleBracketed(args) = &mut segment.arguments {
if args.colon2_token.is_none() {
args.colon2_token = Some(Token![::](Span::call_site()));
}
}
}
}
syn::visit_mut::visit_type_mut(&mut Visitor, &mut formal);
quote! {
/// Synchronously execute this action.
pub fn run(self) -> Result<#sync_type> {
crate::sync::TOKIO_RUNTIME.block_on(std::future::IntoFuture::into_future(self)).map(#formal::new)
}
}
} else {
quote! {
/// Synchronously execute this action.
pub fn run(self) -> #exec_output {
crate::sync::TOKIO_RUNTIME.block_on(std::future::IntoFuture::into_future(self))
}
}
};

quote! {
impl #generics crate::action::private::Sealed for #action { }
Expand Down Expand Up @@ -85,11 +111,7 @@ pub fn action_impl(input: proc_macro::TokenStream) -> proc_macro::TokenStream {

#[cfg(feature = "sync")]
impl #generics #action {
/// Synchronously execute this action.
pub fn run(self) -> #sync_output {
let #sync_arg_mut #sync_arg = crate::sync::TOKIO_RUNTIME.block_on(std::future::IntoFuture::into_future(self));
#sync_body
}
#sync_run
}
}.into()
}
Expand All @@ -107,7 +129,6 @@ struct ActionImpl {
exec_self_mut: Option<Token![mut]>,
exec_output: Type,
exec_body: Block,
sync_wrap: Option<SyncWrap>,
}

impl Parse for ActionImpl {
Expand Down Expand Up @@ -155,13 +176,6 @@ impl Parse for ActionImpl {
let exec_output = impl_body.parse()?;
let exec_body = impl_body.parse()?;

// Optional SyncWrap.
let sync_wrap = if impl_body.peek(Token![fn]) {
Some(impl_body.parse()?)
} else {
None
};

if !impl_body.is_empty() {
return Err(exec_args.error("unexpected token"));
}
Expand All @@ -174,40 +188,25 @@ impl Parse for ActionImpl {
exec_self_mut,
exec_output,
exec_body,
sync_wrap,
})
}
}

// fn sync_wrap([mut] out) -> OutType { <out body> }
struct SyncWrap {
sync_arg_mut: Option<Token![mut]>,
sync_arg: Ident,
sync_output: Type,
sync_body: Block,
struct ActionImplAttrs {
sync_type: Option<Type>,
}

impl Parse for SyncWrap {
impl Parse for ActionImplAttrs {
fn parse(input: ParseStream) -> syn::Result<Self> {
input.parse::<Token![fn]>()?;
parse_name(input, "sync_wrap")?;
let args_input;
parenthesized!(args_input in input);
let sync_arg_mut = args_input.parse()?;
let sync_arg = args_input.parse()?;
if !args_input.is_empty() {
return Err(args_input.error("unexpected token"));
let mut out = Self { sync_type: None };
if input.is_empty() {
return Ok(out);
}
input.parse::<Token![->]>()?;
let sync_output = input.parse()?;
let sync_body = input.parse()?;

Ok(SyncWrap {
sync_arg_mut,
sync_arg,
sync_output,
sync_body,
})
parse_name(input, "sync")?;
input.parse::<Token![=]>()?;
out.sync_type = Some(input.parse()?);
Ok(out)
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod run_command;
mod search_index;
mod session;
mod shutdown;
pub(crate) mod transaction;
mod update;
mod watch;

Expand Down Expand Up @@ -51,6 +52,7 @@ pub use run_command::{RunCommand, RunCursorCommand};
pub use search_index::{CreateSearchIndex, DropSearchIndex, ListSearchIndexes, UpdateSearchIndex};
pub use session::StartSession;
pub use shutdown::Shutdown;
pub use transaction::{AbortTransaction, CommitTransaction, StartTransaction};
pub use update::Update;
pub use watch::Watch;

Expand Down
76 changes: 40 additions & 36 deletions src/action/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,45 +134,49 @@ impl<'a> Aggregate<'a, ImplicitSession> {
}
}

action_impl! {
impl<'a> Action for Aggregate<'a, ImplicitSession> {
type Future = AggregateFuture;

async fn execute(mut self) -> Result<Cursor<Document>> {
resolve_options!(
self.target,
self.options,
[read_concern, write_concern, selection_criteria]
);

let aggregate = crate::operation::aggregate::Aggregate::new(self.target.target(), self.pipeline, self.options);
let client = self.target.client();
client.execute_cursor_operation(aggregate).await
}

fn sync_wrap(out) -> Result<crate::sync::Cursor<Document>> {
out.map(crate::sync::Cursor::new)
}
#[action_impl(sync = crate::sync::Cursor<Document>)]
impl<'a> Action for Aggregate<'a, ImplicitSession> {
type Future = AggregateFuture;

async fn execute(mut self) -> Result<Cursor<Document>> {
resolve_options!(
self.target,
self.options,
[read_concern, write_concern, selection_criteria]
);

let aggregate = crate::operation::aggregate::Aggregate::new(
self.target.target(),
self.pipeline,
self.options,
);
let client = self.target.client();
client.execute_cursor_operation(aggregate).await
}
}

action_impl! {
impl<'a> Action for Aggregate<'a, ExplicitSession<'a>> {
type Future = AggregateSessionFuture;

async fn execute(mut self) -> Result<SessionCursor<Document>> {
resolve_read_concern_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
resolve_write_concern_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
resolve_selection_criteria_with_session!(self.target, self.options, Some(&mut *self.session.0))?;

let aggregate = crate::operation::aggregate::Aggregate::new(self.target.target(), self.pipeline, self.options);
let client = self.target.client();
client.execute_session_cursor_operation(aggregate, self.session.0).await
}

fn sync_wrap(out) -> Result<crate::sync::SessionCursor<Document>> {
out.map(crate::sync::SessionCursor::new)
}
#[action_impl(sync = crate::sync::SessionCursor<Document>)]
impl<'a> Action for Aggregate<'a, ExplicitSession<'a>> {
type Future = AggregateSessionFuture;

async fn execute(mut self) -> Result<SessionCursor<Document>> {
resolve_read_concern_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
resolve_write_concern_with_session!(self.target, self.options, Some(&mut *self.session.0))?;
resolve_selection_criteria_with_session!(
self.target,
self.options,
Some(&mut *self.session.0)
)?;

let aggregate = crate::operation::aggregate::Aggregate::new(
self.target.target(),
self.pipeline,
self.options,
);
let client = self.target.client();
client
.execute_session_cursor_operation(aggregate, self.session.0)
.await
}
}

Expand Down
38 changes: 20 additions & 18 deletions src/action/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,14 @@ impl<'a> EstimatedDocumentCount<'a> {
);
}

action_impl! {
impl<'a> Action for EstimatedDocumentCount<'a> {
type Future = EstimatedDocumentCountFuture;

async fn execute(mut self) -> Result<u64> {
resolve_options!(self.cr, self.options, [read_concern, selection_criteria]);
let op = crate::operation::count::Count::new(self.cr.namespace(), self.options);
self.cr.client().execute_operation(op, None).await
}
#[action_impl]
impl<'a> Action for EstimatedDocumentCount<'a> {
type Future = EstimatedDocumentCountFuture;

async fn execute(mut self) -> Result<u64> {
resolve_options!(self.cr, self.options, [read_concern, selection_criteria]);
let op = crate::operation::count::Count::new(self.cr.namespace(), self.options);
self.cr.client().execute_operation(op, None).await
}
}

Expand Down Expand Up @@ -140,16 +139,19 @@ impl<'a> CountDocuments<'a> {
}
}

action_impl! {
impl<'a> Action for CountDocuments<'a> {
type Future = CountDocumentsFuture;
#[action_impl]
impl<'a> Action for CountDocuments<'a> {
type Future = CountDocumentsFuture;

async fn execute(mut self) -> Result<u64> {
resolve_read_concern_with_session!(self.cr, self.options, self.session.as_ref())?;
resolve_selection_criteria_with_session!(self.cr, self.options, self.session.as_ref())?;
async fn execute(mut self) -> Result<u64> {
resolve_read_concern_with_session!(self.cr, self.options, self.session.as_ref())?;
resolve_selection_criteria_with_session!(self.cr, self.options, self.session.as_ref())?;

let op = crate::operation::count_documents::CountDocuments::new(self.cr.namespace(), self.filter, self.options)?;
self.cr.client().execute_operation(op, self.session).await
}
let op = crate::operation::count_documents::CountDocuments::new(
self.cr.namespace(),
self.filter,
self.options,
)?;
self.cr.client().execute_operation(op, self.session).await
}
}
42 changes: 20 additions & 22 deletions src/action/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,33 +102,31 @@ impl<'a, M> CreateIndex<'a, M> {
}
}

action_impl! {
impl<'a> Action for CreateIndex<'a, Single> {
type Future = CreateIndexFuture;
#[action_impl]
impl<'a> Action for CreateIndex<'a, Single> {
type Future = CreateIndexFuture;

async fn execute(self) -> Result<CreateIndexResult> {
let inner: CreateIndex<'a, Multiple> = CreateIndex {
coll: self.coll,
indexes: self.indexes,
options: self.options,
session: self.session,
_mode: PhantomData,
};
let response = inner.await?;
Ok(response.into_create_index_result())
}
async fn execute(self) -> Result<CreateIndexResult> {
let inner: CreateIndex<'a, Multiple> = CreateIndex {
coll: self.coll,
indexes: self.indexes,
options: self.options,
session: self.session,
_mode: PhantomData,
};
let response = inner.await?;
Ok(response.into_create_index_result())
}
}

action_impl! {
impl<'a> Action for CreateIndex<'a, Multiple> {
type Future = CreateIndexesFuture;
#[action_impl]
impl<'a> Action for CreateIndex<'a, Multiple> {
type Future = CreateIndexesFuture;

async fn execute(mut self) -> Result<CreateIndexesResult> {
resolve_write_concern_with_session!(self.coll, self.options, self.session.as_ref())?;
async fn execute(mut self) -> Result<CreateIndexesResult> {
resolve_write_concern_with_session!(self.coll, self.options, self.session.as_ref())?;

let op = Op::new(self.coll.namespace(), self.indexes, self.options);
self.coll.client().execute_operation(op, self.session).await
}
let op = Op::new(self.coll.namespace(), self.indexes, self.options);
self.coll.client().execute_operation(op, self.session).await
}
}
Loading

0 comments on commit f47d16e

Please sign in to comment.