diff --git a/Cargo.toml b/Cargo.toml index 8d2c62b6fbd..5dbff3eacf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ crates-io = { path = "src/crates-io", version = "0.20" } crossbeam-utils = "0.5" crypto-hash = "0.3.1" curl = { version = "0.4.17", features = ['http2'] } +curl-sys = "0.4.12" env_logger = "0.5.11" failure = "0.1.2" filetime = "0.2" diff --git a/src/cargo/core/package.rs b/src/cargo/core/package.rs index 8824ee75b1a..5770311e350 100644 --- a/src/cargo/core/package.rs +++ b/src/cargo/core/package.rs @@ -7,6 +7,8 @@ use std::path::{Path, PathBuf}; use std::time::{Instant, Duration}; use bytesize::ByteSize; +use curl; +use curl_sys; use curl::easy::{Easy, HttpVersion}; use curl::multi::{Multi, EasyHandle}; use lazycell::LazyCell; @@ -257,7 +259,7 @@ pub struct Downloads<'a, 'cfg: 'a> { set: &'a PackageSet<'cfg>, pending: HashMap, EasyHandle)>, pending_ids: HashSet, - results: Vec<(usize, CargoResult<()>)>, + results: Vec<(usize, Result<(), curl::Error>)>, next: usize, progress: RefCell>>, downloads_finished: usize, @@ -268,14 +270,49 @@ pub struct Downloads<'a, 'cfg: 'a> { } struct Download<'cfg> { + /// Token for this download, used as the key of the `Downloads::pending` map + /// and stored in `EasyHandle` as well. token: usize, + + /// Package that we're downloading id: PackageId, + + /// Actual downloaded data, updated throughout the lifetime of this download data: RefCell>, + + /// The URL that we're downloading from, cached here for error messages and + /// reenqueuing. url: String, + + /// A descriptive string to print when we've finished downloading this crate descriptor: String, + + /// Statistics updated from the progress callback in libcurl total: Cell, current: Cell, + + /// The moment we started this transfer at start: Instant, + + /// Last time we noticed that we got some more data from libcurl + updated_at: Cell, + + /// Timeout management, both of timeout thresholds as well as whether or not + /// our connection has timed out (and accompanying message if it has). + /// + /// Note that timeout management is done manually here because we have a + /// `Multi` with a lot of active transfers but between transfers finishing + /// we perform some possibly slow synchronous work (like grabbing file + /// locks, extracting tarballs, etc). The default timers on our `Multi` keep + /// running during this work, but we don't want them to count towards timing + /// everythig out. As a result, we manage this manually and take the time + /// for synchronous work into account manually. + timeout: ops::HttpTimeout, + timed_out: Cell>, + next_speed_check: Cell, + next_speed_check_bytes_threshold: Cell, + + /// Logic used to track retrying this download if it's a spurious failure. retry: Retry<'cfg>, } @@ -409,7 +446,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { debug!("downloading {} as {}", id, token); assert!(self.pending_ids.insert(id.clone())); - let mut handle = ops::http_handle(self.set.config)?; + let (mut handle, timeout) = ops::http_handle_and_timeout(self.set.config)?; handle.get(true)?; handle.url(&url)?; handle.follow_location(true)?; // follow redirects @@ -447,14 +484,10 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { handle.progress(true)?; handle.progress_function(move |dl_total, dl_cur, _, _| { tls::with(|downloads| { - let downloads = match downloads { - Some(d) => d, - None => return false, - }; - let dl = &downloads.pending[&token].0; - dl.total.set(dl_total as u64); - dl.current.set(dl_cur as u64); - downloads.tick(WhyTick::DownloadUpdate).is_ok() + match downloads { + Some(d) => d.progress(token, dl_total as u64, dl_cur as u64), + None => false, + } }) })?; @@ -468,6 +501,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { self.set.config.shell().status("Downloading", "crates ...")?; } + let now = Instant::now(); let dl = Download { token, data: RefCell::new(Vec::new()), @@ -477,6 +511,11 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { total: Cell::new(0), current: Cell::new(0), start: Instant::now(), + updated_at: Cell::new(now), + timeout, + timed_out: Cell::new(None), + next_speed_check: Cell::new(now), + next_speed_check_bytes_threshold: Cell::new(0), retry: Retry::new(self.set.config)?, }; self.enqueue(dl, handle)?; @@ -514,13 +553,35 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { // then we want to re-enqueue our request for another attempt and // then we wait for another request to finish. let ret = { + let timed_out = &dl.timed_out; let url = &dl.url; dl.retry.try(|| { - result?; + if let Err(e) = result { + // If this error is "aborted by callback" then that's + // probably because our progress callback aborted due to + // a timeout. We'll find out by looking at the + // `timed_out` field, looking for a descriptive message. + // If one is found we switch the error code (to ensure + // it's flagged as spurious) and then attach our extra + // information to the error. + if !e.is_aborted_by_callback() { + return Err(e.into()) + } + + return Err(match timed_out.replace(None) { + Some(msg) => { + let code = curl_sys::CURLE_OPERATION_TIMEDOUT; + let mut err = curl::Error::new(code); + err.set_extra(msg); + err + } + None => e, + }.into()) + } let code = handle.response_code()?; if code != 200 && code != 0 { - let url = handle.effective_url()?.unwrap_or(&url); + let url = handle.effective_url()?.unwrap_or(url); return Err(HttpNot200 { code, url: url.to_string(), @@ -569,7 +630,19 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { let source = sources .get_mut(dl.id.source_id()) .ok_or_else(|| internal(format!("couldn't find source for `{}`", dl.id)))?; + let start = Instant::now(); let pkg = source.finish_download(&dl.id, data)?; + + // Assume that no time has passed while we were calling + // `finish_download`, update all speed checks and timeout limits of all + // active downloads to make sure they don't fire because of a slowly + // extracted tarball. + let finish_dur = start.elapsed(); + for (dl, _) in self.pending.values_mut() { + dl.updated_at.set(dl.updated_at.get() + finish_dur); + dl.next_speed_check.set(dl.next_speed_check.get() + finish_dur); + } + let slot = &self.set.packages[&dl.id]; assert!(slot.fill(pkg).is_ok()); Ok(slot.borrow().unwrap()) @@ -577,12 +650,19 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { fn enqueue(&mut self, dl: Download<'cfg>, handle: Easy) -> CargoResult<()> { let mut handle = self.set.multi.add(handle)?; + let now = Instant::now(); handle.set_token(dl.token)?; + dl.timed_out.set(None); + dl.updated_at.set(now); + dl.current.set(0); + dl.total.set(0); + dl.next_speed_check.set(now + dl.timeout.dur); + dl.next_speed_check_bytes_threshold.set(dl.timeout.low_speed_limit as u64); self.pending.insert(dl.token, (dl, handle)); Ok(()) } - fn wait_for_curl(&mut self) -> CargoResult<(usize, CargoResult<()>)> { + fn wait_for_curl(&mut self) -> CargoResult<(usize, Result<(), curl::Error>)> { // This is the main workhorse loop. We use libcurl's portable `wait` // method to actually perform blocking. This isn't necessarily too // efficient in terms of fd management, but we should only be juggling @@ -610,7 +690,7 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { let token = msg.token().expect("failed to read token"); let handle = &pending[&token].1; if let Some(result) = msg.result_for(&handle) { - results.push((token, result.map_err(|e| e.into()))); + results.push((token, result)); } else { debug!("message without a result (?)"); } @@ -627,6 +707,52 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> { } } + fn progress(&self, token: usize, total: u64, cur: u64) -> bool { + let dl = &self.pending[&token].0; + dl.total.set(total); + let now = Instant::now(); + if cur != dl.current.get() { + dl.current.set(cur); + dl.updated_at.set(now); + + if dl.current.get() >= dl.next_speed_check_bytes_threshold.get() { + dl.next_speed_check.set(now + dl.timeout.dur); + dl.next_speed_check_bytes_threshold.set( + dl.current.get() + dl.timeout.low_speed_limit as u64, + ); + } + } + if !self.tick(WhyTick::DownloadUpdate).is_ok() { + return false + } + + // If we've spent too long not actually receiving any data we time out. + if now - dl.updated_at.get() > dl.timeout.dur { + let msg = format!("failed to download any data for `{}` within {}s", + dl.id, + dl.timeout.dur.as_secs()); + dl.timed_out.set(Some(msg)); + return false + } + + // If we reached the point in time that we need to check our speed + // limit, see if we've transferred enough data during this threshold. If + // it fails this check then we fail because the download is going too + // slowly. + if now >= dl.next_speed_check.get() { + assert!(dl.current.get() < dl.next_speed_check_bytes_threshold.get()); + let msg = format!("download of `{}` failed to transfer more \ + than {} bytes in {}s", + dl.id, + dl.timeout.low_speed_limit, + dl.timeout.dur.as_secs()); + dl.timed_out.set(Some(msg)); + return false + } + + true + } + fn tick(&self, why: WhyTick) -> CargoResult<()> { let mut progress = self.progress.borrow_mut(); let progress = progress.as_mut().unwrap(); diff --git a/src/cargo/lib.rs b/src/cargo/lib.rs index fa54ebbfe9b..e7fcac41d24 100644 --- a/src/cargo/lib.rs +++ b/src/cargo/lib.rs @@ -22,6 +22,7 @@ extern crate core_foundation; extern crate crates_io as registry; extern crate crossbeam_utils; extern crate curl; +extern crate curl_sys; #[macro_use] extern crate failure; extern crate filetime; diff --git a/src/cargo/ops/mod.rs b/src/cargo/ops/mod.rs index 9c09f14f5ed..3b653b00158 100644 --- a/src/cargo/ops/mod.rs +++ b/src/cargo/ops/mod.rs @@ -15,7 +15,8 @@ pub use self::cargo_package::{package, PackageOpts}; pub use self::registry::{publish, registry_configuration, RegistryConfig}; pub use self::registry::{http_handle, needs_custom_http_transport, registry_login, search}; pub use self::registry::{modify_owners, yank, OwnersOptions, PublishOpts}; -pub use self::registry::configure_http_handle; +pub use self::registry::{configure_http_handle, http_handle_and_timeout}; +pub use self::registry::HttpTimeout; pub use self::cargo_fetch::{fetch, FetchOptions}; pub use self::cargo_pkgid::pkgid; pub use self::resolve::{add_overrides, get_resolved_packages, resolve_with_previous, resolve_ws, diff --git a/src/cargo/ops/registry.rs b/src/cargo/ops/registry.rs index 21d70acb485..3932abdbece 100644 --- a/src/cargo/ops/registry.rs +++ b/src/cargo/ops/registry.rs @@ -330,6 +330,12 @@ pub fn registry( /// Create a new HTTP handle with appropriate global configuration for cargo. pub fn http_handle(config: &Config) -> CargoResult { + let (mut handle, timeout) = http_handle_and_timeout(config)?; + timeout.configure(&mut handle)?; + Ok(handle) +} + +pub fn http_handle_and_timeout(config: &Config) -> CargoResult<(Easy, HttpTimeout)> { if config.frozen() { bail!( "attempting to make an HTTP request, but --frozen was \ @@ -345,33 +351,26 @@ pub fn http_handle(config: &Config) -> CargoResult { // connect phase as well as a "low speed" timeout so if we don't receive // many bytes in a large-ish period of time then we time out. let mut handle = Easy::new(); - configure_http_handle(config, &mut handle)?; - Ok(handle) + let timeout = configure_http_handle(config, &mut handle)?; + Ok((handle, timeout)) } pub fn needs_custom_http_transport(config: &Config) -> CargoResult { let proxy_exists = http_proxy_exists(config)?; - let timeout = http_timeout(config)?; + let timeout = HttpTimeout::new(config)?.is_non_default(); let cainfo = config.get_path("http.cainfo")?; let check_revoke = config.get_bool("http.check-revoke")?; let user_agent = config.get_string("http.user-agent")?; Ok(proxy_exists - || timeout.is_some() + || timeout || cainfo.is_some() || check_revoke.is_some() || user_agent.is_some()) } /// Configure a libcurl http handle with the defaults options for Cargo -pub fn configure_http_handle(config: &Config, handle: &mut Easy) -> CargoResult<()> { - // The timeout option for libcurl by default times out the entire transfer, - // but we probably don't want this. Instead we only set timeouts for the - // connect phase as well as a "low speed" timeout so if we don't receive - // many bytes in a large-ish period of time then we time out. - handle.connect_timeout(Duration::new(30, 0))?; - handle.low_speed_time(Duration::new(30, 0))?; - handle.low_speed_limit(http_low_speed_limit(config)?)?; +pub fn configure_http_handle(config: &Config, handle: &mut Easy) -> CargoResult { if let Some(proxy) = http_proxy(config)? { handle.proxy(&proxy)?; } @@ -381,10 +380,6 @@ pub fn configure_http_handle(config: &Config, handle: &mut Easy) -> CargoResult< if let Some(check) = config.get_bool("http.check-revoke")? { handle.ssl_options(SslOpt::new().no_revoke(!check.val))?; } - if let Some(timeout) = http_timeout(config)? { - handle.connect_timeout(Duration::new(timeout as u64, 0))?; - handle.low_speed_time(Duration::new(timeout as u64, 0))?; - } if let Some(user_agent) = config.get_string("http.user-agent")? { handle.useragent(&user_agent.val)?; } else { @@ -416,15 +411,44 @@ pub fn configure_http_handle(config: &Config, handle: &mut Easy) -> CargoResult< } })?; } - Ok(()) + + HttpTimeout::new(config) } -/// Find an override from config for curl low-speed-limit option, otherwise use default value -fn http_low_speed_limit(config: &Config) -> CargoResult { - if let Some(s) = config.get::>("http.low-speed-limit")? { - return Ok(s); +#[must_use] +pub struct HttpTimeout { + pub dur: Duration, + pub low_speed_limit: u32, +} + +impl HttpTimeout { + pub fn new(config: &Config) -> CargoResult { + let low_speed_limit = config.get::>("http.low-speed-limit")? + .unwrap_or(10); + let seconds = config.get::>("http.timeout")? + .or_else(|| env::var("HTTP_TIMEOUT").ok().and_then(|s| s.parse().ok())) + .unwrap_or(30); + Ok(HttpTimeout { + dur: Duration::new(seconds, 0), + low_speed_limit, + }) + } + + fn is_non_default(&self) -> bool { + self.dur != Duration::new(30, 0) || self.low_speed_limit != 10 + } + + pub fn configure(&self, handle: &mut Easy) -> CargoResult<()> { + // The timeout option for libcurl by default times out the entire + // transfer, but we probably don't want this. Instead we only set + // timeouts for the connect phase as well as a "low speed" timeout so + // if we don't receive many bytes in a large-ish period of time then we + // time out. + handle.connect_timeout(self.dur)?; + handle.low_speed_time(self.dur)?; + handle.low_speed_limit(self.low_speed_limit)?; + Ok(()) } - Ok(10) } /// Find an explicit HTTP proxy if one is available. @@ -463,13 +487,6 @@ fn http_proxy_exists(config: &Config) -> CargoResult { } } -fn http_timeout(config: &Config) -> CargoResult> { - if let Some(s) = config.get_i64("http.timeout")? { - return Ok(Some(s.val)); - } - Ok(env::var("HTTP_TIMEOUT").ok().and_then(|s| s.parse().ok())) -} - pub fn registry_login(config: &Config, token: String, registry: Option) -> CargoResult<()> { let RegistryConfig { token: old_token, .. diff --git a/src/cargo/util/config.rs b/src/cargo/util/config.rs index 99689e40cf4..cf5b22a0dda 100644 --- a/src/cargo/util/config.rs +++ b/src/cargo/util/config.rs @@ -780,7 +780,8 @@ impl Config { { let mut http = http.borrow_mut(); http.reset(); - ops::configure_http_handle(self, &mut http)?; + let timeout = ops::configure_http_handle(self, &mut http)?; + timeout.configure(&mut http)?; } Ok(http) } diff --git a/src/cargo/util/network.rs b/src/cargo/util/network.rs index 60a629ea4dc..4c3fcace3f2 100644 --- a/src/cargo/util/network.rs +++ b/src/cargo/util/network.rs @@ -47,9 +47,11 @@ fn maybe_spurious(err: &Error) -> bool { } } if let Some(curl_err) = e.downcast_ref::() { - if curl_err.is_couldnt_connect() || curl_err.is_couldnt_resolve_proxy() + if curl_err.is_couldnt_connect() + || curl_err.is_couldnt_resolve_proxy() || curl_err.is_couldnt_resolve_host() - || curl_err.is_operation_timedout() || curl_err.is_recv_error() + || curl_err.is_operation_timedout() + || curl_err.is_recv_error() { return true; }