Skip to content

Commit

Permalink
Fix timeouts firing while tarballs are extracted
Browse files Browse the repository at this point in the history
This commit fixes #6125 by ensuring that while we're extracting tarballs
or doing other synchronous work like grabbing file locks we're not
letting the timeout timers of each HTTP transfer keep ticking. This is
curl's default behavior (which we don't want in this scenario). Instead
the timeout logic is inlined directly and we manually account for the
synchronous work happening not counting towards timeout limits.

Closes #6125
  • Loading branch information
alexcrichton committed Nov 6, 2018
1 parent b8021c7 commit bfeafde
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ crates-io = { path = "src/crates-io", version = "0.20" }
crossbeam-utils = "0.5"
crypto-hash = "0.3.1"
curl = { version = "0.4.17", features = ['http2'] }
curl-sys = "0.4.12"
env_logger = "0.5.11"
failure = "0.1.2"
filetime = "0.2"
Expand Down
154 changes: 140 additions & 14 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use std::path::{Path, PathBuf};
use std::time::{Instant, Duration};

use bytesize::ByteSize;
use curl;
use curl_sys;
use curl::easy::{Easy, HttpVersion};
use curl::multi::{Multi, EasyHandle};
use lazycell::LazyCell;
Expand Down Expand Up @@ -257,7 +259,7 @@ pub struct Downloads<'a, 'cfg: 'a> {
set: &'a PackageSet<'cfg>,
pending: HashMap<usize, (Download<'cfg>, EasyHandle)>,
pending_ids: HashSet<PackageId>,
results: Vec<(usize, CargoResult<()>)>,
results: Vec<(usize, Result<(), curl::Error>)>,
next: usize,
progress: RefCell<Option<Progress<'cfg>>>,
downloads_finished: usize,
Expand All @@ -268,14 +270,49 @@ pub struct Downloads<'a, 'cfg: 'a> {
}

struct Download<'cfg> {
/// Token for this download, used as the key of the `Downloads::pending` map
/// and stored in `EasyHandle` as well.
token: usize,

/// Package that we're downloading
id: PackageId,

/// Actual downloaded data, updated throughout the lifetime of this download
data: RefCell<Vec<u8>>,

/// The URL that we're downloading from, cached here for error messages and
/// reenqueuing.
url: String,

/// A descriptive string to print when we've finished downloading this crate
descriptor: String,

/// Statistics updated from the progress callback in libcurl
total: Cell<u64>,
current: Cell<u64>,

/// The moment we started this transfer at
start: Instant,

/// Last time we noticed that we got some more data from libcurl
updated_at: Cell<Instant>,

/// Timeout management, both of timeout thresholds as well as whether or not
/// our connection has timed out (and accompanying message if it has).
///
/// Note that timeout management is done manually here because we have a
/// `Multi` with a lot of active transfers but between transfers finishing
/// we perform some possibly slow synchronous work (like grabbing file
/// locks, extracting tarballs, etc). The default timers on our `Multi` keep
/// running during this work, but we don't want them to count towards timing
/// everythig out. As a result, we manage this manually and take the time
/// for synchronous work into account manually.
timeout: ops::HttpTimeout,
timed_out: Cell<Option<String>>,
next_speed_check: Cell<Instant>,
next_speed_check_bytes_threshold: Cell<u64>,

/// Logic used to track retrying this download if it's a spurious failure.
retry: Retry<'cfg>,
}

Expand Down Expand Up @@ -409,7 +446,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
debug!("downloading {} as {}", id, token);
assert!(self.pending_ids.insert(id.clone()));

let mut handle = ops::http_handle(self.set.config)?;
let (mut handle, timeout) = ops::http_handle_and_timeout(self.set.config)?;
handle.get(true)?;
handle.url(&url)?;
handle.follow_location(true)?; // follow redirects
Expand Down Expand Up @@ -447,14 +484,10 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
handle.progress(true)?;
handle.progress_function(move |dl_total, dl_cur, _, _| {
tls::with(|downloads| {
let downloads = match downloads {
Some(d) => d,
None => return false,
};
let dl = &downloads.pending[&token].0;
dl.total.set(dl_total as u64);
dl.current.set(dl_cur as u64);
downloads.tick(WhyTick::DownloadUpdate).is_ok()
match downloads {
Some(d) => d.progress(token, dl_total as u64, dl_cur as u64),
None => false,
}
})
})?;

Expand All @@ -468,6 +501,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
self.set.config.shell().status("Downloading", "crates ...")?;
}

let now = Instant::now();
let dl = Download {
token,
data: RefCell::new(Vec::new()),
Expand All @@ -477,6 +511,11 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
total: Cell::new(0),
current: Cell::new(0),
start: Instant::now(),
updated_at: Cell::new(now),
timeout,
timed_out: Cell::new(None),
next_speed_check: Cell::new(now),
next_speed_check_bytes_threshold: Cell::new(0),
retry: Retry::new(self.set.config)?,
};
self.enqueue(dl, handle)?;
Expand Down Expand Up @@ -514,13 +553,35 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
// then we want to re-enqueue our request for another attempt and
// then we wait for another request to finish.
let ret = {
let timed_out = &dl.timed_out;
let url = &dl.url;
dl.retry.try(|| {
result?;
if let Err(e) = result {
// If this error is "aborted by callback" then that's
// probably because our progress callback aborted due to
// a timeout. We'll find out by looking at the
// `timed_out` field, looking for a descriptive message.
// If one is found we switch the error code (to ensure
// it's flagged as spurious) and then attach our extra
// information to the error.
if !e.is_aborted_by_callback() {
return Err(e.into())
}

return Err(match timed_out.replace(None) {
Some(msg) => {
let code = curl_sys::CURLE_OPERATION_TIMEDOUT;
let mut err = curl::Error::new(code);
err.set_extra(msg);
err
}
None => e,
}.into())
}

let code = handle.response_code()?;
if code != 200 && code != 0 {
let url = handle.effective_url()?.unwrap_or(&url);
let url = handle.effective_url()?.unwrap_or(url);
return Err(HttpNot200 {
code,
url: url.to_string(),
Expand Down Expand Up @@ -569,20 +630,39 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let source = sources
.get_mut(dl.id.source_id())
.ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?;
let start = Instant::now();
let pkg = source.finish_download(&dl.id, data)?;

// Assume that no time has passed while we were calling
// `finish_download`, update all speed checks and timeout limits of all
// active downloads to make sure they don't fire because of a slowly
// extracted tarball.
let finish_dur = start.elapsed();
for (dl, _) in self.pending.values_mut() {
dl.updated_at.set(dl.updated_at.get() + finish_dur);
dl.next_speed_check.set(dl.next_speed_check.get() + finish_dur);
}

let slot = &self.set.packages[&dl.id];
assert!(slot.fill(pkg).is_ok());
Ok(slot.borrow().unwrap())
}

fn enqueue(&mut self, dl: Download<'cfg>, handle: Easy) -> CargoResult<()> {
let mut handle = self.set.multi.add(handle)?;
let now = Instant::now();
handle.set_token(dl.token)?;
dl.timed_out.set(None);
dl.updated_at.set(now);
dl.current.set(0);
dl.total.set(0);
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(dl.timeout.low_speed_limit as u64);
self.pending.insert(dl.token, (dl, handle));
Ok(())
}

fn wait_for_curl(&mut self) -> CargoResult<(usize, CargoResult<()>)> {
fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> {
// This is the main workhorse loop. We use libcurl's portable `wait`
// method to actually perform blocking. This isn't necessarily too
// efficient in terms of fd management, but we should only be juggling
Expand Down Expand Up @@ -610,7 +690,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let token = msg.token().expect("failed to read token");
let handle = &pending[&token].1;
if let Some(result) = msg.result_for(&handle) {
results.push((token, result.map_err(|e| e.into())));
results.push((token, result));
} else {
debug!("message without a result (?)");
}
Expand All @@ -627,6 +707,52 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
}
}

fn progress(&self, token: usize, total: u64, cur: u64) -> bool {
let dl = &self.pending[&token].0;
dl.total.set(total);
let now = Instant::now();
if cur != dl.current.get() {
dl.current.set(cur);
dl.updated_at.set(now);

if dl.current.get() >= dl.next_speed_check_bytes_threshold.get() {
dl.next_speed_check.set(now + dl.timeout.dur);
dl.next_speed_check_bytes_threshold.set(
dl.current.get() + dl.timeout.low_speed_limit as u64,
);
}
}
if !self.tick(WhyTick::DownloadUpdate).is_ok() {
return false
}

// If we've spent too long not actually receiving any data we time out.
if now - dl.updated_at.get() > dl.timeout.dur {
let msg = format!("failed to download any data for `{}` within {}s",
dl.id,
dl.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}

// If we reached the point in time that we need to check our speed
// limit, see if we've transferred enough data during this threshold. If
// it fails this check then we fail because the download is going too
// slowly.
if now >= dl.next_speed_check.get() {
assert!(dl.current.get() < dl.next_speed_check_bytes_threshold.get());
let msg = format!("download of `{}` failed to transfer more \
than {} bytes in {}s",
dl.id,
dl.timeout.low_speed_limit,
dl.timeout.dur.as_secs());
dl.timed_out.set(Some(msg));
return false
}

true
}

fn tick(&self, why: WhyTick) -> CargoResult<()> {
let mut progress = self.progress.borrow_mut();
let progress = progress.as_mut().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/cargo/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ extern crate core_foundation;
extern crate crates_io as registry;
extern crate crossbeam_utils;
extern crate curl;
extern crate curl_sys;
#[macro_use]
extern crate failure;
extern crate filetime;
Expand Down
3 changes: 2 additions & 1 deletion src/cargo/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ pub use self::cargo_package::{package, PackageOpts};
pub use self::registry::{publish, registry_configuration, RegistryConfig};
pub use self::registry::{http_handle, needs_custom_http_transport, registry_login, search};
pub use self::registry::{modify_owners, yank, OwnersOptions, PublishOpts};
pub use self::registry::configure_http_handle;
pub use self::registry::{configure_http_handle, http_handle_and_timeout};
pub use self::registry::HttpTimeout;
pub use self::cargo_fetch::{fetch, FetchOptions};
pub use self::cargo_pkgid::pkgid;
pub use self::resolve::{add_overrides, get_resolved_packages, resolve_with_previous, resolve_ws,
Expand Down
Loading

0 comments on commit bfeafde

Please sign in to comment.