From 72625bb4a007f61f055f753b3b2ccd9b37278e68 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Fri, 8 Nov 2024 09:27:38 +0000 Subject: [PATCH] chore: add traces --- src/mito2/src/read/scan_region.rs | 3 +++ src/mito2/src/read/scan_util.rs | 3 ++- src/mito2/src/read/seq_scan.rs | 1 + src/mito2/src/sst/parquet/file_range.rs | 3 ++- src/mito2/src/sst/parquet/reader.rs | 8 +++++++- src/mito2/src/sst/parquet/row_group.rs | 4 ++++ 6 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 1a7fb29b2e56..823db8279ab2 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -660,6 +660,7 @@ impl ScanInput { } /// Prunes a file to scan and returns the builder to build readers. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn prune_file( &self, file_index: usize, @@ -818,6 +819,7 @@ impl StreamContext { } /// Creates file ranges to scan. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub(crate) async fn build_file_ranges( &self, index: RowGroupIndex, @@ -892,6 +894,7 @@ impl RangeBuilderList { } /// Builds file ranges to read the row group at `index`. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn build_file_ranges( &self, input: &ScanInput, diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index d27468521a7d..9e5d0290f508 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -18,7 +18,7 @@ use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; use async_stream::try_stream; -use common_telemetry::debug; +use common_telemetry::{debug, tracing}; use futures::Stream; use store_api::storage::RegionId; @@ -148,6 +148,7 @@ pub(crate) fn scan_mem_ranges( } /// Scans file ranges at `index`. +#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub(crate) fn scan_file_ranges( stream_ctx: Arc, part_metrics: PartitionMetrics, diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 9b7a71a36c51..c4bf95b1493f 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -348,6 +348,7 @@ impl fmt::Debug for SeqScan { } /// Builds sources for the partition range. +#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] fn build_sources( stream_ctx: &Arc, part_range: &PartitionRange, diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 0976c2402f08..d724309f8d27 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -19,7 +19,7 @@ use std::ops::BitAnd; use std::sync::Arc; use api::v1::{OpType, SemanticType}; -use common_telemetry::error; +use common_telemetry::{error, tracing}; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; use parquet::arrow::arrow_reader::RowSelection; @@ -80,6 +80,7 @@ impl FileRange { } /// Returns a reader to read the [FileRange]. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub(crate) async fn reader( &self, selector: Option, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b73026a7a6e3..ed19ff8c93a0 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -22,7 +22,7 @@ use std::time::{Duration, Instant}; use api::v1::SemanticType; use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; -use common_telemetry::{debug, warn}; +use common_telemetry::{debug, tracing, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; @@ -183,6 +183,7 @@ impl ParquetReaderBuilder { /// Builds a [FileRangeContext] and collects row groups to read. /// /// This needs to perform IO operation. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub(crate) async fn build_reader_input( &self, metrics: &mut ReaderMetrics, @@ -336,6 +337,7 @@ impl ParquetReaderBuilder { } /// Computes row groups to read, along with their respective row selections. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn row_groups_to_read( &self, read_format: &ReadFormat, @@ -381,6 +383,7 @@ impl ParquetReaderBuilder { } /// Prunes row groups by fulltext index. Returns `true` if the row groups are pruned. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn prune_row_groups_by_fulltext_index( &self, row_group_size: usize, @@ -461,6 +464,7 @@ impl ParquetReaderBuilder { /// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices /// as an escape route in case of index issues, and it can be used to test /// the correctness of the index. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn prune_row_groups_by_inverted_index( &self, row_group_size: usize, @@ -528,6 +532,7 @@ impl ParquetReaderBuilder { } /// Prunes row groups by min-max index. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] fn prune_row_groups_by_minmax( &self, read_format: &ReadFormat, @@ -869,6 +874,7 @@ impl RowGroupReaderBuilder { } /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub(crate) async fn build( &self, row_group_idx: usize, diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 73382c06d9b3..9b2aa57e59c7 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -18,6 +18,7 @@ use std::ops::Range; use std::sync::Arc; use bytes::{Buf, Bytes}; +use common_telemetry::tracing; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; use parquet::arrow::ProjectionMask; @@ -97,6 +98,7 @@ impl<'a> InMemoryRowGroup<'a> { } /// Fetches the necessary column data into memory + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub async fn fetch( &mut self, projection: &ProjectionMask, @@ -235,6 +237,7 @@ impl<'a> InMemoryRowGroup<'a> { /// Fetches pages for columns if cache is enabled. /// If the page is in the cache, sets the column chunk or `column_uncompressed_pages` for the column. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] fn fetch_pages_from_cache(&mut self, projection: &ProjectionMask) { let _timer = READ_STAGE_FETCH_PAGES.start_timer(); self.column_chunks @@ -276,6 +279,7 @@ impl<'a> InMemoryRowGroup<'a> { /// Try to fetch data from WriteCache, /// if not in WriteCache, fetch data from object store directly. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn fetch_bytes(&self, ranges: &[Range]) -> Result> { let key = IndexKey::new(self.region_id, self.file_id, FileType::Parquet); match self.fetch_ranges_from_write_cache(key, ranges).await {