diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index d8ac5ce46b62..55765099b615 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -32,6 +32,7 @@ use std::time::Duration; use api::v1::OpType; use async_trait::async_trait; +use common_telemetry::tracing; use common_time::Timestamp; use datafusion_common::arrow::array::UInt8Array; use datatypes::arrow; @@ -877,6 +878,7 @@ pub enum Source { impl Source { /// Returns next [Batch] from this data source. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub(crate) async fn next_batch(&mut self) -> Result> { match self { Source::Reader(reader) => reader.next_batch().await, @@ -910,6 +912,7 @@ pub type BoxedBatchStream = BoxStream<'static, Result>; #[async_trait::async_trait] impl BatchReader for Box { + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn next_batch(&mut self) -> Result> { (**self).next_batch().await } diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index 79d035e03271..7da9009e365a 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_trait::async_trait; +use common_telemetry::tracing; use datatypes::vectors::UInt32Vector; use store_api::storage::TimeSeriesRowSelector; @@ -138,6 +139,7 @@ impl RowGroupLastRowCachedReader { #[async_trait] impl BatchReader for RowGroupLastRowCachedReader { + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn next_batch(&mut self) -> Result> { match self { RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await, diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs index 8060c534052b..576a35735b44 100644 --- a/src/mito2/src/read/merge.rs +++ b/src/mito2/src/read/merge.rs @@ -20,7 +20,7 @@ use std::mem; use std::time::{Duration, Instant}; use async_trait::async_trait; -use common_telemetry::debug; +use common_telemetry::{debug, tracing}; use crate::error::Result; use crate::memtable::BoxedBatchIterator; @@ -276,6 +276,7 @@ impl MergeReaderBuilder { } /// Builds and initializes the reader, then resets the builder. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub async fn build(&mut self) -> Result { let sources = mem::take(&mut self.sources); MergeReader::new(sources).await @@ -313,6 +314,7 @@ impl Node { /// Initialize a node. /// /// It tries to fetch one batch from the `source`. + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn new(mut source: Source, metrics: &mut Metrics) -> Result { // Ensures batch is not empty. let start = Instant::now(); diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index cb0066e73472..d0765d819ddb 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_telemetry::tracing; use common_time::Timestamp; use datatypes::scalars::ScalarVectorBuilder; use datatypes::vectors::BooleanVectorBuilder; @@ -30,6 +31,7 @@ pub enum Source { } impl Source { + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn next_batch(&mut self) -> Result> { match self { Source::RowGroup(r) => r.next_batch().await, @@ -79,6 +81,7 @@ impl PruneReader { } } + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] pub(crate) async fn next_batch(&mut self) -> Result> { while let Some(b) = self.source.next_batch().await? { match self.prune(b)? { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index c8c351228023..40d839f4df0c 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1164,6 +1164,7 @@ impl RowGroupReader { #[async_trait::async_trait] impl BatchReader for RowGroupReader { + #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] async fn next_batch(&mut self) -> Result> { let scan_start = Instant::now(); if let Some(batch) = self.batches.pop_front() {