Skip to content

Commit

Permalink
Add storage metrics for opendal storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
fmassot committed Mar 16, 2024
1 parent 33920d6 commit 9a32743
Showing 1 changed file with 25 additions and 2 deletions.
27 changes: 25 additions & 2 deletions quickwit/quickwit-storage/src/opendal_storage/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::io::{AsyncRead, AsyncWriteExt};
use crate::storage::SendableAsync;
use crate::{
BulkDeleteError, OwnedBytes, PutPayload, Storage, StorageError, StorageErrorKind,
StorageResolverError, StorageResult,
StorageResolverError, StorageResult, STORAGE_METRICS,
};

/// OpenDAL based storage implementation.
Expand Down Expand Up @@ -88,20 +88,35 @@ impl Storage for OpendalStorage {
tokio::io::copy(&mut payload_reader, &mut storage_writer).await?;
storage_writer.close().await?;

// FIXME: should be payload.len() / ByteSize::mb(8)?
// crate::STORAGE_METRICS.object_storage_put_parts.inc();
crate::STORAGE_METRICS
.object_storage_upload_num_bytes
.inc_by(payload.len());

Ok(())
}

async fn copy_to(&self, path: &Path, output: &mut dyn SendableAsync) -> StorageResult<()> {
let path = path.as_os_str().to_string_lossy();
let mut storage_reader = self.op.reader(&path).await?;
tokio::io::copy(&mut storage_reader, output).await?;
let num_bytes_copied = tokio::io::copy(&mut storage_reader, output).await?;
output.flush().await?;
crate::STORAGE_METRICS.object_storage_get_total.inc();
STORAGE_METRICS
.object_storage_download_num_bytes
.inc_by(num_bytes_copied);

Ok(())
}

async fn get_slice(&self, path: &Path, range: Range<usize>) -> StorageResult<OwnedBytes> {
let path = path.as_os_str().to_string_lossy();
let range = range.start as u64..range.end as u64;
crate::STORAGE_METRICS.object_storage_get_total.inc();
STORAGE_METRICS
.object_storage_download_num_bytes
.inc_by(range.end - range.start);
let storage_content = self.op.read_with(&path).range(range).await?;

Ok(OwnedBytes::new(storage_content))
Expand All @@ -114,14 +129,22 @@ impl Storage for OpendalStorage {
) -> StorageResult<Box<dyn AsyncRead + Send + Unpin>> {
let path = path.as_os_str().to_string_lossy();
let range = range.start as u64..range.end as u64;
crate::STORAGE_METRICS.object_storage_get_total.inc();
STORAGE_METRICS
.object_storage_download_num_bytes
.inc_by(range.end - range.start);
let storage_reader = self.op.reader_with(&path).range(range).await?;

Ok(Box::new(storage_reader))
}

async fn get_all(&self, path: &Path) -> StorageResult<OwnedBytes> {
let path = path.as_os_str().to_string_lossy();
crate::STORAGE_METRICS.object_storage_get_total.inc();
let storage_content = self.op.read(&path).await?;
STORAGE_METRICS
.object_storage_download_num_bytes
.inc_by(storage_content.len() as u64);

Ok(OwnedBytes::new(storage_content))
}
Expand Down

0 comments on commit 9a32743

Please sign in to comment.