Skip to content

Commit

Permalink
feat(inverted_index): implement apply for SstIndexApplier
Browse files Browse the repository at this point in the history
Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc committed Jan 3, 2024
1 parent 611a8aa commit 0d82313
Show file tree
Hide file tree
Showing 15 changed files with 598 additions and 33 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ operator = { path = "src/operator" }
partition = { path = "src/partition" }
plugins = { path = "src/plugins" }
promql = { path = "src/promql" }
puffin = { path = "src/puffin" }
query = { path = "src/query" }
script = { path = "src/script" }
servers = { path = "src/servers" }
Expand Down
9 changes: 6 additions & 3 deletions src/index/src/inverted_index/search/index_apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

mod predicates_apply;

use std::collections::BTreeSet;

use async_trait::async_trait;
pub use predicates_apply::PredicatesIndexApplier;

Expand All @@ -24,15 +26,16 @@ use crate::inverted_index::format::reader::InvertedIndexReader;
///
/// Applier instances are reusable and work with various `InvertedIndexReader` instances,
/// avoiding repeated compilation of fixed predicates such as regex patterns.
#[mockall::automock]
#[async_trait]
pub trait IndexApplier {
/// Applies the predefined predicates to the data read by the given index reader, returning
/// a list of relevant indices (e.g., post IDs, group IDs, row IDs).
async fn apply(
async fn apply<'a>(
&self,
context: SearchContext,
reader: &mut dyn InvertedIndexReader,
) -> Result<Vec<usize>>;
reader: &mut (dyn InvertedIndexReader + 'a),
) -> Result<BTreeSet<usize>>;

/// Returns the memory usage of the applier.
fn memory_usage(&self) -> usize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeSet;
use std::mem::size_of;

use async_trait::async_trait;
Expand Down Expand Up @@ -43,11 +44,11 @@ pub struct PredicatesIndexApplier {
impl IndexApplier for PredicatesIndexApplier {
/// Applies all `FstApplier`s to the data in the inverted index reader, intersecting the individual
/// bitmaps obtained for each index to result in a final set of indices.
async fn apply(
async fn apply<'a>(
&self,
context: SearchContext,
reader: &mut dyn InvertedIndexReader,
) -> Result<Vec<usize>> {
reader: &mut (dyn InvertedIndexReader + 'a),
) -> Result<BTreeSet<usize>> {
let metadata = reader.metadata().await?;

let mut bitmap = Self::bitmap_full_range(&metadata);
Expand All @@ -60,7 +61,7 @@ impl IndexApplier for PredicatesIndexApplier {
let Some(meta) = metadata.metas.get(name) else {
match context.index_not_found_strategy {
IndexNotFoundStrategy::ReturnEmpty => {
return Ok(vec![]);
return Ok(BTreeSet::default());
}
IndexNotFoundStrategy::Ignore => {
continue;
Expand Down Expand Up @@ -209,7 +210,7 @@ mod tests {
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 2, 4, 6]);
assert_eq!(indices, BTreeSet::from_iter([0, 2, 4, 6]));

// An index reader with a single tag "tag-0" but without value "tag-0_value-0"
let mut mock_reader = MockInvertedIndexReader::new();
Expand Down Expand Up @@ -263,7 +264,7 @@ mod tests {
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 4, 6]);
assert_eq!(indices, BTreeSet::from_iter([0, 4, 6]));
}

#[tokio::test]
Expand All @@ -281,7 +282,7 @@ mod tests {
.apply(SearchContext::default(), &mut mock_reader)
.await
.unwrap();
assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]); // full range to scan
assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7])); // full range to scan
}

#[tokio::test]
Expand Down Expand Up @@ -353,7 +354,7 @@ mod tests {
)
.await
.unwrap();
assert_eq!(indices, vec![0, 1, 2, 3, 4, 5, 6, 7]);
assert_eq!(indices, BTreeSet::from_iter([0, 1, 2, 3, 4, 5, 6, 7]));
}

#[test]
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ num_cpus = "1.13"
object-store.workspace = true
parquet = { workspace = true, features = ["async"] }
paste.workspace = true
pin-project.workspace = true
prometheus.workspace = true
prost.workspace = true
puffin.workspace = true
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json.workspace = true
Expand Down
12 changes: 4 additions & 8 deletions src/mito2/src/access_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@

use std::sync::Arc;

use object_store::{util, ObjectStore};
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;

use crate::error::{DeleteSstSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::location;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;

Expand Down Expand Up @@ -61,7 +62,7 @@ impl AccessLayer {

/// Deletes a SST file with given file id.
pub(crate) async fn delete_sst(&self, file_id: FileId) -> Result<()> {
let path = self.sst_file_path(&file_id.as_parquet());
let path = location::sst_file_path(&self.region_dir, file_id);
self.object_store
.delete(&path)
.await
Expand All @@ -81,12 +82,7 @@ impl AccessLayer {
metadata: RegionMetadataRef,
source: Source,
) -> ParquetWriter {
let path = self.sst_file_path(&file_id.as_parquet());
let path = location::sst_file_path(&self.region_dir, file_id);
ParquetWriter::new(path, metadata, source, self.object_store.clone())
}

/// Returns the `file_path` for the `file_name` in the object store.
fn sst_file_path(&self, file_name: &str) -> String {
util::join_path(&self.region_dir, file_name)
}
}
33 changes: 32 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,33 @@ pub enum Error {
source: datatypes::error::Error,
location: Location,
},

#[snafu(display("Failed to apply index"))]
ApplyIndex {
#[snafu(source)]
source: index::inverted_index::error::Error,
location: Location,
},

#[snafu(display("Failed to read puffin metadata"))]
PuffinReadMetadata {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Failed to read puffin blob"))]
PuffinReadBlob {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},

#[snafu(display("Blob type not found, blob_type: {blob_type}"))]
PuffinBlobTypeNotFound {
blob_type: String,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -477,6 +504,7 @@ impl ErrorExt for Error {
| RegionCorrupted { .. }
| CreateDefault { .. }
| InvalidParquet { .. }
| PuffinBlobTypeNotFound { .. }
| UnexpectedReplay { .. } => StatusCode::Unexpected,
RegionNotFound { .. } => StatusCode::RegionNotFound,
ObjectStoreNotFound { .. }
Expand Down Expand Up @@ -522,8 +550,11 @@ impl ErrorExt for Error {
JsonOptions { .. } => StatusCode::InvalidArguments,
EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
ArrowReader { .. } => StatusCode::StorageUnavailable,
BuildIndexApplier { source, .. } => source.status_code(),
ConvertValue { source, .. } => source.status_code(),
BuildIndexApplier { source, .. } | ApplyIndex { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => {
source.status_code()
}
}
}

Expand Down
48 changes: 48 additions & 0 deletions src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub const STAGE_LABEL: &str = "stage";
pub const TYPE_LABEL: &str = "type";
/// Reason to flush.
pub const FLUSH_REASON: &str = "reason";
/// File type label.
pub const FILE_TYPE_LABEL: &str = "file_type";

lazy_static! {
/// Global write buffer size in bytes.
Expand Down Expand Up @@ -143,4 +145,50 @@ lazy_static! {
&[TYPE_LABEL]
)
.unwrap();
// ------- End of cache metrics.

// Index metrics.
/// Timer of index application.
pub static ref INDEX_APPLY_COST_TIME: Histogram = register_histogram!(
"index_apply_cost_time",
"index apply cost time",
)
.unwrap();
/// Gauge of index apply memory usage.
pub static ref INDEX_APPLY_MEMORY_USAGE: IntGauge = register_int_gauge!(
"index_apply_memory_usage",
"index apply memory usage",
)
.unwrap();
/// Counter of r/w bytes on index related IO operations.
pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"index_io_bytes_total",
"index io bytes total",
&[TYPE_LABEL, FILE_TYPE_LABEL]
)
.unwrap();
/// Counter of read bytes on puffin files.
pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "puffin"]);

/// Counter of r/w operations on index related IO operations, e.g. read, write, seek and flush.
pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!(
"index_io_op_total",
"index io op total",
&[TYPE_LABEL, FILE_TYPE_LABEL]
)
.unwrap();
/// Counter of read operations on puffin files.
pub static ref INDEX_PUFFIN_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["read", "puffin"]);
/// Counter of seek operations on puffin files.
pub static ref INDEX_PUFFIN_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["seek", "puffin"]);
/// Counter of write operations on puffin files.
pub static ref INDEX_PUFFIN_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["write", "puffin"]);
/// Counter of flush operations on puffin files.
pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "puffin"]);
// ------- End of index metrics.
}
3 changes: 2 additions & 1 deletion src/mito2/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
pub mod file;
pub mod file_purger;
mod index;
pub mod index;
pub mod location;
pub mod parquet;
pub(crate) mod version;
9 changes: 7 additions & 2 deletions src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use common_time::Timestamp;
use object_store::util::join_path;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use store_api::storage::RegionId;
use uuid::Uuid;

use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
use crate::sst::location;

/// Type to store SST level.
pub type Level = u8;
Expand Down Expand Up @@ -57,6 +57,11 @@ impl FileId {
pub fn as_parquet(&self) -> String {
format!("{}{}", self, ".parquet")
}

/// Append `.puffin` to file id to make a complete file name
pub fn as_puffin(&self) -> String {
format!("{}{}", self, ".puffin")
}
}

impl fmt::Display for FileId {
Expand Down Expand Up @@ -131,7 +136,7 @@ impl FileHandle {

/// Returns the complete file path of the file.
pub fn file_path(&self, file_dir: &str) -> String {
join_path(file_dir, &self.file_id().as_parquet())
location::sst_file_path(file_dir, self.file_id())
}

/// Returns the time range of the file.
Expand Down
10 changes: 4 additions & 6 deletions src/mito2/src/sst/file_purger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ impl FilePurger for LocalFilePurger {
mod tests {
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::{util, ObjectStore};
use object_store::ObjectStore;

use super::*;
use crate::access_layer::AccessLayer;
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange};
use crate::sst::location;

#[tokio::test]
async fn test_file_purge() {
Expand All @@ -119,7 +120,7 @@ mod tests {
let object_store = ObjectStore::new(builder).unwrap().finish();
let sst_file_id = FileId::random();
let sst_dir = "table1";
let path = util::join_path(sst_dir, &sst_file_id.as_parquet());
let path = location::sst_file_path(sst_dir, sst_file_id);

object_store.write(&path, vec![0; 4096]).await.unwrap();

Expand All @@ -145,9 +146,6 @@ mod tests {

scheduler.stop(true).await.unwrap();

assert!(!object_store
.is_exist(&format!("{}/{}", sst_dir, sst_file_id.as_parquet()))
.await
.unwrap());
assert!(!object_store.is_exist(&path).await.unwrap());
}
}
3 changes: 3 additions & 0 deletions src/mito2/src/sst/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@

pub mod applier;
mod codec;
mod store;

const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";
Loading

0 comments on commit 0d82313

Please sign in to comment.