From bfeafde0df7d5b5e8ada37550591db9d9ad6c3ff Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 3 Oct 2018 16:16:24 -0700 Subject: [PATCH] Fix timeouts firing while tarballs are extracted This commit fixes #6125 by ensuring that while we're extracting tarballs or doing other synchronous work like grabbing file locks we're not letting the timeout timers of each HTTP transfer keep ticking. This is curl's default behavior (which we don't want in this scenario). Instead the timeout logic is inlined directly and we manually account for the synchronous work happening not counting towards timeout limits. Closes #6125 --- Cargo.toml | 1 + src/cargo/core/package.rs | 154 ++++++++++++++++++++++++++++++++++---- src/cargo/lib.rs | 1 + src/cargo/ops/mod.rs | 3 +- src/cargo/ops/registry.rs | 75 ++++++++++++------- src/cargo/util/config.rs | 3 +- src/cargo/util/network.rs | 6 +- 7 files changed, 196 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8d2c62b6fbd..5dbff3eacf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ crates-io = { path = "src/crates-io", version = "0.20" } crossbeam-utils = "0.5" crypto-hash = "0.3.1" curl = { version = "0.4.17", features = ['http2'] } +curl-sys = "0.4.12" env_logger = "0.5.11" failure = "0.1.2" filetime = "0.2" diff --git a/src/cargo/core/package.rs b/src/cargo/core/package.rs index 7b249ba594d..171ac27e33e 100644 --- a/src/cargo/core/package.rs +++ b/src/cargo/core/package.rs @@ -7,6 +7,8 @@ use std::path::{Path, PathBuf}; use std::time::{Instant, Duration}; use bytesize::ByteSize; +use curl; +use curl_sys; use curl::easy::{Easy, HttpVersion}; use curl::multi::{Multi, EasyHandle}; use lazycell::LazyCell; @@ -257,7 +259,7 @@ pub struct Downloads<'a, 'cfg: 'a> { set: &'a PackageSet<'cfg>, pending: HashMap, EasyHandle)>, pending_ids: HashSet, - results: Vec<(usize, CargoResult<()>)>, + results: Vec<(usize, Result<(), curl::Error>)>, next: usize, progress: RefCell>>, downloads_finished: usize, @@ -268,14 +270,49 @@ pub struct Downloads<'a, 'cfg: 'a> { } struct Download<'cfg> { + /// Token for this download, used as the key of the `Downloads::pending` map + /// and stored in `EasyHandle` as well. token: usize, + + /// Package that we're downloading id: PackageId, + + /// Actual downloaded data, updated throughout the lifetime of this download data: RefCell>, + + /// The URL that we're downloading from, cached here for error messages and + /// reenqueuing. url: String, + + /// A descriptive string to print when we've finished downloading this crate descriptor: String, + + /// Statistics updated from the progress callback in libcurl total: Cell, current: Cell, + + /// The moment we started this transfer at start: Instant, + + /// Last time we noticed that we got some more data from libcurl + updated_at: Cell, + + /// Timeout management, both of timeout thresholds as well as whether or not + /// our connection has timed out (and accompanying message if it has). + /// + /// Note that timeout management is done manually here because we have a + /// `Multi` with a lot of active transfers but between transfers finishing + /// we perform some possibly slow synchronous work (like grabbing file + /// locks, extracting tarballs, etc). The default timers on our `Multi` keep + /// running during this work, but we don't want them to count towards timing + /// everythig out. As a result, we manage this manually and take the time + /// for synchronous work into account manually. + timeout: ops::HttpTimeout, + timed_out: Cell>, + next_speed_check: Cell, + next_speed_check_bytes_threshold: Cell, + + /// Logic used to track retrying this download if it's a spurious failure. retry: Retry<'cfg>, } @@ -409,7 +446,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { debug!("downloading {} as {}", id, token); assert!(self.pending_ids.insert(id.clone())); - let mut handle = ops::http_handle(self.set.config)?; + let (mut handle, timeout) = ops::http_handle_and_timeout(self.set.config)?; handle.get(true)?; handle.url(&url)?; handle.follow_location(true)?; // follow redirects @@ -447,14 +484,10 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { handle.progress(true)?; handle.progress_function(move |dl_total, dl_cur, _, _| { tls::with(|downloads| { - let downloads = match downloads { - Some(d) => d, - None => return false, - }; - let dl = &downloads.pending[&token].0; - dl.total.set(dl_total as u64); - dl.current.set(dl_cur as u64); - downloads.tick(WhyTick::DownloadUpdate).is_ok() + match downloads { + Some(d) => d.progress(token, dl_total as u64, dl_cur as u64), + None => false, + } }) })?; @@ -468,6 +501,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { self.set.config.shell().status("Downloading", "crates ...")?; } + let now = Instant::now(); let dl = Download { token, data: RefCell::new(Vec::new()), @@ -477,6 +511,11 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { total: Cell::new(0), current: Cell::new(0), start: Instant::now(), + updated_at: Cell::new(now), + timeout, + timed_out: Cell::new(None), + next_speed_check: Cell::new(now), + next_speed_check_bytes_threshold: Cell::new(0), retry: Retry::new(self.set.config)?, }; self.enqueue(dl, handle)?; @@ -514,13 +553,35 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { // then we want to re-enqueue our request for another attempt and // then we wait for another request to finish. let ret = { + let timed_out = &dl.timed_out; let url = &dl.url; dl.retry.try(|| { - result?; + if let Err(e) = result { + // If this error is "aborted by callback" then that's + // probably because our progress callback aborted due to + // a timeout. We'll find out by looking at the + // `timed_out` field, looking for a descriptive message. + // If one is found we switch the error code (to ensure + // it's flagged as spurious) and then attach our extra + // information to the error. + if !e.is_aborted_by_callback() { + return Err(e.into()) + } + + return Err(match timed_out.replace(None) { + Some(msg) => { + let code = curl_sys::CURLE_OPERATION_TIMEDOUT; + let mut err = curl::Error::new(code); + err.set_extra(msg); + err + } + None => e, + }.into()) + } let code = handle.response_code()?; if code != 200 && code != 0 { - let url = handle.effective_url()?.unwrap_or(&url); + let url = handle.effective_url()?.unwrap_or(url); return Err(HttpNot200 { code, url: url.to_string(), @@ -569,7 +630,19 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { let source = sources .get_mut(dl.id.source_id()) .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?; + let start = Instant::now(); let pkg = source.finish_download(&dl.id, data)?; + + // Assume that no time has passed while we were calling + // `finish_download`, update all speed checks and timeout limits of all + // active downloads to make sure they don't fire because of a slowly + // extracted tarball. + let finish_dur = start.elapsed(); + for (dl, _) in self.pending.values_mut() { + dl.updated_at.set(dl.updated_at.get() + finish_dur); + dl.next_speed_check.set(dl.next_speed_check.get() + finish_dur); + } + let slot = &self.set.packages[&dl.id]; assert!(slot.fill(pkg).is_ok()); Ok(slot.borrow().unwrap()) @@ -577,12 +650,19 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { fn enqueue(&mut self, dl: Download<'cfg>, handle: Easy) -> CargoResult<()> { let mut handle = self.set.multi.add(handle)?; + let now = Instant::now(); handle.set_token(dl.token)?; + dl.timed_out.set(None); + dl.updated_at.set(now); + dl.current.set(0); + dl.total.set(0); + dl.next_speed_check.set(now + dl.timeout.dur); + dl.next_speed_check_bytes_threshold.set(dl.timeout.low_speed_limit as u64); self.pending.insert(dl.token, (dl, handle)); Ok(()) } - fn wait_for_curl(&mut self) -> CargoResult<(usize, CargoResult<()>)> { + fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> { // This is the main workhorse loop. We use libcurl's portable `wait` // method to actually perform blocking. This isn't necessarily too // efficient in terms of fd management, but we should only be juggling @@ -610,7 +690,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { let token = msg.token().expect("failed to read token"); let handle = &pending[&token].1; if let Some(result) = msg.result_for(&handle) { - results.push((token, result.map_err(|e| e.into()))); + results.push((token, result)); } else { debug!("message without a result (?)"); } @@ -627,6 +707,52 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { } } + fn progress(&self, token: usize, total: u64, cur: u64) -> bool { + let dl = &self.pending[&token].0; + dl.total.set(total); + let now = Instant::now(); + if cur != dl.current.get() { + dl.current.set(cur); + dl.updated_at.set(now); + + if dl.current.get() >= dl.next_speed_check_bytes_threshold.get() { + dl.next_speed_check.set(now + dl.timeout.dur); + dl.next_speed_check_bytes_threshold.set( + dl.current.get() + dl.timeout.low_speed_limit as u64, + ); + } + } + if !self.tick(WhyTick::DownloadUpdate).is_ok() { + return false + } + + // If we've spent too long not actually receiving any data we time out. + if now - dl.updated_at.get() > dl.timeout.dur { + let msg = format!("failed to download any data for `{}` within {}s", + dl.id, + dl.timeout.dur.as_secs()); + dl.timed_out.set(Some(msg)); + return false + } + + // If we reached the point in time that we need to check our speed + // limit, see if we've transferred enough data during this threshold. If + // it fails this check then we fail because the download is going too + // slowly. + if now >= dl.next_speed_check.get() { + assert!(dl.current.get() < dl.next_speed_check_bytes_threshold.get()); + let msg = format!("download of `{}` failed to transfer more \ + than {} bytes in {}s", + dl.id, + dl.timeout.low_speed_limit, + dl.timeout.dur.as_secs()); + dl.timed_out.set(Some(msg)); + return false + } + + true + } + fn tick(&self, why: WhyTick) -> CargoResult<()> { let mut progress = self.progress.borrow_mut(); let progress = progress.as_mut().unwrap(); diff --git a/src/cargo/lib.rs b/src/cargo/lib.rs index fa54ebbfe9b..e7fcac41d24 100644 --- a/src/cargo/lib.rs +++ b/src/cargo/lib.rs @@ -22,6 +22,7 @@ extern crate core_foundation; extern crate crates_io as registry; extern crate crossbeam_utils; extern crate curl; +extern crate curl_sys; #[macro_use] extern crate failure; extern crate filetime; diff --git a/src/cargo/ops/mod.rs b/src/cargo/ops/mod.rs index 9c09f14f5ed..3b653b00158 100644 --- a/src/cargo/ops/mod.rs +++ b/src/cargo/ops/mod.rs @@ -15,7 +15,8 @@ pub use self::cargo_package::{package, PackageOpts}; pub use self::registry::{publish, registry_configuration, RegistryConfig}; pub use self::registry::{http_handle, needs_custom_http_transport, registry_login, search}; pub use self::registry::{modify_owners, yank, OwnersOptions, PublishOpts}; -pub use self::registry::configure_http_handle; +pub use self::registry::{configure_http_handle, http_handle_and_timeout}; +pub use self::registry::HttpTimeout; pub use self::cargo_fetch::{fetch, FetchOptions}; pub use self::cargo_pkgid::pkgid; pub use self::resolve::{add_overrides, get_resolved_packages, resolve_with_previous, resolve_ws, diff --git a/src/cargo/ops/registry.rs b/src/cargo/ops/registry.rs index 21d70acb485..3932abdbece 100644 --- a/src/cargo/ops/registry.rs +++ b/src/cargo/ops/registry.rs @@ -330,6 +330,12 @@ pub fn registry( /// Create a new HTTP handle with appropriate global configuration for cargo. pub fn http_handle(config: &Config) -> CargoResult { + let (mut handle, timeout) = http_handle_and_timeout(config)?; + timeout.configure(&mut handle)?; + Ok(handle) +} + +pub fn http_handle_and_timeout(config: &Config) -> CargoResult<(Easy, HttpTimeout)> { if config.frozen() { bail!( "attempting to make an HTTP request, but --frozen was \ @@ -345,33 +351,26 @@ pub fn http_handle(config: &Config) -> CargoResult { // connect phase as well as a "low speed" timeout so if we don't receive // many bytes in a large-ish period of time then we time out. let mut handle = Easy::new(); - configure_http_handle(config, &mut handle)?; - Ok(handle) + let timeout = configure_http_handle(config, &mut handle)?; + Ok((handle, timeout)) } pub fn needs_custom_http_transport(config: &Config) -> CargoResult { let proxy_exists = http_proxy_exists(config)?; - let timeout = http_timeout(config)?; + let timeout = HttpTimeout::new(config)?.is_non_default(); let cainfo = config.get_path("http.cainfo")?; let check_revoke = config.get_bool("http.check-revoke")?; let user_agent = config.get_string("http.user-agent")?; Ok(proxy_exists - || timeout.is_some() + || timeout || cainfo.is_some() || check_revoke.is_some() || user_agent.is_some()) } /// Configure a libcurl http handle with the defaults options for Cargo -pub fn configure_http_handle(config: &Config, handle: &mut Easy) -> CargoResult<()> { - // The timeout option for libcurl by default times out the entire transfer, - // but we probably don't want this. Instead we only set timeouts for the - // connect phase as well as a "low speed" timeout so if we don't receive - // many bytes in a large-ish period of time then we time out. - handle.connect_timeout(Duration::new(30, 0))?; - handle.low_speed_time(Duration::new(30, 0))?; - handle.low_speed_limit(http_low_speed_limit(config)?)?; +pub fn configure_http_handle(config: &Config, handle: &mut Easy) -> CargoResult { if let Some(proxy) = http_proxy(config)? { handle.proxy(&proxy)?; } @@ -381,10 +380,6 @@ pub fn configure_http_handle(config: &Config, handle: &mut Easy) -> CargoResult< if let Some(check) = config.get_bool("http.check-revoke")? { handle.ssl_options(SslOpt::new().no_revoke(!check.val))?; } - if let Some(timeout) = http_timeout(config)? { - handle.connect_timeout(Duration::new(timeout as u64, 0))?; - handle.low_speed_time(Duration::new(timeout as u64, 0))?; - } if let Some(user_agent) = config.get_string("http.user-agent")? { handle.useragent(&user_agent.val)?; } else { @@ -416,15 +411,44 @@ pub fn configure_http_handle(config: &Config, handle: &mut Easy) -> CargoResult< } })?; } - Ok(()) + + HttpTimeout::new(config) } -/// Find an override from config for curl low-speed-limit option, otherwise use default value -fn http_low_speed_limit(config: &Config) -> CargoResult { - if let Some(s) = config.get::>("http.low-speed-limit")? { - return Ok(s); +#[must_use] +pub struct HttpTimeout { + pub dur: Duration, + pub low_speed_limit: u32, +} + +impl HttpTimeout { + pub fn new(config: &Config) -> CargoResult { + let low_speed_limit = config.get::>("http.low-speed-limit")? + .unwrap_or(10); + let seconds = config.get::>("http.timeout")? + .or_else(|| env::var("HTTP_TIMEOUT").ok().and_then(|s| s.parse().ok())) + .unwrap_or(30); + Ok(HttpTimeout { + dur: Duration::new(seconds, 0), + low_speed_limit, + }) + } + + fn is_non_default(&self) -> bool { + self.dur != Duration::new(30, 0) || self.low_speed_limit != 10 + } + + pub fn configure(&self, handle: &mut Easy) -> CargoResult<()> { + // The timeout option for libcurl by default times out the entire + // transfer, but we probably don't want this. Instead we only set + // timeouts for the connect phase as well as a "low speed" timeout so + // if we don't receive many bytes in a large-ish period of time then we + // time out. + handle.connect_timeout(self.dur)?; + handle.low_speed_time(self.dur)?; + handle.low_speed_limit(self.low_speed_limit)?; + Ok(()) } - Ok(10) } /// Find an explicit HTTP proxy if one is available. @@ -463,13 +487,6 @@ fn http_proxy_exists(config: &Config) -> CargoResult { } } -fn http_timeout(config: &Config) -> CargoResult> { - if let Some(s) = config.get_i64("http.timeout")? { - return Ok(Some(s.val)); - } - Ok(env::var("HTTP_TIMEOUT").ok().and_then(|s| s.parse().ok())) -} - pub fn registry_login(config: &Config, token: String, registry: Option) -> CargoResult<()> { let RegistryConfig { token: old_token, .. diff --git a/src/cargo/util/config.rs b/src/cargo/util/config.rs index 99689e40cf4..cf5b22a0dda 100644 --- a/src/cargo/util/config.rs +++ b/src/cargo/util/config.rs @@ -780,7 +780,8 @@ impl Config { { let mut http = http.borrow_mut(); http.reset(); - ops::configure_http_handle(self, &mut http)?; + let timeout = ops::configure_http_handle(self, &mut http)?; + timeout.configure(&mut http)?; } Ok(http) } diff --git a/src/cargo/util/network.rs b/src/cargo/util/network.rs index 60a629ea4dc..4c3fcace3f2 100644 --- a/src/cargo/util/network.rs +++ b/src/cargo/util/network.rs @@ -47,9 +47,11 @@ fn maybe_spurious(err: &Error) -> bool { } } if let Some(curl_err) = e.downcast_ref::() { - if curl_err.is_couldnt_connect() || curl_err.is_couldnt_resolve_proxy() + if curl_err.is_couldnt_connect() + || curl_err.is_couldnt_resolve_proxy() || curl_err.is_couldnt_resolve_host() - || curl_err.is_operation_timedout() || curl_err.is_recv_error() + || curl_err.is_operation_timedout() + || curl_err.is_recv_error() { return true; }