From 8305480c13143309a819c7ae0f083fe8e3f31461 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 18 Dec 2024 17:19:05 +0800 Subject: [PATCH] feat: bump opendal and switch prometheus layer to the upstream impl (#5179) * feat: bump opendal and switch prometheus layer to the upstream impl Signed-off-by: Ruihang Xia * remove unused files Signed-off-by: Ruihang Xia * fix tests Signed-off-by: Ruihang Xia * remove unused things Signed-off-by: Ruihang Xia * remove root dir on recovering cache Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 25 +- src/common/datasource/src/object_store/fs.rs | 2 +- src/common/datasource/src/object_store/s3.rs | 2 +- src/common/procedure/src/local/runner.rs | 8 +- src/datanode/src/error.rs | 15 +- src/datanode/src/store.rs | 5 +- src/file-engine/src/manifest.rs | 2 +- src/file-engine/src/region.rs | 6 +- src/metric-engine/src/test_util.rs | 4 +- src/mito2/src/cache/file_cache.rs | 4 +- src/mito2/src/engine/create_test.rs | 4 +- src/mito2/src/engine/drop_test.rs | 12 +- src/mito2/src/engine/open_test.rs | 4 +- src/mito2/src/manifest/tests/checkpoint.rs | 2 + src/mito2/src/sst/file_purger.rs | 6 +- src/mito2/src/worker/handle_open.rs | 2 +- src/object-store/Cargo.toml | 3 +- src/object-store/src/layers.rs | 33 +- .../src/layers/lru_cache/read_cache.rs | 9 +- src/object-store/src/layers/prometheus.rs | 584 ------------------ src/object-store/src/util.rs | 49 +- src/object-store/tests/object_store_test.rs | 46 +- 22 files changed, 119 insertions(+), 708 deletions(-) delete mode 100644 src/object-store/src/layers/prometheus.rs diff --git a/Cargo.lock b/Cargo.lock index a0225cf27dbe..fa8ba34d1a3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -866,18 +866,6 @@ dependencies = [ "rand", ] -[[package]] -name = "backon" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d67782c3f868daa71d3533538e98a8e13713231969def7536e8039606fc46bf0" -dependencies = [ - "fastrand", - "futures-core", - "pin-project", - "tokio", -] - [[package]] name = "backon" version = "1.2.0" @@ -2228,7 +2216,7 @@ version = "0.12.0" dependencies = [ "async-stream", "async-trait", - "backon 1.2.0", + "backon", "common-base", "common-error", "common-macro", @@ -7386,13 +7374,13 @@ checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" [[package]] name = "opendal" -version = "0.49.2" +version = "0.50.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b04d09b9822c2f75a1d2fc513a2c1279c70e91e7407936fffdf6a6976ec530a" +checksum = "cb28bb6c64e116ceaf8dd4e87099d3cfea4a58e85e62b104fef74c91afba0f44" dependencies = [ "anyhow", "async-trait", - "backon 0.4.4", + "backon", "base64 0.22.1", "bytes", "chrono", @@ -7405,6 +7393,7 @@ dependencies = [ "md-5", "once_cell", "percent-encoding", + "prometheus", "quick-xml 0.36.2", "reqsign", "reqwest", @@ -9387,9 +9376,9 @@ dependencies = [ [[package]] name = "reqsign" -version = "0.16.0" +version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa" +checksum = "eb0075a66c8bfbf4cc8b70dca166e722e1f55a3ea9250ecbb85f4d92a5f64149" dependencies = [ "anyhow", "async-trait", diff --git a/src/common/datasource/src/object_store/fs.rs b/src/common/datasource/src/object_store/fs.rs index f87311f517b7..5ffbbfa3148a 100644 --- a/src/common/datasource/src/object_store/fs.rs +++ b/src/common/datasource/src/object_store/fs.rs @@ -27,7 +27,7 @@ pub fn build_fs_backend(root: &str) -> Result { DefaultLoggingInterceptor, )) .layer(object_store::layers::TracingLayer) - .layer(object_store::layers::PrometheusMetricsLayer::new(true)) + .layer(object_store::layers::build_prometheus_metrics_layer(true)) .finish(); Ok(object_store) } diff --git a/src/common/datasource/src/object_store/s3.rs b/src/common/datasource/src/object_store/s3.rs index e141621b899b..0d83eb7a98b8 100644 --- a/src/common/datasource/src/object_store/s3.rs +++ b/src/common/datasource/src/object_store/s3.rs @@ -89,7 +89,7 @@ pub fn build_s3_backend( DefaultLoggingInterceptor, )) .layer(object_store::layers::TracingLayer) - .layer(object_store::layers::PrometheusMetricsLayer::new(true)) + .layer(object_store::layers::build_prometheus_metrics_layer(true)) .finish()) } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index c2d15001fba3..bf277a0e72e5 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -544,7 +544,7 @@ mod tests { use common_test_util::temp_dir::create_temp_dir; use futures_util::future::BoxFuture; use futures_util::FutureExt; - use object_store::ObjectStore; + use object_store::{EntryMode, ObjectStore}; use tokio::sync::mpsc; use super::*; @@ -578,7 +578,11 @@ mod tests { ) { let dir = proc_path!(procedure_store, "{procedure_id}/"); let lister = object_store.list(&dir).await.unwrap(); - let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect(); + let mut files_in_dir: Vec<_> = lister + .into_iter() + .filter(|x| x.metadata().mode() == EntryMode::FILE) + .map(|de| de.name().to_string()) + .collect(); files_in_dir.sort_unstable(); assert_eq!(files, files_in_dir); } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 9fbd46e16009..61a4eae12883 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -193,6 +193,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to build http client"))] + BuildHttpClient { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: reqwest::Error, + }, + #[snafu(display("Missing required field: {}", name))] MissingRequiredField { name: String, @@ -406,9 +414,10 @@ impl ErrorExt for Error { | MissingKvBackend { .. } | TomlFormat { .. } => StatusCode::InvalidArguments, - PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => { - StatusCode::Unexpected - } + PayloadNotExist { .. } + | Unexpected { .. } + | WatchAsyncTaskChange { .. } + | BuildHttpClient { .. } => StatusCode::Unexpected, AsyncTaskExecute { source, .. } => source.status_code(), diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index c78afe448e0c..52a1cba982e1 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -32,7 +32,7 @@ use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, O use snafu::prelude::*; use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; -use crate::error::{self, CreateDirSnafu, Result}; +use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result}; pub(crate) async fn new_raw_object_store( store: &ObjectStoreConfig, @@ -236,7 +236,8 @@ pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result builder.timeout(config.timeout) }; - HttpClient::build(http_builder).context(error::InitBackendSnafu) + let client = http_builder.build().context(BuildHttpClientSnafu)?; + Ok(HttpClient::with(client)) } struct PrintDetailedError; diff --git a/src/file-engine/src/manifest.rs b/src/file-engine/src/manifest.rs index 6310c3ccb912..6bf5ee104ba2 100644 --- a/src/file-engine/src/manifest.rs +++ b/src/file-engine/src/manifest.rs @@ -46,7 +46,7 @@ impl FileRegionManifest { pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> { let path = ®ion_manifest_path(region_dir); let exist = object_store - .is_exist(path) + .exists(path) .await .context(CheckObjectSnafu { path })?; ensure!(!exist, ManifestExistsSnafu { path }); diff --git a/src/file-engine/src/region.rs b/src/file-engine/src/region.rs index a5af6822285e..673d352b1e63 100644 --- a/src/file-engine/src/region.rs +++ b/src/file-engine/src/region.rs @@ -130,7 +130,7 @@ mod tests { assert_eq!(region.metadata.primary_key, vec![1]); assert!(object_store - .is_exist("create_region_dir/manifest/_file_manifest") + .exists("create_region_dir/manifest/_file_manifest") .await .unwrap()); @@ -198,13 +198,13 @@ mod tests { .unwrap(); assert!(object_store - .is_exist("drop_region_dir/manifest/_file_manifest") + .exists("drop_region_dir/manifest/_file_manifest") .await .unwrap()); FileRegion::drop(®ion, &object_store).await.unwrap(); assert!(!object_store - .is_exist("drop_region_dir/manifest/_file_manifest") + .exists("drop_region_dir/manifest/_file_manifest") .await .unwrap()); diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index c5f7a2b4a32c..d0f8cf5028e6 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -313,12 +313,12 @@ mod test { let region_dir = "test_metric_region"; // assert metadata region's dir let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR); - let exist = object_store.is_exist(&metadata_region_dir).await.unwrap(); + let exist = object_store.exists(&metadata_region_dir).await.unwrap(); assert!(exist); // assert data region's dir let data_region_dir = join_dir(region_dir, DATA_REGION_SUBDIR); - let exist = object_store.is_exist(&data_region_dir).await.unwrap(); + let exist = object_store.exists(&data_region_dir).await.unwrap(); assert!(exist); // check mito engine diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 9e5742ca0410..eb112530cad7 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -286,7 +286,7 @@ impl FileCache { } async fn get_reader(&self, file_path: &str) -> object_store::Result> { - if self.local_store.is_exist(file_path).await? { + if self.local_store.exists(file_path).await? { Ok(Some(self.local_store.reader(file_path).await?)) } else { Ok(None) @@ -480,7 +480,7 @@ mod tests { cache.memory_index.run_pending_tasks().await; // The file also not exists. - assert!(!local_store.is_exist(&file_path).await.unwrap()); + assert!(!local_store.exists(&file_path).await.unwrap()); assert_eq!(0, cache.memory_index.weighted_size()); } diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index 48b04dc86d91..4bcc55934034 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -192,12 +192,12 @@ async fn test_engine_create_with_custom_store() { assert!(object_store_manager .find("Gcs") .unwrap() - .is_exist(region_dir) + .exists(region_dir) .await .unwrap()); assert!(!object_store_manager .default_object_store() - .is_exist(region_dir) + .exists(region_dir) .await .unwrap()); } diff --git a/src/mito2/src/engine/drop_test.rs b/src/mito2/src/engine/drop_test.rs index 7d719f778be9..5d0c5afbf06e 100644 --- a/src/mito2/src/engine/drop_test.rs +++ b/src/mito2/src/engine/drop_test.rs @@ -71,7 +71,7 @@ async fn test_engine_drop_region() { assert!(!env .get_object_store() .unwrap() - .is_exist(&join_path(®ion_dir, DROPPING_MARKER_FILE)) + .exists(&join_path(®ion_dir, DROPPING_MARKER_FILE)) .await .unwrap()); @@ -93,7 +93,7 @@ async fn test_engine_drop_region() { listener.wait().await; let object_store = env.get_object_store().unwrap(); - assert!(!object_store.is_exist(®ion_dir).await.unwrap()); + assert!(!object_store.exists(®ion_dir).await.unwrap()); } #[tokio::test] @@ -167,13 +167,13 @@ async fn test_engine_drop_region_for_custom_store() { assert!(object_store_manager .find("Gcs") .unwrap() - .is_exist(&custom_region_dir) + .exists(&custom_region_dir) .await .unwrap()); assert!(object_store_manager .find("default") .unwrap() - .is_exist(&global_region_dir) + .exists(&global_region_dir) .await .unwrap()); @@ -190,13 +190,13 @@ async fn test_engine_drop_region_for_custom_store() { assert!(!object_store_manager .find("Gcs") .unwrap() - .is_exist(&custom_region_dir) + .exists(&custom_region_dir) .await .unwrap()); assert!(object_store_manager .find("default") .unwrap() - .is_exist(&global_region_dir) + .exists(&global_region_dir) .await .unwrap()); } diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index 6752bbd04b12..a3b51514c287 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -228,13 +228,13 @@ async fn test_engine_region_open_with_custom_store() { let object_store_manager = env.get_object_store_manager().unwrap(); assert!(!object_store_manager .default_object_store() - .is_exist(region.access_layer.region_dir()) + .exists(region.access_layer.region_dir()) .await .unwrap()); assert!(object_store_manager .find("Gcs") .unwrap() - .is_exist(region.access_layer.region_dir()) + .exists(region.access_layer.region_dir()) .await .unwrap()); } diff --git a/src/mito2/src/manifest/tests/checkpoint.rs b/src/mito2/src/manifest/tests/checkpoint.rs index 692f40422b17..6f2c92bc5e09 100644 --- a/src/mito2/src/manifest/tests/checkpoint.rs +++ b/src/mito2/src/manifest/tests/checkpoint.rs @@ -84,6 +84,7 @@ async fn manager_without_checkpoint() { // check files let mut expected = vec![ + "/", "00000000000000000010.json", "00000000000000000009.json", "00000000000000000008.json", @@ -130,6 +131,7 @@ async fn manager_with_checkpoint_distance_1() { // check files let mut expected = vec![ + "/", "00000000000000000009.checkpoint", "00000000000000000010.checkpoint", "00000000000000000010.json", diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 76c7a7150328..81251c91a564 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -185,7 +185,7 @@ mod tests { scheduler.stop(true).await.unwrap(); - assert!(!object_store.is_exist(&path).await.unwrap()); + assert!(!object_store.exists(&path).await.unwrap()); } #[tokio::test] @@ -247,7 +247,7 @@ mod tests { scheduler.stop(true).await.unwrap(); - assert!(!object_store.is_exist(&path).await.unwrap()); - assert!(!object_store.is_exist(&index_path).await.unwrap()); + assert!(!object_store.exists(&path).await.unwrap()); + assert!(!object_store.exists(&index_path).await.unwrap()); } } diff --git a/src/mito2/src/worker/handle_open.rs b/src/mito2/src/worker/handle_open.rs index d4a13a134597..01eaf1765224 100644 --- a/src/mito2/src/worker/handle_open.rs +++ b/src/mito2/src/worker/handle_open.rs @@ -51,7 +51,7 @@ impl RegionWorkerLoop { // Check if this region is pending drop. And clean the entire dir if so. if !self.dropping_regions.is_region_exists(region_id) && object_store - .is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE)) + .exists(&join_path(&request.region_dir, DROPPING_MARKER_FILE)) .await .context(OpenDalSnafu)? { diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 72e0e2bfbe46..b82be7376a72 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -17,8 +17,9 @@ futures.workspace = true lazy_static.workspace = true md5 = "0.7" moka = { workspace = true, features = ["future"] } -opendal = { version = "0.49", features = [ +opendal = { version = "0.50", features = [ "layers-tracing", + "layers-prometheus", "services-azblob", "services-fs", "services-gcs", diff --git a/src/object-store/src/layers.rs b/src/object-store/src/layers.rs index b2145aa6b0e5..20108ab63c52 100644 --- a/src/object-store/src/layers.rs +++ b/src/object-store/src/layers.rs @@ -13,8 +13,37 @@ // limitations under the License. mod lru_cache; -mod prometheus; pub use lru_cache::*; pub use opendal::layers::*; -pub use prometheus::PrometheusMetricsLayer; +pub use prometheus::build_prometheus_metrics_layer; + +mod prometheus { + use std::sync::{Mutex, OnceLock}; + + use opendal::layers::PrometheusLayer; + + static PROMETHEUS_LAYER: OnceLock> = OnceLock::new(); + + pub fn build_prometheus_metrics_layer(with_path_label: bool) -> PrometheusLayer { + PROMETHEUS_LAYER + .get_or_init(|| { + // This logical tries to extract parent path from the object storage operation + // the function also relies on assumption that the region path is built from + // pattern `/catalog/schema/table_id/....` + // + // We'll get the data/catalog/schema from path. + let path_level = if with_path_label { 3 } else { 0 }; + + let layer = PrometheusLayer::builder() + .path_label(path_level) + .register_default() + .unwrap(); + + Mutex::new(layer) + }) + .lock() + .unwrap() + .clone() + } +} diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index f88b36784d15..874b17280d9c 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -156,9 +156,12 @@ impl ReadCache { let size = entry.metadata().content_length(); OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64); - self.mem_cache - .insert(read_key.to_string(), ReadResult::Success(size as u32)) - .await; + // ignore root path + if entry.path() != "/" { + self.mem_cache + .insert(read_key.to_string(), ReadResult::Success(size as u32)) + .await; + } } Ok(self.cache_stat().await) diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs deleted file mode 100644 index fef83a91468a..000000000000 --- a/src/object-store/src/layers/prometheus.rs +++ /dev/null @@ -1,584 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! code originally from , make a tiny change to avoid crash in multi thread env - -use std::fmt::{Debug, Formatter}; - -use common_telemetry::debug; -use lazy_static::lazy_static; -use opendal::raw::*; -use opendal::{Buffer, ErrorKind}; -use prometheus::{ - exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec, - Histogram, HistogramTimer, HistogramVec, IntCounterVec, -}; - -use crate::util::extract_parent_path; - -type Result = std::result::Result; - -lazy_static! { - static ref REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!( - "opendal_requests_total", - "Total times of all kinds of operation being called", - &["scheme", "operation", "path"], - ) - .unwrap(); - static ref REQUESTS_DURATION_SECONDS: HistogramVec = register_histogram_vec!( - histogram_opts!( - "opendal_requests_duration_seconds", - "Histogram of the time spent on specific operation", - exponential_buckets(0.01, 2.0, 16).unwrap() - ), - &["scheme", "operation", "path"] - ) - .unwrap(); - static ref BYTES_TOTAL: HistogramVec = register_histogram_vec!( - histogram_opts!( - "opendal_bytes_total", - "Total size of sync or async Read/Write", - exponential_buckets(0.01, 2.0, 16).unwrap() - ), - &["scheme", "operation", "path"] - ) - .unwrap(); -} - -#[inline] -fn increment_errors_total(op: Operation, kind: ErrorKind) { - debug!( - "Prometheus statistics metrics error, operation {} error {}", - op.into_static(), - kind.into_static() - ); -} - -/// Please refer to [prometheus](https://docs.rs/prometheus) for every operation. -/// -/// # Prometheus Metrics -/// -/// In this section, we will introduce three metrics that are currently being exported by opendal. These metrics are essential for understanding the behavior and performance of opendal. -/// -/// -/// | Metric Name | Type | Description | Labels | -/// |-----------------------------------|-----------|------------------------------------------------------|---------------------| -/// | opendal_requests_total | Counter | Total times of all kinds of operation being called | scheme, operation | -/// | opendal_requests_duration_seconds | Histogram | Histogram of the time spent on specific operation | scheme, operation | -/// | opendal_bytes_total | Histogram | Total size of sync or async Read/Write | scheme, operation | -/// -/// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/). -/// -/// # Histogram Configuration -/// -/// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration. -#[derive(Default, Debug, Clone)] -pub struct PrometheusMetricsLayer { - pub path_label: bool, -} - -impl PrometheusMetricsLayer { - pub fn new(path_label: bool) -> Self { - Self { path_label } - } -} - -impl Layer for PrometheusMetricsLayer { - type LayeredAccess = PrometheusAccess; - - fn layer(&self, inner: A) -> Self::LayeredAccess { - let meta = inner.info(); - let scheme = meta.scheme(); - - PrometheusAccess { - inner, - scheme: scheme.to_string(), - path_label: self.path_label, - } - } -} - -#[derive(Clone)] -pub struct PrometheusAccess { - inner: A, - scheme: String, - path_label: bool, -} - -impl PrometheusAccess { - fn get_path_label<'a>(&self, path: &'a str) -> &'a str { - if self.path_label { - extract_parent_path(path) - } else { - "" - } - } -} - -impl Debug for PrometheusAccess { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PrometheusAccessor") - .field("inner", &self.inner) - .finish_non_exhaustive() - } -} - -impl LayeredAccess for PrometheusAccess { - type Inner = A; - type Reader = PrometheusMetricWrapper; - type BlockingReader = PrometheusMetricWrapper; - type Writer = PrometheusMetricWrapper; - type BlockingWriter = PrometheusMetricWrapper; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; - - fn inner(&self) -> &Self::Inner { - &self.inner - } - - async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::CreateDir.into_static(), path_label]) - .start_timer(); - let create_res = self.inner.create_dir(path, args).await; - - timer.observe_duration(); - create_res.inspect_err(|e| { - increment_errors_total(Operation::CreateDir, e.kind()); - }) - } - - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Read.into_static(), path_label]) - .start_timer(); - - let (rp, r) = self.inner.read(path, args).await.inspect_err(|e| { - increment_errors_total(Operation::Read, e.kind()); - })?; - - Ok(( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Read, - BYTES_TOTAL.with_label_values(&[ - &self.scheme, - Operation::Read.into_static(), - path_label, - ]), - timer, - ), - )) - } - - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Write.into_static(), path_label]) - .start_timer(); - - let (rp, r) = self.inner.write(path, args).await.inspect_err(|e| { - increment_errors_total(Operation::Write, e.kind()); - })?; - - Ok(( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Write, - BYTES_TOTAL.with_label_values(&[ - &self.scheme, - Operation::Write.into_static(), - path_label, - ]), - timer, - ), - )) - } - - async fn stat(&self, path: &str, args: OpStat) -> Result { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label]) - .inc(); - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Stat.into_static(), path_label]) - .start_timer(); - - let stat_res = self.inner.stat(path, args).await; - timer.observe_duration(); - stat_res.inspect_err(|e| { - increment_errors_total(Operation::Stat, e.kind()); - }) - } - - async fn delete(&self, path: &str, args: OpDelete) -> Result { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Delete.into_static(), path_label]) - .start_timer(); - - let delete_res = self.inner.delete(path, args).await; - timer.observe_duration(); - delete_res.inspect_err(|e| { - increment_errors_total(Operation::Delete, e.kind()); - }) - } - - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::List.into_static(), path_label]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::List.into_static(), path_label]) - .start_timer(); - - let list_res = self.inner.list(path, args).await; - - timer.observe_duration(); - list_res.inspect_err(|e| { - increment_errors_total(Operation::List, e.kind()); - }) - } - - async fn batch(&self, args: OpBatch) -> Result { - REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Batch.into_static(), ""]) - .start_timer(); - let result = self.inner.batch(args).await; - - timer.observe_duration(); - result.inspect_err(|e| { - increment_errors_total(Operation::Batch, e.kind()); - }) - } - - async fn presign(&self, path: &str, args: OpPresign) -> Result { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Presign.into_static(), path_label]) - .start_timer(); - let result = self.inner.presign(path, args).await; - timer.observe_duration(); - - result.inspect_err(|e| { - increment_errors_total(Operation::Presign, e.kind()); - }) - } - - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[ - &self.scheme, - Operation::BlockingCreateDir.into_static(), - path_label, - ]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[ - &self.scheme, - Operation::BlockingCreateDir.into_static(), - path_label, - ]) - .start_timer(); - let result = self.inner.blocking_create_dir(path, args); - - timer.observe_duration(); - - result.inspect_err(|e| { - increment_errors_total(Operation::BlockingCreateDir, e.kind()); - }) - } - - fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[ - &self.scheme, - Operation::BlockingRead.into_static(), - path_label, - ]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[ - &self.scheme, - Operation::BlockingRead.into_static(), - path_label, - ]) - .start_timer(); - - self.inner - .blocking_read(path, args) - .map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::BlockingRead, - BYTES_TOTAL.with_label_values(&[ - &self.scheme, - Operation::BlockingRead.into_static(), - path_label, - ]), - timer, - ), - ) - }) - .inspect_err(|e| { - increment_errors_total(Operation::BlockingRead, e.kind()); - }) - } - - fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[ - &self.scheme, - Operation::BlockingWrite.into_static(), - path_label, - ]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[ - &self.scheme, - Operation::BlockingWrite.into_static(), - path_label, - ]) - .start_timer(); - - self.inner - .blocking_write(path, args) - .map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::BlockingWrite, - BYTES_TOTAL.with_label_values(&[ - &self.scheme, - Operation::BlockingWrite.into_static(), - path_label, - ]), - timer, - ), - ) - }) - .inspect_err(|e| { - increment_errors_total(Operation::BlockingWrite, e.kind()); - }) - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[ - &self.scheme, - Operation::BlockingStat.into_static(), - path_label, - ]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[ - &self.scheme, - Operation::BlockingStat.into_static(), - path_label, - ]) - .start_timer(); - let result = self.inner.blocking_stat(path, args); - timer.observe_duration(); - result.inspect_err(|e| { - increment_errors_total(Operation::BlockingStat, e.kind()); - }) - } - - fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[ - &self.scheme, - Operation::BlockingDelete.into_static(), - path_label, - ]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[ - &self.scheme, - Operation::BlockingDelete.into_static(), - path_label, - ]) - .start_timer(); - let result = self.inner.blocking_delete(path, args); - timer.observe_duration(); - - result.inspect_err(|e| { - increment_errors_total(Operation::BlockingDelete, e.kind()); - }) - } - - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - let path_label = self.get_path_label(path); - REQUESTS_TOTAL - .with_label_values(&[ - &self.scheme, - Operation::BlockingList.into_static(), - path_label, - ]) - .inc(); - - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[ - &self.scheme, - Operation::BlockingList.into_static(), - path_label, - ]) - .start_timer(); - let result = self.inner.blocking_list(path, args); - timer.observe_duration(); - - result.inspect_err(|e| { - increment_errors_total(Operation::BlockingList, e.kind()); - }) - } -} - -pub struct PrometheusMetricWrapper { - inner: R, - - op: Operation, - bytes_counter: Histogram, - _requests_duration_timer: HistogramTimer, - bytes: u64, -} - -impl Drop for PrometheusMetricWrapper { - fn drop(&mut self) { - self.bytes_counter.observe(self.bytes as f64); - } -} - -impl PrometheusMetricWrapper { - fn new( - inner: R, - op: Operation, - bytes_counter: Histogram, - requests_duration_timer: HistogramTimer, - ) -> Self { - Self { - inner, - op, - bytes_counter, - _requests_duration_timer: requests_duration_timer, - bytes: 0, - } - } -} - -impl oio::Read for PrometheusMetricWrapper { - async fn read(&mut self) -> Result { - self.inner.read().await.inspect_err(|err| { - increment_errors_total(self.op, err.kind()); - }) - } -} - -impl oio::BlockingRead for PrometheusMetricWrapper { - fn read(&mut self) -> opendal::Result { - self.inner.read().inspect_err(|err| { - increment_errors_total(self.op, err.kind()); - }) - } -} - -impl oio::Write for PrometheusMetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result<()> { - let bytes = bs.len(); - match self.inner.write(bs).await { - Ok(_) => { - self.bytes += bytes as u64; - Ok(()) - } - Err(err) => { - increment_errors_total(self.op, err.kind()); - Err(err) - } - } - } - - async fn close(&mut self) -> Result<()> { - self.inner.close().await.inspect_err(|err| { - increment_errors_total(self.op, err.kind()); - }) - } - - async fn abort(&mut self) -> Result<()> { - self.inner.close().await.inspect_err(|err| { - increment_errors_total(self.op, err.kind()); - }) - } -} - -impl oio::BlockingWrite for PrometheusMetricWrapper { - fn write(&mut self, bs: Buffer) -> Result<()> { - let bytes = bs.len(); - self.inner - .write(bs) - .map(|_| { - self.bytes += bytes as u64; - }) - .inspect_err(|err| { - increment_errors_total(self.op, err.kind()); - }) - } - - fn close(&mut self) -> Result<()> { - self.inner.close().inspect_err(|err| { - increment_errors_total(self.op, err.kind()); - }) - } -} diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index fc0a031ab953..271da33e853c 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -15,19 +15,12 @@ use std::fmt::Display; use common_telemetry::{debug, error, trace}; -use futures::TryStreamExt; use opendal::layers::{LoggingInterceptor, LoggingLayer, TracingLayer}; use opendal::raw::{AccessorInfo, Operation}; -use opendal::{Entry, ErrorKind, Lister}; +use opendal::ErrorKind; -use crate::layers::PrometheusMetricsLayer; use crate::ObjectStore; -/// Collect all entries from the [Lister]. -pub async fn collect(stream: Lister) -> Result, opendal::Error> { - stream.try_collect::>().await -} - /// Join two paths and normalize the output dir. /// /// The output dir is always ends with `/`. e.g. @@ -127,26 +120,12 @@ pub fn normalize_path(path: &str) -> String { p } -// This logical tries to extract parent path from the object storage operation -// the function also relies on assumption that the region path is built from -// pattern `/catalog/schema/table_id/....` -// -// this implementation tries to extract at most 3 levels of parent path -pub(crate) fn extract_parent_path(path: &str) -> &str { - // split the path into `catalog`, `schema` and others - path.char_indices() - .filter(|&(_, c)| c == '/') - // we get the data/catalog/schema from path, split at the 3rd / - .nth(2) - .map_or(path, |(i, _)| &path[..i]) -} - /// Attaches instrument layers to the object store. pub fn with_instrument_layers(object_store: ObjectStore, path_label: bool) -> ObjectStore { object_store .layer(LoggingLayer::new(DefaultLoggingInterceptor)) .layer(TracingLayer) - .layer(PrometheusMetricsLayer::new(path_label)) + .layer(crate::layers::build_prometheus_metrics_layer(path_label)) } static LOGGING_TARGET: &str = "opendal::services"; @@ -263,28 +242,4 @@ mod tests { assert_eq!("/abc", join_path("//", "/abc")); assert_eq!("abc/def", join_path("abc/", "//def")); } - - #[test] - fn test_path_extraction() { - assert_eq!( - "data/greptime/public", - extract_parent_path("data/greptime/public/1024/1024_0000000000/") - ); - - assert_eq!( - "data/greptime/public", - extract_parent_path("data/greptime/public/1/") - ); - - assert_eq!( - "data/greptime/public", - extract_parent_path("data/greptime/public") - ); - - assert_eq!("data/greptime/", extract_parent_path("data/greptime/")); - - assert_eq!("data/", extract_parent_path("data/")); - - assert_eq!("/", extract_parent_path("/")); - } } diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 497decffabfc..847a48fe04a0 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -66,23 +66,23 @@ async fn test_object_list(store: &ObjectStore) -> Result<()> { // List objects let entries = store.list("/").await?; - assert_eq!(3, entries.len()); + assert_eq!(4, entries.len()); // plus the dir itself store.delete(p1).await?; store.delete(p3).await?; // List objects again - // Only o2 is exists + // Only o2 and root exist let entries = store.list("/").await?; - assert_eq!(1, entries.len()); - assert_eq!(p2, entries.first().unwrap().path()); + assert_eq!(2, entries.len()); + assert_eq!(p2, entries[1].path()); let content = store.read(p2).await?; assert_eq!("Hello, object2!", String::from_utf8(content.to_vec())?); store.delete(p2).await?; let entries = store.list("/").await?; - assert!(entries.is_empty()); + assert_eq!(1, entries.len()); assert!(store.read(p1).await.is_err()); assert!(store.read(p2).await.is_err()); @@ -252,7 +252,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { for file_name in file_names { - assert!(cache_layer.contains_file(file_name).await); + assert!(cache_layer.contains_file(file_name).await, "{file_name}"); } } @@ -264,7 +264,9 @@ async fn assert_cache_files( let (_, mut lister) = store.list("/", OpList::default()).await?; let mut objects = vec![]; while let Some(e) = lister.next().await? { - objects.push(e); + if e.mode() == EntryMode::FILE { + objects.push(e); + } } // compare the cache file with the expected cache file; ignore orders @@ -332,9 +334,9 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_cache_files( &cache_store, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-", ], &["Hello, object1!", "object2!", "Hello, object2!"], ) @@ -342,9 +344,9 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_lru_cache( &cache_layer, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-14", - "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-14", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=7-", + "ecfe0dce85de452eb0a325158e7bfb75.cache-bytes=0-", ], ) .await; @@ -355,13 +357,13 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_eq!(cache_layer.read_cache_stat().await, (1, 15)); assert_cache_files( &cache_store, - &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"], + &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"], &["Hello, object1!"], ) .await?; assert_lru_cache( &cache_layer, - &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14"], + &["6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-"], ) .await; @@ -388,8 +390,8 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_cache_files( &cache_store, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], &["Hello, object1!", "Hello, object3!", "Hello"], @@ -398,8 +400,8 @@ async fn test_object_store_cache_policy() -> Result<()> { assert_lru_cache( &cache_layer, &[ - "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", + "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=0-", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], ) @@ -416,7 +418,7 @@ async fn test_object_store_cache_policy() -> Result<()> { &cache_store, &[ "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], &["ello, object1!", "Hello, object3!", "Hello"], @@ -426,7 +428,7 @@ async fn test_object_store_cache_policy() -> Result<()> { &cache_layer, &[ "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], ) @@ -448,7 +450,7 @@ async fn test_object_store_cache_policy() -> Result<()> { &cache_layer, &[ "6d29752bdc6e4d5ba5483b96615d6c48.cache-bytes=1-14", - "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-14", + "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-", "a8b1dc21e24bb55974e3e68acc77ed52.cache-bytes=0-4", ], )