Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ObjectStore::copy Atomic and Automatically Create Parent Directories (#4758) (#4760) #4759

Merged
merged 6 commits into from
Sep 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions object_store/src/http/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,31 +256,37 @@ impl Client {
}

pub async fn copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
let from = self.path_url(from);
let to = self.path_url(to);
let method = Method::from_bytes(b"COPY").unwrap();

let mut builder = self
.client
.request(method, from)
.header("Destination", to.as_str());
let mut retry = false;
loop {
let method = Method::from_bytes(b"COPY").unwrap();

if !overwrite {
builder = builder.header("Overwrite", "F");
}
let mut builder = self
.client
.request(method, self.path_url(from))
.header("Destination", self.path_url(to).as_str());

match builder.send_retry(&self.retry_config).await {
Ok(_) => Ok(()),
Err(e)
if !overwrite
&& matches!(e.status(), Some(StatusCode::PRECONDITION_FAILED)) =>
{
Err(crate::Error::AlreadyExists {
path: to.to_string(),
source: Box::new(e),
})
if !overwrite {
builder = builder.header("Overwrite", "F");
}
Err(source) => Err(Error::Request { source }.into()),

return match builder.send_retry(&self.retry_config).await {
Ok(_) => Ok(()),
Err(source) => Err(match source.status() {
Some(StatusCode::PRECONDITION_FAILED) if !overwrite => {
crate::Error::AlreadyExists {
path: to.to_string(),
source: Box::new(source),
}
}
// Some implementations return 404 instead of 409
Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if !retry => {
retry = true;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We copy the loop protection from the other http methods, this is necessary here because HTTP implementations may return StatusCode::Conflict for other reasons

self.create_parent_directories(to).await?;
continue;
}
_ => Error::Request { source }.into(),
}),
};
}
}
}
Expand Down
20 changes: 18 additions & 2 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,8 +1105,24 @@ mod tests {
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst.clone()]);

let dst2 = Path::from("new/nested/foo.parquet");
storage.copy(&emoji_file, &dst2).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst.clone(), dst2.clone()]);

let dst3 = Path::from("new/nested2/bar.parquet");
storage.rename(&dst, &dst3).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst2.clone(), dst3.clone()]);

let err = storage.head(&dst).await.unwrap_err();
assert!(matches!(err, Error::NotFound { .. }));

storage.delete(&emoji_file).await.unwrap();
storage.delete(&dst).await.unwrap();
storage.delete(&dst3).await.unwrap();
storage.delete(&dst2).await.unwrap();
let files = flatten_list_stream(storage, Some(&emoji_prefix))
.await
.unwrap();
Expand Down Expand Up @@ -1605,7 +1621,7 @@ mod tests {
pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("test2");
let path2 = Path::from("not_exists_nested/test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");

Expand Down
112 changes: 69 additions & 43 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use futures::{stream::BoxStream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use snafu::{ensure, ResultExt, Snafu};
use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
Expand Down Expand Up @@ -78,10 +78,10 @@ pub(crate) enum Error {
path: PathBuf,
},

#[snafu(display("Unable to create file {}: {}", path.display(), err))]
#[snafu(display("Unable to create file {}: {}", path.display(), source))]
UnableToCreateFile {
source: io::Error,
path: PathBuf,
err: io::Error,
},

#[snafu(display("Unable to delete file {}: {}", path.display(), source))]
Expand Down Expand Up @@ -336,12 +336,13 @@ impl ObjectStore for LocalFileSystem {
// If the file was successfully opened, return it wrapped in a boxed `AsyncWrite` trait object.
Ok(file) => return Ok(Box::new(file)),
// If the error is that the file was not found, attempt to create the file and any necessary parent directories.
Err(err) if err.kind() == ErrorKind::NotFound => {
Err(source) if source.kind() == ErrorKind::NotFound => {
// Get the path to the parent directory of the file.
let parent = path
.parent()
// If the parent directory does not exist, return a `UnableToCreateFileSnafu` error.
.context(UnableToCreateFileSnafu { path: &path, err })?;
let parent =
path.parent().ok_or_else(|| Error::UnableToCreateFile {
path: path.to_path_buf(),
source,
})?;

// Create the parent directory and any necessary ancestors.
tokio::fs::create_dir_all(parent)
Expand Down Expand Up @@ -584,20 +585,42 @@ impl ObjectStore for LocalFileSystem {
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;

maybe_spawn_blocking(move || {
std::fs::copy(&from, &to).context(UnableToCopyFileSnafu { from, to })?;
Ok(())
let mut id = 0;
// In order to make this atomic we:
//
// - hard link to a hidden temporary file
// - atomically rename this temporary file into place
//
// This is necessary because hard_link returns an error if the destination already exists
maybe_spawn_blocking(move || loop {
let staged = staged_upload_path(&to, &id.to_string());
match std::fs::hard_link(&from, &staged) {
Ok(_) => {
return std::fs::rename(&staged, &to).map_err(|source| {
Error::UnableToCopyFile { from, to, source }.into()
})
}
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => id += 1,
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;
maybe_spawn_blocking(move || {
std::fs::rename(&from, &to).context(UnableToCopyFileSnafu { from, to })?;
Ok(())
maybe_spawn_blocking(move || loop {
match std::fs::rename(&from, &to) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}
Expand All @@ -606,25 +629,37 @@ impl ObjectStore for LocalFileSystem {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;

maybe_spawn_blocking(move || {
std::fs::hard_link(&from, &to).map_err(|err| match err.kind() {
io::ErrorKind::AlreadyExists => Error::AlreadyExists {
path: to.to_str().unwrap().to_string(),
source: err,
}
.into(),
_ => Error::UnableToCopyFile {
from,
to,
source: err,
}
.into(),
})
maybe_spawn_blocking(move || loop {
match std::fs::hard_link(&from, &to) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => {
return Err(Error::AlreadyExists {
path: to.to_str().unwrap().to_string(),
source,
}
.into())
}
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}
}

/// Creates the parent directories of `path` or returns an error based on `source` if no parent
fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
let parent = path.parent().ok_or_else(|| Error::UnableToCreateFile {
path: path.to_path_buf(),
source,
})?;

std::fs::create_dir_all(parent).context(UnableToCreateDirSnafu { path: parent })?;
Ok(())
}

/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `suffix`
///
/// Creates any directories if necessary
Expand All @@ -636,20 +671,11 @@ fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> {
let mut options = OpenOptions::new();
match options.read(true).write(true).create_new(true).open(&path) {
Ok(f) => return Ok((f, suffix)),
Err(e) if e.kind() == ErrorKind::AlreadyExists => {
multipart_id += 1;
}
Err(err) if err.kind() == ErrorKind::NotFound => {
let parent = path
.parent()
.context(UnableToCreateFileSnafu { path: &path, err })?;

std::fs::create_dir_all(parent)
.context(UnableToCreateDirSnafu { path: parent })?;

continue;
}
Err(source) => return Err(Error::UnableToOpenFile { source, path }.into()),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => multipart_id += 1,
ErrorKind::NotFound => create_parent_dirs(&path, source)?,
_ => return Err(Error::UnableToOpenFile { source, path }.into()),
},
}
}
}
Expand Down
Loading