Skip to content

Commit

Permalink
Merge pull request #1456 from microsoft/enhancement-collections-set-d…
Browse files Browse the repository at this point in the history
…efaults

[runtime] Enhancement: Add some reasonable defaults
  • Loading branch information
iyzhang authored Nov 14, 2024
2 parents a7f0053 + 94f5af9 commit 9639ebb
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 31 deletions.
9 changes: 6 additions & 3 deletions src/rust/collections/async_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use ::std::{
//======================================================================================================================

// The following value was chosen arbitrarily.
const TIMEOUT_SECONDS: Duration = Duration::from_secs(1000);
const DEFAULT_QUEUE_SIZE: usize = 1024;

//======================================================================================================================
// Structures
Expand Down Expand Up @@ -72,7 +72,10 @@ impl<T> AsyncQueue<T> {
}
}
};
conditional_yield_with_timeout(wait_condition, timeout.unwrap_or(TIMEOUT_SECONDS)).await
match timeout {
Some(timeout) => conditional_yield_with_timeout(wait_condition, timeout).await,
None => Ok(wait_condition.await),
}
}

/// Try to get the head of the queue.
Expand Down Expand Up @@ -129,7 +132,7 @@ impl<T> SharedAsyncQueue<T> {
impl<T> Default for AsyncQueue<T> {
fn default() -> Self {
Self {
queue: VecDeque::<T>::default(),
queue: VecDeque::<T>::with_capacity(DEFAULT_QUEUE_SIZE),
cond_var: SharedConditionVariable::default(),
}
}
Expand Down
13 changes: 5 additions & 8 deletions src/rust/collections/async_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ use ::std::{
time::{Duration, Instant},
};

//======================================================================================================================
// Constants
//======================================================================================================================

/// Default timeout for an AynscQueue This was chosen arbitrarily.
const TIMEOUT_SECONDS: Duration = Duration::from_secs(1000);

//======================================================================================================================
// Structures
//======================================================================================================================
Expand Down Expand Up @@ -69,7 +62,11 @@ impl<T: Clone> AsyncValue<T> {
}

pub async fn wait_for_change(&mut self, timeout: Option<Duration>) -> Result<T, Fail> {
conditional_yield_with_timeout(self.cond_var.wait(), timeout.unwrap_or(TIMEOUT_SECONDS)).await?;
match timeout {
Some(timeout) => conditional_yield_with_timeout(self.cond_var.wait(), timeout).await?,
None => self.cond_var.wait().await,
};

Ok(self.value.clone())
}

Expand Down
14 changes: 10 additions & 4 deletions src/rust/runtime/condition_variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ use ::std::{
task::{Context, Poll, Waker},
};

//======================================================================================================================
// Constant
//======================================================================================================================

const DEFAULT_WAITER_QUEUE_SIZE: usize = 64;

//======================================================================================================================
// Structures
//======================================================================================================================
Expand Down Expand Up @@ -55,18 +61,18 @@ struct YieldPoint {

impl SharedConditionVariable {
/// Wake the next waiting coroutine.
#[inline]
pub fn signal(&mut self) {
if let Some((_, waiter)) = self.waiters.pop_front() {
self.num_ready += 1;
waiter.wake_by_ref();
}
}

#[allow(unused)]
/// Wake all waiting coroutines.
pub fn broadcast(&mut self) {
while let Some((task_id, waiter)) = self.waiters.pop_front() {
self.num_ready += 1;
self.num_ready = self.num_ready + self.waiters.len();
for (_, waiter) in self.waiters.drain(..) {
waiter.wake_by_ref();
}
}
Expand Down Expand Up @@ -104,7 +110,7 @@ impl SharedConditionVariable {
impl Default for SharedConditionVariable {
fn default() -> Self {
Self(SharedObject::new(ConditionVariable {
waiters: VecDeque::default(),
waiters: VecDeque::with_capacity(DEFAULT_WAITER_QUEUE_SIZE),
num_ready: 0,
last_id: 0,
}))
Expand Down
24 changes: 8 additions & 16 deletions src/rust/runtime/network/socket/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,6 @@ use crate::{
runtime::{fail::Fail, network::socket::operation::SocketOp},
};
use ::socket2::Type;
use ::std::time::Duration;

//======================================================================================================================
// Structures
//======================================================================================================================

/// Set the timeout to be large enough that we effectively never time out.
const TIMEOUT: Duration = Duration::from_secs(1000);

//======================================================================================================================
// Structures
Expand Down Expand Up @@ -77,8 +69,8 @@ impl SocketStateMachine {
loop {
match self.may_accept() {
Ok(()) => {
// If either a time out or the current state changed, check it again.
_ = self.current.clone().wait_for_change(Some(TIMEOUT)).await;
// Check state again if it changes.
_ = self.current.clone().wait_for_change(None).await;
continue;
},
Err(e) => return e,
Expand All @@ -96,8 +88,8 @@ impl SocketStateMachine {
loop {
match self.may_connect() {
Ok(()) => {
// If either a time out or the current state changed, check it again.
_ = self.current.clone().wait_for_change(Some(TIMEOUT)).await;
// Check state again if it changes.
_ = self.current.clone().wait_for_change(None).await;
continue;
},
Err(e) => return e,
Expand All @@ -122,8 +114,8 @@ impl SocketStateMachine {
loop {
match self.may_push() {
Ok(()) => {
// If either a time out or the current state changed, check it again.
_ = self.current.clone().wait_for_change(Some(TIMEOUT)).await;
// Check state again if it changes.
_ = self.current.clone().wait_for_change(None).await;
continue;
},
Err(e) => return e,
Expand All @@ -150,8 +142,8 @@ impl SocketStateMachine {
loop {
match self.may_pop() {
Ok(()) => {
// If either a time out or the current state changed, check it again.
_ = self.current.clone().wait_for_change(Some(TIMEOUT)).await;
// Check state again if it changes.
_ = self.current.clone().wait_for_change(None).await;
continue;
},
Err(e) => return e,
Expand Down

0 comments on commit 9639ebb

Please sign in to comment.