Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout batch downloads, not each download #6285

Merged
merged 1 commit into from
Nov 9, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 40 additions & 41 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,17 @@ pub struct Downloads<'a, 'cfg: 'a> {
largest: (u64, String),
start: Instant,
success: bool,

/// Timeout management, both of timeout thresholds as well as whether or not
/// our connection has timed out (and accompanying message if it has).
///
/// Note that timeout management is done manually here instead of in libcurl
/// because we want to apply timeouts to an entire batch of operations, not
/// any one particular single operatino
timeout: ops::HttpTimeout, // timeout configuration
updated_at: Cell<Instant>, // last time we received bytes
next_speed_check: Cell<Instant>, // if threshold isn't 0 by this time, error
next_speed_check_bytes_threshold: Cell<u64>, // decremented when we receive bytes
}

struct Download<'cfg> {
Expand All @@ -293,24 +304,7 @@ struct Download<'cfg> {

/// The moment we started this transfer at
start: Instant,

/// Last time we noticed that we got some more data from libcurl
updated_at: Cell<Instant>,

/// Timeout management, both of timeout thresholds as well as whether or not
/// our connection has timed out (and accompanying message if it has).
///
/// Note that timeout management is done manually here because we have a
/// `Multi` with a lot of active transfers but between transfers finishing
/// we perform some possibly slow synchronous work (like grabbing file
/// locks, extracting tarballs, etc). The default timers on our `Multi` keep
/// running during this work, but we don't want them to count towards timing
/// everythig out. As a result, we manage this manually and take the time
/// for synchronous work into account manually.
timeout: ops::HttpTimeout,
timed_out: Cell<Option<String>>,
next_speed_check: Cell<Instant>,
next_speed_check_bytes_threshold: Cell<u64>,

/// Logic used to track retrying this download if it's a spurious failure.
retry: Retry<'cfg>,
Expand Down Expand Up @@ -359,6 +353,7 @@ impl<'cfg> PackageSet<'cfg> {

pub fn enable_download<'a>(&'a self) -> CargoResult<Downloads<'a, 'cfg>> {
assert!(!self.downloading.replace(true));
let timeout = ops::HttpTimeout::new(self.config)?;
Ok(Downloads {
start: Instant::now(),
set: self,
Expand All @@ -375,6 +370,10 @@ impl<'cfg> PackageSet<'cfg> {
downloaded_bytes: 0,
largest: (0, String::new()),
success: false,
updated_at: Cell::new(Instant::now()),
timeout,
next_speed_check: Cell::new(Instant::now()),
next_speed_check_bytes_threshold: Cell::new(0),
})
}

Expand Down Expand Up @@ -446,7 +445,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
debug!("downloading {} as {}", id, token);
assert!(self.pending_ids.insert(id.clone()));

let (mut handle, timeout) = ops::http_handle_and_timeout(self.set.config)?;
let (mut handle, _timeout) = ops::http_handle_and_timeout(self.set.config)?;
handle.get(true)?;
handle.url(&url)?;
handle.follow_location(true)?; // follow redirects
Expand Down Expand Up @@ -501,7 +500,6 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
self.set.config.shell().status("Downloading", "crates ...")?;
}

let now = Instant::now();
let dl = Download {
token,
data: RefCell::new(Vec::new()),
Expand All @@ -511,11 +509,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
total: Cell::new(0),
current: Cell::new(0),
start: Instant::now(),
updated_at: Cell::new(now),
timeout,
timed_out: Cell::new(None),
next_speed_check: Cell::new(now),
next_speed_check_bytes_threshold: Cell::new(0),
retry: Retry::new(self.set.config)?,
};
self.enqueue(dl, handle)?;
Expand Down Expand Up @@ -638,10 +632,8 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// active downloads to make sure they don't fire because of a slowly
// extracted tarball.
let finish_dur = start.elapsed();
for (dl, _) in self.pending.values_mut() {
dl.updated_at.set(dl.updated_at.get() + finish_dur);
dl.next_speed_check.set(dl.next_speed_check.get() + finish_dur);
}
self.updated_at.set(self.updated_at.get() + finish_dur);
self.next_speed_check.set(self.next_speed_check.get() + finish_dur);

let slot = &self.set.packages[&dl.id];
assert!(slot.fill(pkg).is_ok());
Expand All @@ -652,12 +644,12 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let mut handle = self.set.multi.add(handle)?;
let now = Instant::now();
handle.set_token(dl.token)?;
self.updated_at.set(now);
self.next_speed_check.set(now + self.timeout.dur);
self.next_speed_check_bytes_threshold.set(self.timeout.low_speed_limit as u64);
dl.timed_out.set(None);
dl.updated_at.set(now);
dl.current.set(0);
dl.total.set(0);
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(dl.timeout.low_speed_limit as u64);
self.pending.insert(dl.token, (dl, handle));
Ok(())
}
Expand Down Expand Up @@ -712,25 +704,31 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
dl.total.set(total);
let now = Instant::now();
if cur != dl.current.get() {
let delta = cur - dl.current.get();
let threshold = self.next_speed_check_bytes_threshold.get();

dl.current.set(cur);
dl.updated_at.set(now);
self.updated_at.set(now);

if dl.current.get() >= dl.next_speed_check_bytes_threshold.get() {
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(
dl.current.get() + dl.timeout.low_speed_limit as u64,
if delta >= threshold {
self.next_speed_check.set(now + self.timeout.dur);
self.next_speed_check_bytes_threshold.set(
self.timeout.low_speed_limit as u64,
);
} else {
self.next_speed_check_bytes_threshold.set(threshold - delta);
}
}
if !self.tick(WhyTick::DownloadUpdate).is_ok() {
return false
}

// If we've spent too long not actually receiving any data we time out.
if now - dl.updated_at.get() > dl.timeout.dur {
if now - self.updated_at.get() > self.timeout.dur {
alexcrichton marked this conversation as resolved.
Show resolved Hide resolved
self.updated_at.set(now);
let msg = format!("failed to download any data for `{}` within {}s",
dl.id,
dl.timeout.dur.as_secs());
self.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}
Expand All @@ -739,13 +737,14 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// limit, see if we've transferred enough data during this threshold. If
// it fails this check then we fail because the download is going too
// slowly.
if now >= dl.next_speed_check.get() {
assert!(dl.current.get() < dl.next_speed_check_bytes_threshold.get());
if now >= self.next_speed_check.get() {
self.next_speed_check.set(now + self.timeout.dur);
assert!(self.next_speed_check_bytes_threshold.get() > 0);
let msg = format!("download of `{}` failed to transfer more \
than {} bytes in {}s",
dl.id,
dl.timeout.low_speed_limit,
dl.timeout.dur.as_secs());
self.timeout.low_speed_limit,
self.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}
Expand Down