Skip to content

Commit

Permalink
Refactor: Rename ActiveJobTokenServer::try_acquire => acquire
Browse files Browse the repository at this point in the history
Change it to an async function that returns `Result<JobToken, Error>`

Signed-off-by: Jiahao XU <[email protected]>
  • Loading branch information
NobodyXu committed Mar 2, 2024
1 parent ddf2c04 commit 511aab4
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 35 deletions.
8 changes: 1 addition & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1508,13 +1508,7 @@ impl Build {
let spawn_future = async {
for obj in objs {
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
let token = loop {
if let Some(token) = tokens.try_acquire()? {
break token;
} else {
YieldOnce::default().await
}
};
let token = tokens.acquire().await?;
let mut child = spawn(&mut cmd, &program, &self.cargo_output)?;
let mut stderr_forwarder = StderrForwarder::new(&mut child);
stderr_forwarder.set_non_blocking()?;
Expand Down
68 changes: 40 additions & 28 deletions src/parallel/job_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,18 @@ impl ActiveJobTokenServer {
}
}

pub(crate) fn try_acquire(&self) -> Result<Option<JobToken>, Error> {
pub(crate) async fn acquire(&self) -> Result<JobToken, Error> {
match &self {
Self::Inherited(jobserver) => jobserver.try_acquire(),
Self::InProcess(jobserver) => Ok(jobserver.try_acquire()),
Self::Inherited(jobserver) => jobserver.acquire().await,
Self::InProcess(jobserver) => Ok(jobserver.acquire().await),
}
}
}

mod inherited_jobserver {
use super::JobToken;

use crate::{Error, ErrorKind};
use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind};

use std::{
io,
Expand Down Expand Up @@ -145,26 +145,30 @@ mod inherited_jobserver {
})
}

pub(super) fn try_acquire(&self) -> Result<Option<JobToken>, Error> {
if self.jobserver.global_implicit_token.swap(false, AcqRel) {
// fast path
return Ok(Some(JobToken()));
}

// Cold path, no global implicit token, obtain one
match self.rx.try_recv() {
Ok(res) => {
let acquired = res?;
acquired.drop_without_releasing();
Ok(Some(JobToken()))
pub(super) async fn acquire(&self) -> Result<JobToken, Error> {
loop {
if self.jobserver.global_implicit_token.swap(false, AcqRel) {
// fast path
break Ok(JobToken());
}
Err(mpsc::TryRecvError::Disconnected) => Err(Error::new(
ErrorKind::JobserverHelpThreadError,
"jobserver help thread has returned before ActiveJobServer is dropped",
)),
Err(mpsc::TryRecvError::Empty) => {
self.helper_thread.request_token();
Ok(None)

// Cold path, no global implicit token, obtain one
match self.rx.try_recv() {
Ok(res) => {
let acquired = res?;
acquired.drop_without_releasing();
break Ok(JobToken());
}
Err(mpsc::TryRecvError::Disconnected) => {
break Err(Error::new(
ErrorKind::JobserverHelpThreadError,
"jobserver help thread has returned before ActiveJobServer is dropped",
))
}
Err(mpsc::TryRecvError::Empty) => {
self.helper_thread.request_token();
YieldOnce::default().await
}
}
}
}
Expand All @@ -174,6 +178,8 @@ mod inherited_jobserver {
mod inprocess_jobserver {
use super::JobToken;

use crate::parallel::async_executor::YieldOnce;

use std::{
env::var,
sync::atomic::{
Expand Down Expand Up @@ -204,12 +210,18 @@ mod inprocess_jobserver {
Self(AtomicU32::new(parallelism))
}

pub(super) fn try_acquire(&self) -> Option<JobToken> {
let res = self
.0
.fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1));
pub(super) async fn acquire(&self) -> JobToken {
loop {
let res = self
.0
.fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1));

res.ok().map(|_| JobToken())
if res.is_ok() {
break JobToken();
}

YieldOnce::default().await
}
}

pub(super) fn release_token_raw(&self) {
Expand Down

0 comments on commit 511aab4

Please sign in to comment.