From f39529f0cbb1086564d438a938382074d96fcd6c Mon Sep 17 00:00:00 2001 From: lyang24 Date: Thu, 29 Aug 2024 14:06:19 -0700 Subject: [PATCH] feat: add more span on mito engine --- src/mito2/src/read.rs | 2 ++ src/mito2/src/read/scan_region.rs | 4 +++- src/mito2/src/read/seq_scan.rs | 6 +++++- src/mito2/src/sst/index/store.rs | 3 +++ src/mito2/src/sst/parquet/file_range.rs | 3 ++- src/mito2/src/sst/parquet/reader.rs | 6 +++++- src/mito2/src/sst/parquet/row_group.rs | 2 ++ src/mito2/src/sst/parquet/writer.rs | 3 +++ src/mito2/src/worker/handle_write.rs | 2 ++ 9 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index edde28dab80c..ff9e4bf7b04e 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -30,6 +30,7 @@ use std::time::Duration; use api::v1::OpType; use async_trait::async_trait; +use common_telemetry::tracing; use common_time::Timestamp; use datafusion_common::arrow::array::UInt8Array; use datatypes::arrow; @@ -727,6 +728,7 @@ pub type BoxedBatchStream = BoxStream<'static, Result>; #[async_trait::async_trait] impl BatchReader for Box { + #[tracing::instrument(skip_all)] async fn next_batch(&mut self) -> Result> { (**self).next_batch().await } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index d0bef554727f..f95935164b28 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -20,7 +20,7 @@ use std::time::{Duration, Instant}; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::{debug, error, warn}; +use common_telemetry::{debug, error, tracing, warn}; use common_time::range::TimestampRange; use common_time::Timestamp; use datafusion::physical_plan::DisplayFormatType; @@ -62,6 +62,7 @@ pub(crate) enum Scanner { impl Scanner { /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions. + #[tracing::instrument(skip_all)] pub(crate) async fn scan(&self) -> Result { match self { Scanner::Seq(seq_scan) => seq_scan.build_stream(), @@ -70,6 +71,7 @@ impl Scanner { } /// Returns a [RegionScanner] to scan the region. + #[tracing::instrument(skip_all)] pub(crate) async fn region_scanner(self) -> Result { match self { Scanner::Seq(seq_scan) => Ok(Box::new(seq_scan)), diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 954580986ec2..ae40e1ce2600 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -23,7 +23,7 @@ use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::util::ChainedRecordBatchStream; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; -use common_telemetry::debug; +use common_telemetry::{debug, tracing}; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use smallvec::smallvec; @@ -119,6 +119,7 @@ impl SeqScan { } /// Builds sources from a [ScanPart]. + #[tracing::instrument(skip(part, sources, row_selector))] fn build_part_sources( part: &ScanPart, sources: &mut Vec, @@ -204,6 +205,7 @@ impl SeqScan { /// Builds a merge reader that reads data from one [`PartitionRange`]. /// /// If the `range_id` is out of bound, returns None. + #[tracing::instrument(skip_all)] async fn build_merge_reader( stream_ctx: &StreamContext, range_id: usize, @@ -244,6 +246,7 @@ impl SeqScan { maybe_reader } + #[tracing::instrument(skip_all)] async fn build_reader_from_sources( stream_ctx: &StreamContext, mut sources: Vec, @@ -500,6 +503,7 @@ impl RegionScanner for SeqScan { self.stream_ctx.input.mapper.output_schema() } + #[tracing::instrument(skip_all)] fn scan_partition(&self, partition: usize) -> Result { self.uncached_scan_part_impl(partition) } diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index 7dfcdc253cd6..6596d024e433 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -16,6 +16,7 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +use common_telemetry::tracing; use futures::{AsyncRead, AsyncSeek, AsyncWrite}; use object_store::ObjectStore; use pin_project::pin_project; @@ -54,6 +55,7 @@ impl InstrumentedStore { /// Returns an [`InstrumentedAsyncRead`] for the given path. /// Metrics like the number of bytes read, read and seek operations /// are recorded using the provided `IntCounter`s. + #[tracing::instrument(skip_all)] pub async fn reader<'a>( &self, path: &str, @@ -81,6 +83,7 @@ impl InstrumentedStore { /// Returns an [`InstrumentedAsyncWrite`] for the given path. /// Metrics like the number of bytes written, write and flush operations /// are recorded using the provided `IntCounter`s. + #[tracing::instrument(skip_all)] pub async fn writer<'a>( &self, path: &str, diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 0976c2402f08..98ad74609680 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -19,7 +19,7 @@ use std::ops::BitAnd; use std::sync::Arc; use api::v1::{OpType, SemanticType}; -use common_telemetry::error; +use common_telemetry::{error, tracing}; use datatypes::arrow::array::BooleanArray; use datatypes::arrow::buffer::BooleanBuffer; use parquet::arrow::arrow_reader::RowSelection; @@ -80,6 +80,7 @@ impl FileRange { } /// Returns a reader to read the [FileRange]. + #[tracing::instrument(skip_all)] pub(crate) async fn reader( &self, selector: Option, diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 766dab04290e..b497cea9fa8f 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -22,7 +22,7 @@ use std::time::{Duration, Instant}; use api::v1::SemanticType; use async_trait::async_trait; use common_recordbatch::filter::SimpleFilterEvaluator; -use common_telemetry::{debug, warn}; +use common_telemetry::{debug, tracing, warn}; use common_time::range::TimestampRange; use common_time::timestamp::TimeUnit; use common_time::Timestamp; @@ -173,6 +173,7 @@ impl ParquetReaderBuilder { /// Builds a [ParquetReader]. /// /// This needs to perform IO operation. + #[tracing::instrument(skip_all)] pub async fn build(&self) -> Result { let mut metrics = ReaderMetrics::default(); @@ -183,6 +184,7 @@ impl ParquetReaderBuilder { /// Builds a [FileRangeContext] and collects row groups to read. /// /// This needs to perform IO operation. + #[tracing::instrument(skip_all)] pub(crate) async fn build_reader_input( &self, metrics: &mut ReaderMetrics, @@ -273,6 +275,7 @@ impl ParquetReaderBuilder { } /// Decodes region metadata from key value. + #[tracing::instrument(skip_all)] fn get_region_metadata( file_path: &str, key_value_meta: Option<&Vec>, @@ -300,6 +303,7 @@ impl ParquetReaderBuilder { } /// Reads parquet metadata of specific file. + #[tracing::instrument(skip_all)] async fn read_parquet_metadata( &self, file_path: &str, diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index 68a91e55fef4..9d0c450d07e4 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -18,6 +18,7 @@ use std::ops::Range; use std::sync::Arc; use bytes::{Buf, Bytes}; +use common_telemetry::tracing; use object_store::ObjectStore; use parquet::arrow::arrow_reader::{RowGroups, RowSelection}; use parquet::arrow::ProjectionMask; @@ -93,6 +94,7 @@ impl<'a> InMemoryRowGroup<'a> { } /// Fetches the necessary column data into memory + #[tracing::instrument(skip_all)] pub async fn fetch( &mut self, projection: &ProjectionMask, diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 1d63f5e3d01b..345c2648a4c6 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -20,6 +20,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use common_telemetry::tracing; use common_time::Timestamp; use datatypes::arrow::datatypes::SchemaRef; use object_store::{FuturesAsyncWriter, ObjectStore}; @@ -65,6 +66,7 @@ pub struct ObjectStoreWriterFactory { impl WriterFactory for ObjectStoreWriterFactory { type Writer = Compat; + #[tracing::instrument(skip_all)] async fn create(&mut self) -> Result { self.object_store .writer_with(&self.path) @@ -109,6 +111,7 @@ where /// Iterates source and writes all rows to Parquet file. /// /// Returns the [SstInfo] if the SST is written. + #[tracing::instrument(skip_all)] pub async fn write_all( &mut self, mut source: Source, diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 85ce49f3150f..a1c3b2454ada 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -18,6 +18,7 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; use api::v1::OpType; +use common_telemetry::tracing; use snafu::ensure; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadata; @@ -31,6 +32,7 @@ use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { /// Takes and handles all write requests. + #[tracing::instrument(skip_all)] pub(crate) async fn handle_write_requests( &mut self, mut write_requests: Vec,