diff --git a/Cargo.lock b/Cargo.lock index 6a0d9cdd09f14f..5364929003631e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -254,6 +254,16 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "async-attributes" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3203e79f4dd9bdda415ed03cf14dae5a2bf775c683a00f94e9cd1faf0f596e5" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "async-channel" version = "1.8.0" @@ -386,6 +396,7 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" dependencies = [ + "async-attributes", "async-channel", "async-global-executor", "async-io", @@ -1221,12 +1232,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" -[[package]] -name = "castaway" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2698f953def977c68f935bb0dfa959375ad4638570e969e2f1e9f433cbf1af6" - [[package]] name = "castaway" version = "0.2.2" @@ -1521,7 +1526,7 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" dependencies = [ - "castaway 0.2.2", + "castaway", "cfg-if", "itoa", "ryu", @@ -2164,37 +2169,6 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b365fabc795046672053e29c954733ec3b05e4be654ab130fe8f1f94d7051f35" -[[package]] -name = "curl" -version = "0.4.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "509bd11746c7ac09ebd19f0b17782eae80aadee26237658a6b4808afb5c11a22" -dependencies = [ - "curl-sys", - "libc", - "openssl-probe", - "openssl-sys", - "schannel", - "socket2 0.4.9", - "winapi", -] - -[[package]] -name = "curl-sys" -version = "0.4.60+curl-7.88.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "717abe2cb465a5da6ce06617388a3980c9a2844196734bec8ccb8e575250f13f" -dependencies = [ - "cc", - "libc", - "libnghttp2-sys", - "libz-sys", - "openssl-sys", - "pkg-config", - "vcpkg", - "winapi", -] - [[package]] name = "darling" version = "0.14.4" @@ -3430,19 +3404,19 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] name = "httpmock" -version = "0.6.8" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b02e044d3b4c2f94936fb05f9649efa658ca788f44eb6b87554e2033fc8ce93" +checksum = "08ec9586ee0910472dec1a1f0f8acf52f0fdde93aea74d70d4a3107b4be0fd5b" dependencies = [ "assert-json-diff", "async-object-pool", + "async-std", "async-trait", "base64 0.21.4", "crossbeam-utils", "form_urlencoded", "futures-util", "hyper", - "isahc", "lazy_static", "levenshtein", "log", @@ -3471,6 +3445,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "human_format" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3b1f728c459d27b12448862017b96ad4767b1ec2ec5e6434e99f1577f085b8" + [[package]] name = "humantime" version = "2.1.0" @@ -3494,7 +3474,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.4", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -3854,33 +3834,6 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "616cde7c720bb2bb5824a224687d8f77bfd38922027f01d825cd7453be5099fb" -[[package]] -name = "isahc" -version = "1.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "334e04b4d781f436dc315cb1e7515bd96826426345d498149e4bde36b67f8ee9" -dependencies = [ - "async-channel", - "castaway 0.1.2", - "crossbeam-utils", - "curl", - "curl-sys", - "encoding_rs", - "event-listener", - "futures-lite", - "http 0.2.11", - "log", - "mime", - "once_cell", - "polling", - "slab", - "sluice", - "tracing", - "tracing-futures", - "url", - "waker-fn", -] - [[package]] name = "itertools" version = "0.10.5" @@ -4218,16 +4171,6 @@ dependencies = [ "libc", ] -[[package]] -name = "libnghttp2-sys" -version = "0.1.7+1.45.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57ed28aba195b38d5ff02b9170cbff627e336a20925e43b4945390401c5dc93f" -dependencies = [ - "cc", - "libc", -] - [[package]] name = "libz-sys" version = "1.1.8" @@ -4656,9 +4599,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "log", @@ -5515,18 +5458,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.0" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.0" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", @@ -6283,14 +6226,14 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.5" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.8", - "regex-syntax 0.7.5", + "regex-automata 0.4.5", + "regex-syntax 0.8.2", ] [[package]] @@ -6302,17 +6245,6 @@ dependencies = [ "regex-syntax 0.6.29", ] -[[package]] -name = "regex-automata" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" -dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax 0.7.5", -] - [[package]] name = "regex-automata" version = "0.4.5" @@ -6330,12 +6262,6 @@ version = "0.6.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" -[[package]] -name = "regex-syntax" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" - [[package]] name = "regex-syntax" version = "0.8.2" @@ -6418,10 +6344,12 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 0.22.6", "winreg", @@ -7247,9 +7175,9 @@ checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" [[package]] name = "similar" -version = "2.2.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf" +checksum = "fa42c91313f1d05da9b26f267f931cf178d4aba455b4c4622dd7355eb80c6640" dependencies = [ "bstr 0.2.17", "unicode-segmentation", @@ -7276,17 +7204,6 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03b634d87b960ab1a38c4fe143b508576f075e7c978bfad18217645ebfdfa2ec" -[[package]] -name = "sluice" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d7400c0eff44aa2fcb5e31a5f24ba9716ed90138769e4977a2ba6014ae63eb5" -dependencies = [ - "async-channel", - "futures-core", - "futures-io", -] - [[package]] name = "smallvec" version = "1.13.1" @@ -7325,12 +7242,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -9484,9 +9401,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", "bytes", @@ -9496,7 +9413,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.4", + "socket2 0.5.7", "tokio-macros", "tracing", "windows-sys 0.48.0", @@ -9514,9 +9431,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", @@ -9567,9 +9484,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" dependencies = [ "futures-core", "pin-project-lite", @@ -10951,8 +10868,10 @@ name = "turborepo-api-client" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "chrono", "http 0.2.11", + "httpmock", "lazy_static", "port_scanner", "regex", @@ -10963,6 +10882,8 @@ dependencies = [ "test-case", "thiserror", "tokio", + "tokio-stream", + "tokio-util", "tracing", "turbopath", "turborepo-ci", @@ -11006,6 +10927,7 @@ version = "0.1.0" dependencies = [ "anyhow", "base64 0.21.4", + "bytes", "camino", "futures", "hmac", @@ -11013,6 +10935,7 @@ dependencies = [ "os_str_bytes", "path-clean 1.0.1", "petgraph", + "pin-project", "port_scanner", "reqwest", "serde", @@ -11023,6 +10946,8 @@ dependencies = [ "test-case", "thiserror", "tokio", + "tokio-stream", + "tokio-util", "tracing", "turbopath", "turborepo-analytics", @@ -11158,6 +11083,7 @@ dependencies = [ "globwatch", "go-parse-duration", "hex", + "human_format", "humantime", "ignore", "itertools 0.10.5", @@ -12067,6 +11993,19 @@ dependencies = [ "leb128", ] +[[package]] +name = "wasm-streams" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmer" version = "4.2.5" diff --git a/crates/turborepo-api-client/Cargo.toml b/crates/turborepo-api-client/Cargo.toml index c900b52ae24118..306ecb9656edee 100644 --- a/crates/turborepo-api-client/Cargo.toml +++ b/crates/turborepo-api-client/Cargo.toml @@ -21,15 +21,18 @@ workspace = true [dependencies] anyhow = { workspace = true } +bytes.workspace = true chrono = { workspace = true, features = ["serde"] } lazy_static = { workspace = true } regex = { workspace = true } -reqwest = { workspace = true, features = ["json"] } +reqwest = { workspace = true, features = ["json", "stream"] } rustc_version_runtime = "0.2.1" serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } +tokio-stream = "0.1.15" +tokio-util = { version = "0.7.10", features = ["codec"] } tracing = { workspace = true } turbopath = { workspace = true } turborepo-ci = { workspace = true } diff --git a/crates/turborepo-api-client/src/error.rs b/crates/turborepo-api-client/src/error.rs index 5bd378db7b9383..422d8d5762039d 100644 --- a/crates/turborepo-api-client/src/error.rs +++ b/crates/turborepo-api-client/src/error.rs @@ -7,6 +7,8 @@ use crate::CachingStatus; #[derive(Debug, Error)] pub enum Error { + #[error("Error reading from disk: {0}")] + ReadError(#[from] std::io::Error), #[error("Error making HTTP request: {0}")] ReqwestError(#[from] reqwest::Error), #[error("skipping HTTP Request, too many failures have occurred.\nLast error: {0}")] diff --git a/crates/turborepo-api-client/src/lib.rs b/crates/turborepo-api-client/src/lib.rs index 1a659ec471c0d1..a2160505a2ef5b 100644 --- a/crates/turborepo-api-client/src/lib.rs +++ b/crates/turborepo-api-client/src/lib.rs @@ -8,7 +8,7 @@ use std::{backtrace::Backtrace, env, future::Future, time::Duration}; use lazy_static::lazy_static; use regex::Regex; pub use reqwest::Response; -use reqwest::{Method, RequestBuilder, StatusCode}; +use reqwest::{Body, Method, RequestBuilder, StatusCode}; use serde::Deserialize; use turborepo_ci::{is_ci, Vendor}; use turborepo_vercel_api::{ @@ -74,7 +74,7 @@ pub trait CacheClient { fn put_artifact( &self, hash: &str, - artifact_body: &[u8], + artifact_body: impl tokio_stream::Stream> + Send + Sync + 'static, duration: u64, tag: Option<&str>, token: &str, @@ -358,7 +358,7 @@ impl CacheClient for APIClient { async fn put_artifact( &self, hash: &str, - artifact_body: &[u8], + artifact_body: impl tokio_stream::Stream> + Send + Sync + 'static, duration: u64, tag: Option<&str>, token: &str, @@ -382,13 +382,15 @@ impl CacheClient for APIClient { request_url = preflight_response.location.clone(); } + let stream = Body::wrap_stream(artifact_body); + let mut request_builder = self .cache_client .put(request_url) .header("Content-Type", "application/octet-stream") .header("x-artifact-duration", duration.to_string()) .header("User-Agent", self.user_agent.clone()) - .body(artifact_body.to_vec()); + .body(stream); if allow_auth { request_builder = request_builder.header("Authorization", format!("Bearer {}", token)); diff --git a/crates/turborepo-cache/Cargo.toml b/crates/turborepo-cache/Cargo.toml index 39db432bcf79df..0a79d1bd10f112 100644 --- a/crates/turborepo-cache/Cargo.toml +++ b/crates/turborepo-cache/Cargo.toml @@ -24,12 +24,14 @@ workspace = true [dependencies] base64 = "0.21.0" +bytes.workspace = true camino = { workspace = true } futures = { workspace = true } hmac = "0.12.1" os_str_bytes = "6.5.0" path-clean = { workspace = true } petgraph = "0.6.3" +pin-project = "1.1.5" reqwest = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } @@ -37,6 +39,8 @@ sha2 = { workspace = true } tar = "0.4.38" thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } +tokio-stream = "0.1.15" +tokio-util = { version = "0.7.10", features = ["codec"] } tracing = { workspace = true } turbopath = { workspace = true } turborepo-analytics = { workspace = true } diff --git a/crates/turborepo-cache/src/async_cache.rs b/crates/turborepo-cache/src/async_cache.rs index 94c32a0ad8da67..e25d2b5a350cd0 100644 --- a/crates/turborepo-cache/src/async_cache.rs +++ b/crates/turborepo-cache/src/async_cache.rs @@ -1,13 +1,15 @@ -use std::sync::{atomic::AtomicU8, Arc}; +use std::sync::{atomic::AtomicU8, Arc, Mutex}; use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::sync::{mpsc, Semaphore}; +use tokio::sync::{mpsc, oneshot, Semaphore}; use tracing::{warn, Instrument, Level}; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf}; use turborepo_analytics::AnalyticsSender; use turborepo_api_client::{APIAuth, APIClient}; -use crate::{multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOpts}; +use crate::{ + http::UploadMap, multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOpts, +}; const WARNING_CUTOFF: u8 = 4; @@ -24,8 +26,11 @@ enum WorkerRequest { duration: u64, files: Vec, }, - Flush(tokio::sync::oneshot::Sender<()>), - Shutdown(tokio::sync::oneshot::Sender<()>), + Flush(oneshot::Sender<()>), + /// Shutdown the cache. The first oneshot notifies when shutdown starts and + /// allows the user to inspect the status of the uploads. The second + /// oneshot notifies when the shutdown is complete. + Shutdown(oneshot::Sender>>, oneshot::Sender<()>), } impl AsyncCache { @@ -95,8 +100,8 @@ impl AsyncCache { } drop(callback); } - WorkerRequest::Shutdown(callback) => { - shutdown_callback = Some(callback); + WorkerRequest::Shutdown(callback, callback2) => { + shutdown_callback = Some((callback, callback2)); break; } }; @@ -104,12 +109,20 @@ impl AsyncCache { // Drop write consumer to immediately notify callers that cache is shutting down drop(write_consumer); + let done = if let Some((closing, done)) = shutdown_callback { + closing.send(real_cache.requests().unwrap_or_default()).ok(); + Some(done) + } else { + None + }; + // wait for all writers to finish while let Some(worker) = workers.next().await { let _ = worker; } - if let Some(callback) = shutdown_callback { - callback.send(()).ok(); + + if let Some(done) = done { + done.send(()).ok(); } }); @@ -162,7 +175,7 @@ impl AsyncCache { // before checking the cache. #[tracing::instrument(skip_all)] pub async fn wait(&self) -> Result<(), CacheError> { - let (tx, rx) = tokio::sync::oneshot::channel(); + let (tx, rx) = oneshot::channel(); self.writer_sender .send(WorkerRequest::Flush(tx)) .await @@ -172,14 +185,31 @@ impl AsyncCache { Ok(()) } + /// Shut down the cache, waiting for all workers to finish writing. + /// This function returns as soon as the shut down has started, + /// returning a channel through which workers can report on their + /// progress. #[tracing::instrument(skip_all)] - pub async fn shutdown(&self) -> Result<(), CacheError> { - let (tx, rx) = tokio::sync::oneshot::channel(); + pub async fn start_shutdown( + &self, + ) -> Result<(Arc>, oneshot::Receiver<()>), CacheError> { + let (closing_tx, closing_rx) = oneshot::channel::>>(); + let (closed_tx, closed_rx) = oneshot::channel::<()>(); self.writer_sender - .send(WorkerRequest::Shutdown(tx)) + .send(WorkerRequest::Shutdown(closing_tx, closed_tx)) .await .map_err(|_| CacheError::CacheShuttingDown)?; - rx.await.ok(); + Ok((closing_rx.await.unwrap(), closed_rx)) // todo + } + + /// Shut down the cache, waiting for all workers to finish writing. + /// This function returns as soon as the shut down has started, + /// returning a channel through which workers can report on their + /// progress. + #[tracing::instrument(skip_all)] + pub async fn shutdown(&self) -> Result<(), CacheError> { + let (_, closed_rx) = self.start_shutdown().await?; + closed_rx.await.ok(); Ok(()) } } diff --git a/crates/turborepo-cache/src/http.rs b/crates/turborepo-cache/src/http.rs index 510fa1e8ee7954..00eabc75943dec 100644 --- a/crates/turborepo-cache/src/http.rs +++ b/crates/turborepo-cache/src/http.rs @@ -1,5 +1,11 @@ -use std::{backtrace::Backtrace, io::Write}; +use std::{ + backtrace::Backtrace, + collections::HashMap, + io::{Cursor, Write}, + sync::{Arc, Mutex}, +}; +use tokio_stream::StreamExt; use tracing::debug; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf}; use turborepo_analytics::AnalyticsSender; @@ -11,15 +17,19 @@ use turborepo_api_client::{ use crate::{ cache_archive::{CacheReader, CacheWriter}, signature_authentication::ArtifactSignatureAuthenticator, + upload_progress::{UploadProgress, UploadProgressQuery}, CacheError, CacheHitMetadata, CacheOpts, CacheSource, }; +pub type UploadMap = HashMap>; + pub struct HTTPCache { client: APIClient, signer_verifier: Option, repo_root: AbsoluteSystemPathBuf, api_auth: APIAuth, analytics_recorder: Option, + uploads: Arc>, } impl HTTPCache { @@ -53,6 +63,7 @@ impl HTTPCache { client, signer_verifier, repo_root, + uploads: Arc::new(Mutex::new(HashMap::new())), api_auth, analytics_recorder, } @@ -68,6 +79,7 @@ impl HTTPCache { ) -> Result<(), CacheError> { let mut artifact_body = Vec::new(); self.write(&mut artifact_body, anchor, files).await?; + let bytes = artifact_body.len(); let tag = self .signer_verifier @@ -75,13 +87,29 @@ impl HTTPCache { .map(|signer| signer.generate_tag(hash.as_bytes(), &artifact_body)) .transpose()?; + let stream = tokio_util::codec::FramedRead::new( + Cursor::new(artifact_body), + tokio_util::codec::BytesCodec::new(), + ) + .map(|res| { + res.map(|bytes| bytes.freeze()) + .map_err(|e| turborepo_api_client::Error::from(e)) + }); + + let (progress, query) = UploadProgress::<10, 100, _>::new(stream, Some(bytes)); + + { + let mut uploads = self.uploads.lock().unwrap(); + uploads.insert(hash.to_string(), query); + } + tracing::debug!("uploading {}", hash); match self .client .put_artifact( hash, - &artifact_body, + progress, duration, tag.as_deref(), &self.api_auth.token, @@ -237,6 +265,10 @@ impl HTTPCache { ))) } + pub fn requests(&self) -> Arc> { + self.uploads.clone() + } + #[tracing::instrument(skip_all)] pub(crate) fn restore_tar( root: &AbsoluteSystemPath, diff --git a/crates/turborepo-cache/src/lib.rs b/crates/turborepo-cache/src/lib.rs index 0f9fbd00de00de..efcafe5124f9b8 100644 --- a/crates/turborepo-cache/src/lib.rs +++ b/crates/turborepo-cache/src/lib.rs @@ -19,6 +19,7 @@ mod multiplexer; pub mod signature_authentication; #[cfg(test)] mod test_cases; +mod upload_progress; use std::{backtrace, backtrace::Backtrace}; diff --git a/crates/turborepo-cache/src/multiplexer.rs b/crates/turborepo-cache/src/multiplexer.rs index 33a8bccb359e2a..4a02954a5b4374 100644 --- a/crates/turborepo-cache/src/multiplexer.rs +++ b/crates/turborepo-cache/src/multiplexer.rs @@ -1,11 +1,18 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, +}; use tracing::{debug, warn}; use turbopath::{AbsoluteSystemPath, AnchoredSystemPathBuf}; use turborepo_analytics::AnalyticsSender; use turborepo_api_client::{APIAuth, APIClient}; -use crate::{fs::FSCache, http::HTTPCache, CacheError, CacheHitMetadata, CacheOpts}; +use crate::{ + fs::FSCache, + http::{HTTPCache, UploadMap}, + CacheError, CacheHitMetadata, CacheOpts, +}; pub struct CacheMultiplexer { // We use an `AtomicBool` instead of removing the cache because that would require @@ -82,6 +89,10 @@ impl CacheMultiplexer { } } + pub fn requests(&self) -> Option>> { + self.http.as_ref().map(|http| http.requests()) + } + #[tracing::instrument(skip_all)] pub async fn put( &self, diff --git a/crates/turborepo-cache/src/upload_progress.rs b/crates/turborepo-cache/src/upload_progress.rs new file mode 100644 index 00000000000000..4d0384ebce9556 --- /dev/null +++ b/crates/turborepo-cache/src/upload_progress.rs @@ -0,0 +1,162 @@ +use std::{ + pin::Pin, + sync::{Arc, Mutex, Weak}, + task::{Context, Poll}, + time::Instant, +}; + +use futures::Stream; +use pin_project::pin_project; + +/// Consists of a total file upload time and a ring buffer of bytes sent per +/// second over some time interval. +#[pin_project] +pub struct UploadProgress { + /// A pair of bucket generation and bytes uploaded in that bucket. + /// + /// We need to store the generation to ensure that we don't accidentally + /// read from an expired bucket if there is a gap in writing. + state: Arc>, + start: Instant, + #[pin] + inner: S, +} + +impl UploadProgress { + /// Create a new `UploadProgress` with the given stream and interval. + pub fn new(inner: S, size: Option) -> (Self, UploadProgressQuery) { + let state = Arc::new(Mutex::new((0, [(0, 0); BUCKETS]))); + let now = Instant::now(); + let query = UploadProgressQuery::new(now, Arc::downgrade(&state), size); + + ( + Self { + state, + start: now, + inner, + }, + query, + ) + } +} + +impl Stream + for UploadProgress +where + S::Item: ProgressLen, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.as_mut().project(); + match this.inner.poll_next(cx) { + Poll::Ready(Some(item)) => { + // same as `curr_gen_index` but we can't borrow `self` twice + let (curr_gen, index) = { + // usize fits 570 million years of milliseconds since start on 64 bit + let gen = (this.start.elapsed().as_millis() as usize) / INTERVAL; + (gen, gen % BUCKETS) + }; + let mut state = this.state.lock().unwrap(); + let (gen, value) = &mut state.1[index]; + if *gen != curr_gen { + *gen = curr_gen; + *value = item.len(); + } else { + *value += item.len(); + } + + state.0 += item.len(); + + Poll::Ready(Some(item)) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} + +trait ProgressLen { + fn len(&self) -> usize; +} + +impl ProgressLen for bytes::Bytes { + fn len(&self) -> usize { + self.len() + } +} + +impl ProgressLen for Result { + fn len(&self) -> usize { + match self { + Ok(t) => t.len(), + Err(_) => 0, + } + } +} + +#[derive(Clone)] +pub struct UploadProgressQuery { + start: Instant, + state: Weak>, + size: Option, +} + +impl UploadProgressQuery { + fn new( + start: Instant, + state: Weak>, + size: Option, + ) -> Self { + Self { start, state, size } + } + + // Note: this usize is since the upload started so, on 64 bit systems, it + // should be good for 584.5 million years. Downcasting is probably safe... + fn curr_gen(&self) -> usize { + let since = self.start.elapsed().as_millis() as usize; + since / self.interval_ms() + } + + pub const fn interval_ms(&self) -> usize { + INTERVAL + } + + /// Get the total number of bytes uploaded. + /// + /// Returns `None` if the `UploadProgress` has been dropped. + pub fn bytes(&self) -> Option { + self.state.upgrade().map(|s| s.lock().unwrap().0) + } + + pub fn size(&self) -> Option { + self.size + } + + pub fn done(&self) -> bool { + self.state.strong_count() == 0 + } + + /// Get the average bytes per second over the last `SIZE` intervals. + /// + /// Returns `None` if the `UploadProgress` has been dropped. + pub fn average_bps(&self) -> Option { + let curr_gen = self.curr_gen(); + let min_gen = curr_gen.saturating_sub(BUCKETS); + self.state.upgrade().map(|s| { + let s = s.lock().unwrap(); + let total_bytes = + s.1.iter() + .filter(|(gen, _)| *gen >= min_gen) + .map(|(_, bytes)| *bytes) + .sum::(); + + // buckets * interval = milliseconds, so we multiply by 1000 to get seconds + (total_bytes as f64 / (BUCKETS * INTERVAL) as f64) * 1000.0 + }) + } +} diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index 041a1fd31e9a59..27eb0ca243c0ba 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -61,6 +61,7 @@ globwalk = { version = "0.1.0", path = "../turborepo-globwalk" } globwatch = { path = "../turborepo-globwatch" } go-parse-duration = "0.1.1" hex = "0.4.3" +human_format = "1.1.0" humantime = "2.1.0" ignore = "0.4.22" itertools = { workspace = true } diff --git a/crates/turborepo-lib/src/run/cache.rs b/crates/turborepo-lib/src/run/cache.rs index d4e50b90747837..4f49e9da3789d4 100644 --- a/crates/turborepo-lib/src/run/cache.rs +++ b/crates/turborepo-lib/src/run/cache.rs @@ -1,10 +1,15 @@ -use std::{io::Write, sync::Arc, time::Duration}; +use std::{ + io::Write, + sync::{Arc, Mutex}, + time::Duration, +}; +use tokio::sync::oneshot; use tracing::{debug, error}; use turbopath::{ AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPath, AnchoredSystemPathBuf, }; -use turborepo_cache::{AsyncCache, CacheError, CacheHitMetadata, CacheSource}; +use turborepo_cache::{http::UploadMap, AsyncCache, CacheError, CacheHitMetadata, CacheSource}; use turborepo_repository::package_graph::PackageInfo; use turborepo_scm::SCM; use turborepo_telemetry::events::{task::PackageTaskEventBuilder, TrackedErrors}; @@ -120,9 +125,11 @@ impl RunCache { } } - pub async fn shutdown_cache(&self) { + pub async fn shutdown_cache( + &self, + ) -> Result<(Arc>, oneshot::Receiver<()>), CacheError> { // Ignore errors coming from cache already shutting down - self.cache.shutdown().await.ok(); + self.cache.start_shutdown().await } } diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index c0d90017204f04..8dca9683abf721 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -17,6 +17,7 @@ use std::{collections::HashSet, io::Write, sync::Arc}; pub use cache::{CacheOutput, ConfigCache, Error as CacheError, RunCache, TaskCache}; use chrono::{DateTime, Local}; use rayon::iter::ParallelBridge; +use tokio::select; use tracing::debug; use turbopath::AbsoluteSystemPathBuf; use turborepo_api_client::{APIAuth, APIClient}; @@ -126,7 +127,55 @@ impl Run { tokio::spawn(async move { let _guard = subscriber.listen().await; let spinner = turborepo_ui::start_spinner("...Finishing writing to cache..."); - run_cache.shutdown_cache().await; + if let Ok((status, closed)) = run_cache.shutdown_cache().await { + let fut = async { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + // loop through hashmap, extract items that are still running, + // sum up bit per second + let (bytes_per_second, bytes_uploaded, bytes_total) = { + let status = status.lock().unwrap(); + let total_bps: f64 = status + .iter() + .filter_map(|(_hash, task)| task.average_bps()) + .sum(); + let bytes_uploaded: usize = + status.iter().filter_map(|(_hash, task)| task.bytes()).sum(); + let bytes_total: usize = status + .iter() + .filter(|(_hash, task)| !task.done()) + .filter_map(|(_hash, task)| task.size()) + .sum(); + (total_bps, bytes_uploaded, bytes_total) + }; + + if bytes_total == 0 { + continue; + } + + // convert to human readable + let mut formatter = human_format::Formatter::new(); + let formatter = formatter.with_decimals(2).with_separator(""); + let bytes_per_second = + formatter.with_units("B/s").format(bytes_per_second); + let bytes_remaining = formatter + .with_units("B") + .format(bytes_total.saturating_sub(bytes_uploaded) as f64); + + spinner.set_message(format!( + "...Finishing writing to cache... ({} remaining, {})", + bytes_remaining, bytes_per_second + )); + } + }; + select! { + _ = closed => {} + _ = fut => {} + _ = tokio::signal::ctrl_c() => {tracing::debug!("received ctrl-c, exiting");} + } + } else { + tracing::warn!("could not start shutdown, exiting"); + } spinner.finish_and_clear(); }); }