From c2b8c03a3120775179529b17f3e4d4bcf4958164 Mon Sep 17 00:00:00 2001 From: Jake Shadle Date: Fri, 29 Sep 2023 13:26:33 +0200 Subject: [PATCH] Add support for OS locking (#33) As pointed out in #30, gix's locking mechanism is insufficient in the face of SIGKILL or other process interruption that can't be caught by gix's signal handler, in addition to the giant footgun that tame-index doesn't actually hook that signal handler for you, meaning applications using it that forget to hook it also run into issues. In addition, I raised #17 while doing https://github.com/crate-ci/cargo-release/pull/693 (https://github.com/crate-ci/cargo-release/pull/693/files/012d3e9a7be23db14096e6a0d41cea7528f9348c#r1301688472) as right now tame-index can perform mutation concurrently with cargo itself, or potentially read partial data while it is being written. This PR resolves both of these issues by forcing all R/W operations on indexes to take a `&FileLock` argument, as well as providing a `LockOptions` "builder" to create file locks. These locks are created using flock on unix and LockFileEx on windows, meaning they can properly be cleaned up by the OS in all situations, including SIGKILL and power loss etc, unlike gix's locks, and is the same mechanism that cargo uses for its global package lock, meaning downstream users can ensure they play nicely with cargo. The lock facilities are part of the public API of tame-index as I opted to roll my own implementation instead of using fslock, as it is very outdated, and doesn't support timeouts. This does mean a lot of unsafe has been added, but it is tested and not _too_ bad. This can potentially be moved out to a separate crate in the future, but is fine for now. This means it could be used to resolve https://github.com/rustsec/rustsec/issues/1011, and is something I will use in cargo-deny for the same thing, protecting access to the advisory database during mutation. It should also be noted that one can also just construct a `FileLock::unlocked()` to satisfy the API, without actually performing any locking, for cases where it's not needed/testing/etc. Resolves: #17 Resolves: #30 --- .gitignore | 3 +- Cargo.toml | 16 +- benches/sparse.rs | 15 +- src/error.rs | 3 + src/index.rs | 9 +- src/index/cache.rs | 17 +- src/index/combo.rs | 21 ++- src/index/git.rs | 13 +- src/index/git_remote.rs | 40 ++--- src/index/local.rs | 7 +- src/index/sparse.rs | 18 +- src/index/sparse_remote.rs | 51 ++++-- src/utils.rs | 3 +- src/utils/flock.rs | 301 ++++++++++++++++++++++++++++++++ src/utils/flock/bindings.toml | 26 +++ src/utils/flock/unix.rs | 155 ++++++++++++++++ src/utils/flock/win_bindings.rs | 74 ++++++++ src/utils/flock/windows.rs | 138 +++++++++++++++ tests/cache.rs | 6 +- tests/flock.rs | 172 ++++++++++++++++++ tests/flock/Cargo.toml | 8 + tests/flock/src/main.rs | 32 ++++ tests/git.rs | 46 ++--- tests/local.rs | 4 +- tests/sparse.rs | 29 +-- tests/utils.rs | 5 + 26 files changed, 1106 insertions(+), 106 deletions(-) create mode 100644 src/utils/flock.rs create mode 100644 src/utils/flock/bindings.toml create mode 100644 src/utils/flock/unix.rs create mode 100644 src/utils/flock/win_bindings.rs create mode 100644 src/utils/flock/windows.rs create mode 100644 tests/flock.rs create mode 100644 tests/flock/Cargo.toml create mode 100644 tests/flock/src/main.rs diff --git a/.gitignore b/.gitignore index 2f88dba..a6fb271 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target **/*.rs.bk -Cargo.lock \ No newline at end of file +Cargo.lock +tests/flock/target diff --git a/Cargo.toml b/Cargo.toml index 671c366..6ad9155 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,17 +53,15 @@ tokio = { version = "1.0", default-features = false, features = [ "time", ], optional = true } # cargo config parsing -toml = "0.7" +toml = "0.8" # Faster hashing twox-hash = { version = "1.6", default-features = false } [dependencies.gix] optional = true -version = "0.53.1" +version = "0.54" default-features = false -features = [ - "blocking-http-transport-reqwest", -] +features = ["blocking-http-transport-reqwest"] [dependencies.reqwest] optional = true @@ -71,8 +69,14 @@ version = "0.11" default-features = false features = ["blocking", "gzip"] +[target.'cfg(unix)'.dependencies] +libc = "0.2" + +[target.'cfg(windows)'.dependencies] +windows-targets = "0.48" + [dev-dependencies] -cargo_metadata = "0.17" +cargo_metadata = "0.18" rayon = "1.7" tempfile = "3.6" tiny-bench = "0.3" diff --git a/benches/sparse.rs b/benches/sparse.rs index ffe858c..2216c2f 100644 --- a/benches/sparse.rs +++ b/benches/sparse.rs @@ -60,7 +60,11 @@ fn main() { } fn blocking(rsi: &tame_index::index::RemoteSparseIndex, krates: &KrateSet) { - let krates = rsi.krates(krates.clone(), true); + let krates = rsi.krates( + krates.clone(), + true, + &tame_index::index::FileLock::unlocked(), + ); for (krate, res) in krates { if let Err(err) = res { @@ -70,7 +74,14 @@ fn blocking(rsi: &tame_index::index::RemoteSparseIndex, krates: &KrateSet) { } fn asunc(rsi: &tame_index::index::AsyncRemoteSparseIndex, krates: &KrateSet) { - let krates = rsi.krates_blocking(krates.clone(), true, None).unwrap(); + let krates = rsi + .krates_blocking( + krates.clone(), + true, + None, + &tame_index::index::FileLock::unlocked(), + ) + .unwrap(); for (krate, res) in krates { if let Err(err) = res { diff --git a/src/error.rs b/src/error.rs index 7c9bd4e..d26b660 100644 --- a/src/error.rs +++ b/src/error.rs @@ -50,6 +50,9 @@ pub enum Error { #[cfg(feature = "local")] #[error(transparent)] Local(#[from] LocalRegistryError), + /// Failed to lock a file + #[error(transparent)] + Lock(#[from] crate::utils::flock::FileLockError), } impl From for Error { diff --git a/src/index.rs b/src/index.rs index 3888058..314d056 100644 --- a/src/index.rs +++ b/src/index.rs @@ -29,6 +29,8 @@ pub use sparse::SparseIndex; #[cfg(feature = "sparse")] pub use sparse_remote::{AsyncRemoteSparseIndex, RemoteSparseIndex}; +pub use crate::utils::flock::FileLock; + /// Global configuration of an index, reflecting the [contents of config.json](https://doc.rust-lang.org/cargo/reference/registries.html#index-format). #[derive(Eq, PartialEq, PartialOrd, Ord, Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct IndexConfig { @@ -119,12 +121,13 @@ impl ComboIndexCache { pub fn cached_krate( &self, name: crate::KrateName<'_>, + lock: &FileLock, ) -> Result, Error> { match self { - Self::Git(index) => index.cached_krate(name), - Self::Sparse(index) => index.cached_krate(name), + Self::Git(index) => index.cached_krate(name, lock), + Self::Sparse(index) => index.cached_krate(name, lock), #[cfg(feature = "local")] - Self::Local(lr) => lr.cached_krate(name), + Self::Local(lr) => lr.cached_krate(name, lock), } } diff --git a/src/index/cache.rs b/src/index/cache.rs index c9ddab0..caf2e81 100644 --- a/src/index/cache.rs +++ b/src/index/cache.rs @@ -32,6 +32,7 @@ pub const INDEX_V_MAX: u32 = 2; /// The byte representation of [`INDEX_V_MAX`] const INDEX_V_MAX_BYTES: [u8; 4] = INDEX_V_MAX.to_le_bytes(); +use super::FileLock; use crate::{CacheError, Error, IndexKrate, KrateName, PathBuf}; /// A wrapper around a byte buffer that has been (partially) validated to be a @@ -227,8 +228,9 @@ impl IndexCache { &self, name: KrateName<'_>, revision: Option<&str>, + lock: &FileLock, ) -> Result, Error> { - let Some(contents) = self.read_cache_file(name)? else { + let Some(contents) = self.read_cache_file(name, lock)? else { return Ok(None); }; @@ -237,7 +239,12 @@ impl IndexCache { } /// Writes the specified crate and revision to the cache - pub fn write_to_cache(&self, krate: &IndexKrate, revision: &str) -> Result { + pub fn write_to_cache( + &self, + krate: &IndexKrate, + revision: &str, + _lock: &FileLock, + ) -> Result { let name = krate.name().try_into()?; let cache_path = self.cache_path(name); @@ -280,7 +287,11 @@ impl IndexCache { /// /// It is recommended to use [`Self::cached_krate`] #[inline] - pub fn read_cache_file(&self, name: KrateName<'_>) -> Result>, Error> { + pub fn read_cache_file( + &self, + name: KrateName<'_>, + _lock: &FileLock, + ) -> Result>, Error> { let cache_path = self.cache_path(name); match std::fs::read(&cache_path) { diff --git a/src/index/combo.rs b/src/index/combo.rs index 58595cd..d537a26 100644 --- a/src/index/combo.rs +++ b/src/index/combo.rs @@ -1,7 +1,7 @@ #[cfg(feature = "local")] use crate::index::LocalRegistry; use crate::{ - index::{RemoteGitIndex, RemoteSparseIndex}, + index::{FileLock, RemoteGitIndex, RemoteSparseIndex}, Error, IndexKrate, KrateName, }; @@ -29,23 +29,28 @@ impl ComboIndex { &self, name: KrateName<'_>, write_cache_entry: bool, + lock: &FileLock, ) -> Result, Error> { match self { - Self::Git(index) => index.krate(name, write_cache_entry), - Self::Sparse(index) => index.krate(name, write_cache_entry), + Self::Git(index) => index.krate(name, write_cache_entry, lock), + Self::Sparse(index) => index.krate(name, write_cache_entry, lock), #[cfg(feature = "local")] - Self::Local(lr) => lr.cached_krate(name), + Self::Local(lr) => lr.cached_krate(name, lock), } } /// Retrieves the cached crate metadata if it exists #[inline] - pub fn cached_krate(&self, name: KrateName<'_>) -> Result, Error> { + pub fn cached_krate( + &self, + name: KrateName<'_>, + lock: &FileLock, + ) -> Result, Error> { match self { - Self::Git(index) => index.cached_krate(name), - Self::Sparse(index) => index.cached_krate(name), + Self::Git(index) => index.cached_krate(name, lock), + Self::Sparse(index) => index.cached_krate(name, lock), #[cfg(feature = "local")] - Self::Local(lr) => lr.cached_krate(name), + Self::Local(lr) => lr.cached_krate(name, lock), } } } diff --git a/src/index/git.rs b/src/index/git.rs index 32043f4..a8c72ff 100644 --- a/src/index/git.rs +++ b/src/index/git.rs @@ -1,4 +1,4 @@ -use super::IndexCache; +use super::{FileLock, IndexCache}; use crate::{Error, IndexKrate, KrateName, PathBuf}; /// The URL of the crates.io index for use with git @@ -66,8 +66,12 @@ impl GitIndex { /// There are no guarantees around freshness, and no network I/O will be /// performed. #[inline] - pub fn cached_krate(&self, name: KrateName<'_>) -> Result, Error> { - self.cache.cached_krate(name, self.head_commit()) + pub fn cached_krate( + &self, + name: KrateName<'_>, + lock: &FileLock, + ) -> Result, Error> { + self.cache.cached_krate(name, self.head_commit(), lock) } /// Writes the specified crate to the cache. @@ -79,10 +83,11 @@ impl GitIndex { &self, krate: &IndexKrate, blob_id: Option<&str>, + lock: &FileLock, ) -> Result, Error> { let Some(id) = blob_id.or_else(|| self.head_commit()) else { return Ok(None); }; - self.cache.write_to_cache(krate, id).map(Some) + self.cache.write_to_cache(krate, id, lock).map(Some) } } diff --git a/src/index/git_remote.rs b/src/index/git_remote.rs index 730e2c4..15cf307 100644 --- a/src/index/git_remote.rs +++ b/src/index/git_remote.rs @@ -1,4 +1,4 @@ -use super::GitIndex; +use super::{FileLock, GitIndex}; use crate::{Error, IndexKrate, KrateName}; use std::sync::atomic::AtomicBool; @@ -30,14 +30,12 @@ impl RemoteGitIndex { /// if the process is interrupted with Ctrl+C. To support `panic = abort` you also need to register /// the `gix` signal handler to clean up the locks, see [`gix::interrupt::init_handler`]. #[inline] - pub fn new(index: GitIndex) -> Result { + pub fn new(index: GitIndex, lock: &FileLock) -> Result { Self::with_options( index, gix::progress::Discard, &gix::interrupt::IS_INTERRUPTED, - gix::lock::acquire::Fail::AfterDurationWithBackoff(std::time::Duration::from_secs( - 60 * 10, /* 10 minutes */ - )), + lock, ) } @@ -60,7 +58,7 @@ impl RemoteGitIndex { mut index: GitIndex, progress: P, should_interrupt: &AtomicBool, - lock_policy: gix::lock::acquire::Fail, + _lock: &FileLock, ) -> Result where P: gix::NestedProgress, @@ -82,14 +80,6 @@ impl RemoteGitIndex { mapping.reduced = open_with_complete_config.clone(); mapping.full = open_with_complete_config.clone(); - let _lock = gix::lock::Marker::acquire_to_hold_resource( - index.cache.path.with_extension("tame-index"), - lock_policy, - Some(std::path::PathBuf::from_iter(Some( - std::path::Component::RootDir, - ))), - )?; - // Attempt to open the repository, if it fails for any reason, // attempt to perform a fresh clone instead let repo = gix::ThreadSafeRepository::discover_opts( @@ -235,8 +225,9 @@ impl RemoteGitIndex { &self, name: KrateName<'_>, write_cache_entry: bool, + lock: &FileLock, ) -> Result, Error> { - if let Ok(Some(cached)) = self.cached_krate(name) { + if let Ok(Some(cached)) = self.cached_krate(name, lock) { return Ok(Some(cached)); } @@ -252,7 +243,7 @@ impl RemoteGitIndex { let gix::ObjectId::Sha1(sha1) = blob.id; let blob_id = crate::utils::encode_hex(&sha1, &mut hex_id); - let _ = self.index.write_to_cache(&krate, Some(blob_id)); + let _ = self.index.write_to_cache(&krate, Some(blob_id), lock); } Ok(Some(krate)) @@ -302,8 +293,12 @@ impl RemoteGitIndex { /// advantage of that though as it does not have access to git and thus /// cannot know the blob id. #[inline] - pub fn cached_krate(&self, name: KrateName<'_>) -> Result, Error> { - let Some(cached) = self.index.cache.read_cache_file(name)? else { + pub fn cached_krate( + &self, + name: KrateName<'_>, + lock: &FileLock, + ) -> Result, Error> { + let Some(cached) = self.index.cache.read_cache_file(name, lock)? else { return Ok(None); }; let valid = crate::index::cache::ValidCacheEntry::read(&cached)?; @@ -329,8 +324,12 @@ impl RemoteGitIndex { /// /// This method performs network I/O. #[inline] - pub fn fetch(&mut self) -> Result<(), Error> { - self.fetch_with_options(gix::progress::Discard, &gix::interrupt::IS_INTERRUPTED) + pub fn fetch(&mut self, lock: &FileLock) -> Result<(), Error> { + self.fetch_with_options( + gix::progress::Discard, + &gix::interrupt::IS_INTERRUPTED, + lock, + ) } /// Same as [`Self::fetch`] but allows specifying a progress implementation @@ -339,6 +338,7 @@ impl RemoteGitIndex { &mut self, mut progress: P, should_interrupt: &AtomicBool, + _lock: &FileLock, ) -> Result<(), Error> where P: gix::NestedProgress, diff --git a/src/index/local.rs b/src/index/local.rs index 0388406..45fb08b 100644 --- a/src/index/local.rs +++ b/src/index/local.rs @@ -1,5 +1,6 @@ //! Contains code for reading and writing [local registries](https://doc.rust-lang.org/cargo/reference/source-replacement.html#local-registry-sources) +use super::FileLock; use crate::{Error, IndexKrate, KrateName, Path, PathBuf}; use smol_str::SmolStr; @@ -130,7 +131,11 @@ impl LocalRegistry { /// Note this naming is just to be consistent with [`crate::SparseIndex`] and /// [`crate::GitIndex`], local registries do not have a .cache in the index #[inline] - pub fn cached_krate(&self, name: KrateName<'_>) -> Result, Error> { + pub fn cached_krate( + &self, + name: KrateName<'_>, + _lock: &FileLock, + ) -> Result, Error> { let index_path = make_path(&self.path, name); let buf = match std::fs::read(&index_path) { diff --git a/src/index/sparse.rs b/src/index/sparse.rs index d1a1b90..43d595e 100644 --- a/src/index/sparse.rs +++ b/src/index/sparse.rs @@ -1,4 +1,4 @@ -use super::{cache::ValidCacheEntry, IndexCache}; +use super::{cache::ValidCacheEntry, FileLock, IndexCache}; use crate::{Error, HttpError, IndexKrate, KrateName}; /// The default URL of the crates.io HTTP index @@ -72,8 +72,12 @@ impl SparseIndex { /// Attempts to read the locally cached crate information #[inline] - pub fn cached_krate(&self, name: KrateName<'_>) -> Result, Error> { - self.cache.cached_krate(name, None) + pub fn cached_krate( + &self, + name: KrateName<'_>, + lock: &FileLock, + ) -> Result, Error> { + self.cache.cached_krate(name, None, lock) } /// Creates an HTTP request that can be sent via your HTTP client of choice @@ -92,6 +96,7 @@ impl SparseIndex { &self, name: KrateName<'_>, etag: Option<&str>, + lock: &FileLock, ) -> Result, Error> { use http::header; @@ -129,7 +134,7 @@ impl SparseIndex { // If we're unable to get the cache version we can just ignore setting the // header, guaranteeing we'll get the full index contents if the crate exists let set_cache_version = |headers: &mut header::HeaderMap| -> Option<()> { - let contents = self.cache.read_cache_file(name).ok()??; + let contents = self.cache.read_cache_file(name, lock).ok()??; let valid = ValidCacheEntry::read(&contents).ok()?; let (key, value) = valid.revision.split_once(':')?; @@ -182,6 +187,7 @@ impl SparseIndex { name: KrateName<'_>, response: http::Response>, write_cache_entry: bool, + lock: &FileLock, ) -> Result, Error> { use http::{header, StatusCode}; let (parts, body) = response.into_parts(); @@ -210,14 +216,14 @@ impl SparseIndex { // It's unfortunate if we can't write to the cache, but we // don't treat it as a hard error since we still have the // index metadata - let _err = self.cache.write_to_cache(&krate, &revision); + let _err = self.cache.write_to_cache(&krate, &revision, lock); } Ok(Some(krate)) } // The local cache entry is up to date with the latest entry on the // server, we can just return the local one - StatusCode::NOT_MODIFIED => self.cache.cached_krate(name, None), + StatusCode::NOT_MODIFIED => self.cache.cached_krate(name, None, lock), // The server requires authorization but the user didn't provide it StatusCode::UNAUTHORIZED => Err(HttpError::StatusCode { code: StatusCode::UNAUTHORIZED, diff --git a/src/index/sparse_remote.rs b/src/index/sparse_remote.rs index 682a093..a06885b 100644 --- a/src/index/sparse_remote.rs +++ b/src/index/sparse_remote.rs @@ -1,4 +1,4 @@ -use super::SparseIndex; +use super::{FileLock, SparseIndex}; use crate::{Error, IndexKrate, KrateName}; pub use reqwest::blocking::Client; pub use reqwest::Client as AsyncClient; @@ -30,8 +30,9 @@ impl RemoteSparseIndex { &self, name: KrateName<'_>, write_cache_entry: bool, + lock: &FileLock, ) -> Result, Error> { - let req = self.index.make_remote_request(name, None)?; + let req = self.index.make_remote_request(name, None, lock)?; let req = req.try_into()?; let res = self.client.execute(req)?; @@ -49,7 +50,7 @@ impl RemoteSparseIndex { let res = builder.body(body.to_vec())?; self.index - .parse_remote_response(name, res, write_cache_entry) + .parse_remote_response(name, res, write_cache_entry, lock) } /// Attempts to read the locally cached crate information @@ -58,8 +59,12 @@ impl RemoteSparseIndex { /// guarantee that the cache information is up to date with the latest in /// the remote index #[inline] - pub fn cached_krate(&self, name: KrateName<'_>) -> Result, Error> { - self.index.cached_krate(name) + pub fn cached_krate( + &self, + name: KrateName<'_>, + lock: &FileLock, + ) -> Result, Error> { + self.index.cached_krate(name, lock) } /// Helper method for downloading multiple crates in parallel @@ -71,6 +76,7 @@ impl RemoteSparseIndex { &self, krates: BTreeSet, write_cache_entries: bool, + lock: &FileLock, ) -> BTreeMap, Error>> { use rayon::prelude::*; krates @@ -78,7 +84,7 @@ impl RemoteSparseIndex { .map(|kname| { let res = || { let name = kname.as_str().try_into()?; - self.krate(name, write_cache_entries) + self.krate(name, write_cache_entries, lock) }; let res = res(); (kname, res) @@ -108,12 +114,16 @@ impl AsyncRemoteSparseIndex { &self, name: KrateName<'_>, write_cache_entry: bool, + lock: &FileLock, ) -> Result, Error> { - let req = self.index.make_remote_request(name, None)?.try_into()?; + let req = self + .index + .make_remote_request(name, None, lock)? + .try_into()?; let res = Self::exec_request(&self.client, req).await?; self.index - .parse_remote_response(name, res, write_cache_entry) + .parse_remote_response(name, res, write_cache_entry, lock) } async fn exec_request( @@ -151,8 +161,12 @@ impl AsyncRemoteSparseIndex { /// guarantee that the cache information is up to date with the latest in /// the remote index #[inline] - pub fn cached_krate(&self, name: KrateName<'_>) -> Result, Error> { - self.index.cached_krate(name) + pub fn cached_krate( + &self, + name: KrateName<'_>, + lock: &FileLock, + ) -> Result, Error> { + self.index.cached_krate(name, lock) } /// Helper method for downloading multiples crates concurrently @@ -173,16 +187,18 @@ impl AsyncRemoteSparseIndex { krates: BTreeSet, write_cache_entries: bool, individual_timeout: Option, + lock: &FileLock, ) -> BTreeMap, Error>> { let mut tasks = tokio::task::JoinSet::new(); let mut results = BTreeMap::new(); for kname in krates { - match kname - .as_str() - .try_into() - .and_then(|name| Ok(self.index.make_remote_request(name, None)?.try_into()?)) - { + match kname.as_str().try_into().and_then(|name| { + Ok(self + .index + .make_remote_request(name, None, lock)? + .try_into()?) + }) { Ok(req) => { let client = self.client.clone(); tasks.spawn(async move { @@ -224,7 +240,7 @@ impl AsyncRemoteSparseIndex { .try_into() .expect("this was already validated"); self.index - .parse_remote_response(name, res, write_cache_entries) + .parse_remote_response(name, res, write_cache_entries, lock) }); results.lock().unwrap().insert(kname, res); @@ -244,11 +260,12 @@ impl AsyncRemoteSparseIndex { krates: BTreeSet, write_cache_entries: bool, individual_timeout: Option, + lock: &FileLock, ) -> Result, Error>>, tokio::runtime::TryCurrentError> { let current = tokio::runtime::Handle::try_current()?; Ok(current.block_on(async { - self.krates(krates, write_cache_entries, individual_timeout) + self.krates(krates, write_cache_entries, individual_timeout, lock) .await })) } diff --git a/src/utils.rs b/src/utils.rs index b0fca2d..c8f0fa2 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -3,10 +3,11 @@ use crate::{Error, InvalidUrl, InvalidUrlError, PathBuf}; +pub mod flock; #[cfg(feature = "git")] pub mod git; -/// Returns the storage directory (in utf-8) used by Cargo, often knowns as +/// Returns the storage directory (in utf-8) used by Cargo, often known as /// `.cargo` or `CARGO_HOME` #[inline] pub fn cargo_home() -> Result { diff --git a/src/utils/flock.rs b/src/utils/flock.rs new file mode 100644 index 0000000..87b4e3f --- /dev/null +++ b/src/utils/flock.rs @@ -0,0 +1,301 @@ +//! Provides facilities for file locks on unix and windows + +use crate::{Error, Path, PathBuf}; +use std::{fs, time::Duration}; + +#[cfg_attr(unix, path = "flock/unix.rs")] +#[cfg_attr(windows, path = "flock/windows.rs")] +mod sys; + +/// An error pertaining to a failed file lock +#[derive(Debug, thiserror::Error)] +#[error("failed to obtain lock file '{path}'")] +pub struct FileLockError { + /// The path of the file lock + pub path: PathBuf, + /// The underlying failure reason + pub source: LockError, +} + +/// Errors that can occur when attempting to acquire a [`FileLock`] +#[derive(Debug, thiserror::Error)] +pub enum LockError { + /// An I/O error occurred attempting to open the lock file + #[error(transparent)] + Open(std::io::Error), + /// Exclusive locks cannot be take on read-only file systems + #[error("attempted to take an exclusive lock on a read-only path")] + Readonly, + /// Failed to create parents directories to lock file + #[error("failed to create parent directories for lock path")] + CreateDir(std::io::Error), + /// Locking is not supported if the lock file is on an NFS, though note this + /// is a bit more nuanced as NFSv4 _does_ support file locking, but is out + /// of scope, at least for now + #[error("NFS do not support locking")] + Nfs, + /// This could happen on eg. _extremely_ old and outdated OSes or some filesystems + /// and is only present for completeness + #[error("locking is not supported on the filesystem and/or in the kernel")] + NotSupported, + /// An I/O error occurred attempting to un/lock the file + #[error("failed to acquire or release file lock")] + Lock(std::io::Error), + /// The lock could not be acquired within the caller provided timeout + #[error("failed to acquire lock within the specified duration")] + TimedOut, + /// The lock is currently held by another + #[error("the lock is currently held by another")] + Contested, +} + +/// Provides options for creating a [`FileLock`] +pub struct LockOptions<'pb> { + path: std::borrow::Cow<'pb, Path>, + exclusive: bool, + shared_fallback: bool, +} + +impl<'pb> LockOptions<'pb> { + /// Creates a new [`Self`] for the specified path + #[inline] + pub fn new(path: &'pb Path) -> Self { + Self { + path: path.into(), + exclusive: false, + shared_fallback: false, + } + } + + /// Creates a new [`Self`] for locking cargo's global package lock + /// + /// If specified, the path is used as the root, otherwise it is rooted at + /// the path determined by `$CARGO_HOME` + #[inline] + pub fn cargo_package_lock(root: Option) -> Result { + let mut path = if let Some(root) = root { + root + } else { + crate::utils::cargo_home()? + }; + path.push(".package-cache"); + + Ok(Self { + path: path.into(), + exclusive: false, + shared_fallback: false, + }) + } + + /// Will attempt to acquire a shared lock rather than an exclusive one + #[inline] + pub fn shared(mut self) -> Self { + self.exclusive = false; + self + } + + /// Will attempt to acquire an exclusive lock, which can optionally fallback + /// to a shared lock if the lock file is for a read only filesystem + #[inline] + pub fn exclusive(mut self, shared_fallback: bool) -> Self { + self.exclusive = true; + self.shared_fallback = shared_fallback; + self + } + + /// Attempts to acquire a lock, but fails immediately if the lock is currently + /// held + #[inline] + pub fn try_lock(&self) -> Result { + self.open_and_lock(Option:: Option>::None) + } + + /// Attempts to acquire a lock, waiting if the lock is currently held. + /// + /// Unlike [`Self::try_lock`], if the lock is currently held, the specified + /// callback is called to inform the caller that a wait is about to + /// be performed, then waits for the amount of time specified by the return + /// of the callback, or infinitely in the case of `None`. + #[inline] + pub fn lock(&self, wait: impl Fn(&Path) -> Option) -> Result { + self.open_and_lock(Some(wait)) + } + + fn open(&self, opts: &fs::OpenOptions) -> Result { + opts.open(self.path.as_std_path()).or_else(|err| { + if err.kind() == std::io::ErrorKind::NotFound && self.exclusive { + fs::create_dir_all(self.path.parent().unwrap()).map_err(|e| FileLockError { + path: self.path.parent().unwrap().to_owned(), + source: LockError::CreateDir(e), + })?; + self.open(opts) + } else { + // Note we just use the 30 EROFS constant here, which won't work on WASI, Haiku, or some other + // niche targets, but none of them are intended targets for this crate, but can be fixed later + // if someone actually uses them + let source = if err.kind() == std::io::ErrorKind::PermissionDenied + || cfg!(unix) && err.raw_os_error() == Some(30 /* EROFS */) + { + LockError::Readonly + } else { + LockError::Open(err) + }; + + Err(FileLockError { + path: self.path.as_ref().to_owned(), + source, + }) + } + }) + } + + fn open_and_lock( + &self, + wait: Option Option>, + ) -> Result { + let (state, file) = if self.exclusive { + match self.open(&sys::open_opts(true)) { + Ok(file) => (LockState::Exclusive, file), + Err(err) => { + // If the user requested it, check if the error is due to a read only error, + // and if so, fallback to a shared lock instead of an exclusive lock, just + // as cargo does + // + // https://github.com/rust-lang/cargo/blob/0b6cc3c75f1813df857fb54421edf7f8fee548e3/src/cargo/util/config/mod.rs#L1907-L1935 + if self.shared_fallback && matches!(err.source, LockError::Readonly) { + (LockState::Shared, self.open(&sys::open_opts(false))?) + } else { + return Err(err.into()); + } + } + } + } else { + (LockState::Shared, self.open(&sys::open_opts(false))?) + }; + + self.do_lock(state, &file, wait) + .map_err(|source| FileLockError { + path: self.path.as_ref().to_owned(), + source, + })?; + + Ok(FileLock { + file: Some(file), + state, + }) + } + + fn do_lock( + &self, + state: LockState, + file: &fs::File, + wait: Option Option>, + ) -> Result<(), LockError> { + #[cfg(all(target_os = "linux", not(target_env = "musl")))] + fn is_on_nfs_mount(path: &crate::Path) -> bool { + use std::os::unix::prelude::*; + + let path = match std::ffi::CString::new(path.as_os_str().as_bytes()) { + Ok(path) => path, + Err(_) => return false, + }; + + #[allow(unsafe_code)] + unsafe { + let mut buf: libc::statfs = std::mem::zeroed(); + let r = libc::statfs(path.as_ptr(), &mut buf); + + r == 0 && buf.f_type as u32 == libc::NFS_SUPER_MAGIC as u32 + } + } + + #[cfg(any(not(target_os = "linux"), target_env = "musl"))] + fn is_on_nfs_mount(_path: &crate::Path) -> bool { + false + } + + // File locking on Unix is currently implemented via `flock`, which is known + // to be broken on NFS. We could in theory just ignore errors that happen on + // NFS, but apparently the failure mode [1] for `flock` on NFS is **blocking + // forever**, even if the "non-blocking" flag is passed! + // + // As a result, we just skip all file locks entirely on NFS mounts. That + // should avoid calling any `flock` functions at all, and it wouldn't work + // there anyway. + // + // [1]: https://github.com/rust-lang/cargo/issues/2615 + if is_on_nfs_mount(&self.path) { + return Err(LockError::Nfs); + } + + match sys::try_lock(file, state) { + Ok(()) => return Ok(()), + + // In addition to ignoring NFS which is commonly not working we also + // just ignore locking on filesystems that look like they don't + // implement file locking. + Err(e) if sys::is_unsupported(&e) => return Err(LockError::NotSupported), + + Err(e) => { + if !sys::is_contended(&e) { + return Err(LockError::Lock(e)); + } + } + } + + // Signal to the caller that we are about to enter a blocking operation + // and whether they want to assign a timeout to it + if let Some(wait) = wait { + let timeout = wait(&self.path); + + sys::lock(file, state, timeout).map_err(|e| { + if sys::is_timed_out(&e) { + LockError::TimedOut + } else { + LockError::Lock(e) + } + }) + } else { + Err(LockError::Contested) + } + } +} + +#[derive(PartialEq, Copy, Clone, Debug)] +enum LockState { + Exclusive, + Shared, + Unlocked, +} + +/// A currently held file lock. +/// +/// The lock is released when this is dropped, or the program exits for any reason, +/// including `SIGKILL` or power loss +pub struct FileLock { + file: Option, + state: LockState, +} + +impl FileLock { + /// Creates a [`Self`] in an unlocked state. + /// + /// This allows for easy testing or use in situations where you don't care + /// about file locking, or have other ways to ensure something is uncontested + pub fn unlocked() -> Self { + Self { + file: None, + state: LockState::Unlocked, + } + } +} + +impl Drop for FileLock { + fn drop(&mut self) { + if self.state != LockState::Unlocked { + if let Some(f) = self.file.take() { + let _ = sys::unlock(&f); + } + } + } +} diff --git a/src/utils/flock/bindings.toml b/src/utils/flock/bindings.toml new file mode 100644 index 0000000..5b5ce1f --- /dev/null +++ b/src/utils/flock/bindings.toml @@ -0,0 +1,26 @@ +output = "win_bindings.rs" +binds = [ + "CloseHandle", + "CreateEventA", + "ERROR_INVALID_FUNCTION", + "ERROR_IO_PENDING", + "ERROR_LOCK_VIOLATION", + "FILE_FLAG_OVERLAPPED", + "INFINITE", + "LockFileEx", + "LOCKFILE_EXCLUSIVE_LOCK", + "LOCKFILE_FAIL_IMMEDIATELY", + "UnlockFile", + "WaitForSingleObject", + "WAIT_IO_COMPLETION", + "WAIT_OBJECT_0", + "WAIT_TIMEOUT", +] + +[bind-mode] +mode = "minwin" + +[bind-mode.config] +enum-style = "minwin" +fix-naming = true +use-rust-casing = true diff --git a/src/utils/flock/unix.rs b/src/utils/flock/unix.rs new file mode 100644 index 0000000..0808edc --- /dev/null +++ b/src/utils/flock/unix.rs @@ -0,0 +1,155 @@ +#![allow(unsafe_code)] + +use super::LockState; +use std::{fs::File, io::Error, os::unix::io::AsRawFd, time::Duration}; + +type Result = std::io::Result<()>; + +macro_rules! flock_flag { + ($state:expr) => { + match $state { + LockState::Shared => libc::LOCK_SH, + LockState::Exclusive => libc::LOCK_EX, + _ => unreachable!(), + } + }; +} + +macro_rules! error { + ($func:expr) => { + if $func != 0 { + return Err(Error::last_os_error()); + } + }; +} + +#[inline] +pub(super) fn open_opts(exclusive: bool) -> std::fs::OpenOptions { + let mut o = std::fs::OpenOptions::new(); + o.read(true); + + if exclusive { + o.write(true).create(true); + } + + o +} + +#[inline] +pub(super) fn try_lock(file: &File, state: LockState) -> Result { + flock(file, flock_flag!(state) | libc::LOCK_NB) +} + +#[inline] +pub(super) fn lock(file: &File, state: LockState, timeout: Option) -> Result { + if let Some(timeout) = timeout { + static SIG_HANDLER: std::sync::Once = std::sync::Once::new(); + + // Unfortunately due the global nature of signal handlers, we need to + // register a signal handler that just ignores the signal we use to kill + // the thread performing the lock if it is timed out, as otherwise the + // default signal handler will terminate the process, not just interrupt + // the one thread the signal was sent to. + SIG_HANDLER.call_once(|| { + unsafe { + let mut act: libc::sigaction = std::mem::zeroed(); + if libc::sigemptyset(&mut act.sa_mask) != 0 { + eprintln!("unable to clear action mask: {:?}", Error::last_os_error()); + return; + } + + unsafe extern "C" fn on_sig( + _sig: libc::c_int, + _info: *mut libc::siginfo_t, + _uc: *mut libc::c_void, + ) { + } + + act.sa_flags = libc::SA_SIGINFO; + act.sa_sigaction = on_sig as usize; + + // Register the action with the signal handler + if libc::sigaction(libc::SIGUSR1, &act, std::ptr::null_mut()) != 0 { + eprintln!( + "unable to register signal handler: {:?}", + Error::last_os_error() + ); + } + } + }); + + // We _could_ do this with a timer sending a SIGALRM to interupt the + // syscall, but that involves global state, and is just generally not + // nice, so we use a simpler approach of just spawning a thread and sending + // a signal to it ourselves. Less efficient probably, but IMO cleaner + let file_ptr = file as *const _ as usize; + let mut thread_id: libc::pthread_t = 0; + let tid = &mut thread_id as *mut _ as usize; + let (tx, rx) = std::sync::mpsc::channel(); + let lock_thread = std::thread::Builder::new() + .name("flock wait".into()) + .spawn(move || unsafe { + *(tid as *mut _) = libc::pthread_self(); + let res = flock(&*(file_ptr as *const _), flock_flag!(state)); + tx.send(res).unwrap(); + })?; + + match rx.recv_timeout(timeout) { + Ok(res) => { + let _ = lock_thread.join(); + res + } + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => { + // Send a signal to interrupt the lock syscall + // Note that there is an edge case here, where the thread could be + // finished and we just got unlucky by timing out at the exact + // same moment. According to https://man7.org/linux/man-pages/man3/pthread_kill.3.html, + // at least for glibc, pthread_kill _should_ set ESRCH if the thread + // has already finished, but other implementations _might_ just + // cause a SIGSEGV + unsafe { libc::pthread_kill(thread_id, libc::SIGUSR1) }; + + // Now that we've sent the signal, we can be fairly sure the thread + // syscall will finish/has already finished so we block this time + let res = rx.recv().unwrap(); + let _ = lock_thread.join(); + res + } + Err(_) => unreachable!(), + } + } else { + flock(file, flock_flag!(state)) + } +} + +#[inline] +pub(super) fn unlock(file: &File) -> Result { + flock(file, libc::LOCK_UN) +} + +#[inline] +fn flock(file: &File, flag: libc::c_int) -> Result { + error!(unsafe { libc::flock(file.as_raw_fd(), flag) }); + Ok(()) +} + +#[inline] +pub(super) fn is_unsupported(err: &std::io::Error) -> bool { + match err.raw_os_error() { + // Unfortunately, depending on the target, these may or may not be the same. + // For targets in which they are the same, the duplicate pattern causes a warning. + #[allow(unreachable_patterns)] + Some(libc::ENOTSUP | libc::EOPNOTSUPP | libc::ENOSYS) => true, + _ => false, + } +} + +#[inline] +pub(super) fn is_contended(err: &Error) -> bool { + err.raw_os_error().map_or(false, |x| x == libc::EWOULDBLOCK) +} + +#[inline] +pub(super) fn is_timed_out(err: &Error) -> bool { + err.raw_os_error().map_or(false, |x| x == libc::EINTR) +} diff --git a/src/utils/flock/win_bindings.rs b/src/utils/flock/win_bindings.rs new file mode 100644 index 0000000..da24ba3 --- /dev/null +++ b/src/utils/flock/win_bindings.rs @@ -0,0 +1,74 @@ +//! Bindings generated by `minwin` 0.1.0 +#![allow( + non_snake_case, + non_upper_case_globals, + non_camel_case_types, + clippy::upper_case_acronyms +)] +::windows_targets::link!( + "kernel32.dll" "system" "CloseHandle" fn close_handle(object : Handle) -> Bool +); +::windows_targets::link!( + "kernel32.dll" "system" "CreateEventA" fn create_event_a(event_attributes : * const + SecurityAttributes, manual_reset : Bool, initial_state : Bool, name : Pcstr) -> + Handle +); +::windows_targets::link!( + "kernel32.dll" "system" "LockFileEx" fn lock_file_ex(file : Handle, flags : + LockFileFlags::Enum, reserved : u32, number_of_bytes_to_lock_low : u32, + number_of_bytes_to_lock_high : u32, overlapped : * mut Overlapped) -> Bool +); +::windows_targets::link!( + "kernel32.dll" "system" "UnlockFile" fn unlock_file(file : Handle, file_offset_low : + u32, file_offset_high : u32, number_of_bytes_to_unlock_low : u32, + number_of_bytes_to_unlock_high : u32) -> Bool +); +::windows_targets::link!( + "kernel32.dll" "system" "WaitForSingleObject" fn wait_for_single_object(handle : + Handle, milliseconds : u32) -> Win32Error::Enum +); +pub const Infinite: u32 = 4294967295; +pub type Bool = i32; +pub mod FileFlagsAndAttributes { + pub type Enum = u32; + pub const FileFlagOverlapped: Enum = 1073741824; +} +pub type Handle = isize; +pub mod LockFileFlags { + pub type Enum = u32; + pub const LockfileFailImmediately: Enum = 1; + pub const LockfileExclusiveLock: Enum = 2; +} +#[repr(C)] +pub struct Overlapped { + pub internal: usize, + pub internal_high: usize, + pub anonymous: Overlapped_0, + pub event: Handle, +} +#[repr(C)] +pub union Overlapped_0 { + pub anonymous: ::std::mem::ManuallyDrop, + pub pointer: *mut ::core::ffi::c_void, +} +#[repr(C)] +pub struct Overlapped_0_0 { + pub offset: u32, + pub offset_high: u32, +} +pub type Pcstr = *const u8; +#[repr(C)] +pub struct SecurityAttributes { + pub length: u32, + pub security_descriptor: *mut ::core::ffi::c_void, + pub inherit_handle: Bool, +} +pub mod Win32Error { + pub type Enum = u32; + pub const WaitObject0: Enum = 0; + pub const ErrorInvalidFunction: Enum = 1; + pub const ErrorLockViolation: Enum = 33; + pub const WaitIoCompletion: Enum = 192; + pub const WaitTimeout: Enum = 258; + pub const ErrorIoPending: Enum = 997; +} diff --git a/src/utils/flock/windows.rs b/src/utils/flock/windows.rs new file mode 100644 index 0000000..f51d1ca --- /dev/null +++ b/src/utils/flock/windows.rs @@ -0,0 +1,138 @@ +#![allow(unsafe_code)] + +//! Support for windows file locking. This implementation mainly pulls from +//! +//! in addition to cargo + +use super::LockState; +use std::{fs::File, io::Error, os::windows::io::AsRawHandle, time::Duration}; + +type Result = std::io::Result<()>; + +#[path = "win_bindings.rs"] +mod bindings; + +use bindings::*; + +macro_rules! flock_flag { + ($state:expr) => { + match $state { + LockState::Shared => 0, + LockState::Exclusive => LockFileFlags::LockfileExclusiveLock, + _ => unreachable!(), + } + }; +} + +#[inline] +pub(super) fn open_opts(exclusive: bool) -> std::fs::OpenOptions { + let mut o = std::fs::OpenOptions::new(); + o.read(true); + + if exclusive { + o.write(true).create(true); + } + + // Since we do async I/O with waits, we need to open the file with overlapped + // as otherwise LockFileEx will just hang until it can take the lock + use std::os::windows::fs::OpenOptionsExt; + o.custom_flags(FileFlagsAndAttributes::FileFlagOverlapped); + + o +} + +#[inline] +pub(super) fn try_lock(file: &File, state: LockState) -> Result { + flock( + file, + flock_flag!(state) | LockFileFlags::LockfileFailImmediately, + None, + ) +} + +#[inline] +pub(super) fn lock(file: &File, state: LockState, timeout: Option) -> Result { + flock(file, flock_flag!(state), timeout) +} + +fn flock(file: &File, flags: u32, timeout: Option) -> Result { + unsafe { + let mut overlapped: Overlapped = std::mem::zeroed(); + overlapped.event = create_event_a(std::ptr::null(), 0, 0, std::ptr::null()); + + if overlapped.event == 0 { + return Err(Error::last_os_error()); + } + + let res = if lock_file_ex( + file.as_raw_handle() as Handle, + flags, + 0, + !0, + !0, + &mut overlapped, + ) == 0 + { + let err = Error::last_os_error(); + + if err + .raw_os_error() + .map_or(false, |x| x == Win32Error::ErrorIoPending as i32) + { + let timeout = timeout.map_or(0, |dur| { + let millis = dur.as_millis(); + if millis >= Infinite as u128 { + u32::MAX + } else { + millis as u32 + } + }); + + match wait_for_single_object(overlapped.event, timeout) { + Win32Error::WaitObject0 => Ok(()), + Win32Error::WaitTimeout => { + Err(Error::from_raw_os_error(Win32Error::WaitTimeout as _)) + } + _ => Err(Error::last_os_error()), + } + } else { + Err(err) + } + } else { + Ok(()) + }; + + close_handle(overlapped.event); + res + } +} + +pub(super) fn unlock(file: &File) -> Result { + unsafe { + let ret = unlock_file(file.as_raw_handle() as Handle, 0, 0, !0, !0); + if ret == 0 { + Err(Error::last_os_error()) + } else { + Ok(()) + } + } +} + +#[inline] +pub(super) fn is_contended(err: &Error) -> bool { + err.raw_os_error() + .map_or(false, |x| x == Win32Error::ErrorLockViolation as i32) +} + +#[inline] +pub(super) fn is_unsupported(err: &Error) -> bool { + err.raw_os_error() + .map_or(false, |x| x == Win32Error::ErrorInvalidFunction as i32) +} + +#[inline] +pub(super) fn is_timed_out(err: &Error) -> bool { + err.raw_os_error().map_or(false, |x| { + x == Win32Error::WaitTimeout as i32 || x == Win32Error::WaitIoCompletion as i32 + }) +} diff --git a/tests/cache.rs b/tests/cache.rs index 77b27ec..56454c0 100644 --- a/tests/cache.rs +++ b/tests/cache.rs @@ -11,9 +11,10 @@ use tame_index::{index::cache::ValidCacheEntry, utils::get_index_details, IndexC fn parses_current_cargo_cache() { let (path, _url) = get_index_details(tame_index::CRATES_IO_HTTP_INDEX, None).unwrap(); let cache = IndexCache::at_path(path); + let lock = &utils::unlocked(); let cached = cache - .read_cache_file("camino".try_into().unwrap()) + .read_cache_file("camino".try_into().unwrap(), lock) .expect("failed to read cache file") .expect("cache file not found"); @@ -49,9 +50,10 @@ fn parses_current_cargo_cache() { fn serializes_current_cargo_cache() { let (path, _url) = get_index_details(tame_index::CRATES_IO_HTTP_INDEX, None).unwrap(); let cache = IndexCache::at_path(path); + let lock = &utils::unlocked(); let cargos = cache - .read_cache_file("camino".try_into().unwrap()) + .read_cache_file("camino".try_into().unwrap(), lock) .expect("failed to read cache file") .expect("cache file not found"); diff --git a/tests/flock.rs b/tests/flock.rs new file mode 100644 index 0000000..2191d0f --- /dev/null +++ b/tests/flock.rs @@ -0,0 +1,172 @@ +use std::time::Duration; +use tame_index::utils::flock::LockOptions; + +mod utils; + +enum LockKind { + Exclusive, + Shared, +} + +impl LockKind { + fn as_str(&self) -> &'static str { + match self { + Self::Exclusive => "exclusive", + Self::Shared => "shared", + } + } +} + +fn spawn(kind: LockKind, path: &tame_index::Path) -> std::process::Child { + let mut cmd = std::process::Command::new("cargo"); + cmd.env("RUST_BACKTRACE", "1") + .args([ + "run", + "-q", + "--manifest-path", + "tests/flock/Cargo.toml", + "--", + ]) + .stdout(std::process::Stdio::piped()) + .arg(kind.as_str()) + .arg(path); + + let mut child = cmd.spawn().expect("failed to spawn flock"); + + // Wait for the child to actually take the lock + { + use std::io::Read; + let mut output = child.stdout.take().unwrap(); + + let mut buff = [0u8; 4]; + output.read_exact(&mut buff).expect("failed to read output"); + + assert_eq!( + '🔒', + char::from_u32(u32::from_le_bytes(buff)).expect("invalid char") + ); + } + + child +} + +fn kill(mut child: std::process::Child) { + child.kill().expect("failed to kill child"); + child.wait().expect("failed to wait for child"); +} + +/// Validates we can take a lock we know is uncontended +#[test] +fn can_take_lock() { + let td = utils::tempdir(); + let ctl = td.path().join("can-take-lock"); + + let lo = LockOptions::new(&ctl).exclusive(false); + + let _lf = lo + .lock(|_p| unreachable!("lock is uncontested")) + .expect("failed to acquire lock"); +} + +/// Validates we can create parent directories for a lock file if they don't exist +#[test] +fn can_take_lock_in_non_existant_directory() { + let td = utils::tempdir(); + let ctl = td.path().join("sub/dir/can-take-lock"); + + let lo = LockOptions::new(&ctl).exclusive(false); + + let _lf = lo.try_lock().expect("failed to acquire lock"); +} + +/// Validates we can take multiple shared locks of the same file +#[test] +fn can_take_shared_lock() { + let td = utils::tempdir(); + let ctl = td.path().join("can-take-shared-lock"); + + let _ = std::fs::OpenOptions::new() + .create(true) + .write(true) + .open(&ctl) + .expect("failed to create lock file"); + + let child = spawn(LockKind::Shared, &ctl); + + let mut lo = LockOptions::new(&ctl); + + lo = lo.shared(); + lo.try_lock().expect("failed to acquire shared lock"); + + lo = lo.exclusive(false); + if lo.try_lock().is_ok() { + panic!("we acquired an exclusive lock but we shouldn't have been able to"); + } + + kill(child); + + lo.try_lock().expect("failed to acquire exclusive lock"); +} + +/// Validates we can wait for a lock to be released +#[test] +fn waits_lock() { + let td = utils::tempdir(); + let ctl = td.path().join("waits-lock"); + + let _ = std::fs::OpenOptions::new() + .create(true) + .write(true) + .open(&ctl) + .expect("failed to create lock file"); + + let child = spawn(LockKind::Exclusive, &ctl); + + std::thread::scope(|s| { + s.spawn(|| { + LockOptions::new(&ctl) + .lock(|_p| { + println!("waiting on lock"); + Some(Duration::from_millis(200)) + }) + .expect("failed to acquire shared lock"); + }); + s.spawn(|| { + std::thread::sleep(Duration::from_millis(100)); + kill(child); + }); + }); +} + +/// Ensures we can timeout if it takes too long to acquire the lock +#[test] +fn wait_lock_times_out() { + let td = utils::tempdir(); + let ctl = td.path().join("wait-lock-times-out"); + + let _ = std::fs::OpenOptions::new() + .create(true) + .write(true) + .open(&ctl) + .expect("failed to create lock file"); + + let child = spawn(LockKind::Exclusive, &ctl); + + if let Err(err) = LockOptions::new(&ctl).shared().lock(|_p| { + println!("waiting on lock"); + Some(Duration::from_millis(100)) + }) { + let tame_index::Error::Lock(le) = err else { + panic!("unexpected error type {err:#?}"); + }; + + assert!(matches!( + le.source, + tame_index::utils::flock::LockError::TimedOut + )); + } else { + panic!("we should not be able to take the lock"); + } + + kill(child); +} diff --git a/tests/flock/Cargo.toml b/tests/flock/Cargo.toml new file mode 100644 index 0000000..c050e6e --- /dev/null +++ b/tests/flock/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "flock" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +tame-index = { path = "../.." } diff --git a/tests/flock/src/main.rs b/tests/flock/src/main.rs new file mode 100644 index 0000000..3c45801 --- /dev/null +++ b/tests/flock/src/main.rs @@ -0,0 +1,32 @@ +#![allow(missing_docs)] + +use tame_index::utils::flock; + +fn main() { + let mut args = std::env::args().skip(1); + let kind = args.next().unwrap(); + let path = args.next().unwrap(); + + let lo = flock::LockOptions::new(tame_index::Path::new(&path)); + + let lo = match kind.as_str() { + "shared" => lo.shared(), + "exclusive" => lo.exclusive(false), + _ => panic!("unknown lock kind '{kind}'"), + }; + + let _fl = lo.try_lock().expect("failed to acquire lock"); + { + use std::io::Write; + let mut stdout = std::io::stdout(); + stdout.write(&('🔒' as u32).to_le_bytes()).unwrap(); + stdout.flush().unwrap(); + } + + // If the test that spawned this process fails it won't reap this process, so + // don't loop forever + std::thread::sleep(std::time::Duration::from_secs(30)); + + // Unnecessary, we shouldn't ever get here unless the test that called us failed + std::process::exit(1); +} diff --git a/tests/git.rs b/tests/git.rs index b2bc9db..a9db8d0 100644 --- a/tests/git.rs +++ b/tests/git.rs @@ -15,6 +15,7 @@ fn remote_index( root: IndexPath::Exact(path.as_ref().to_owned()), }) .unwrap(), + &utils::unlocked(), ) .unwrap() } @@ -251,11 +252,12 @@ impl FakeRemote { #[test] fn clones_new() { let remote = FakeRemote::new(); + let lock = &utils::unlocked(); let (rgi, _td) = remote.local(); assert!(rgi - .cached_krate("clones_new".try_into().unwrap()) + .cached_krate("clones_new".try_into().unwrap(), lock) .unwrap() .is_none()); } @@ -264,6 +266,7 @@ fn clones_new() { #[test] fn opens_existing() { let mut remote = FakeRemote::new(); + let lock = &utils::unlocked(); let krate = utils::fake_krate("opens-existing", 4); let expected_head = remote.commit(&krate); @@ -278,7 +281,7 @@ fn opens_existing() { // This should not be in the cache assert_eq!( first - .krate("opens-existing".try_into().unwrap(), true) + .krate("opens-existing".try_into().unwrap(), true, lock) .expect("failed to read git blob") .expect("expected krate"), krate, @@ -294,7 +297,7 @@ fn opens_existing() { // This should be in the cache as it is file based not memory based assert_eq!( first - .cached_krate("opens-existing".try_into().unwrap()) + .cached_krate("opens-existing".try_into().unwrap(), lock) .expect("failed to read cache file") .expect("expected cached krate"), krate, @@ -305,6 +308,7 @@ fn opens_existing() { #[test] fn updates_cache() { let mut remote = FakeRemote::new(); + let lock = &utils::unlocked(); let krate = utils::fake_krate("updates-cache", 4); let expected_head = remote.commit(&krate); @@ -318,14 +322,14 @@ fn updates_cache() { // This should not be in the cache assert_eq!( - rgi.krate("updates-cache".try_into().unwrap(), true) + rgi.krate("updates-cache".try_into().unwrap(), true, lock) .expect("failed to read git blob") .expect("expected krate"), krate, ); assert_eq!( - rgi.cached_krate("updates-cache".try_into().unwrap()) + rgi.cached_krate("updates-cache".try_into().unwrap(), lock) .expect("failed to read cache file") .expect("expected krate"), krate, @@ -337,6 +341,7 @@ fn updates_cache() { #[test] fn fetch_invalidates_cache() { let mut remote = FakeRemote::new(); + let lock = &utils::unlocked(); let krate = utils::fake_krate("invalidates-cache", 4); let same = utils::fake_krate("will-be-cached", 2); @@ -352,13 +357,13 @@ fn fetch_invalidates_cache() { // These should not be in the cache assert_eq!( - rgi.krate("invalidates-cache".try_into().unwrap(), true) + rgi.krate("invalidates-cache".try_into().unwrap(), true, lock) .expect("failed to read git blob") .expect("expected krate"), krate, ); assert_eq!( - rgi.krate("will-be-cached".try_into().unwrap(), true) + rgi.krate("will-be-cached".try_into().unwrap(), true, lock) .expect("failed to read git blob") .expect("expected krate"), same, @@ -369,20 +374,20 @@ fn fetch_invalidates_cache() { let new_head = remote.commit(&new_krate); assert_eq!( - rgi.cached_krate("invalidates-cache".try_into().unwrap()) + rgi.cached_krate("invalidates-cache".try_into().unwrap(), lock) .expect("failed to read cache file") .expect("expected krate"), krate, ); assert_eq!( - rgi.cached_krate("will-be-cached".try_into().unwrap()) + rgi.cached_krate("will-be-cached".try_into().unwrap(), lock) .expect("failed to read cache file") .expect("expected krate"), same, ); // Perform fetch, which should invalidate the cache - rgi.fetch().unwrap(); + rgi.fetch(lock).unwrap(); assert_eq!( rgi.local().head_commit().unwrap(), @@ -390,12 +395,12 @@ fn fetch_invalidates_cache() { ); assert!(rgi - .cached_krate("invalidates-cache".try_into().unwrap()) + .cached_krate("invalidates-cache".try_into().unwrap(), lock) .unwrap() .is_none()); assert_eq!( - rgi.krate("invalidates-cache".try_into().unwrap(), true) + rgi.krate("invalidates-cache".try_into().unwrap(), true, lock) .expect("failed to read git blob") .expect("expected krate"), new_krate, @@ -403,7 +408,7 @@ fn fetch_invalidates_cache() { // This crate _should_ still be cached as it was not changed in the fetch assert_eq!( - rgi.cached_krate("will-be-cached".try_into().unwrap()) + rgi.cached_krate("will-be-cached".try_into().unwrap(), lock) .expect("failed to read cache file") .expect("expected krate"), same, @@ -411,10 +416,10 @@ fn fetch_invalidates_cache() { // We haven't made new commits, so the fetch should not move HEAD and thus // cache entries should still be valid - rgi.fetch().unwrap(); + rgi.fetch(lock).unwrap(); assert_eq!( - rgi.cached_krate("invalidates-cache".try_into().unwrap()) + rgi.cached_krate("invalidates-cache".try_into().unwrap(), lock) .unwrap() .unwrap(), new_krate @@ -426,7 +431,7 @@ fn fetch_invalidates_cache() { let krate4 = utils::fake_krate("krate-4", 4); let expected_head = remote.commit(&krate4); - rgi.fetch().unwrap(); + rgi.fetch(lock).unwrap(); assert_eq!( rgi.local().head_commit().unwrap(), @@ -434,13 +439,13 @@ fn fetch_invalidates_cache() { ); assert_eq!( - rgi.cached_krate("invalidates-cache".try_into().unwrap()) + rgi.cached_krate("invalidates-cache".try_into().unwrap(), lock) .expect("failed to read cache file") .expect("expected krate"), new_krate, ); assert_eq!( - rgi.cached_krate("will-be-cached".try_into().unwrap()) + rgi.cached_krate("will-be-cached".try_into().unwrap(), lock) .expect("failed to read cache file") .expect("expected krate"), same, @@ -522,14 +527,15 @@ fn non_main_local_branch() { } let mut rgi = remote_index(&local_td, &remote.td); + let lock = &utils::unlocked(); let first = utils::fake_krate("first", 1); remote.commit(&first); - rgi.fetch().unwrap(); + rgi.fetch(lock).unwrap(); assert_eq!( - rgi.krate("first".try_into().unwrap(), true) + rgi.krate("first".try_into().unwrap(), true, lock) .unwrap() .unwrap(), first diff --git a/tests/local.rs b/tests/local.rs index 5d1c483..6a0faf0 100644 --- a/tests/local.rs +++ b/tests/local.rs @@ -27,13 +27,15 @@ fn builds_local_registry() { versions: Vec, } + let lock = tame_index::utils::flock::FileLock::unlocked(); + for pkg in &md.packages { if pkg.name == "tame-index" { continue; } let ip = krates.entry(pkg.name.clone()).or_insert_with(|| { let ik = sparse - .cached_krate(pkg.name.as_str().try_into().unwrap()) + .cached_krate(pkg.name.as_str().try_into().unwrap(), &lock) .unwrap() .unwrap(); diff --git a/tests/sparse.rs b/tests/sparse.rs index e762a2b..4b38d6d 100644 --- a/tests/sparse.rs +++ b/tests/sparse.rs @@ -27,9 +27,10 @@ fn opens_crates_io() { #[test] fn make_request_without_cache() { let index = crates_io(env!("CARGO_MANIFEST_DIR")); + let lock = &utils::unlocked(); let req = index - .make_remote_request("serde".try_into().unwrap(), None) + .make_remote_request("serde".try_into().unwrap(), None, lock) .unwrap(); let hdrs = req.headers(); @@ -50,18 +51,18 @@ const DATE: &str = "Thu, 22 Oct 2023 09:40:03 GMT"; #[test] fn make_request_with_cache() { let td = utils::tempdir(); - let index = crates_io(&td); + let lock = &utils::unlocked(); { let etag_krate = utils::fake_krate("etag-krate", 2); index .cache() - .write_to_cache(&etag_krate, &format!("{}: {ETAG}", header::ETAG)) + .write_to_cache(&etag_krate, &format!("{}: {ETAG}", header::ETAG), lock) .unwrap(); let req = index - .make_remote_request("etag-krate".try_into().unwrap(), None) + .make_remote_request("etag-krate".try_into().unwrap(), None, lock) .unwrap(); assert_eq!(req.headers().get(header::IF_NONE_MATCH).unwrap(), ETAG); @@ -69,7 +70,7 @@ fn make_request_with_cache() { { let req = index - .make_remote_request("etag-specified-krate".try_into().unwrap(), Some(ETAG)) + .make_remote_request("etag-specified-krate".try_into().unwrap(), Some(ETAG), lock) .unwrap(); assert_eq!(req.headers().get(header::IF_NONE_MATCH).unwrap(), ETAG); @@ -82,11 +83,12 @@ fn make_request_with_cache() { .write_to_cache( &modified_krate, &format!("{}: {DATE}", header::LAST_MODIFIED), + lock, ) .unwrap(); let req = index - .make_remote_request("modified-krate".try_into().unwrap(), None) + .make_remote_request("modified-krate".try_into().unwrap(), None, lock) .unwrap(); assert_eq!(req.headers().get(header::IF_MODIFIED_SINCE).unwrap(), DATE); @@ -98,11 +100,12 @@ fn make_request_with_cache() { fn parse_unmodified_response() { let td = utils::tempdir(); let index = crates_io(&td); + let lock = &utils::unlocked(); let etag_krate = utils::fake_krate("etag-krate", 2); index .cache() - .write_to_cache(&etag_krate, &format!("{}: {ETAG}", header::ETAG)) + .write_to_cache(&etag_krate, &format!("{}: {ETAG}", header::ETAG), lock) .unwrap(); let response = http::Response::builder() @@ -112,7 +115,7 @@ fn parse_unmodified_response() { .unwrap(); let cached_krate = index - .parse_remote_response("etag-krate".try_into().unwrap(), response, true) + .parse_remote_response("etag-krate".try_into().unwrap(), response, true, lock) .unwrap() .expect("cached krate"); @@ -124,6 +127,7 @@ fn parse_unmodified_response() { fn parse_modified_response() { let td = utils::tempdir(); let index = crates_io(&td); + let lock = &utils::unlocked(); { let etag_krate = utils::fake_krate("etag-krate", 3); @@ -137,7 +141,7 @@ fn parse_modified_response() { .unwrap(); let new_krate = index - .parse_remote_response("etag-krate".try_into().unwrap(), response, true) + .parse_remote_response("etag-krate".try_into().unwrap(), response, true, lock) .unwrap() .expect("new response"); @@ -148,6 +152,7 @@ fn parse_modified_response() { .cached_krate( "etag-krate".try_into().unwrap(), Some(&format!("{}: {ETAG}", header::ETAG)), + lock, ) .unwrap() .expect("cached krate"); @@ -167,7 +172,7 @@ fn parse_modified_response() { .unwrap(); let new_krate = index - .parse_remote_response("modified-krate".try_into().unwrap(), response, true) + .parse_remote_response("modified-krate".try_into().unwrap(), response, true, lock) .unwrap() .expect("new response"); @@ -178,6 +183,7 @@ fn parse_modified_response() { .cached_krate( "modified-krate".try_into().unwrap(), Some(&format!("{}: {DATE}", header::LAST_MODIFIED)), + lock, ) .unwrap() .expect("cached krate"); @@ -192,13 +198,14 @@ fn parse_modified_response() { fn end_to_end() { let td = utils::tempdir(); let index = crates_io(&td); + let lock = &utils::unlocked(); let client = reqwest::blocking::Client::builder().build().unwrap(); let rsi = tame_index::index::RemoteSparseIndex::new(index, client); let spdx_krate = rsi - .krate("spdx".try_into().unwrap(), true) + .krate("spdx".try_into().unwrap(), true, lock) .expect("failed to retrieve spdx") .expect("failed to find spdx"); diff --git a/tests/utils.rs b/tests/utils.rs index 8ba3a88..9cabc3a 100644 --- a/tests/utils.rs +++ b/tests/utils.rs @@ -47,6 +47,11 @@ pub fn tempdir() -> TempDir { TempDir::default() } +#[inline] +pub fn unlocked() -> tame_index::index::FileLock { + tame_index::index::FileLock::unlocked() +} + pub fn fake_krate(name: &str, num_versions: u8) -> IndexKrate { assert!(num_versions > 0); let mut version = semver::Version::new(0, 0, 0);