From f27c34e4bda77d256fd7e093a535bdf116047c8c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 31 Aug 2023 22:43:50 +0100 Subject: [PATCH 1/6] Make LocalFileSystem::copy atomic (#4758) --- object_store/src/local.rs | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 4d57ef1b79e1..18f790e184a3 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -586,7 +586,19 @@ impl ObjectStore for LocalFileSystem { let to = self.config.path_to_filesystem(to)?; maybe_spawn_blocking(move || { - std::fs::copy(&from, &to).context(UnableToCopyFileSnafu { from, to })?; + let mut id = 0; + loop { + let staged = staged_upload_path(&to, &id.to_string()); + if let Err(source) = std::fs::hard_link(&from, &staged) { + if source.kind() == ErrorKind::AlreadyExists { + id += 1; + continue; + } + break Err(source); + } + break std::fs::rename(&staged, &to); + } + .context(UnableToCopyFileSnafu { from, to })?; Ok(()) }) .await @@ -608,7 +620,7 @@ impl ObjectStore for LocalFileSystem { maybe_spawn_blocking(move || { std::fs::hard_link(&from, &to).map_err(|err| match err.kind() { - io::ErrorKind::AlreadyExists => Error::AlreadyExists { + ErrorKind::AlreadyExists => Error::AlreadyExists { path: to.to_str().unwrap().to_string(), source: err, } From 2f02e2120267dca79a19d4ffd153eb53c68c33ab Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 31 Aug 2023 23:19:16 +0100 Subject: [PATCH 2/6] Create sub-directories for copy (#4760) --- object_store/src/lib.rs | 20 ++++++- object_store/src/local.rs | 114 ++++++++++++++++++++------------------ 2 files changed, 78 insertions(+), 56 deletions(-) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 7496b589cd8a..d1ee83b64d7b 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1105,8 +1105,24 @@ mod tests { files.sort_unstable(); assert_eq!(files, vec![emoji_file.clone(), dst.clone()]); + let dst2 = Path::from("new/nested/foo.parquet"); + storage.copy(&emoji_file, &dst2).await.unwrap(); + let mut files = flatten_list_stream(storage, None).await.unwrap(); + files.sort_unstable(); + assert_eq!(files, vec![emoji_file.clone(), dst.clone(), dst2.clone()]); + + let dst3 = Path::from("new/nested2/bar.parquet"); + storage.rename(&dst, &dst3).await.unwrap(); + let mut files = flatten_list_stream(storage, None).await.unwrap(); + files.sort_unstable(); + assert_eq!(files, vec![emoji_file.clone(), dst2.clone(), dst3.clone()]); + + let err = storage.head(&dst).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. })); + storage.delete(&emoji_file).await.unwrap(); - storage.delete(&dst).await.unwrap(); + storage.delete(&dst3).await.unwrap(); + storage.delete(&dst2).await.unwrap(); let files = flatten_list_stream(storage, Some(&emoji_prefix)) .await .unwrap(); @@ -1605,7 +1621,7 @@ mod tests { pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) { // Create two objects let path1 = Path::from("test1"); - let path2 = Path::from("test2"); + let path2 = Path::from("not_exists_nested/test2"); let contents1 = Bytes::from("cats"); let contents2 = Bytes::from("dogs"); diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 18f790e184a3..a91bae50adb3 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -28,7 +28,7 @@ use chrono::{DateTime, Utc}; use futures::future::BoxFuture; use futures::{stream::BoxStream, StreamExt}; use futures::{FutureExt, TryStreamExt}; -use snafu::{ensure, OptionExt, ResultExt, Snafu}; +use snafu::{ensure, ResultExt, Snafu}; use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions}; use std::io::{ErrorKind, Read, Seek, SeekFrom, Write}; use std::ops::Range; @@ -78,10 +78,10 @@ pub(crate) enum Error { path: PathBuf, }, - #[snafu(display("Unable to create file {}: {}", path.display(), err))] + #[snafu(display("Unable to create file {}: {}", path.display(), source))] UnableToCreateFile { + source: io::Error, path: PathBuf, - err: io::Error, }, #[snafu(display("Unable to delete file {}: {}", path.display(), source))] @@ -336,12 +336,13 @@ impl ObjectStore for LocalFileSystem { // If the file was successfully opened, return it wrapped in a boxed `AsyncWrite` trait object. Ok(file) => return Ok(Box::new(file)), // If the error is that the file was not found, attempt to create the file and any necessary parent directories. - Err(err) if err.kind() == ErrorKind::NotFound => { + Err(source) if source.kind() == ErrorKind::NotFound => { // Get the path to the parent directory of the file. - let parent = path - .parent() - // If the parent directory does not exist, return a `UnableToCreateFileSnafu` error. - .context(UnableToCreateFileSnafu { path: &path, err })?; + let parent = + path.parent().ok_or_else(|| Error::UnableToCreateFile { + path: path.to_path_buf(), + source, + })?; // Create the parent directory and any necessary ancestors. tokio::fs::create_dir_all(parent) @@ -584,22 +585,19 @@ impl ObjectStore for LocalFileSystem { async fn copy(&self, from: &Path, to: &Path) -> Result<()> { let from = self.config.path_to_filesystem(from)?; let to = self.config.path_to_filesystem(to)?; - - maybe_spawn_blocking(move || { - let mut id = 0; - loop { - let staged = staged_upload_path(&to, &id.to_string()); - if let Err(source) = std::fs::hard_link(&from, &staged) { - if source.kind() == ErrorKind::AlreadyExists { - id += 1; - continue; - } - break Err(source); - } - break std::fs::rename(&staged, &to); + let mut id = 0; + maybe_spawn_blocking(move || loop { + let staged = staged_upload_path(&to, &id.to_string()); + match std::fs::hard_link(&from, &staged) + .and_then(|_| std::fs::rename(&staged, &to)) + { + Ok(_) => return Ok(()), + Err(source) => match source.kind() { + ErrorKind::AlreadyExists => id += 1, + ErrorKind::NotFound => create_parent_dirs(&to, source)?, + _ => return Err(Error::UnableToCopyFile { from, to, source }.into()), + }, } - .context(UnableToCopyFileSnafu { from, to })?; - Ok(()) }) .await } @@ -607,9 +605,14 @@ impl ObjectStore for LocalFileSystem { async fn rename(&self, from: &Path, to: &Path) -> Result<()> { let from = self.config.path_to_filesystem(from)?; let to = self.config.path_to_filesystem(to)?; - maybe_spawn_blocking(move || { - std::fs::rename(&from, &to).context(UnableToCopyFileSnafu { from, to })?; - Ok(()) + maybe_spawn_blocking(move || loop { + match std::fs::rename(&from, &to) { + Ok(_) => return Ok(()), + Err(source) => match source.kind() { + ErrorKind::NotFound => create_parent_dirs(&to, source)?, + _ => return Err(Error::UnableToCopyFile { from, to, source }.into()), + }, + } }) .await } @@ -618,25 +621,37 @@ impl ObjectStore for LocalFileSystem { let from = self.config.path_to_filesystem(from)?; let to = self.config.path_to_filesystem(to)?; - maybe_spawn_blocking(move || { - std::fs::hard_link(&from, &to).map_err(|err| match err.kind() { - ErrorKind::AlreadyExists => Error::AlreadyExists { - path: to.to_str().unwrap().to_string(), - source: err, - } - .into(), - _ => Error::UnableToCopyFile { - from, - to, - source: err, - } - .into(), - }) + maybe_spawn_blocking(move || loop { + match std::fs::hard_link(&from, &to) { + Ok(_) => return Ok(()), + Err(source) => match source.kind() { + ErrorKind::AlreadyExists => { + return Err(Error::AlreadyExists { + path: to.to_str().unwrap().to_string(), + source, + } + .into()) + } + ErrorKind::NotFound => create_parent_dirs(&to, source)?, + _ => return Err(Error::UnableToCopyFile { from, to, source }.into()), + }, + } }) .await } } +/// Creates the parent directories of `path` or returns an error based on `source` if no parent +fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> { + let parent = path.parent().ok_or_else(|| Error::UnableToCreateFile { + path: path.to_path_buf(), + source, + })?; + + std::fs::create_dir_all(parent).context(UnableToCreateDirSnafu { path: parent })?; + Ok(()) +} + /// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `suffix` /// /// Creates any directories if necessary @@ -648,20 +663,11 @@ fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> { let mut options = OpenOptions::new(); match options.read(true).write(true).create_new(true).open(&path) { Ok(f) => return Ok((f, suffix)), - Err(e) if e.kind() == ErrorKind::AlreadyExists => { - multipart_id += 1; - } - Err(err) if err.kind() == ErrorKind::NotFound => { - let parent = path - .parent() - .context(UnableToCreateFileSnafu { path: &path, err })?; - - std::fs::create_dir_all(parent) - .context(UnableToCreateDirSnafu { path: parent })?; - - continue; - } - Err(source) => return Err(Error::UnableToOpenFile { source, path }.into()), + Err(source) => match source.kind() { + ErrorKind::AlreadyExists => multipart_id += 1, + ErrorKind::NotFound => create_parent_dirs(&path, source)?, + _ => return Err(Error::UnableToOpenFile { source, path }.into()), + }, } } } From ebbc718d41ded20ab8176efea388c9796da795ca Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 1 Sep 2023 11:28:03 +0100 Subject: [PATCH 3/6] Fix HttpStore --- object_store/src/http/client.rs | 51 +++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index 1d3df34db9d1..bc0176f3cfed 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -256,31 +256,38 @@ impl Client { } pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> { - let from = self.path_url(from); - let to = self.path_url(to); - let method = Method::from_bytes(b"COPY").unwrap(); - - let mut builder = self - .client - .request(method, from) - .header("Destination", to.as_str()); + let mut retry = false; + loop { + let method = Method::from_bytes(b"COPY").unwrap(); - if !overwrite { - builder = builder.header("Overwrite", "F"); - } + let mut builder = self + .client + .request(method, self.path_url(from)) + .header("Destination", self.path_url(to).as_str()); - match builder.send_retry(&self.retry_config).await { - Ok(_) => Ok(()), - Err(e) - if !overwrite - && matches!(e.status(), Some(StatusCode::PRECONDITION_FAILED)) => - { - Err(crate::Error::AlreadyExists { - path: to.to_string(), - source: Box::new(e), - }) + if !overwrite { + builder = builder.header("Overwrite", "F"); } - Err(source) => Err(Error::Request { source }.into()), + + return match builder.send_retry(&self.retry_config).await { + Ok(_) => Ok(()), + Err(source) => Err(match source.status() { + Some(StatusCode::PRECONDITION_FAILED) if !overwrite => { + crate::Error::AlreadyExists { + path: to.to_string(), + source: Box::new(source), + } + .into() + } + // Some implementations return 404 instead of 409 + Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => { + retry = true; + self.create_parent_directories(&to).await?; + continue; + } + _ => Error::Request { source }.into(), + }), + }; } } } From 72f46c4e733dff3794cd90fd99d0e4c475108f77 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 1 Sep 2023 11:34:46 +0100 Subject: [PATCH 4/6] Clippy --- object_store/src/http/client.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs index bc0176f3cfed..93cd4ee0ea09 100644 --- a/object_store/src/http/client.rs +++ b/object_store/src/http/client.rs @@ -277,12 +277,11 @@ impl Client { path: to.to_string(), source: Box::new(source), } - .into() } // Some implementations return 404 instead of 409 Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => { retry = true; - self.create_parent_directories(&to).await?; + self.create_parent_directories(to).await?; continue; } _ => Error::Request { source }.into(), From 3cda90757294db346197d4bdd5dd8f713e6c27ff Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 1 Sep 2023 11:43:38 +0100 Subject: [PATCH 5/6] Tweak error propagation --- object_store/src/local.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index a91bae50adb3..a7889089f40e 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -588,10 +588,12 @@ impl ObjectStore for LocalFileSystem { let mut id = 0; maybe_spawn_blocking(move || loop { let staged = staged_upload_path(&to, &id.to_string()); - match std::fs::hard_link(&from, &staged) - .and_then(|_| std::fs::rename(&staged, &to)) - { - Ok(_) => return Ok(()), + match std::fs::hard_link(&from, &staged) { + Ok(_) => { + return std::fs::rename(&staged, &to).map_err(|source| { + Error::UnableToCopyFile { from, to, source }.into() + }) + } Err(source) => match source.kind() { ErrorKind::AlreadyExists => id += 1, ErrorKind::NotFound => create_parent_dirs(&to, source)?, From 57deb67f3d4ea0d50726096d33cf77a4e9756f18 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 1 Sep 2023 11:45:25 +0100 Subject: [PATCH 6/6] Add doc --- object_store/src/local.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index a7889089f40e..495bb4f9c4aa 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -586,6 +586,12 @@ impl ObjectStore for LocalFileSystem { let from = self.config.path_to_filesystem(from)?; let to = self.config.path_to_filesystem(to)?; let mut id = 0; + // In order to make this atomic we: + // + // - hard link to a hidden temporary file + // - atomically rename this temporary file into place + // + // This is necessary because hard_link returns an error if the destination already exists maybe_spawn_blocking(move || loop { let staged = staged_upload_path(&to, &id.to_string()); match std::fs::hard_link(&from, &staged) {