Skip to content

Commit

Permalink
Add abstractions for streaming storage.
Browse files Browse the repository at this point in the history
Implementations are still missing.
  • Loading branch information
rdettai committed Sep 22, 2023
1 parent 0156a81 commit 9a74f5a
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 33 deletions.
1 change: 0 additions & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -416,4 +416,3 @@ sasl2-sys = { git = "https://github.com/quickwit-oss/rust-sasl/", rev = "daca921
#tracing-log = { git = "https://github.com/trinity-1686a/tracing.git", rev = "6806cac3" }
#tracing-opentelemetry = { git = "https://github.com/trinity-1686a/tracing.git", rev = "6806cac3" }
#tracing-subscriber = { git = "https://github.com/trinity-1686a/tracing.git", rev = "6806cac3" }

10 changes: 9 additions & 1 deletion quickwit/quickwit-storage/src/debouncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ use futures::future::{BoxFuture, WeakShared};
use futures::{Future, FutureExt};
use quickwit_common::uri::Uri;
use tantivy::directory::OwnedBytes;
use tokio::io::AsyncRead;

use crate::storage::SendableAsync;
use crate::storage::{SendableAsync, StreamableStorage};
use crate::{BulkDeleteError, Storage, StorageResult};

/// The AsyncDebouncer debounces inflight Futures, so that concurrent async request to the same data
Expand Down Expand Up @@ -191,6 +192,13 @@ impl<T: Storage> Storage for DebouncedStorage<T> {
}
}

#[async_trait]
impl<T: StreamableStorage> StreamableStorage for DebouncedStorage<T> {
async fn get_stream(&self, path: &Path, offset: usize) -> Box<dyn AsyncRead> {
self.underlying.get_stream(path, offset).await
}
}

#[cfg(test)]
mod tests {

Expand Down
13 changes: 10 additions & 3 deletions quickwit/quickwit-storage/src/local_file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ use quickwit_common::ignore_error_kind;
use quickwit_common::uri::Uri;
use quickwit_config::StorageBackend;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tracing::warn;

use crate::storage::SendableAsync;
use crate::storage::{SendableAsync, StreamableStorage};
use crate::{
BulkDeleteError, DebouncedStorage, DeleteFailure, OwnedBytes, Storage, StorageError,
StorageErrorKind, StorageFactory, StorageResolverError, StorageResult,
Expand Down Expand Up @@ -335,6 +335,13 @@ impl Storage for LocalFileStorage {
}
}

#[async_trait]
impl StreamableStorage for LocalFileStorage {
async fn get_stream(&self, _path: &Path, _offset: usize) -> Box<dyn AsyncRead> {
unimplemented!()
}
}

/// A File storage resolver
#[derive(Clone, Debug, Default)]
pub struct LocalFileStorageFactory;
Expand All @@ -345,7 +352,7 @@ impl StorageFactory for LocalFileStorageFactory {
StorageBackend::File
}

async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn StreamableStorage>, StorageResolverError> {
let storage = LocalFileStorage::from_uri(uri)?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
Expand Down
13 changes: 10 additions & 3 deletions quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ use quickwit_config::{AzureStorageConfig, StorageBackend};
use regex::Regex;
use tantivy::directory::OwnedBytes;
use thiserror::Error;
use tokio::io::{AsyncWriteExt, BufReader};
use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{instrument, warn};

use crate::debouncer::DebouncedStorage;
use crate::storage::SendableAsync;
use crate::storage::{SendableAsync, StreamableStorage};
use crate::{
BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError,
StorageErrorKind, StorageFactory, StorageResolverError, StorageResult, STORAGE_METRICS,
Expand All @@ -72,7 +72,7 @@ impl StorageFactory for AzureBlobStorageFactory {
StorageBackend::Azure
}

async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn StreamableStorage>, StorageResolverError> {
let storage = AzureBlobStorage::from_uri(&self.storage_config, uri)?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
Expand Down Expand Up @@ -462,6 +462,13 @@ impl Storage for AzureBlobStorage {
}
}

#[async_trait]
impl StreamableStorage for AzureBlobStorage {
async fn get_stream(&self, _path: &Path, _offset: usize) -> Box<dyn AsyncRead> {
unimplemented!()
}
}

/// Copy range of payload into `Bytes` and return the computed md5.
async fn extract_range_data_and_hash(
payload: Box<dyn PutPayload>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tokio::sync::Semaphore;
use tracing::{info, instrument, warn};

use crate::object_storage::MultiPartPolicy;
use crate::storage::SendableAsync;
use crate::storage::{SendableAsync, StreamableStorage};
use crate::{
BulkDeleteError, DeleteFailure, OwnedBytes, Storage, StorageError, StorageErrorKind,
StorageResolverError, StorageResult, STORAGE_METRICS,
Expand Down Expand Up @@ -787,6 +787,13 @@ impl Storage for S3CompatibleObjectStorage {
}
}

#[async_trait]
impl StreamableStorage for S3CompatibleObjectStorage {
async fn get_stream(&self, _path: &Path, _offset: usize) -> Box<dyn AsyncRead> {
unimplemented!()
}
}

#[cfg(test)]
mod tests {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::{S3StorageConfig, StorageBackend};

use crate::{
DebouncedStorage, S3CompatibleObjectStorage, Storage, StorageFactory, StorageResolverError,
};
use crate::storage::StreamableStorage;
use crate::{DebouncedStorage, S3CompatibleObjectStorage, StorageFactory, StorageResolverError};

/// S3 compatible object storage resolver.
pub struct S3CompatibleObjectStorageFactory {
Expand All @@ -45,7 +44,7 @@ impl StorageFactory for S3CompatibleObjectStorageFactory {
StorageBackend::S3
}

async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn StreamableStorage>, StorageResolverError> {
let storage = S3CompatibleObjectStorage::from_uri(&self.storage_config, uri).await?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
Expand Down
24 changes: 16 additions & 8 deletions quickwit/quickwit-storage/src/prefix_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@ use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use tokio::io::AsyncRead;

use crate::storage::SendableAsync;
use crate::storage::{SendableAsync, StreamableStorage};
use crate::{BulkDeleteError, OwnedBytes, Storage};

/// This storage acts as a proxy to another storage that simply modifies each API call
/// by preceding each path with a given a prefix.
struct PrefixStorage {
pub storage: Arc<dyn Storage>,
struct PrefixStorage<T: ?Sized = dyn Storage> {
pub storage: Arc<T>,
pub prefix: PathBuf,
uri: Uri,
}

impl fmt::Debug for PrefixStorage {
impl<T: ?Sized> fmt::Debug for PrefixStorage<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PrefixStorage")
.field("uri", &self.uri)
Expand All @@ -46,7 +47,7 @@ impl fmt::Debug for PrefixStorage {
}

#[async_trait]
impl Storage for PrefixStorage {
impl<T: Storage + ?Sized> Storage for PrefixStorage<T> {
async fn check_connectivity(&self) -> anyhow::Result<()> {
self.storage.check_connectivity().await
}
Expand Down Expand Up @@ -110,12 +111,19 @@ impl Storage for PrefixStorage {
}
}

#[async_trait]
impl<T: StreamableStorage + ?Sized> StreamableStorage for PrefixStorage<T> {
async fn get_stream(&self, path: &Path, offset: usize) -> Box<dyn AsyncRead> {
self.storage.get_stream(path, offset).await
}
}

/// Creates a [`PrefixStorage`] using an underlying storage and a prefix.
pub(crate) fn add_prefix_to_storage(
storage: Arc<dyn Storage>,
pub(crate) fn add_prefix_to_streamable_storage(
storage: Arc<dyn StreamableStorage>,
prefix: PathBuf,
uri: Uri,
) -> Arc<dyn Storage> {
) -> Arc<dyn StreamableStorage> {
Arc::new(PrefixStorage {
storage,
prefix,
Expand Down
19 changes: 13 additions & 6 deletions quickwit/quickwit-storage/src/ram_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use std::sync::Arc;
use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::StorageBackend;
use tokio::io::AsyncWriteExt;
use tokio::io::{AsyncRead, AsyncWriteExt};
use tokio::sync::RwLock;

use crate::prefix_storage::add_prefix_to_storage;
use crate::storage::SendableAsync;
use crate::prefix_storage::add_prefix_to_streamable_storage;
use crate::storage::{SendableAsync, StreamableStorage};
use crate::{
BulkDeleteError, OwnedBytes, Storage, StorageErrorKind, StorageFactory, StorageResolverError,
StorageResult,
Expand Down Expand Up @@ -152,6 +152,13 @@ impl Storage for RamStorage {
}
}

#[async_trait]
impl StreamableStorage for RamStorage {
async fn get_stream(&self, _path: &Path, _offset: usize) -> Box<dyn AsyncRead> {
unimplemented!()
}
}

/// Builder to create a prepopulated [`RamStorage`]. This is mostly useful for tests.
#[derive(Default)]
pub struct RamStorageBuilder {
Expand All @@ -177,7 +184,7 @@ impl RamStorageBuilder {

/// Storage resolver for [`RamStorage`].
pub struct RamStorageFactory {
ram_storage: Arc<dyn Storage>,
ram_storage: Arc<RamStorage>,
}

impl Default for RamStorageFactory {
Expand All @@ -194,9 +201,9 @@ impl StorageFactory for RamStorageFactory {
StorageBackend::Ram
}

async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn StreamableStorage>, StorageResolverError> {
match uri.filepath() {
Some(prefix) if uri.protocol().is_ram() => Ok(add_prefix_to_storage(
Some(prefix) if uri.protocol().is_ram() => Ok(add_prefix_to_streamable_storage(
self.ram_storage.clone(),
prefix.to_path_buf(),
uri.clone(),
Expand Down
20 changes: 19 additions & 1 deletion quickwit/quickwit-storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use std::fmt;
use std::io::{self, ErrorKind};
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use tempfile::TempPath;
use tokio::fs::File;
use tokio::io::AsyncWrite;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::error;

use crate::{BulkDeleteError, OwnedBytes, PutPayload, StorageErrorKind, StorageResult};
Expand Down Expand Up @@ -135,6 +136,23 @@ pub trait Storage: fmt::Debug + Send + Sync + 'static {
fn uri(&self) -> &Uri;
}

#[async_trait]
pub trait StreamableStorage: Storage + AsStorage {
async fn get_stream(&self, path: &Path, offset: usize) -> Box<dyn AsyncRead>;
}

pub trait AsStorage {
fn as_storage<'a>(self: Arc<Self>) -> Arc<dyn Storage + 'a>
where Self: 'a;
}

impl<T: Storage + Sized> AsStorage for T {
fn as_storage<'a>(self: Arc<Self>) -> Arc<dyn Storage + 'a>
where Self: 'a {
self
}
}

async fn default_copy_to_file<S: Storage + ?Sized>(
storage: &S,
path: &Path,
Expand Down
10 changes: 7 additions & 3 deletions quickwit/quickwit-storage/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::StorageBackend;

use crate::{Storage, StorageResolverError};
use crate::storage::StreamableStorage;
use crate::StorageResolverError;

/// A storage factory builds a [`Storage`] object for a target [`StorageBackend`] from a
/// [`Uri`].
Expand All @@ -34,7 +35,7 @@ pub trait StorageFactory: Send + Sync + 'static {
fn backend(&self) -> StorageBackend;

/// Returns the appropriate [`Storage`] object for the URI.
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError>;
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn StreamableStorage>, StorageResolverError>;
}

/// A storage factory for handling unsupported or unavailable storage backends.
Expand All @@ -57,7 +58,10 @@ impl StorageFactory for UnsupportedStorage {
self.backend
}

async fn resolve(&self, _uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(
&self,
_uri: &Uri,
) -> Result<Arc<dyn StreamableStorage>, StorageResolverError> {
Err(StorageResolverError::UnsupportedBackend(
self.message.to_string(),
))
Expand Down
17 changes: 15 additions & 2 deletions quickwit/quickwit-storage/src/storage_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use quickwit_config::{StorageBackend, StorageConfigs};

use crate::local_file_storage::LocalFileStorageFactory;
use crate::ram_storage::RamStorageFactory;
use crate::storage::StreamableStorage;
#[cfg(feature = "azure")]
use crate::AzureBlobStorageFactory;
use crate::{S3CompatibleObjectStorageFactory, Storage, StorageFactory, StorageResolverError};
Expand All @@ -51,8 +52,11 @@ impl StorageResolver {
StorageResolverBuilder::default()
}

/// Resolves the given URI.
pub async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
/// Resolves the given URI into a storage that can also be used in a streaming fashion
pub async fn resolve_streamable(
&self,
uri: &Uri,
) -> Result<Arc<dyn StreamableStorage>, StorageResolverError> {
let backend = match uri.protocol() {
Protocol::Azure => StorageBackend::Azure,
Protocol::File => StorageBackend::File,
Expand All @@ -74,6 +78,15 @@ impl StorageResolver {
Ok(storage)
}

/// Resolves the given URI.
///
/// All storages that can be resolved are streamable but this method is a
/// convenience to avoid having to upcast when only `Arc<dyn Storage>` is
/// required.
pub async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
self.resolve_streamable(uri).await.map(|s| s.as_storage())
}

/// Creates and returns a default [`StorageResolver`] with the default storage configuration for
/// each backend. Note that if the environment (env vars, instance metadata, ...) fails to
/// provide the necessary credentials, the default Azure or S3 storage returned by this
Expand Down

0 comments on commit 9a74f5a

Please sign in to comment.