diff --git a/src/common/datasource/src/object_store/fs.rs b/src/common/datasource/src/object_store/fs.rs index 6d342f8eb30b..16e30b0044c0 100644 --- a/src/common/datasource/src/object_store/fs.rs +++ b/src/common/datasource/src/object_store/fs.rs @@ -31,7 +31,7 @@ pub fn build_fs_backend(root: &str) -> Result { .expect("input error level must be valid"), ) .layer(object_store::layers::TracingLayer) - .layer(object_store::layers::PrometheusMetricsLayer) + .layer(object_store::layers::PrometheusMetricsLayer::new(true)) .finish(); Ok(object_store) } diff --git a/src/common/datasource/src/object_store/s3.rs b/src/common/datasource/src/object_store/s3.rs index 2b6ac7c2ee73..6efc6474c45d 100644 --- a/src/common/datasource/src/object_store/s3.rs +++ b/src/common/datasource/src/object_store/s3.rs @@ -94,7 +94,7 @@ pub fn build_s3_backend( .expect("input error level must be valid"), ) .layer(object_store::layers::TracingLayer) - .layer(object_store::layers::PrometheusMetricsLayer) + .layer(object_store::layers::PrometheusMetricsLayer::new(true)) .finish()) } diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 6e6e5bea6813..9946ec44839a 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -60,7 +60,7 @@ pub(crate) async fn new_object_store( object_store }; - let store = with_instrument_layers(object_store); + let store = with_instrument_layers(object_store, true); Ok(store) } diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 47f5af9241e6..a72527fb3351 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -208,18 +208,15 @@ pub(crate) struct SstWriteRequest { pub(crate) fulltext_index_config: FulltextIndexConfig, } -/// Creates a fs object store with atomic write dir. -pub(crate) async fn new_fs_object_store(root: &str) -> Result { +pub(crate) async fn new_fs_cache_store(root: &str) -> Result { let atomic_write_dir = join_dir(root, ".tmp/"); clean_dir(&atomic_write_dir).await?; let mut builder = Fs::default(); builder.root(root).atomic_write_dir(&atomic_write_dir); - let object_store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish(); + let store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish(); - // Add layers. - let object_store = with_instrument_layers(object_store); - Ok(object_store) + Ok(with_instrument_layers(store, false)) } /// Clean the directory. diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index ff544662f905..a95bcff15f45 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -24,7 +24,7 @@ use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; use snafu::ResultExt; -use crate::access_layer::{new_fs_object_store, SstWriteRequest}; +use crate::access_layer::{new_fs_cache_store, SstWriteRequest}; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL}; @@ -86,7 +86,7 @@ impl WriteCache { ) -> Result { info!("Init write cache on {cache_dir}, capacity: {cache_capacity}"); - let local_store = new_fs_object_store(cache_dir).await?; + let local_store = new_fs_cache_store(cache_dir).await?; Self::new( local_store, object_store_manager, diff --git a/src/mito2/src/sst/index/intermediate.rs b/src/mito2/src/sst/index/intermediate.rs index 02095eda348b..1568261e206f 100644 --- a/src/mito2/src/sst/index/intermediate.rs +++ b/src/mito2/src/sst/index/intermediate.rs @@ -19,7 +19,7 @@ use object_store::util::{self, normalize_dir}; use store_api::storage::{ColumnId, RegionId}; use uuid::Uuid; -use crate::access_layer::new_fs_object_store; +use crate::access_layer::new_fs_cache_store; use crate::error::Result; use crate::sst::file::FileId; use crate::sst::index::store::InstrumentedStore; @@ -37,7 +37,7 @@ impl IntermediateManager { /// Create a new `IntermediateManager` with the given root path. /// It will clean up all garbage intermediate files from previous runs. pub async fn init_fs(aux_path: impl AsRef) -> Result { - let store = new_fs_object_store(&normalize_dir(aux_path.as_ref())).await?; + let store = new_fs_cache_store(&normalize_dir(aux_path.as_ref())).await?; let store = InstrumentedStore::new(store); // Remove all garbage intermediate files from previous runs. diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index 080ace52bff6..5a2d0b603261 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -84,7 +84,15 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) { /// /// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration. #[derive(Default, Debug, Clone)] -pub struct PrometheusMetricsLayer; +pub struct PrometheusMetricsLayer { + pub path_label: bool, +} + +impl PrometheusMetricsLayer { + pub fn new(path_label: bool) -> Self { + Self { path_label } + } +} impl Layer for PrometheusMetricsLayer { type LayeredAccess = PrometheusAccess; @@ -96,6 +104,7 @@ impl Layer for PrometheusMetricsLayer { PrometheusAccess { inner, scheme: scheme.to_string(), + path_label: self.path_label, } } } @@ -104,6 +113,17 @@ impl Layer for PrometheusMetricsLayer { pub struct PrometheusAccess { inner: A, scheme: String, + path_label: bool, +} + +impl PrometheusAccess { + fn get_path_label<'a>(&self, path: &'a str) -> &'a str { + if self.path_label { + extract_parent_path(path) + } else { + "" + } + } } impl Debug for PrometheusAccess { @@ -128,7 +148,7 @@ impl LayeredAccess for PrometheusAccess { } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label]) .inc(); @@ -146,7 +166,7 @@ impl LayeredAccess for PrometheusAccess { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label]) .inc(); @@ -176,7 +196,7 @@ impl LayeredAccess for PrometheusAccess { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label]) .inc(); @@ -206,7 +226,7 @@ impl LayeredAccess for PrometheusAccess { } async fn stat(&self, path: &str, args: OpStat) -> Result { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label]) .inc(); @@ -223,7 +243,7 @@ impl LayeredAccess for PrometheusAccess { } async fn delete(&self, path: &str, args: OpDelete) -> Result { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label]) .inc(); @@ -241,7 +261,7 @@ impl LayeredAccess for PrometheusAccess { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[&self.scheme, Operation::List.into_static(), path_label]) .inc(); @@ -277,7 +297,7 @@ impl LayeredAccess for PrometheusAccess { } async fn presign(&self, path: &str, args: OpPresign) -> Result { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label]) .inc(); @@ -295,7 +315,7 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[ &self.scheme, @@ -322,7 +342,7 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[ &self.scheme, @@ -363,7 +383,7 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[ &self.scheme, @@ -404,7 +424,7 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[ &self.scheme, @@ -429,7 +449,7 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[ &self.scheme, @@ -455,7 +475,7 @@ impl LayeredAccess for PrometheusAccess { } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - let path_label = extract_parent_path(path); + let path_label = self.get_path_label(path); REQUESTS_TOTAL .with_label_values(&[ &self.scheme, diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index 376e1941c589..c8f7ac893fa4 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -138,7 +138,7 @@ pub(crate) fn extract_parent_path(path: &str) -> &str { } /// Attaches instrument layers to the object store. -pub fn with_instrument_layers(object_store: ObjectStore) -> ObjectStore { +pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore { object_store .layer( LoggingLayer::default() @@ -148,7 +148,7 @@ pub fn with_instrument_layers(object_store: ObjectStore) -> ObjectStore { .expect("input error level must be valid"), ) .layer(TracingLayer) - .layer(PrometheusMetricsLayer) + .layer(PrometheusMetricsLayer::new(path_label)) } #[cfg(test)]