Skip to content

Commit

Permalink
feat: collect filter metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Aug 19, 2024
1 parent 975b8c6 commit 2ea40d0
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 13 deletions.
33 changes: 32 additions & 1 deletion src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::error::{
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::prune::PruneReader;
use crate::sst::parquet::reader::ReaderMetrics;

/// Storage internal representation of a batch of rows for a primary key (time series).
///
Expand Down Expand Up @@ -752,11 +753,27 @@ pub(crate) struct ScannerMetrics {
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
/// Number of row groups before filtering.
num_row_groups_before_filtering: usize,
/// Number of row groups filtered by fulltext index.
num_row_groups_fulltext_index_filtered: usize,
/// Number of row groups filtered by inverted index.
num_row_groups_inverted_index_filtered: usize,
/// Number of row groups filtered by min-max index.
num_row_groups_min_max_filtered: usize,
/// Number of rows filtered by precise filter.
num_rows_precise_filtered: usize,
/// Number of rows in row group before filtering.
num_rows_in_row_group_before_filtering: usize,
/// Number of rows in row group filtered by fulltext index.
num_rows_in_row_group_fulltext_index_filtered: usize,
/// Number of rows in row group filtered by inverted index.
num_rows_in_row_group_inverted_index_filtered: usize,
}

impl ScannerMetrics {
/// Sets and observes metrics on initializing parts.
fn observe_init_part(&mut self, build_parts_cost: Duration) {
fn observe_init_part(&mut self, build_parts_cost: Duration, reader_metrics: &ReaderMetrics) {
self.build_parts_cost = build_parts_cost;

// Observes metrics.
Expand All @@ -766,6 +783,20 @@ impl ScannerMetrics {
READ_STAGE_ELAPSED
.with_label_values(&["build_parts"])
.observe(self.build_parts_cost.as_secs_f64());

self.num_row_groups_before_filtering = reader_metrics.num_row_groups_before_filtering;
self.num_row_groups_fulltext_index_filtered =
reader_metrics.num_row_groups_fulltext_index_filtered;
self.num_row_groups_inverted_index_filtered =
reader_metrics.num_row_groups_inverted_index_filtered;
self.num_row_groups_min_max_filtered = reader_metrics.num_row_groups_min_max_filtered;
self.num_rows_precise_filtered = reader_metrics.num_rows_precise_filtered;
self.num_rows_in_row_group_before_filtering =
reader_metrics.num_rows_in_row_group_before_filtering;
self.num_rows_in_row_group_fulltext_index_filtered =
reader_metrics.num_rows_in_row_group_fulltext_index_filtered;
self.num_rows_in_row_group_inverted_index_filtered =
reader_metrics.num_rows_in_row_group_inverted_index_filtered;
}

/// Observes metrics on scanner finish.
Expand Down
8 changes: 5 additions & 3 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::file_range::FileRange;
use crate::sst::parquet::reader::ReaderMetrics;

/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
Expand Down Expand Up @@ -606,8 +607,9 @@ impl ScanInput {
pub(crate) async fn prune_file_ranges(
&self,
collector: &mut impl FileRangeCollector,
) -> Result<()> {
) -> Result<ReaderMetrics> {
let mut file_prune_cost = Duration::ZERO;
let mut reader_metrics = ReaderMetrics::default();
for file in &self.files {
let prune_start = Instant::now();
let res = self
Expand All @@ -620,7 +622,7 @@ impl ScanInput {
.inverted_index_applier(self.inverted_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input()
.build_reader_input(&mut reader_metrics)
.await;
file_prune_cost += prune_start.elapsed();
let (mut file_range_ctx, row_groups) = match res {
Expand Down Expand Up @@ -665,7 +667,7 @@ impl ScanInput {
file_prune_cost
);

Ok(())
Ok(reader_metrics)
}

/// Scans the input source in another task and sends batches to the sender.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ impl SeqScan {
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = SeqDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
let reader_metrics = input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
Expand All @@ -451,7 +451,7 @@ impl SeqScan {
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;

metrics.observe_init_part(build_part_cost);
metrics.observe_init_part(build_part_cost, &reader_metrics);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ async fn maybe_init_parts(
if part_list.0.is_none() {
let now = Instant::now();
let mut distributor = UnorderedDistributor::default();
input.prune_file_ranges(&mut distributor).await?;
let reader_metrics = input.prune_file_ranges(&mut distributor).await?;
distributor.append_mem_ranges(
&input.memtables,
Some(input.mapper.column_ids()),
Expand All @@ -275,7 +275,7 @@ async fn maybe_init_parts(
let build_part_cost = now.elapsed();
part_list.1 = build_part_cost;

metrics.observe_init_part(build_part_cost);
metrics.observe_init_part(build_part_cost, &reader_metrics);
} else {
// Updates the cost of building parts.
metrics.build_parts_cost = part_list.1;
Expand Down
13 changes: 8 additions & 5 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,19 @@ impl ParquetReaderBuilder {
///
/// This needs to perform IO operation.
pub async fn build(&self) -> Result<ParquetReader> {
let (context, row_groups) = self.build_reader_input().await?;
let mut metrics = ReaderMetrics::default();

let (context, row_groups) = self.build_reader_input(&mut metrics).await?;
ParquetReader::new(Arc::new(context), row_groups).await
}

/// Builds a [FileRangeContext] and collects row groups to read.
///
/// This needs to perform IO operation.
pub(crate) async fn build_reader_input(&self) -> Result<(FileRangeContext, RowGroupMap)> {
pub(crate) async fn build_reader_input(
&self,
metrics: &mut ReaderMetrics,
) -> Result<(FileRangeContext, RowGroupMap)> {
let start = Instant::now();

let file_path = self.file_handle.file_path(&self.file_dir);
Expand Down Expand Up @@ -219,10 +224,8 @@ impl ParquetReaderBuilder {
parquet_to_arrow_field_levels(parquet_schema_desc, projection_mask.clone(), hint)
.context(ReadParquetSnafu { path: &file_path })?;

let mut metrics = ReaderMetrics::default();

let row_groups = self
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics)
.row_groups_to_read(&read_format, &parquet_meta, metrics)
.await;

let reader_builder = RowGroupReaderBuilder {
Expand Down

0 comments on commit 2ea40d0

Please sign in to comment.