Skip to content

Commit

Permalink
Create sub-directories for copy (#4760)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 31, 2023
1 parent f27c34e commit 2f02e21
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 56 deletions.
20 changes: 18 additions & 2 deletions object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,8 +1105,24 @@ mod tests {
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst.clone()]);

let dst2 = Path::from("new/nested/foo.parquet");
storage.copy(&emoji_file, &dst2).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst.clone(), dst2.clone()]);

let dst3 = Path::from("new/nested2/bar.parquet");
storage.rename(&dst, &dst3).await.unwrap();
let mut files = flatten_list_stream(storage, None).await.unwrap();
files.sort_unstable();
assert_eq!(files, vec![emoji_file.clone(), dst2.clone(), dst3.clone()]);

let err = storage.head(&dst).await.unwrap_err();
assert!(matches!(err, Error::NotFound { .. }));

storage.delete(&emoji_file).await.unwrap();
storage.delete(&dst).await.unwrap();
storage.delete(&dst3).await.unwrap();
storage.delete(&dst2).await.unwrap();
let files = flatten_list_stream(storage, Some(&emoji_prefix))
.await
.unwrap();
Expand Down Expand Up @@ -1605,7 +1621,7 @@ mod tests {
pub(crate) async fn copy_if_not_exists(storage: &DynObjectStore) {
// Create two objects
let path1 = Path::from("test1");
let path2 = Path::from("test2");
let path2 = Path::from("not_exists_nested/test2");
let contents1 = Bytes::from("cats");
let contents2 = Bytes::from("dogs");

Expand Down
114 changes: 60 additions & 54 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use futures::{stream::BoxStream, StreamExt};
use futures::{FutureExt, TryStreamExt};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use snafu::{ensure, ResultExt, Snafu};
use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
use std::ops::Range;
Expand Down Expand Up @@ -78,10 +78,10 @@ pub(crate) enum Error {
path: PathBuf,
},

#[snafu(display("Unable to create file {}: {}", path.display(), err))]
#[snafu(display("Unable to create file {}: {}", path.display(), source))]
UnableToCreateFile {
source: io::Error,
path: PathBuf,
err: io::Error,
},

#[snafu(display("Unable to delete file {}: {}", path.display(), source))]
Expand Down Expand Up @@ -336,12 +336,13 @@ impl ObjectStore for LocalFileSystem {
// If the file was successfully opened, return it wrapped in a boxed `AsyncWrite` trait object.
Ok(file) => return Ok(Box::new(file)),
// If the error is that the file was not found, attempt to create the file and any necessary parent directories.
Err(err) if err.kind() == ErrorKind::NotFound => {
Err(source) if source.kind() == ErrorKind::NotFound => {
// Get the path to the parent directory of the file.
let parent = path
.parent()
// If the parent directory does not exist, return a `UnableToCreateFileSnafu` error.
.context(UnableToCreateFileSnafu { path: &path, err })?;
let parent =
path.parent().ok_or_else(|| Error::UnableToCreateFile {
path: path.to_path_buf(),
source,
})?;

// Create the parent directory and any necessary ancestors.
tokio::fs::create_dir_all(parent)
Expand Down Expand Up @@ -584,32 +585,34 @@ impl ObjectStore for LocalFileSystem {
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;

maybe_spawn_blocking(move || {
let mut id = 0;
loop {
let staged = staged_upload_path(&to, &id.to_string());
if let Err(source) = std::fs::hard_link(&from, &staged) {
if source.kind() == ErrorKind::AlreadyExists {
id += 1;
continue;
}
break Err(source);
}
break std::fs::rename(&staged, &to);
let mut id = 0;
maybe_spawn_blocking(move || loop {
let staged = staged_upload_path(&to, &id.to_string());
match std::fs::hard_link(&from, &staged)
.and_then(|_| std::fs::rename(&staged, &to))
{
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => id += 1,
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
.context(UnableToCopyFileSnafu { from, to })?;
Ok(())
})
.await
}

async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;
maybe_spawn_blocking(move || {
std::fs::rename(&from, &to).context(UnableToCopyFileSnafu { from, to })?;
Ok(())
maybe_spawn_blocking(move || loop {
match std::fs::rename(&from, &to) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}
Expand All @@ -618,25 +621,37 @@ impl ObjectStore for LocalFileSystem {
let from = self.config.path_to_filesystem(from)?;
let to = self.config.path_to_filesystem(to)?;

maybe_spawn_blocking(move || {
std::fs::hard_link(&from, &to).map_err(|err| match err.kind() {
ErrorKind::AlreadyExists => Error::AlreadyExists {
path: to.to_str().unwrap().to_string(),
source: err,
}
.into(),
_ => Error::UnableToCopyFile {
from,
to,
source: err,
}
.into(),
})
maybe_spawn_blocking(move || loop {
match std::fs::hard_link(&from, &to) {
Ok(_) => return Ok(()),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => {
return Err(Error::AlreadyExists {
path: to.to_str().unwrap().to_string(),
source,
}
.into())
}
ErrorKind::NotFound => create_parent_dirs(&to, source)?,
_ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
},
}
})
.await
}
}

/// Creates the parent directories of `path` or returns an error based on `source` if no parent
fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
let parent = path.parent().ok_or_else(|| Error::UnableToCreateFile {
path: path.to_path_buf(),
source,
})?;

std::fs::create_dir_all(parent).context(UnableToCreateDirSnafu { path: parent })?;
Ok(())
}

/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `suffix`
///
/// Creates any directories if necessary
Expand All @@ -648,20 +663,11 @@ fn new_staged_upload(base: &std::path::Path) -> Result<(File, String)> {
let mut options = OpenOptions::new();
match options.read(true).write(true).create_new(true).open(&path) {
Ok(f) => return Ok((f, suffix)),
Err(e) if e.kind() == ErrorKind::AlreadyExists => {
multipart_id += 1;
}
Err(err) if err.kind() == ErrorKind::NotFound => {
let parent = path
.parent()
.context(UnableToCreateFileSnafu { path: &path, err })?;

std::fs::create_dir_all(parent)
.context(UnableToCreateDirSnafu { path: parent })?;

continue;
}
Err(source) => return Err(Error::UnableToOpenFile { source, path }.into()),
Err(source) => match source.kind() {
ErrorKind::AlreadyExists => multipart_id += 1,
ErrorKind::NotFound => create_parent_dirs(&path, source)?,
_ => return Err(Error::UnableToOpenFile { source, path }.into()),
},
}
}
}
Expand Down

0 comments on commit 2f02e21

Please sign in to comment.