From 9aa8a55ff5211b3ddbeecf078e32ae10d70b82f7 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 1 Nov 2023 07:59:12 +0000 Subject: [PATCH] Remove ObjectStore::append --- object_store/Cargo.toml | 5 -- object_store/src/lib.rs | 31 +-------- object_store/src/limit.rs | 7 -- object_store/src/local.rs | 126 ----------------------------------- object_store/src/memory.rs | 99 --------------------------- object_store/src/prefix.rs | 6 -- object_store/src/throttle.rs | 4 -- 7 files changed, 1 insertion(+), 277 deletions(-) diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index cb820b509ada..c8cf4e280236 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -53,11 +53,6 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng" reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"], optional = true } ring = { version = "0.17", default-features = false, features = ["std"], optional = true } rustls-pemfile = { version = "1.0", default-features = false, optional = true } - -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] -tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util", "fs"] } - -[target.'cfg(target_arch = "wasm32")'.dependencies] tokio = { version = "1.25.0", features = ["sync", "macros", "rt", "time", "io-util"] } [target.'cfg(target_family="unix")'.dev-dependencies] diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 69db9d97bc2c..1b94f816b1af 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -94,8 +94,7 @@ //! //! This provides some compelling advantages: //! -//! * Except where explicitly stated otherwise, operations are atomic, and readers -//! cannot observe partial and/or failed writes +//! * All operations are atomic, and readers cannot observe partial and/or failed writes //! * Methods map directly to object store APIs, providing both efficiency and predictability //! * Abstracts away filesystem and operating system specific quirks, ensuring portability //! * Allows for functionality not native to filesystems, such as operation preconditions @@ -559,30 +558,6 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// vary by object store. async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()>; - /// Returns an [`AsyncWrite`] that can be used to append to the object at `location` - /// - /// A new object will be created if it doesn't already exist, otherwise it will be - /// opened, with subsequent writes appended to the end. - /// - /// This operation cannot be supported by all stores, most use-cases should prefer - /// [`ObjectStore::put`] and [`ObjectStore::put_multipart`] for better portability - /// and stronger guarantees - /// - /// This API is not guaranteed to be atomic, in particular - /// - /// * On error, `location` may contain partial data - /// * Concurrent calls to [`ObjectStore::list`] may return partially written objects - /// * Concurrent calls to [`ObjectStore::get`] may return partially written data - /// * Concurrent calls to [`ObjectStore::put`] may result in data loss / corruption - /// * Concurrent calls to [`ObjectStore::append`] may result in data loss / corruption - /// - /// Additionally some stores, such as Azure, may only support appending to objects created - /// with [`ObjectStore::append`], and not with [`ObjectStore::put`], [`ObjectStore::copy`], or - /// [`ObjectStore::put_multipart`] - async fn append(&self, _location: &Path) -> Result> { - Err(Error::NotImplemented) - } - /// Return the bytes that are stored at the specified location. async fn get(&self, location: &Path) -> Result { self.get_opts(location, GetOptions::default()).await @@ -779,10 +754,6 @@ macro_rules! as_ref_impl { self.as_ref().abort_multipart(location, multipart_id).await } - async fn append(&self, location: &Path) -> Result> { - self.as_ref().append(location).await - } - async fn get(&self, location: &Path) -> Result { self.as_ref().get(location).await } diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs index 39cc605c4768..d1363d9a4d46 100644 --- a/object_store/src/limit.rs +++ b/object_store/src/limit.rs @@ -94,13 +94,6 @@ impl ObjectStore for LimitStore { let _permit = self.semaphore.acquire().await.unwrap(); self.inner.abort_multipart(location, multipart_id).await } - - async fn append(&self, location: &Path) -> Result> { - let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); - let write = self.inner.append(location).await?; - Ok(Box::new(PermitWrapper::new(write, permit))) - } - async fn get(&self, location: &Path) -> Result { let permit = Arc::clone(&self.semaphore).acquire_owned().await.unwrap(); let r = self.inner.get(location).await?; diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 919baf71b0a8..1a87dc33c7a3 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -350,45 +350,6 @@ impl ObjectStore for LocalFileSystem { .await } - async fn append(&self, location: &Path) -> Result> { - // Get the path to the file from the configuration. - let path = self.config.path_to_filesystem(location)?; - loop { - // Create new `OpenOptions`. - let mut options = tokio::fs::OpenOptions::new(); - - // Attempt to open the file with the given options. - match options - .truncate(false) - .append(true) - .create(true) - .open(&path) - .await - { - // If the file was successfully opened, return it wrapped in a boxed `AsyncWrite` trait object. - Ok(file) => return Ok(Box::new(file)), - // If the error is that the file was not found, attempt to create the file and any necessary parent directories. - Err(source) if source.kind() == ErrorKind::NotFound => { - // Get the path to the parent directory of the file. - let parent = path.parent().ok_or_else(|| Error::UnableToCreateFile { - path: path.to_path_buf(), - source, - })?; - - // Create the parent directory and any necessary ancestors. - tokio::fs::create_dir_all(parent) - .await - // If creating the directory fails, return a `UnableToCreateDirSnafu` error. - .context(UnableToCreateDirSnafu { path: parent })?; - // Try again to open the file. - continue; - } - // If any other error occurs, return a `UnableToOpenFile` error. - Err(source) => return Err(Error::UnableToOpenFile { source, path }.into()), - } - } - } - async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let location = location.clone(); let path = self.config.path_to_filesystem(&location)?; @@ -1449,97 +1410,10 @@ mod tests { mod not_wasm_tests { use crate::local::LocalFileSystem; use crate::{ObjectStore, Path}; - use bytes::Bytes; use std::time::Duration; use tempfile::TempDir; use tokio::io::AsyncWriteExt; - #[tokio::test] - async fn creates_dir_if_not_present_append() { - let root = TempDir::new().unwrap(); - let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); - - let location = Path::from("nested/file/test_file"); - - let data = Bytes::from("arbitrary data"); - let expected_data = data.clone(); - - let mut writer = integration.append(&location).await.unwrap(); - - writer.write_all(data.as_ref()).await.unwrap(); - - writer.flush().await.unwrap(); - - let read_data = integration - .get(&location) - .await - .unwrap() - .bytes() - .await - .unwrap(); - assert_eq!(&*read_data, expected_data); - } - - #[tokio::test] - async fn unknown_length_append() { - let root = TempDir::new().unwrap(); - let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); - - let location = Path::from("some_file"); - - let data = Bytes::from("arbitrary data"); - let expected_data = data.clone(); - let mut writer = integration.append(&location).await.unwrap(); - - writer.write_all(data.as_ref()).await.unwrap(); - writer.flush().await.unwrap(); - - let read_data = integration - .get(&location) - .await - .unwrap() - .bytes() - .await - .unwrap(); - assert_eq!(&*read_data, expected_data); - } - - #[tokio::test] - async fn multiple_append() { - let root = TempDir::new().unwrap(); - let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); - - let location = Path::from("some_file"); - - let data = vec![ - Bytes::from("arbitrary"), - Bytes::from("data"), - Bytes::from("gnz"), - ]; - - let mut writer = integration.append(&location).await.unwrap(); - for d in &data { - writer.write_all(d).await.unwrap(); - } - writer.flush().await.unwrap(); - - let mut writer = integration.append(&location).await.unwrap(); - for d in &data { - writer.write_all(d).await.unwrap(); - } - writer.flush().await.unwrap(); - - let read_data = integration - .get(&location) - .await - .unwrap() - .bytes() - .await - .unwrap(); - let expected_data = Bytes::from("arbitrarydatagnzarbitrarydatagnz"); - assert_eq!(&*read_data, expected_data); - } - #[tokio::test] async fn test_cleanup_intermediate_files() { let root = TempDir::new().unwrap(); diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs index 9d79a798ad1f..382300123846 100644 --- a/object_store/src/memory.rs +++ b/object_store/src/memory.rs @@ -205,14 +205,6 @@ impl ObjectStore for InMemory { Ok(()) } - async fn append(&self, location: &Path) -> Result> { - Ok(Box::new(InMemoryAppend { - location: location.clone(), - data: Vec::::new(), - storage: SharedStorage::clone(&self.storage), - })) - } - async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { let entry = self.entry(location).await?; let e_tag = entry.e_tag.to_string(); @@ -443,53 +435,8 @@ impl AsyncWrite for InMemoryUpload { } } -struct InMemoryAppend { - location: Path, - data: Vec, - storage: Arc>, -} - -impl AsyncWrite for InMemoryAppend { - fn poll_write( - mut self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> Poll> { - self.data.extend_from_slice(buf); - Poll::Ready(Ok(buf.len())) - } - - fn poll_flush( - mut self: Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> Poll> { - let storage = Arc::clone(&self.storage); - - let mut writer = storage.write(); - - if let Some(entry) = writer.map.remove(&self.location) { - let buf = std::mem::take(&mut self.data); - let concat = Bytes::from_iter(entry.data.into_iter().chain(buf)); - writer.insert(&self.location, concat); - } else { - let data = Bytes::from(std::mem::take(&mut self.data)); - writer.insert(&self.location, data); - }; - Poll::Ready(Ok(())) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - self.poll_flush(cx) - } -} - #[cfg(test)] mod tests { - use tokio::io::AsyncWriteExt; - use super::*; use crate::tests::*; @@ -577,50 +524,4 @@ mod tests { panic!("unexpected error type: {err:?}"); } } - - #[tokio::test] - async fn test_append_new() { - let in_memory = InMemory::new(); - let location = Path::from("some_file"); - let data = Bytes::from("arbitrary data"); - let expected_data = data.clone(); - - let mut writer = in_memory.append(&location).await.unwrap(); - writer.write_all(&data).await.unwrap(); - writer.flush().await.unwrap(); - - let read_data = in_memory - .get(&location) - .await - .unwrap() - .bytes() - .await - .unwrap(); - assert_eq!(&*read_data, expected_data); - } - - #[tokio::test] - async fn test_append_existing() { - let in_memory = InMemory::new(); - let location = Path::from("some_file"); - let data = Bytes::from("arbitrary"); - let data_appended = Bytes::from(" data"); - let expected_data = Bytes::from("arbitrary data"); - - let mut writer = in_memory.append(&location).await.unwrap(); - writer.write_all(&data).await.unwrap(); - writer.flush().await.unwrap(); - - writer.write_all(&data_appended).await.unwrap(); - writer.flush().await.unwrap(); - - let read_data = in_memory - .get(&location) - .await - .unwrap() - .bytes() - .await - .unwrap(); - assert_eq!(&*read_data, expected_data); - } } diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs index 68101307fbdf..38f9b07bbd05 100644 --- a/object_store/src/prefix.rs +++ b/object_store/src/prefix.rs @@ -103,12 +103,6 @@ impl ObjectStore for PrefixStore { let full_path = self.full_path(location); self.inner.abort_multipart(&full_path, multipart_id).await } - - async fn append(&self, location: &Path) -> Result> { - let full_path = self.full_path(location); - self.inner.append(&full_path).await - } - async fn get(&self, location: &Path) -> Result { let full_path = self.full_path(location); self.inner.get(&full_path).await diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs index dcd2c04bcf05..252256a4599e 100644 --- a/object_store/src/throttle.rs +++ b/object_store/src/throttle.rs @@ -169,10 +169,6 @@ impl ObjectStore for ThrottledStore { Err(super::Error::NotImplemented) } - async fn append(&self, _location: &Path) -> Result> { - Err(super::Error::NotImplemented) - } - async fn get(&self, location: &Path) -> Result { sleep(self.config().wait_get_per_call).await;