Skip to content

Commit

Permalink
chore: add more traces
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 12, 2024
1 parent ab7bee2 commit 3797f01
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::time::Duration;

use api::v1::OpType;
use async_trait::async_trait;
use common_telemetry::tracing;
use common_time::Timestamp;
use datafusion_common::arrow::array::UInt8Array;
use datatypes::arrow;
Expand Down Expand Up @@ -877,6 +878,7 @@ pub enum Source {

impl Source {
/// Returns next [Batch] from this data source.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
Source::Reader(reader) => reader.next_batch().await,
Expand Down Expand Up @@ -910,6 +912,7 @@ pub type BoxedBatchStream = BoxStream<'static, Result<Batch>>;

#[async_trait::async_trait]
impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn next_batch(&mut self) -> Result<Option<Batch>> {
(**self).next_batch().await
}
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/read/last_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use common_telemetry::tracing;
use datatypes::vectors::UInt32Vector;
use store_api::storage::TimeSeriesRowSelector;

Expand Down Expand Up @@ -138,6 +139,7 @@ impl RowGroupLastRowCachedReader {

#[async_trait]
impl BatchReader for RowGroupLastRowCachedReader {
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
RowGroupLastRowCachedReader::Hit(r) => r.next_batch().await,
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/read/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::mem;
use std::time::{Duration, Instant};

use async_trait::async_trait;
use common_telemetry::debug;
use common_telemetry::{debug, tracing};

use crate::error::Result;
use crate::memtable::BoxedBatchIterator;
Expand Down Expand Up @@ -276,6 +276,7 @@ impl MergeReaderBuilder {
}

/// Builds and initializes the reader, then resets the builder.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub async fn build(&mut self) -> Result<MergeReader> {
let sources = mem::take(&mut self.sources);
MergeReader::new(sources).await
Expand Down Expand Up @@ -313,6 +314,7 @@ impl Node {
/// Initialize a node.
///
/// It tries to fetch one batch from the `source`.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
// Ensures batch is not empty.
let start = Instant::now();
Expand Down
3 changes: 3 additions & 0 deletions src/mito2/src/read/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_telemetry::tracing;
use common_time::Timestamp;
use datatypes::scalars::ScalarVectorBuilder;
use datatypes::vectors::BooleanVectorBuilder;
Expand All @@ -30,6 +31,7 @@ pub enum Source {
}

impl Source {
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn next_batch(&mut self) -> Result<Option<Batch>> {
match self {
Source::RowGroup(r) => r.next_batch().await,
Expand Down Expand Up @@ -79,6 +81,7 @@ impl PruneReader {
}
}

#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
while let Some(b) = self.source.next_batch().await? {
match self.prune(b)? {
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ impl RowGroupReader {

#[async_trait::async_trait]
impl BatchReader for RowGroupReader {
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
async fn next_batch(&mut self) -> Result<Option<Batch>> {
let scan_start = Instant::now();
if let Some(batch) = self.batches.pop_front() {
Expand Down

0 comments on commit 3797f01

Please sign in to comment.