Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up many usages of futures_util::future::select #874

Merged
merged 7 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions full-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ either = { version = "1.8.1", default-features = false }
event-listener = "2.5.3"
fnv = { version = "1.0.7", default-features = false }
futures-channel = "0.3.27"
futures-lite = { version = "1.13.0", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.27", default-features = false }
hashbrown = { version = "0.14.0", default-features = false }
hex = { version = "0.4.3", default-features = false }
Expand Down
57 changes: 32 additions & 25 deletions full-node/src/network_service/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

use crate::{LogCallback, LogLevel};
use core::{future::Future, pin};
use futures_lite::future;
use futures_util::{FutureExt as _, StreamExt as _};
use smol::{
channel,
future::{self, FutureExt as _},
future::FutureExt as _,
io::{AsyncRead, AsyncWrite},
};
use smoldot::{
Expand Down Expand Up @@ -232,36 +233,42 @@ pub(super) async fn established_connection_task(
// Starting from here, we block the current task until more processing needs to happen.

// Future ready when the timeout indicated by the connection state machine is reached.
let poll_after = if let Some(wake_up) = wake_up_after {
let poll_after = {
let now = Instant::now();
if wake_up > now {
futures_util::future::Either::Left(smol::Timer::at(wake_up))
} else {
if wake_up_after
.as_ref()
.map_or(false, |wake_up_after| *wake_up_after <= now)
{
// "Wake up" immediately.
continue;
} else {
async {
if let Some(wake_up_after) = wake_up_after {
smol::Timer::at(wake_up_after).await;
} else {
future::pending().await
}
}
}
} else {
futures_util::future::Either::Right(future::pending())
}
.fuse();

};
// Future that is woken up when new data is ready on the socket.
let connection_ready = pin::pin!(if let Some(socket) = socket_container.as_mut() {
futures_util::future::Either::Left(Pin::new(socket).process())
} else {
futures_util::future::Either::Right(future::pending())
});

let connection_ready = async {
if let Some(socket) = socket_container.as_mut() {
Pin::new(socket).process().await;
} else {
future::pending().await
}
};
// Future that is woken up when a new message is coming from the coordinator.
let message_from_coordinator = Pin::new(&mut coordinator_to_connection).peek();

// Wait until either some data is ready on the socket, or the connection state machine
// has requested to be polled again, or a message is coming from the coordinator.
futures_util::future::select(
futures_util::future::select(connection_ready, message_from_coordinator),
poll_after,
)
.await;
let message_from_coordinator = async {
pin::Pin::new(&mut coordinator_to_connection).peek().await;
};

// Combine all futures into one.
poll_after
.or(connection_ready)
.or(message_from_coordinator)
.await;
}
}

Expand Down
39 changes: 20 additions & 19 deletions light-base/src/json_rpc_service/background/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ use alloc::{
use core::{
cmp, iter,
num::{NonZeroU32, NonZeroUsize},
pin,
time::Duration,
};
use futures_channel::mpsc;
use futures_lite::FutureExt as _;
use futures_util::{future, FutureExt as _, StreamExt as _};
use futures_util::{FutureExt as _, StreamExt as _};
use hashbrown::HashMap;
use smoldot::{
chain::fork_tree,
Expand Down Expand Up @@ -527,25 +526,27 @@ impl<TPlat: PlatformRef> ChainHeadFollowTask<TPlat> {
) {
loop {
let outcome = {
let next_block = pin::pin!(match &mut self.subscription {
Subscription::WithRuntime { notifications, .. } => {
future::Either::Left(notifications.next().map(either::Left))
}
Subscription::WithoutRuntime(notifications) => {
future::Either::Right(notifications.next().map(either::Right))
let next_block = async {
match &mut self.subscription {
Subscription::WithRuntime { notifications, .. } => {
Some(either::Left(either::Left(notifications.next().await)))
}
Subscription::WithoutRuntime(notifications) => {
Some(either::Left(either::Right(notifications.next().await)))
}
}
});
let next_message = pin::pin!(messages_rx.next());

match future::select(
future::select(next_block, next_message),
pin::pin!(subscription.wait_until_stale()),
)
.await
};

match next_block
.or(async { Some(either::Right(messages_rx.next().await)) })
.or(async {
subscription.wait_until_stale().await;
None
})
.await
{
future::Either::Left((future::Either::Left((v, _)), _)) => either::Left(v),
future::Either::Left((future::Either::Right((v, _)), _)) => either::Right(v),
future::Either::Right(((), _)) => return,
Some(outcome) => outcome,
None => return,
}
};

Expand Down
27 changes: 10 additions & 17 deletions light-base/src/json_rpc_service/background/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use super::{Background, PlatformRef};
use crate::transactions_service;

use alloc::{borrow::ToOwned as _, format, string::ToString as _, sync::Arc, vec::Vec};
use core::pin;
use futures_util::{future, StreamExt as _};
use futures_lite::future;
use futures_util::StreamExt as _;
use smoldot::json_rpc::{methods, service};

impl<TPlat: PlatformRef> Background<TPlat> {
Expand Down Expand Up @@ -104,22 +104,17 @@ impl<TPlat: PlatformRef> Background<TPlat> {
let mut num_broadcasted_peers = 0;

loop {
let event = {
let unsubscribed = pin::pin!(subscription.wait_until_stale());
match future::select(transaction_updates.next(), unsubscribed).await {
future::Either::Left((v, _)) => either::Left(v),
future::Either::Right((v, _)) => either::Right(v),
}
};

let status_update = match event {
either::Left(Some(status)) => status,
either::Left(None) if !is_legacy => {
let status_update = match future::or(
async { Some(transaction_updates.next().await) },
async { subscription.wait_until_stale().await; None }
).await {
Some(Some(status)) => status,
Some(None) if !is_legacy => {
// Channel from the transactions service has been closed.
// Stop the task.
break;
}
either::Left(None) => {
Some(None) => {
// Channel from the transactions service has been closed.
// Stop the task.
// There is nothing more that can be done except hope that the
Expand All @@ -128,9 +123,7 @@ impl<TPlat: PlatformRef> Background<TPlat> {
subscription.wait_until_stale().await;
break;
}
either::Right(()) => {
break;
}
None => break,
};

match (status_update, is_legacy) {
Expand Down
42 changes: 25 additions & 17 deletions light-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ extern crate alloc;

use alloc::{borrow::ToOwned as _, boxed::Box, format, string::String, sync::Arc, vec, vec::Vec};
use core::{num::NonZeroU32, ops, pin};
use futures_channel::oneshot;
use futures_util::{future, FutureExt as _};
use hashbrown::{hash_map::Entry, HashMap};
use itertools::Itertools as _;
Expand Down Expand Up @@ -226,9 +225,9 @@ struct PublicApiChain<TChain> {
/// [`AddChainConfig::json_rpc`] was [`AddChainConfigJsonRpc::Disabled`] when adding the chain.
json_rpc_frontend: Option<json_rpc_service::Frontend>,

/// Dummy channel. Nothing is ever sent on it, but the receiving side is stored in the
/// [`JsonRpcResponses`] in order to detect when the chain has been removed.
_public_api_chain_destroyed_tx: oneshot::Sender<()>,
/// Notified when the [`PublicApiChain`] is destroyed, in order for the [`JsonRpcResponses`]
/// to detect when the chain has been removed.
public_api_chain_destroyed_event: event_listener::Event,
}

/// Identifies a chain, so that multiple identical chains are de-duplicated.
Expand Down Expand Up @@ -307,25 +306,29 @@ pub struct JsonRpcResponses {
///
/// As long as this object is alive, the JSON-RPC service will continue running. In order
/// to prevent that from happening, we destroy it as soon as the
/// [`JsonRpcResponses::public_api_chain_destroyed_rx`] is notified of the destruction of
/// [`JsonRpcResponses::public_api_chain_destroyed`] is notified of the destruction of
/// the sender.
inner: Option<json_rpc_service::Frontend>,

/// Dummy channel. Nothing is ever sent on it, but the sending side is stored in the
/// [`PublicApiChain`] in order to detect when the chain has been removed.
public_api_chain_destroyed_rx: oneshot::Receiver<()>,
/// Notified when the [`PublicApiChain`] is destroyed.
public_api_chain_destroyed: event_listener::EventListener,
}

impl JsonRpcResponses {
/// Returns the next response or notification, or `None` if the chain has been removed.
pub async fn next(&mut self) -> Option<String> {
if let Some(frontend) = self.inner.as_mut() {
let response_fut = pin::pin!(frontend.next_json_rpc_response());
match future::select(response_fut, &mut self.public_api_chain_destroyed_rx).await {
future::Either::Left((response, _)) => return Some(response),
future::Either::Right((_result, _)) => {
debug_assert!(_result.is_err());
}
match futures_lite::future::or(
async { Some(frontend.next_json_rpc_response().await) },
async {
(&mut self.public_api_chain_destroyed).await;
None
},
)
.await
{
Some(response) => return Some(response),
None => {}
}
}

Expand Down Expand Up @@ -914,19 +917,20 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
};

// Success!
let (public_api_chain_destroyed_tx, public_api_chain_destroyed_rx) = oneshot::channel();
let public_api_chain_destroyed_event = event_listener::Event::new();
let public_api_chain_destroyed = public_api_chain_destroyed_event.listen();
public_api_chains_entry.insert(PublicApiChain {
user_data: config.user_data,
key: new_chain_key,
chain_spec_chain_id,
json_rpc_frontend: json_rpc_frontend.clone(),
_public_api_chain_destroyed_tx: public_api_chain_destroyed_tx,
public_api_chain_destroyed_event,
});
Ok(AddChainSuccess {
chain_id: new_chain_id,
json_rpc_responses: json_rpc_frontend.map(|f| JsonRpcResponses {
inner: Some(f),
public_api_chain_destroyed_rx,
public_api_chain_destroyed,
}),
})
}
Expand All @@ -947,6 +951,10 @@ impl<TPlat: platform::PlatformRef, TChain> Client<TPlat, TChain> {
pub fn remove_chain(&mut self, id: ChainId) -> TChain {
let removed_chain = self.public_api_chains.remove(id.0);

removed_chain
.public_api_chain_destroyed_event
.notify(usize::max_value());

// `chains_by_key` is created lazily when `add_chain` is called.
// Since we're removing a chain that has been added with `add_chain`, it is guaranteed
// that `chains_by_key` is set.
Expand Down
Loading