Skip to content

Commit

Permalink
feat: bump opendal and switch prometheus layer to the upstream impl (#…
Browse files Browse the repository at this point in the history
…5179)

* feat: bump opendal and switch prometheus layer to the upstream impl

Signed-off-by: Ruihang Xia <[email protected]>

* remove unused files

Signed-off-by: Ruihang Xia <[email protected]>

* fix tests

Signed-off-by: Ruihang Xia <[email protected]>

* remove unused things

Signed-off-by: Ruihang Xia <[email protected]>

* remove root dir on recovering cache

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Dec 18, 2024
1 parent 218236c commit 8305480
Show file tree
Hide file tree
Showing 22 changed files with 119 additions and 708 deletions.
25 changes: 7 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/datasource/src/object_store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn build_fs_backend(root: &str) -> Result<ObjectStore> {
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.layer(object_store::layers::build_prometheus_metrics_layer(true))
.finish();
Ok(object_store)
}
2 changes: 1 addition & 1 deletion src/common/datasource/src/object_store/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub fn build_s3_backend(
DefaultLoggingInterceptor,
))
.layer(object_store::layers::TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer::new(true))
.layer(object_store::layers::build_prometheus_metrics_layer(true))
.finish())
}

Expand Down
8 changes: 6 additions & 2 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ mod tests {
use common_test_util::temp_dir::create_temp_dir;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use object_store::ObjectStore;
use object_store::{EntryMode, ObjectStore};
use tokio::sync::mpsc;

use super::*;
Expand Down Expand Up @@ -578,7 +578,11 @@ mod tests {
) {
let dir = proc_path!(procedure_store, "{procedure_id}/");
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister.into_iter().map(|de| de.name().to_string()).collect();
let mut files_in_dir: Vec<_> = lister
.into_iter()
.filter(|x| x.metadata().mode() == EntryMode::FILE)
.map(|de| de.name().to_string())
.collect();
files_in_dir.sort_unstable();
assert_eq!(files, files_in_dir);
}
Expand Down
15 changes: 12 additions & 3 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to build http client"))]
BuildHttpClient {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: reqwest::Error,
},

#[snafu(display("Missing required field: {}", name))]
MissingRequiredField {
name: String,
Expand Down Expand Up @@ -406,9 +414,10 @@ impl ErrorExt for Error {
| MissingKvBackend { .. }
| TomlFormat { .. } => StatusCode::InvalidArguments,

PayloadNotExist { .. } | Unexpected { .. } | WatchAsyncTaskChange { .. } => {
StatusCode::Unexpected
}
PayloadNotExist { .. }
| Unexpected { .. }
| WatchAsyncTaskChange { .. }
| BuildHttpClient { .. } => StatusCode::Unexpected,

AsyncTaskExecute { source, .. } => source.status_code(),

Expand Down
5 changes: 3 additions & 2 deletions src/datanode/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, O
use snafu::prelude::*;

use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
use crate::error::{self, CreateDirSnafu, Result};
use crate::error::{self, BuildHttpClientSnafu, CreateDirSnafu, Result};

pub(crate) async fn new_raw_object_store(
store: &ObjectStoreConfig,
Expand Down Expand Up @@ -236,7 +236,8 @@ pub(crate) fn build_http_client(config: &HttpClientConfig) -> Result<HttpClient>
builder.timeout(config.timeout)
};

HttpClient::build(http_builder).context(error::InitBackendSnafu)
let client = http_builder.build().context(BuildHttpClientSnafu)?;
Ok(HttpClient::with(client))
}
struct PrintDetailedError;

Expand Down
2 changes: 1 addition & 1 deletion src/file-engine/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl FileRegionManifest {
pub async fn store(&self, region_dir: &str, object_store: &ObjectStore) -> Result<()> {
let path = &region_manifest_path(region_dir);
let exist = object_store
.is_exist(path)
.exists(path)
.await
.context(CheckObjectSnafu { path })?;
ensure!(!exist, ManifestExistsSnafu { path });
Expand Down
6 changes: 3 additions & 3 deletions src/file-engine/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ mod tests {
assert_eq!(region.metadata.primary_key, vec![1]);

assert!(object_store
.is_exist("create_region_dir/manifest/_file_manifest")
.exists("create_region_dir/manifest/_file_manifest")
.await
.unwrap());

Expand Down Expand Up @@ -198,13 +198,13 @@ mod tests {
.unwrap();

assert!(object_store
.is_exist("drop_region_dir/manifest/_file_manifest")
.exists("drop_region_dir/manifest/_file_manifest")
.await
.unwrap());

FileRegion::drop(&region, &object_store).await.unwrap();
assert!(!object_store
.is_exist("drop_region_dir/manifest/_file_manifest")
.exists("drop_region_dir/manifest/_file_manifest")
.await
.unwrap());

Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ mod test {
let region_dir = "test_metric_region";
// assert metadata region's dir
let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR);
let exist = object_store.is_exist(&metadata_region_dir).await.unwrap();
let exist = object_store.exists(&metadata_region_dir).await.unwrap();
assert!(exist);

// assert data region's dir
let data_region_dir = join_dir(region_dir, DATA_REGION_SUBDIR);
let exist = object_store.is_exist(&data_region_dir).await.unwrap();
let exist = object_store.exists(&data_region_dir).await.unwrap();
assert!(exist);

// check mito engine
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl FileCache {
}

async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
if self.local_store.is_exist(file_path).await? {
if self.local_store.exists(file_path).await? {
Ok(Some(self.local_store.reader(file_path).await?))
} else {
Ok(None)
Expand Down Expand Up @@ -480,7 +480,7 @@ mod tests {
cache.memory_index.run_pending_tasks().await;

// The file also not exists.
assert!(!local_store.is_exist(&file_path).await.unwrap());
assert!(!local_store.exists(&file_path).await.unwrap());
assert_eq!(0, cache.memory_index.weighted_size());
}

Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/create_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ async fn test_engine_create_with_custom_store() {
assert!(object_store_manager
.find("Gcs")
.unwrap()
.is_exist(region_dir)
.exists(region_dir)
.await
.unwrap());
assert!(!object_store_manager
.default_object_store()
.is_exist(region_dir)
.exists(region_dir)
.await
.unwrap());
}
Expand Down
12 changes: 6 additions & 6 deletions src/mito2/src/engine/drop_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn test_engine_drop_region() {
assert!(!env
.get_object_store()
.unwrap()
.is_exist(&join_path(&region_dir, DROPPING_MARKER_FILE))
.exists(&join_path(&region_dir, DROPPING_MARKER_FILE))
.await
.unwrap());

Expand All @@ -93,7 +93,7 @@ async fn test_engine_drop_region() {
listener.wait().await;

let object_store = env.get_object_store().unwrap();
assert!(!object_store.is_exist(&region_dir).await.unwrap());
assert!(!object_store.exists(&region_dir).await.unwrap());
}

#[tokio::test]
Expand Down Expand Up @@ -167,13 +167,13 @@ async fn test_engine_drop_region_for_custom_store() {
assert!(object_store_manager
.find("Gcs")
.unwrap()
.is_exist(&custom_region_dir)
.exists(&custom_region_dir)
.await
.unwrap());
assert!(object_store_manager
.find("default")
.unwrap()
.is_exist(&global_region_dir)
.exists(&global_region_dir)
.await
.unwrap());

Expand All @@ -190,13 +190,13 @@ async fn test_engine_drop_region_for_custom_store() {
assert!(!object_store_manager
.find("Gcs")
.unwrap()
.is_exist(&custom_region_dir)
.exists(&custom_region_dir)
.await
.unwrap());
assert!(object_store_manager
.find("default")
.unwrap()
.is_exist(&global_region_dir)
.exists(&global_region_dir)
.await
.unwrap());
}
4 changes: 2 additions & 2 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,13 @@ async fn test_engine_region_open_with_custom_store() {
let object_store_manager = env.get_object_store_manager().unwrap();
assert!(!object_store_manager
.default_object_store()
.is_exist(region.access_layer.region_dir())
.exists(region.access_layer.region_dir())
.await
.unwrap());
assert!(object_store_manager
.find("Gcs")
.unwrap()
.is_exist(region.access_layer.region_dir())
.exists(region.access_layer.region_dir())
.await
.unwrap());
}
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/manifest/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ async fn manager_without_checkpoint() {

// check files
let mut expected = vec![
"/",
"00000000000000000010.json",
"00000000000000000009.json",
"00000000000000000008.json",
Expand Down Expand Up @@ -130,6 +131,7 @@ async fn manager_with_checkpoint_distance_1() {

// check files
let mut expected = vec![
"/",
"00000000000000000009.checkpoint",
"00000000000000000010.checkpoint",
"00000000000000000010.json",
Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/sst/file_purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod tests {

scheduler.stop(true).await.unwrap();

assert!(!object_store.is_exist(&path).await.unwrap());
assert!(!object_store.exists(&path).await.unwrap());
}

#[tokio::test]
Expand Down Expand Up @@ -247,7 +247,7 @@ mod tests {

scheduler.stop(true).await.unwrap();

assert!(!object_store.is_exist(&path).await.unwrap());
assert!(!object_store.is_exist(&index_path).await.unwrap());
assert!(!object_store.exists(&path).await.unwrap());
assert!(!object_store.exists(&index_path).await.unwrap());
}
}
2 changes: 1 addition & 1 deletion src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Check if this region is pending drop. And clean the entire dir if so.
if !self.dropping_regions.is_region_exists(region_id)
&& object_store
.is_exist(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
.exists(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
.await
.context(OpenDalSnafu)?
{
Expand Down
3 changes: 2 additions & 1 deletion src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.49", features = [
opendal = { version = "0.50", features = [
"layers-tracing",
"layers-prometheus",
"services-azblob",
"services-fs",
"services-gcs",
Expand Down
33 changes: 31 additions & 2 deletions src/object-store/src/layers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,37 @@
// limitations under the License.

mod lru_cache;
mod prometheus;

pub use lru_cache::*;
pub use opendal::layers::*;
pub use prometheus::PrometheusMetricsLayer;
pub use prometheus::build_prometheus_metrics_layer;

mod prometheus {
use std::sync::{Mutex, OnceLock};

use opendal::layers::PrometheusLayer;

static PROMETHEUS_LAYER: OnceLock<Mutex<PrometheusLayer>> = OnceLock::new();

pub fn build_prometheus_metrics_layer(with_path_label: bool) -> PrometheusLayer {
PROMETHEUS_LAYER
.get_or_init(|| {
// This logical tries to extract parent path from the object storage operation
// the function also relies on assumption that the region path is built from
// pattern `<data|index>/catalog/schema/table_id/....`
//
// We'll get the data/catalog/schema from path.
let path_level = if with_path_label { 3 } else { 0 };

let layer = PrometheusLayer::builder()
.path_label(path_level)
.register_default()
.unwrap();

Mutex::new(layer)
})
.lock()
.unwrap()
.clone()
}
}
Loading

0 comments on commit 8305480

Please sign in to comment.