diff --git a/docs/Gcs.md b/docs/Gcs.md index f4cc3b5a6..bb91971ca 100644 --- a/docs/Gcs.md +++ b/docs/Gcs.md @@ -65,6 +65,16 @@ export SCCACHE_GCS_KEY_PATH=secret-gcp-storage.json Cache location GCS, bucket: Bucket(name=), key_prefix: (none) ``` +## Lifecycle management + +Sccache updates the `CustomTime` metadata field of cache objects every time +there was a cache hit. +This can be used to implement automatic cleanup in GCS using the +["Custom time before"](https://cloud.google.com/storage/docs/lifecycle#customtimebefore) +or ["Days since custom time"](https://cloud.google.com/storage/docs/lifecycle#dayssincecustomtime) +conditions on the bucket in order to remove cache entries which have not been +actively used for a certain amount of time. + ## Deprecation `SCCACHE_GCS_OAUTH_URL` have been deprecated and not supported, please use `SCCACHE_GCS_SERVICE_ACCOUNT` instead. diff --git a/src/cache/cache.rs b/src/cache/cache.rs index 6c63d84e3..5802e52b8 100644 --- a/src/cache/cache.rs +++ b/src/cache/cache.rs @@ -16,7 +16,7 @@ use crate::cache::azure::AzureBlobCache; use crate::cache::disk::DiskCache; #[cfg(feature = "gcs")] -use crate::cache::gcs::GCSCache; +use crate::cache::gcs; #[cfg(feature = "gha")] use crate::cache::gha::GHACache; #[cfg(feature = "memcached")] @@ -51,7 +51,9 @@ use std::io::{self, Cursor, Read, Seek, Write}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use tempfile::NamedTempFile; +use tokio::sync::RwLock as TokioRwLock; use zip::write::FileOptions; use zip::{CompressionMethod, ZipArchive, ZipWriter}; @@ -355,7 +357,8 @@ pub trait Storage: Send + Sync { /// /// - `Ok(CacheMode::ReadOnly)` means cache can only be used to `get` /// cache. - /// - `Ok(CacheMode::ReadWrite)` means cache can do both `get` and `put`. + /// - `Ok(CacheMode::ReadWrite)` means cache can do all `get`, `put`, and + /// `timestamp_cache_hit` methods. /// - `Err(err)` means cache is not setup correctly or not match with /// users input (for example, user try to use `ReadWrite` but cache /// is `ReadOnly`). @@ -367,6 +370,21 @@ pub trait Storage: Send + Sync { Ok(CacheMode::ReadWrite) } + /// Stamp the custom "access time" or "custom time" record for an entry in + /// the cache, if present. + /// + /// It is not always generally possible or practical to query this + /// information within sccache itself. + /// + /// Returns a `Future` that will provide the result or error when the stamp + /// request finished. In case the operation is supported and successfully + /// completed, an `Ok(Some(Duration)` will be present as a `Result`. In case + /// the operation can not be performed for configuration reasons an + /// `Ok(None)` will be returned. In a context where it is assumed that the + /// operation will succeed and any kind of error occurs, the `Err` is + /// returned as the `Result`. + async fn timestamp_cache_hit(&self, key: &str) -> Result>; + /// Get the storage location. fn location(&self) -> String; @@ -402,6 +420,32 @@ pub trait Storage: Send + Sync { } } +/// An interface to least recent usage time (`atime`-like) timestamp updates. +#[async_trait] +pub trait TimestampUpdater: Send + Sync { + /// Returns whether the current implementation can update the timestamp. + /// This might be `false` due to configuration reasons, or the lack of + /// necessary rights. + fn can_update(&self) -> bool { + true + } + + /// Returns whether the `TimestampUpdater` needs (re-)initialization. + /// A `true` value should indicate that a reinitialization is required or + /// it can not be determined if such a reinitialization is required. + /// A `false` value shall only be returned if it is deterministically + /// known that reinitialization can be skipped. + async fn needs_init(&self) -> Result; + + /// (Re-)initializes the timestamp updater's runtime data, such as + /// authentication tokens. + async fn init(&mut self) -> Result<()>; + + /// Updates the least recent use timestamp (if applicable) of the cache + /// entry identified by `key` to the current timestamp. + async fn update(&self, key: &str) -> Result<()>; +} + /// Configuration switches for preprocessor cache mode. #[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] @@ -453,10 +497,9 @@ impl PreprocessorCacheModeConfig { } } -/// Implement storage for operator. +/// Implement `Storage` for `opendal::Operator`. #[cfg(any( feature = "azure", - feature = "gcs", feature = "gha", feature = "memcached", feature = "redis", @@ -480,7 +523,7 @@ impl Storage for opendal::Operator { } async fn put(&self, key: &str, entry: CacheWrite) -> Result { - let start = std::time::Instant::now(); + let start = Instant::now(); self.write(&normalize_key(key), entry.finish()?).await?; @@ -533,6 +576,29 @@ impl Storage for opendal::Operator { Ok(mode) } + #[cfg(any( + feature = "azure", + feature = "gha", + feature = "memcached", + feature = "redis", + feature = "s3", + feature = "webdav", + ))] + async fn timestamp_cache_hit(&self, _key: &str) -> Result> { + let scheme = self.info().scheme(); + match scheme { + #[allow(unreachable_patterns)] + // If we build only with `cargo build --no-default-features`, we + // only want to use sccache with a local cache and no remote storage + // support. Also, lack of support for `timestamp_cache_hit` is not + // a problem for provider cases non-implemented in here. + _ => { + debug!("timestamp_cache_hit is not supported for {scheme}"); + Err(anyhow!("Not implemented.")) + } + } + } + fn location(&self) -> String { let meta = self.info(); format!( @@ -552,6 +618,92 @@ impl Storage for opendal::Operator { } } +/// Wrapper object for `Storage` implementations where a `TimestampUpdater` +/// implementation is also available. +#[derive(Debug)] +pub struct TimestampUpdatingStorage { + pub storage: S, + pub updater: Arc>, +} + +/// Implement `Storage` for `opendal::Operator` that also retained a +/// `TimestampUpdater`. +/// +/// Normally, this implementation calls the usual `Storage` trait methods. +#[cfg(any( + feature = "gcs", +))] +#[async_trait] +impl Storage +for TimestampUpdatingStorage { + async fn get(&self, key: &str) -> Result { + self.storage.get(key).await + } + + async fn put(&self, key: &str, entry: CacheWrite) -> Result { + self.storage.put(key, entry).await + } + + async fn check(&self) -> Result { + let as_storage: &dyn Storage = &self.storage; + as_storage.check().await + } + + #[cfg(any( + feature = "gcs", + ))] + async fn timestamp_cache_hit(&self, key: &str) -> Result> { + let scheme = self.storage.info().scheme(); + match scheme { + #[cfg(feature = "gcs")] + opendal::Scheme::Gcs => { + let start = Instant::now(); + { + // Try to run the update without reinitialization if + // possible. This saves us taking the exclusive write lock + // and speeds up overall performance if everyone can take + // a const reference to `updater`. + let updater = self.updater.read().await; + if !updater.can_update() { + //.The inability to update the cache, if from a known + // and verifiable property, is not an error. + return Ok(None); + } + if !updater.needs_init().await? { + updater.update(key).await?; + return Ok(Some(start.elapsed())); + } + } + + { + let mut updater = self.updater.write().await; + updater.init().await?; + updater.update(key).await?; + + Ok(Some(start.elapsed())) + } + } + #[allow(unreachable_patterns)] + _ => { + self.storage.timestamp_cache_hit(key).await + } + } + } + + fn location(&self) -> String { + self.storage.location() + } + + async fn current_size(&self) -> Result> { + self.storage.current_size().await + } + + async fn max_size(&self) -> Result> { + self.storage.max_size().await + } +} + + /// Normalize key `abcdef` into `a/b/c/abcdef` pub(in crate::cache) fn normalize_key(key: &str) -> String { format!("{}/{}/{}/{}", &key[0..1], &key[1..2], &key[2..3], &key) @@ -587,7 +739,7 @@ pub fn storage_from_config( }) => { debug!("Init gcs cache with bucket {bucket}, key_prefix {key_prefix}"); - let storage = GCSCache::build( + let storage = gcs::GCSCache::build( bucket, key_prefix, cred_path.as_deref(), @@ -596,8 +748,21 @@ pub fn storage_from_config( credential_url.as_deref(), ) .map_err(|err| anyhow!("create gcs cache failed: {err:?}"))?; + let updater = gcs::GCSCustomTimeUpdater::new( + bucket, + key_prefix, + cred_path.as_deref(), + service_account.as_deref(), + (*rw_mode).into(), + credential_url.as_deref(), + ); - return Ok(Arc::new(storage)); + let storage_with_updater = TimestampUpdatingStorage { + storage, + updater: Arc::new(TokioRwLock::new(updater)), + }; + + return Ok(Arc::new(storage_with_updater)); } #[cfg(feature = "gha")] CacheType::GHA(config::GHACacheConfig { ref version, .. }) => { diff --git a/src/cache/disk.rs b/src/cache/disk.rs index 36d3be8ef..cc96e80ce 100644 --- a/src/cache/disk.rs +++ b/src/cache/disk.rs @@ -168,6 +168,11 @@ impl Storage for DiskCache { Ok(self.rw_mode) } + async fn timestamp_cache_hit(&self, _key: &str) -> Result> { + // Not supported. + Ok(None) + } + fn location(&self) -> String { format!("Local disk: {:?}", self.lru.lock().unwrap().path()) } diff --git a/src/cache/gcs.rs b/src/cache/gcs.rs index 6adcc570a..0c0709c6d 100644 --- a/src/cache/gcs.rs +++ b/src/cache/gcs.rs @@ -13,13 +13,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::cache::CacheMode; +use crate::cache::{CacheMode, TimestampUpdater, normalize_key}; use crate::errors::*; +use async_trait::async_trait; use opendal::Operator; use opendal::{layers::LoggingLayer, services::Gcs}; -use reqsign::{GoogleToken, GoogleTokenLoad}; +use reqsign::{GoogleCredentialLoader, + GoogleToken, + GoogleTokenLoad, + GoogleTokenLoader, + GoogleSigner, +}; use reqwest::Client; use serde::Deserialize; +use serde_json::json; use url::Url; use super::http_client::set_user_agent; @@ -73,6 +80,190 @@ impl GCSCache { .finish(); Ok(op) } + +} + +/// An updater that updates entries in Google Cloud Storage with a custom +/// timestamp. +pub struct GCSCustomTimeUpdater { + bucket: String, + key_prefix: String, + cred_path: Option, + service_account: Option, + rw_mode: CacheMode, + credential_url: Option, + + client: reqwest::Client, + signer: GoogleSigner, + token: Option, + token_last_loaded_at: Option> +} + +impl GCSCustomTimeUpdater { + pub fn new ( + bucket: &str, + key_prefix: &str, + cred_path: Option<&str>, + service_account: Option<&str>, + rw_mode: CacheMode, + credential_url: Option<&str>, + ) -> Self { + GCSCustomTimeUpdater { + bucket: bucket.to_string(), + key_prefix: key_prefix.to_string(), + cred_path: cred_path.map(|s| s.to_string()), + service_account: service_account.map(|s| s.to_string()), + rw_mode, + credential_url: credential_url.map(|s| s.to_string()), + + client: reqwest::Client::new(), + signer: GoogleSigner::new("storage"), + token: None, + token_last_loaded_at: None, + } + } + + /// Initializes the request token that will be used to update the cache + /// hits, as the API requires authenticated requests. + pub async fn init(&mut self) -> Result<()> { + match self.load_token().await { + Ok(t) => { + self.token = Some(t); + self.token_last_loaded_at = Some(chrono::Utc::now()); + } + Err(err) => { + error!("failed to load token: {err}"); + return Err(anyhow!("gcs: failed to load token")); + } + } + + Ok(()) + } + + /// Loads a new token using the Google Cloud API for the timestamp update + /// requests. + async fn load_token(&mut self) -> Result { + if let Some(cred_url) = &self.credential_url { + let _ = Url::parse(cred_url) + .map_err(|err| anyhow!("gcs credential url is invalid: {err:?}"))?; + + let tc_loader = TaskClusterTokenLoader { + scope: rw_to_scope(self.rw_mode).to_string(), + url: cred_url.to_string(), + }; + + if let Ok(Some(tok)) = tc_loader.load(self.client.clone()).await { + return Ok(tok) + } + } + + if let Some(cred_path) = &self.cred_path { + let cred_loader = GoogleCredentialLoader::default() + .with_disable_env() + .with_disable_well_known_location() + .with_path(cred_path); + let creds = cred_loader.load()?.unwrap(); + + let mut token_loader = GoogleTokenLoader::new( + // devstorage.full_control is required to use the metadata PATCH + "https://www.googleapis.com/auth/devstorage.full_control", + Client::new()) + .with_credentials(creds); + if let Some(srv_account) = &self.service_account { + token_loader = token_loader.with_service_account(srv_account); + } + + if let Some(token_) = token_loader.load().await? { + return Ok(token_); + } + } + + error!("failed to load credential token: no valid cases"); + Err(anyhow!("gcs: failed to load credential token: no valid cases")) + } +} + +/// In raw API requests, `/` in paths must be URL-encoded. +fn encode_key_to_api_path(key: String) -> String { + key.replace("/", "%2F") +} + +#[async_trait] +impl TimestampUpdater for GCSCustomTimeUpdater { + fn can_update(&self) -> bool { + // Assume no rights to update metadata if the user may only read the + // cache. + CacheMode::from(self.rw_mode.clone()) == CacheMode::ReadWrite + } + + async fn needs_init(&self) -> Result { + if !self.can_update() { + return Ok(false); + } + + if self.token.is_none() || self.token_last_loaded_at.is_none() { + debug!("needs initialization because a token was never loaded"); + return Ok(true); + } + + if (chrono::Utc::now() - self.token_last_loaded_at.as_ref().unwrap()) + .num_seconds() > 3600 { + // Google Cloud tokens are valid for 3600 seconds by default. + // Unfortunately, reqsign explicitly forbids querying the otherwise + // available `expires_in` field from the `Token`... + debug!("needs reinitialization because the token likely expired"); + return Ok(true); + } + + Ok(false) + } + + async fn init(&mut self) -> Result<()> { + self.init().await + } + + // Update the `CustomTime` timestamp for an object in the cache through + // [the API](https://cloud.google.com/storage/docs/metadata#custom-time). + // + // `CustomTime` can only grow incrementally. In case an error is reported + // because someone else set a time ahead of us, the error is silently + // ignored. + async fn update(&self, key: &str) -> Result<()> { + if !self.can_update() { + return Err(anyhow!("gcs: update timestamp is not supported for a read-only cache")); + } + + let url = format!( + "https://storage.googleapis.com/storage/v1/b/{}/o/{}", + self.bucket, + encode_key_to_api_path(self.key_prefix.clone() + &normalize_key(key)) + ); + let payload = json!({ + "customTime": chrono::Utc::now().to_rfc3339(), + }); + let mut request = self.client.patch(&url).json(&payload).build()?; + self.signer.sign(&mut request, self.token.as_ref().unwrap())?; + + let resp = self.client.execute(request).await?; + if !resp.status().is_success() { + let status_code = resp.status(); + let content = resp.text().await?; + + if status_code == reqwest::StatusCode::BAD_REQUEST && + content.contains("Custom time cannot be decreased.") { + // Do not report custom time changes as an error, the local + // clock may be out of sync with another sccache server using + // the same cache. + return Ok(()); + } + + return Err(anyhow!( + "gcs: failed to update timestamp for {key}: code: {status_code}, {content}" + )); + } + + Ok(()) + } } /// TaskClusterTokenLoeader is used to load tokens from [TaskCluster](https://taskcluster.net/) diff --git a/src/cache/readonly.rs b/src/cache/readonly.rs index 90431c4fb..b101722ec 100644 --- a/src/cache/readonly.rs +++ b/src/cache/readonly.rs @@ -44,6 +44,11 @@ impl Storage for ReadOnlyStorage { Ok(CacheMode::ReadOnly) } + async fn timestamp_cache_hit(&self, _key: &str) -> Result> { + // Not supported. + Ok(None) + } + /// Get the storage location. fn location(&self) -> String { self.0.location() diff --git a/src/compiler/compiler.rs b/src/compiler/compiler.rs index 2ae421679..5f5f035bf 100644 --- a/src/compiler/compiler.rs +++ b/src/compiler/compiler.rs @@ -563,6 +563,25 @@ where match lookup { CacheLookupResult::Success(compile_result, output) => { + let out_pretty3 = out_pretty.clone(); + match storage.timestamp_cache_hit(&key).await { + Ok(None) => {} // Not an error, intentionally left empty. + Ok(Some(duration_timestamp)) => { + debug!( + "[{}]: Updated cache hit timestamp {}", + out_pretty3, + fmt_duration_as_secs(&duration_timestamp) + ); + } + Err(e) => { + // `{:#}` prints the error and the causes in a single line. + let errmsg = format!("{:#}", e); + warn!( + "[{}]: Failed to update cache hit timestamp: {}", + out_pretty3, errmsg); + } + } + Ok::<_, Error>((compile_result, output)) } CacheLookupResult::Miss(miss_type) => { diff --git a/src/test/mock_storage.rs b/src/test/mock_storage.rs index 00a6aa7c8..5daa12838 100644 --- a/src/test/mock_storage.rs +++ b/src/test/mock_storage.rs @@ -65,6 +65,9 @@ impl Storage for MockStorage { Duration::from_secs(0) }) } + async fn timestamp_cache_hit(&self, _key: &str) -> Result { + Ok(Duration::from_secs(0)) + } fn location(&self) -> String { "Mock Storage".to_string() }