Skip to content

Commit

Permalink
chore: add traces
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Nov 8, 2024
1 parent 00bb2ea commit 72625bb
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 3 deletions.
3 changes: 3 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ impl ScanInput {
}

/// Prunes a file to scan and returns the builder to build readers.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn prune_file(
&self,
file_index: usize,
Expand Down Expand Up @@ -818,6 +819,7 @@ impl StreamContext {
}

/// Creates file ranges to scan.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn build_file_ranges(
&self,
index: RowGroupIndex,
Expand Down Expand Up @@ -892,6 +894,7 @@ impl RangeBuilderList {
}

/// Builds file ranges to read the row group at `index`.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn build_file_ranges(
&self,
input: &ScanInput,
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/read/scan_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

use async_stream::try_stream;
use common_telemetry::debug;
use common_telemetry::{debug, tracing};
use futures::Stream;
use store_api::storage::RegionId;

Expand Down Expand Up @@ -148,6 +148,7 @@ pub(crate) fn scan_mem_ranges(
}

/// Scans file ranges at `index`.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) fn scan_file_ranges(
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ impl fmt::Debug for SeqScan {
}

/// Builds sources for the partition range.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
fn build_sources(
stream_ctx: &Arc<StreamContext>,
part_range: &PartitionRange,
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/sst/parquet/file_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::ops::BitAnd;
use std::sync::Arc;

use api::v1::{OpType, SemanticType};
use common_telemetry::error;
use common_telemetry::{error, tracing};
use datatypes::arrow::array::BooleanArray;
use datatypes::arrow::buffer::BooleanBuffer;
use parquet::arrow::arrow_reader::RowSelection;
Expand Down Expand Up @@ -80,6 +80,7 @@ impl FileRange {
}

/// Returns a reader to read the [FileRange].
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn reader(
&self,
selector: Option<TimeSeriesRowSelector>,
Expand Down
8 changes: 7 additions & 1 deletion src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::time::{Duration, Instant};
use api::v1::SemanticType;
use async_trait::async_trait;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, warn};
use common_telemetry::{debug, tracing, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
Expand Down Expand Up @@ -183,6 +183,7 @@ impl ParquetReaderBuilder {
/// Builds a [FileRangeContext] and collects row groups to read.
///
/// This needs to perform IO operation.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn build_reader_input(
&self,
metrics: &mut ReaderMetrics,
Expand Down Expand Up @@ -336,6 +337,7 @@ impl ParquetReaderBuilder {
}

/// Computes row groups to read, along with their respective row selections.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn row_groups_to_read(
&self,
read_format: &ReadFormat,
Expand Down Expand Up @@ -381,6 +383,7 @@ impl ParquetReaderBuilder {
}

/// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn prune_row_groups_by_fulltext_index(
&self,
row_group_size: usize,
Expand Down Expand Up @@ -461,6 +464,7 @@ impl ParquetReaderBuilder {
/// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
/// as an escape route in case of index issues, and it can be used to test
/// the correctness of the index.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn prune_row_groups_by_inverted_index(
&self,
row_group_size: usize,
Expand Down Expand Up @@ -528,6 +532,7 @@ impl ParquetReaderBuilder {
}

/// Prunes row groups by min-max index.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
fn prune_row_groups_by_minmax(
&self,
read_format: &ReadFormat,
Expand Down Expand Up @@ -869,6 +874,7 @@ impl RowGroupReaderBuilder {
}

/// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn build(
&self,
row_group_idx: usize,
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/sst/parquet/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::ops::Range;
use std::sync::Arc;

use bytes::{Buf, Bytes};
use common_telemetry::tracing;
use object_store::ObjectStore;
use parquet::arrow::arrow_reader::{RowGroups, RowSelection};
use parquet::arrow::ProjectionMask;
Expand Down Expand Up @@ -97,6 +98,7 @@ impl<'a> InMemoryRowGroup<'a> {
}

/// Fetches the necessary column data into memory
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub async fn fetch(
&mut self,
projection: &ProjectionMask,
Expand Down Expand Up @@ -235,6 +237,7 @@ impl<'a> InMemoryRowGroup<'a> {

/// Fetches pages for columns if cache is enabled.
/// If the page is in the cache, sets the column chunk or `column_uncompressed_pages` for the column.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) {
let _timer = READ_STAGE_FETCH_PAGES.start_timer();
self.column_chunks
Expand Down Expand Up @@ -276,6 +279,7 @@ impl<'a> InMemoryRowGroup<'a> {

/// Try to fetch data from WriteCache,
/// if not in WriteCache, fetch data from object store directly.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn fetch_bytes(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet);
match self.fetch_ranges_from_write_cache(key, ranges).await {
Expand Down

0 comments on commit 72625bb

Please sign in to comment.