From 0d8231329fdb4ad518ac6876b440f1ded0969dd4 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 3 Jan 2024 09:22:17 +0000 Subject: [PATCH] feat(inverted_index): implement apply for SstIndexApplier Signed-off-by: Zhenchi --- Cargo.lock | 2 + Cargo.toml | 1 + .../src/inverted_index/search/index_apply.rs | 9 +- .../search/index_apply/predicates_apply.rs | 17 +- src/mito2/Cargo.toml | 2 + src/mito2/src/access_layer.rs | 12 +- src/mito2/src/error.rs | 33 ++- src/mito2/src/metrics.rs | 48 ++++ src/mito2/src/sst.rs | 3 +- src/mito2/src/sst/file.rs | 9 +- src/mito2/src/sst/file_purger.rs | 10 +- src/mito2/src/sst/index.rs | 3 + src/mito2/src/sst/index/applier.rs | 168 ++++++++++- src/mito2/src/sst/index/store.rs | 261 ++++++++++++++++++ src/mito2/src/sst/location.rs | 53 ++++ 15 files changed, 598 insertions(+), 33 deletions(-) create mode 100644 src/mito2/src/sst/index/store.rs create mode 100644 src/mito2/src/sst/location.rs diff --git a/Cargo.lock b/Cargo.lock index b042227a293b..7a5dabdef624 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4986,8 +4986,10 @@ dependencies = [ "object-store", "parquet", "paste", + "pin-project", "prometheus", "prost 0.12.3", + "puffin", "regex", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index a3413aa9d48d..e6ebf7c620a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -180,6 +180,7 @@ operator = { path = "src/operator" } partition = { path = "src/partition" } plugins = { path = "src/plugins" } promql = { path = "src/promql" } +puffin = { path = "src/puffin" } query = { path = "src/query" } script = { path = "src/script" } servers = { path = "src/servers" } diff --git a/src/index/src/inverted_index/search/index_apply.rs b/src/index/src/inverted_index/search/index_apply.rs index e87472bdeb0a..0dd2f7a5473b 100644 --- a/src/index/src/inverted_index/search/index_apply.rs +++ b/src/index/src/inverted_index/search/index_apply.rs @@ -14,6 +14,8 @@ mod predicates_apply; +use std::collections::BTreeSet; + use async_trait::async_trait; pub use predicates_apply::PredicatesIndexApplier; @@ -24,15 +26,16 @@ use crate::inverted_index::format::reader::InvertedIndexReader; /// /// Applier instances are reusable and work with various `InvertedIndexReader` instances, /// avoiding repeated compilation of fixed predicates such as regex patterns. +#[mockall::automock] #[async_trait] pub trait IndexApplier { /// Applies the predefined predicates to the data read by the given index reader, returning /// a list of relevant indices (e.g., post IDs, group IDs, row IDs). - async fn apply( + async fn apply<'a>( &self, context: SearchContext, - reader: &mut dyn InvertedIndexReader, - ) -> Result>; + reader: &mut (dyn InvertedIndexReader + 'a), + ) -> Result>; /// Returns the memory usage of the applier. fn memory_usage(&self) -> usize; diff --git a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs index addf1a07446d..aba2f8c99962 100644 --- a/src/index/src/inverted_index/search/index_apply/predicates_apply.rs +++ b/src/index/src/inverted_index/search/index_apply/predicates_apply.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeSet; use std::mem::size_of; use async_trait::async_trait; @@ -43,11 +44,11 @@ pub struct PredicatesIndexApplier { impl IndexApplier for PredicatesIndexApplier { /// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual /// bitmaps obtained for each index to result in a final set of indices. - async fn apply( + async fn apply<'a>( &self, context: SearchContext, - reader: &mut dyn InvertedIndexReader, - ) -> Result> { + reader: &mut (dyn InvertedIndexReader + 'a), + ) -> Result> { let metadata = reader.metadata().await?; let mut bitmap = Self::bitmap_full_range(&metadata); @@ -60,7 +61,7 @@ impl IndexApplier for PredicatesIndexApplier { let Some(meta) = metadata.metas.get(name) else { match context.index_not_found_strategy { IndexNotFoundStrategy::ReturnEmpty => { - return Ok(vec![]); + return Ok(BTreeSet::default()); } IndexNotFoundStrategy::Ignore => { continue; @@ -209,7 +210,7 @@ mod tests { .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert_eq!(indices, vec![0, 2, 4, 6]); + assert_eq!(indices, BTreeSet::from_iter([0, 2, 4, 6])); // An index reader with a single tag "tag-0" but without value "tag-0_value-0" let mut mock_reader = MockInvertedIndexReader::new(); @@ -263,7 +264,7 @@ mod tests { .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert_eq!(indices, vec![0, 4, 6]); + assert_eq!(indices, BTreeSet::from_iter([0, 4, 6])); } #[tokio::test] @@ -281,7 +282,7 @@ mod tests { .apply(SearchContext::default(), &mut mock_reader) .await .unwrap(); - assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]); // full range to scan + assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); // full range to scan } #[tokio::test] @@ -353,7 +354,7 @@ mod tests { ) .await .unwrap(); - assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]); + assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); } #[test] diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index e4868b047552..917785fcff16 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -48,8 +48,10 @@ num_cpus = "1.13" object-store.workspace = true parquet = { workspace = true, features = ["async"] } paste.workspace = true +pin-project.workspace = true prometheus.workspace = true prost.workspace = true +puffin.workspace = true regex = "1.5" serde = { version = "1.0", features = ["derive"] } serde_json.workspace = true diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 4c0e8bbde50b..1f4fd731efa7 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -14,13 +14,14 @@ use std::sync::Arc; -use object_store::{util, ObjectStore}; +use object_store::ObjectStore; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use crate::error::{DeleteSstSnafu, Result}; use crate::read::Source; use crate::sst::file::{FileHandle, FileId}; +use crate::sst::location; use crate::sst::parquet::reader::ParquetReaderBuilder; use crate::sst::parquet::writer::ParquetWriter; @@ -61,7 +62,7 @@ impl AccessLayer { /// Deletes a SST file with given file id. pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> { - let path = self.sst_file_path(&file_id.as_parquet()); + let path = location::sst_file_path(&self.region_dir, file_id); self.object_store .delete(&path) .await @@ -81,12 +82,7 @@ impl AccessLayer { metadata: RegionMetadataRef, source: Source, ) -> ParquetWriter { - let path = self.sst_file_path(&file_id.as_parquet()); + let path = location::sst_file_path(&self.region_dir, file_id); ParquetWriter::new(path, metadata, source, self.object_store.clone()) } - - /// Returns the `file_path` for the `file_name` in the object store. - fn sst_file_path(&self, file_name: &str) -> String { - util::join_path(&self.region_dir, file_name) - } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 68a35123ea39..044a4be5848d 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -440,6 +440,33 @@ pub enum Error { source: datatypes::error::Error, location: Location, }, + + #[snafu(display("Failed to apply index"))] + ApplyIndex { + #[snafu(source)] + source: index::inverted_index::error::Error, + location: Location, + }, + + #[snafu(display("Failed to read puffin metadata"))] + PuffinReadMetadata { + #[snafu(source)] + source: puffin::error::Error, + location: Location, + }, + + #[snafu(display("Failed to read puffin blob"))] + PuffinReadBlob { + #[snafu(source)] + source: puffin::error::Error, + location: Location, + }, + + #[snafu(display("Blob type not found, blob_type: {blob_type}"))] + PuffinBlobTypeNotFound { + blob_type: String, + location: Location, + }, } pub type Result = std::result::Result; @@ -477,6 +504,7 @@ impl ErrorExt for Error { | RegionCorrupted { .. } | CreateDefault { .. } | InvalidParquet { .. } + | PuffinBlobTypeNotFound { .. } | UnexpectedReplay { .. } => StatusCode::Unexpected, RegionNotFound { .. } => StatusCode::RegionNotFound, ObjectStoreNotFound { .. } @@ -522,8 +550,11 @@ impl ErrorExt for Error { JsonOptions { .. } => StatusCode::InvalidArguments, EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound, ArrowReader { .. } => StatusCode::StorageUnavailable, - BuildIndexApplier { source, .. } => source.status_code(), ConvertValue { source, .. } => source.status_code(), + BuildIndexApplier { source, .. } | ApplyIndex { source, .. } => source.status_code(), + PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => { + source.status_code() + } } } diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index d53cbd495dd5..2dcf49f6e9fe 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -21,6 +21,8 @@ pub const STAGE_LABEL: &str = "stage"; pub const TYPE_LABEL: &str = "type"; /// Reason to flush. pub const FLUSH_REASON: &str = "reason"; +/// File type label. +pub const FILE_TYPE_LABEL: &str = "file_type"; lazy_static! { /// Global write buffer size in bytes. @@ -143,4 +145,50 @@ lazy_static! { &[TYPE_LABEL] ) .unwrap(); + // ------- End of cache metrics. + + // Index metrics. + /// Timer of index application. + pub static ref INDEX_APPLY_COST_TIME: Histogram = register_histogram!( + "index_apply_cost_time", + "index apply cost time", + ) + .unwrap(); + /// Gauge of index apply memory usage. + pub static ref INDEX_APPLY_MEMORY_USAGE: IntGauge = register_int_gauge!( + "index_apply_memory_usage", + "index apply memory usage", + ) + .unwrap(); + /// Counter of r/w bytes on index related IO operations. + pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( + "index_io_bytes_total", + "index io bytes total", + &[TYPE_LABEL, FILE_TYPE_LABEL] + ) + .unwrap(); + /// Counter of read bytes on puffin files. + pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL + .with_label_values(&["read", "puffin"]); + + /// Counter of r/w operations on index related IO operations, e.g. read, write, seek and flush. + pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!( + "index_io_op_total", + "index io op total", + &[TYPE_LABEL, FILE_TYPE_LABEL] + ) + .unwrap(); + /// Counter of read operations on puffin files. + pub static ref INDEX_PUFFIN_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["read", "puffin"]); + /// Counter of seek operations on puffin files. + pub static ref INDEX_PUFFIN_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["seek", "puffin"]); + /// Counter of write operations on puffin files. + pub static ref INDEX_PUFFIN_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["write", "puffin"]); + /// Counter of flush operations on puffin files. + pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL + .with_label_values(&["flush", "puffin"]); + // ------- End of index metrics. } diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index 55939c2d246a..94e0cb205bc2 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -16,6 +16,7 @@ pub mod file; pub mod file_purger; -mod index; +pub mod index; +pub mod location; pub mod parquet; pub(crate) mod version; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index a16987690d09..d32133d1bcc6 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -20,13 +20,13 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use common_time::Timestamp; -use object_store::util::join_path; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; use store_api::storage::RegionId; use uuid::Uuid; use crate::sst::file_purger::{FilePurgerRef, PurgeRequest}; +use crate::sst::location; /// Type to store SST level. pub type Level = u8; @@ -57,6 +57,11 @@ impl FileId { pub fn as_parquet(&self) -> String { format!("{}{}", self, ".parquet") } + + /// Append `.puffin` to file id to make a complete file name + pub fn as_puffin(&self) -> String { + format!("{}{}", self, ".puffin") + } } impl fmt::Display for FileId { @@ -131,7 +136,7 @@ impl FileHandle { /// Returns the complete file path of the file. pub fn file_path(&self, file_dir: &str) -> String { - join_path(file_dir, &self.file_id().as_parquet()) + location::sst_file_path(file_dir, self.file_id()) } /// Returns the time range of the file. diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs index 15c3df6cc703..059b1956d7e2 100644 --- a/src/mito2/src/sst/file_purger.rs +++ b/src/mito2/src/sst/file_purger.rs @@ -102,12 +102,13 @@ impl FilePurger for LocalFilePurger { mod tests { use common_test_util::temp_dir::create_temp_dir; use object_store::services::Fs; - use object_store::{util, ObjectStore}; + use object_store::ObjectStore; use super::*; use crate::access_layer::AccessLayer; use crate::schedule::scheduler::{LocalScheduler, Scheduler}; use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange}; + use crate::sst::location; #[tokio::test] async fn test_file_purge() { @@ -119,7 +120,7 @@ mod tests { let object_store = ObjectStore::new(builder).unwrap().finish(); let sst_file_id = FileId::random(); let sst_dir = "table1"; - let path = util::join_path(sst_dir, &sst_file_id.as_parquet()); + let path = location::sst_file_path(sst_dir, sst_file_id); object_store.write(&path, vec![0; 4096]).await.unwrap(); @@ -145,9 +146,6 @@ mod tests { scheduler.stop(true).await.unwrap(); - assert!(!object_store - .is_exist(&format!("{}/{}", sst_dir, sst_file_id.as_parquet())) - .await - .unwrap()); + assert!(!object_store.is_exist(&path).await.unwrap()); } } diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index baffda27aa6e..1f89612deef5 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -16,3 +16,6 @@ pub mod applier; mod codec; +mod store; + +const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1"; diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index 95ca25ba003d..7d8b63c9e71f 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -14,8 +14,29 @@ pub mod builder; -use index::inverted_index::search::index_apply::IndexApplier; +use std::collections::BTreeSet; + +use futures::{AsyncRead, AsyncSeek}; +use index::inverted_index::format::reader::InvertedIndexBlobReader; +use index::inverted_index::search::index_apply::{ + IndexApplier, IndexNotFoundStrategy, SearchContext, +}; use object_store::ObjectStore; +use puffin::file_format::reader::{PuffinAsyncReader, PuffinFileReader}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{ + ApplyIndexSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu, + Result, +}; +use crate::metrics::{ + INDEX_APPLY_COST_TIME, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL, + INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_SEEK_OP_TOTAL, +}; +use crate::sst::file::FileId; +use crate::sst::index::store::InstrumentedStore; +use crate::sst::index::INDEX_BLOB_TYPE; +use crate::sst::location; /// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files /// and returning the relevant row group ids for further scan. @@ -23,8 +44,8 @@ pub struct SstIndexApplier { /// The root directory of the region. region_dir: String, - /// Object store responsible for accessing SST files. - object_store: ObjectStore, + /// Store responsible for accessing SST files. + store: InstrumentedStore, /// Predefined index applier used to apply predicates to index files /// and return the relevant row group ids for further scan. @@ -38,10 +59,149 @@ impl SstIndexApplier { object_store: ObjectStore, index_applier: Box, ) -> Self { + INDEX_APPLY_MEMORY_USAGE.add(index_applier.memory_usage() as i64); + Self { region_dir, - object_store, + store: InstrumentedStore::new(object_store), index_applier, } } + + /// Applies predicates to the provided SST file id and returns the relevant row group ids + pub async fn apply(&self, file_id: FileId) -> Result> { + let _timer = INDEX_APPLY_COST_TIME.start_timer(); + + let mut puffin_reader = self.puffin_reader(file_id).await?; + let blob_reader = Self::index_blob_reader(&mut puffin_reader).await?; + let mut index_reader = InvertedIndexBlobReader::new(blob_reader); + + let context = SearchContext { + // Encountering a non-existing column indicates that it doesn't match predicates. + index_not_found_strategy: IndexNotFoundStrategy::ReturnEmpty, + }; + self.index_applier + .apply(context, &mut index_reader) + .await + .context(ApplyIndexSnafu) + } + + /// Helper function to create a [`PuffinFileReader`] for the provided SST file id. + async fn puffin_reader( + &self, + file_id: FileId, + ) -> Result> { + let file_path = location::index_file_path(&self.region_dir, file_id); + let file_reader = self + .store + .reader( + &file_path, + &INDEX_PUFFIN_READ_BYTES_TOTAL, + &INDEX_PUFFIN_READ_OP_TOTAL, + &INDEX_PUFFIN_SEEK_OP_TOTAL, + ) + .await?; + Ok(PuffinFileReader::new(file_reader)) + } + + /// Helper function to create a [`PuffinBlobReader`] for the index blob of the provided index file reader. + async fn index_blob_reader( + puffin_reader: &mut PuffinFileReader, + ) -> Result { + let file_meta = puffin_reader + .metadata() + .await + .context(PuffinReadMetadataSnafu)?; + let blob_meta = file_meta + .blobs + .iter() + .find(|blob| blob.blob_type == INDEX_BLOB_TYPE) + .context(PuffinBlobTypeNotFoundSnafu { + blob_type: INDEX_BLOB_TYPE, + })?; + puffin_reader + .blob_reader(blob_meta) + .context(PuffinReadBlobSnafu) + } +} + +impl Drop for SstIndexApplier { + fn drop(&mut self) { + INDEX_APPLY_MEMORY_USAGE.sub(self.index_applier.memory_usage() as i64); + } +} + +#[cfg(test)] +mod tests { + use futures::io::Cursor; + use index::inverted_index::search::index_apply::MockIndexApplier; + use object_store::services::Memory; + use puffin::file_format::writer::{Blob, PuffinAsyncWriter, PuffinFileWriter}; + + use super::*; + use crate::error::Error; + + #[tokio::test] + async fn test_index_applier_apply_basic() { + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let file_id = FileId::random(); + let region_dir = "region_dir".to_string(); + let path = location::index_file_path(®ion_dir, file_id); + + let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap()); + puffin_writer + .add_blob(Blob { + blob_type: INDEX_BLOB_TYPE.to_string(), + data: Cursor::new(vec![]), + properties: Default::default(), + }) + .await + .unwrap(); + puffin_writer.finish().await.unwrap(); + + let mut mock_index_applier = MockIndexApplier::new(); + mock_index_applier.expect_memory_usage().returning(|| 100); + mock_index_applier + .expect_apply() + .returning(|_, _| Ok(BTreeSet::from_iter([1, 2, 3]))); + + let sst_index_applier = SstIndexApplier::new( + region_dir.clone(), + object_store, + Box::new(mock_index_applier), + ); + let ids = sst_index_applier.apply(file_id).await.unwrap(); + assert_eq!(ids, BTreeSet::from_iter([1, 2, 3])); + } + + #[tokio::test] + async fn test_index_applier_apply_invalid_blob_type() { + let object_store = ObjectStore::new(Memory::default()).unwrap().finish(); + let file_id = FileId::random(); + let region_dir = "region_dir".to_string(); + let path = location::index_file_path(®ion_dir, file_id); + + let mut puffin_writer = PuffinFileWriter::new(object_store.writer(&path).await.unwrap()); + puffin_writer + .add_blob(Blob { + blob_type: "invalid_blob_type".to_string(), + data: Cursor::new(vec![]), + properties: Default::default(), + }) + .await + .unwrap(); + puffin_writer.finish().await.unwrap(); + + let mut mock_index_applier = MockIndexApplier::new(); + mock_index_applier.expect_memory_usage().returning(|| 100); + mock_index_applier.expect_apply().never(); + + let sst_index_applier = SstIndexApplier::new( + region_dir.clone(), + object_store, + Box::new(mock_index_applier), + ); + let res = sst_index_applier.apply(file_id).await; + assert!(matches!(res, Err(Error::PuffinBlobTypeNotFound { .. }))); + } } diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs new file mode 100644 index 000000000000..dbdbcbf78033 --- /dev/null +++ b/src/mito2/src/sst/index/store.rs @@ -0,0 +1,261 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::{AsyncRead, AsyncSeek, AsyncWrite}; +use object_store::ObjectStore; +use pin_project::pin_project; +use prometheus::IntCounter; +use snafu::ResultExt; + +use crate::error::{OpenDalSnafu, Result}; + +/// A wrapper around [`ObjectStore`] that adds instrumentation for monitoring +/// metrics such as bytes read, bytes written, and the number of seek operations. +#[derive(Clone)] +pub(crate) struct InstrumentedStore { + /// The underlying object store. + object_store: ObjectStore, +} + +impl InstrumentedStore { + /// Create a new `InstrumentedStore`. + pub fn new(object_store: ObjectStore) -> Self { + Self { object_store } + } + + /// Returns an [`InstrumentedAsyncRead`] for the given path. + /// Metrics like the number of bytes read, read and seek operations + /// are recorded using the provided `IntCounter`s. + pub async fn reader<'a>( + &self, + path: &str, + read_byte_count: &'a IntCounter, + read_count: &'a IntCounter, + seek_count: &'a IntCounter, + ) -> Result> { + let reader = self.object_store.reader(path).await.context(OpenDalSnafu)?; + Ok(InstrumentedAsyncRead::new( + reader, + read_byte_count, + read_count, + seek_count, + )) + } + + /// Returns an [`InstrumentedAsyncWrite`] for the given path. + /// Metrics like the number of bytes written, write and flush operations + /// are recorded using the provided `IntCounter`s. + pub async fn writer<'a>( + &self, + path: &str, + write_byte_count: &'a IntCounter, + write_count: &'a IntCounter, + flush_count: &'a IntCounter, + ) -> Result> { + let writer = self.object_store.writer(path).await.context(OpenDalSnafu)?; + Ok(InstrumentedAsyncWrite::new( + writer, + write_byte_count, + write_count, + flush_count, + )) + } +} + +/// A wrapper around [`AsyncRead`] that adds instrumentation for monitoring +#[pin_project] +pub(crate) struct InstrumentedAsyncRead<'a, R> { + #[pin] + inner: R, + read_byte_count: CounterGuard<'a>, + read_count: CounterGuard<'a>, + seek_count: CounterGuard<'a>, +} + +impl<'a, R> InstrumentedAsyncRead<'a, R> { + /// Create a new `InstrumentedAsyncRead`. + fn new( + inner: R, + read_byte_count: &'a IntCounter, + read_count: &'a IntCounter, + seek_count: &'a IntCounter, + ) -> Self { + Self { + inner, + read_byte_count: CounterGuard::new(read_byte_count), + read_count: CounterGuard::new(read_count), + seek_count: CounterGuard::new(seek_count), + } + } +} + +impl<'a, R: AsyncRead + Unpin + Send> AsyncRead for InstrumentedAsyncRead<'a, R> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let poll = self.as_mut().project().inner.poll_read(cx, buf); + if let Poll::Ready(Ok(n)) = &poll { + self.read_count.inc_by(1); + self.read_byte_count.inc_by(*n); + } + poll + } +} + +impl<'a, R: AsyncSeek + Unpin + Send> AsyncSeek for InstrumentedAsyncRead<'a, R> { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: io::SeekFrom, + ) -> Poll> { + let poll = self.as_mut().project().inner.poll_seek(cx, pos); + if let Poll::Ready(Ok(_)) = &poll { + self.seek_count.inc_by(1); + } + poll + } +} + +/// A wrapper around [`AsyncWrite`] that adds instrumentation for monitoring +#[pin_project] +pub(crate) struct InstrumentedAsyncWrite<'a, W> { + #[pin] + inner: W, + write_byte_count: CounterGuard<'a>, + write_count: CounterGuard<'a>, + flush_count: CounterGuard<'a>, +} + +impl<'a, W> InstrumentedAsyncWrite<'a, W> { + /// Create a new `InstrumentedAsyncWrite`. + fn new( + inner: W, + write_byte_count: &'a IntCounter, + write_count: &'a IntCounter, + flush_count: &'a IntCounter, + ) -> Self { + Self { + inner, + write_byte_count: CounterGuard::new(write_byte_count), + write_count: CounterGuard::new(write_count), + flush_count: CounterGuard::new(flush_count), + } + } +} + +impl<'a, W: AsyncWrite + Unpin + Send> AsyncWrite for InstrumentedAsyncWrite<'a, W> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let poll = self.as_mut().project().inner.poll_write(cx, buf); + if let Poll::Ready(Ok(n)) = &poll { + self.write_count.inc_by(1); + self.write_byte_count.inc_by(*n); + } + poll + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let poll = self.as_mut().project().inner.poll_flush(cx); + if let Poll::Ready(Ok(())) = &poll { + self.flush_count.inc_by(1); + } + poll + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx) + } +} + +/// A guard that increments a counter when dropped. +struct CounterGuard<'a> { + count: usize, + counter: &'a IntCounter, +} + +impl<'a> CounterGuard<'a> { + /// Create a new `CounterGuard`. + fn new(counter: &'a IntCounter) -> Self { + Self { count: 0, counter } + } + + /// Increment the counter by `n`. + fn inc_by(&mut self, n: usize) { + self.count += n; + } +} + +impl<'a> Drop for CounterGuard<'a> { + fn drop(&mut self) { + if self.count > 0 { + self.counter.inc_by(self.count as _); + } + } +} + +#[cfg(test)] +mod tests { + use futures::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; + use object_store::services::Memory; + + use super::*; + + #[tokio::test] + async fn test_instrumented_store_read_write() { + let instrumented_store = + InstrumentedStore::new(ObjectStore::new(Memory::default()).unwrap().finish()); + + let read_byte_count = IntCounter::new("read_byte_count", "read_byte_count").unwrap(); + let read_count = IntCounter::new("read_count", "read_count").unwrap(); + let seek_count = IntCounter::new("seek_count", "seek_count").unwrap(); + let write_byte_count = IntCounter::new("write_byte_count", "write_byte_count").unwrap(); + let write_count = IntCounter::new("write_count", "write_count").unwrap(); + let flush_count = IntCounter::new("flush_count", "flush_count").unwrap(); + + let mut writer = instrumented_store + .writer("my_file", &write_byte_count, &write_count, &flush_count) + .await + .unwrap(); + writer.write_all(b"hello").await.unwrap(); + writer.flush().await.unwrap(); + writer.close().await.unwrap(); + drop(writer); + + let mut reader = instrumented_store + .reader("my_file", &read_byte_count, &read_count, &seek_count) + .await + .unwrap(); + let mut buf = vec![0; 5]; + reader.read_exact(&mut buf).await.unwrap(); + reader.seek(io::SeekFrom::Start(0)).await.unwrap(); + reader.read_exact(&mut buf).await.unwrap(); + drop(reader); + + assert_eq!(read_byte_count.get(), 10); + assert_eq!(read_count.get(), 2); + assert_eq!(seek_count.get(), 1); + assert_eq!(write_byte_count.get(), 5); + assert_eq!(write_count.get(), 1); + assert_eq!(flush_count.get(), 1); + } +} diff --git a/src/mito2/src/sst/location.rs b/src/mito2/src/sst/location.rs new file mode 100644 index 000000000000..179e9159c94c --- /dev/null +++ b/src/mito2/src/sst/location.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use object_store::util; + +use crate::sst::file::FileId; + +/// Returns the path of the SST file in the object store: +/// `{region_dir}/{sst_file_id}.parquet` +pub fn sst_file_path(region_dir: &str, sst_file_id: FileId) -> String { + util::join_path(region_dir, &sst_file_id.as_parquet()) +} + +/// Returns the path of the index file in the object store: +/// `{region_dir}/index/{sst_file_id}.puffin` +pub fn index_file_path(region_dir: &str, sst_file_id: FileId) -> String { + let dir = util::join_dir(region_dir, "index"); + util::join_path(&dir, &sst_file_id.as_puffin()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_sst_file_path() { + let file_id = FileId::random(); + assert_eq!( + sst_file_path("region_dir", file_id), + format!("region_dir/{file_id}.parquet") + ); + } + + #[test] + fn test_index_file_path() { + let file_id = FileId::random(); + assert_eq!( + index_file_path("region_dir", file_id), + format!("region_dir/index/{file_id}.puffin") + ); + } +}