Skip to content

Commit

Permalink
refactor: reuse ReaderFilterMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
evenyag committed Aug 19, 2024
1 parent 2ea40d0 commit d0e4b97
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 78 deletions.
37 changes: 7 additions & 30 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::error::{
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::prune::PruneReader;
use crate::sst::parquet::reader::ReaderMetrics;
use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};

/// Storage internal representation of a batch of rows for a primary key (time series).
///
Expand Down Expand Up @@ -753,22 +753,8 @@ pub(crate) struct ScannerMetrics {
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
/// Number of row groups before filtering.
num_row_groups_before_filtering: usize,
/// Number of row groups filtered by fulltext index.
num_row_groups_fulltext_index_filtered: usize,
/// Number of row groups filtered by inverted index.
num_row_groups_inverted_index_filtered: usize,
/// Number of row groups filtered by min-max index.
num_row_groups_min_max_filtered: usize,
/// Number of rows filtered by precise filter.
num_rows_precise_filtered: usize,
/// Number of rows in row group before filtering.
num_rows_in_row_group_before_filtering: usize,
/// Number of rows in row group filtered by fulltext index.
num_rows_in_row_group_fulltext_index_filtered: usize,
/// Number of rows in row group filtered by inverted index.
num_rows_in_row_group_inverted_index_filtered: usize,
/// Filter related metrics for readers.
filter_metrics: ReaderFilterMetrics,
}

impl ScannerMetrics {
Expand All @@ -784,19 +770,10 @@ impl ScannerMetrics {
.with_label_values(&["build_parts"])
.observe(self.build_parts_cost.as_secs_f64());

self.num_row_groups_before_filtering = reader_metrics.num_row_groups_before_filtering;
self.num_row_groups_fulltext_index_filtered =
reader_metrics.num_row_groups_fulltext_index_filtered;
self.num_row_groups_inverted_index_filtered =
reader_metrics.num_row_groups_inverted_index_filtered;
self.num_row_groups_min_max_filtered = reader_metrics.num_row_groups_min_max_filtered;
self.num_rows_precise_filtered = reader_metrics.num_rows_precise_filtered;
self.num_rows_in_row_group_before_filtering =
reader_metrics.num_rows_in_row_group_before_filtering;
self.num_rows_in_row_group_fulltext_index_filtered =
reader_metrics.num_rows_in_row_group_fulltext_index_filtered;
self.num_rows_in_row_group_inverted_index_filtered =
reader_metrics.num_rows_in_row_group_inverted_index_filtered;
// We only call this once so we overwrite it directly.
self.filter_metrics = reader_metrics.filter_metrics;
// Observes filter metrics.
self.filter_metrics.observe();
}

/// Observes metrics on scanner finish.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/read/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ impl PruneReader {
let num_rows_before_filter = batch.num_rows();
let Some(batch_filtered) = self.context.precise_filter(batch)? else {
// the entire batch is filtered out
self.metrics.num_rows_precise_filtered += num_rows_before_filter;
self.metrics.filter_metrics.num_rows_precise_filtered += num_rows_before_filter;
return Ok(None);
};

// update metric
let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
self.metrics.num_rows_precise_filtered += filtered_rows;
self.metrics.filter_metrics.num_rows_precise_filtered += filtered_rows;

if !batch_filtered.is_empty() {
Ok(Some(batch_filtered))
Expand Down
113 changes: 67 additions & 46 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl ParquetReaderBuilder {
.context(ReadParquetSnafu { path: &file_path })?;

let row_groups = self
.row_groups_to_read(&read_format, &parquet_meta, metrics)
.row_groups_to_read(&read_format, &parquet_meta, &mut metrics.filter_metrics)
.await;

let reader_builder = RowGroupReaderBuilder {
Expand Down Expand Up @@ -339,7 +339,7 @@ impl ParquetReaderBuilder {
&self,
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
metrics: &mut ReaderMetrics,
metrics: &mut ReaderFilterMetrics,
) -> BTreeMap<usize, Option<RowSelection>> {
let num_row_groups = parquet_meta.num_row_groups();
let num_rows = parquet_meta.file_metadata().num_rows();
Expand Down Expand Up @@ -385,7 +385,7 @@ impl ParquetReaderBuilder {
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderMetrics,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.fulltext_index_applier else {
return false;
Expand Down Expand Up @@ -465,7 +465,7 @@ impl ParquetReaderBuilder {
row_group_size: usize,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderMetrics,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(index_applier) = &self.inverted_index_applier else {
return false;
Expand Down Expand Up @@ -532,7 +532,7 @@ impl ParquetReaderBuilder {
read_format: &ReadFormat,
parquet_meta: &ParquetMetaData,
output: &mut BTreeMap<usize, Option<RowSelection>>,
metrics: &mut ReaderMetrics,
metrics: &mut ReaderFilterMetrics,
) -> bool {
let Some(predicate) = &self.predicate else {
return false;
Expand Down Expand Up @@ -727,9 +727,9 @@ fn time_range_to_predicate(
Ok(predicates)
}

/// Parquet reader metrics.
#[derive(Debug, Default, Clone)]
pub(crate) struct ReaderMetrics {
/// Metrics of filtering rows groups and rows.
#[derive(Debug, Default, Clone, Copy)]
pub(crate) struct ReaderFilterMetrics {
/// Number of row groups before filtering.
pub(crate) num_row_groups_before_filtering: usize,
/// Number of row groups filtered by fulltext index.
Expand All @@ -746,6 +746,57 @@ pub(crate) struct ReaderMetrics {
pub(crate) num_rows_in_row_group_fulltext_index_filtered: usize,
/// Number of rows in row group filtered by inverted index.
pub(crate) num_rows_in_row_group_inverted_index_filtered: usize,
}

impl ReaderFilterMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &ReaderFilterMetrics) {
self.num_row_groups_before_filtering += other.num_row_groups_before_filtering;
self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered;
self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered;
self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered;
self.num_rows_precise_filtered += other.num_rows_precise_filtered;
self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering;
self.num_rows_in_row_group_fulltext_index_filtered +=
other.num_rows_in_row_group_fulltext_index_filtered;
self.num_rows_in_row_group_inverted_index_filtered +=
other.num_rows_in_row_group_inverted_index_filtered;
}

/// Reports metrics.
pub(crate) fn observe(&self) {
READ_ROW_GROUPS_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.num_row_groups_before_filtering as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(self.num_row_groups_fulltext_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.num_row_groups_inverted_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["minmax_index_filtered"])
.inc_by(self.num_row_groups_min_max_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(self.num_rows_precise_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(self.num_rows_in_row_group_before_filtering as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(self.num_rows_in_row_group_fulltext_index_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(self.num_rows_in_row_group_inverted_index_filtered as u64);
}
}

/// Parquet reader metrics.
#[derive(Debug, Default, Clone)]
pub(crate) struct ReaderMetrics {
/// Filtered row groups and rows metrics.
pub(crate) filter_metrics: ReaderFilterMetrics,
/// Duration to build the parquet reader.
pub(crate) build_cost: Duration,
/// Duration to scan the reader.
Expand All @@ -761,16 +812,7 @@ pub(crate) struct ReaderMetrics {
impl ReaderMetrics {
/// Adds `other` metrics to this metrics.
pub(crate) fn merge_from(&mut self, other: &ReaderMetrics) {
self.num_row_groups_before_filtering += other.num_row_groups_before_filtering;
self.num_row_groups_fulltext_index_filtered += other.num_row_groups_fulltext_index_filtered;
self.num_row_groups_inverted_index_filtered += other.num_row_groups_inverted_index_filtered;
self.num_row_groups_min_max_filtered += other.num_row_groups_min_max_filtered;
self.num_rows_precise_filtered += other.num_rows_precise_filtered;
self.num_rows_in_row_group_before_filtering += other.num_rows_in_row_group_before_filtering;
self.num_rows_in_row_group_fulltext_index_filtered +=
other.num_rows_in_row_group_fulltext_index_filtered;
self.num_rows_in_row_group_inverted_index_filtered +=
other.num_rows_in_row_group_inverted_index_filtered;
self.filter_metrics.merge_from(&other.filter_metrics);
self.build_cost += other.build_cost;
self.scan_cost += other.scan_cost;
self.num_record_batches += other.num_record_batches;
Expand Down Expand Up @@ -1009,10 +1051,12 @@ impl Drop for ParquetReader {
self.context.reader_builder().file_handle.region_id(),
self.context.reader_builder().file_handle.file_id(),
self.context.reader_builder().file_handle.time_range(),
metrics.num_row_groups_before_filtering
- metrics.num_row_groups_inverted_index_filtered
- metrics.num_row_groups_min_max_filtered,
metrics.num_row_groups_before_filtering,
metrics.filter_metrics.num_row_groups_before_filtering
- metrics
.filter_metrics
.num_row_groups_inverted_index_filtered
- metrics.filter_metrics.num_row_groups_min_max_filtered,
metrics.filter_metrics.num_row_groups_before_filtering,
metrics
);

Expand All @@ -1026,30 +1070,7 @@ impl Drop for ParquetReader {
READ_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(metrics.num_rows as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(metrics.num_row_groups_before_filtering as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(metrics.num_row_groups_fulltext_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(metrics.num_row_groups_inverted_index_filtered as u64);
READ_ROW_GROUPS_TOTAL
.with_label_values(&["minmax_index_filtered"])
.inc_by(metrics.num_row_groups_min_max_filtered as u64);
PRECISE_FILTER_ROWS_TOTAL
.with_label_values(&["parquet"])
.inc_by(metrics.num_rows_precise_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["before_filtering"])
.inc_by(metrics.num_rows_in_row_group_before_filtering as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["fulltext_index_filtered"])
.inc_by(metrics.num_rows_in_row_group_fulltext_index_filtered as u64);
READ_ROWS_IN_ROW_GROUP_TOTAL
.with_label_values(&["inverted_index_filtered"])
.inc_by(metrics.num_rows_in_row_group_inverted_index_filtered as u64);
metrics.filter_metrics.observe();
}
}

Expand Down

0 comments on commit d0e4b97

Please sign in to comment.