From 63bbfd04c75e00e24f41a8a61af282a961c4d9c4 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 19 Nov 2024 20:43:30 +0800 Subject: [PATCH] fix: prune memtable/files range independently in each partition (#4998) * feat: prune in each partition * chore: change pick log to trace * chore: add in progress partition scan to metrics * feat: seqscan support pruning in partition * chore: remove commented codes --- src/mito2/src/compaction/twcs.rs | 4 +- src/mito2/src/metrics.rs | 9 ++ src/mito2/src/read/range.rs | 163 ++++++++++++++++++++++++- src/mito2/src/read/scan_region.rs | 173 ++------------------------- src/mito2/src/read/scan_util.rs | 21 +++- src/mito2/src/read/seq_scan.rs | 30 ++++- src/mito2/src/read/unordered_scan.rs | 23 +++- 7 files changed, 245 insertions(+), 178 deletions(-) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index c28b30ac05d3..96fc755a7a70 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -16,7 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::Debug; -use common_telemetry::{debug, info}; +use common_telemetry::{info, trace}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; @@ -114,7 +114,7 @@ impl TwcsPicker { // Files in window exceeds file num limit vec![enforce_file_num(&files.files, max_files)] } else { - debug!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs); + trace!("Skip building compaction output, active window: {:?}, current window: {}, max runs: {}, found runs: {}, ", active_window, *window, max_runs, found_runs); continue; }; diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index c50bbbdc7847..e7c1c7272ef8 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -26,6 +26,8 @@ pub const FLUSH_REASON: &str = "reason"; pub const FILE_TYPE_LABEL: &str = "file_type"; /// Region worker id label. pub const WORKER_LABEL: &str = "worker"; +/// Partition label. +pub const PARTITION_LABEL: &str = "partition"; lazy_static! { /// Global write buffer size in bytes. @@ -134,6 +136,13 @@ lazy_static! { ) .unwrap(); pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]); + /// Number of in-progress scan per partition. + pub static ref IN_PROGRESS_SCAN: IntGaugeVec = register_int_gauge_vec!( + "greptime_mito_in_progress_scan", + "mito in progress scan per partition", + &[TYPE_LABEL, PARTITION_LABEL] + ) + .unwrap(); /// Counter of rows read from different source. pub static ref READ_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap(); diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 4bf9314915b9..1944d171dd19 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -14,15 +14,22 @@ //! Structs for partition ranges. +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; + use common_time::Timestamp; +use parquet::arrow::arrow_reader::RowSelection; use smallvec::{smallvec, SmallVec}; use store_api::region_engine::PartitionRange; use crate::cache::CacheManager; -use crate::memtable::MemtableRef; +use crate::error::Result; +use crate::memtable::{MemtableRange, MemtableRef}; use crate::read::scan_region::ScanInput; use crate::sst::file::{overlaps, FileHandle, FileTimeRange}; +use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef}; use crate::sst::parquet::format::parquet_row_group_time_range; +use crate::sst::parquet::reader::ReaderMetrics; use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; const ALL_ROW_GROUPS: i64 = -1; @@ -334,6 +341,160 @@ fn maybe_split_ranges_for_seq_scan(ranges: Vec) -> Vec { new_ranges } +/// Builder to create file ranges. +#[derive(Default)] +pub(crate) struct FileRangeBuilder { + /// Context for the file. + /// None indicates nothing to read. + context: Option, + /// Row selections for each row group to read. + /// It skips the row group if it is not in the map. + row_groups: BTreeMap>, +} + +impl FileRangeBuilder { + /// Builds a file range builder from context and row groups. + pub(crate) fn new( + context: FileRangeContextRef, + row_groups: BTreeMap>, + ) -> Self { + Self { + context: Some(context), + row_groups, + } + } + + /// Builds file ranges to read. + /// Negative `row_group_index` indicates all row groups. + pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) { + let Some(context) = self.context.clone() else { + return; + }; + if row_group_index >= 0 { + let row_group_index = row_group_index as usize; + // Scans one row group. + let Some(row_selection) = self.row_groups.get(&row_group_index) else { + return; + }; + ranges.push(FileRange::new( + context, + row_group_index, + row_selection.clone(), + )); + } else { + // Scans all row groups. + ranges.extend( + self.row_groups + .iter() + .map(|(row_group_index, row_selection)| { + FileRange::new(context.clone(), *row_group_index, row_selection.clone()) + }), + ); + } + } +} + +/// Builder to create mem ranges. +pub(crate) struct MemRangeBuilder { + /// Ranges of a memtable. + row_groups: BTreeMap, +} + +impl MemRangeBuilder { + /// Builds a mem range builder from row groups. + pub(crate) fn new(row_groups: BTreeMap) -> Self { + Self { row_groups } + } + + /// Builds mem ranges to read in the memtable. + /// Negative `row_group_index` indicates all row groups. + fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) { + if row_group_index >= 0 { + let row_group_index = row_group_index as usize; + // Scans one row group. + let Some(range) = self.row_groups.get(&row_group_index) else { + return; + }; + ranges.push(range.clone()); + } else { + ranges.extend(self.row_groups.values().cloned()); + } + } +} + +/// List to manages the builders to create file ranges. +/// Each scan partition should have its own list. Mutex inside this list is used to allow moving +/// the list to different streams in the same partition. +pub(crate) struct RangeBuilderList { + num_memtables: usize, + mem_builders: Mutex>>, + file_builders: Mutex>>>, +} + +impl RangeBuilderList { + /// Creates a new [ReaderBuilderList] with the given number of memtables and files. + pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self { + let mem_builders = (0..num_memtables).map(|_| None).collect(); + let file_builders = (0..num_files).map(|_| None).collect(); + Self { + num_memtables, + mem_builders: Mutex::new(mem_builders), + file_builders: Mutex::new(file_builders), + } + } + + /// Builds file ranges to read the row group at `index`. + pub(crate) async fn build_file_ranges( + &self, + input: &ScanInput, + index: RowGroupIndex, + reader_metrics: &mut ReaderMetrics, + ) -> Result> { + let mut ranges = SmallVec::new(); + let file_index = index.index - self.num_memtables; + let builder_opt = self.get_file_builder(file_index); + match builder_opt { + Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges), + None => { + let builder = input.prune_file(file_index, reader_metrics).await?; + builder.build_ranges(index.row_group_index, &mut ranges); + self.set_file_builder(file_index, Arc::new(builder)); + } + } + Ok(ranges) + } + + /// Builds mem ranges to read the row group at `index`. + pub(crate) fn build_mem_ranges( + &self, + input: &ScanInput, + index: RowGroupIndex, + ) -> SmallVec<[MemtableRange; 2]> { + let mut ranges = SmallVec::new(); + let mut mem_builders = self.mem_builders.lock().unwrap(); + match &mut mem_builders[index.index] { + Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges), + None => { + let builder = input.prune_memtable(index.index); + builder.build_ranges(index.row_group_index, &mut ranges); + mem_builders[index.index] = Some(builder); + } + } + + ranges + } + + fn get_file_builder(&self, index: usize) -> Option> { + let file_builders = self.file_builders.lock().unwrap(); + file_builders[index].clone() + } + + fn set_file_builder(&self, index: usize, builder: Arc) { + let mut file_builders = self.file_builders.lock().unwrap(); + file_builders[index] = Some(builder); + } +} + #[cfg(test)] mod tests { use common_time::timestamp::TimeUnit; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 0241ba72037e..7da80806f22e 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -14,9 +14,9 @@ //! Scans a region according to the scan request. -use std::collections::{BTreeMap, HashSet}; +use std::collections::HashSet; use std::fmt; -use std::sync::{Arc, Mutex as StdMutex}; +use std::sync::Arc; use std::time::Instant; use common_error::ext::BoxedError; @@ -24,23 +24,21 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{debug, error, tracing, warn}; use common_time::range::TimestampRange; use datafusion_expr::utils::expr_to_columns; -use parquet::arrow::arrow_reader::RowSelection; -use smallvec::SmallVec; use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::storage::{ScanRequest, TimeSeriesRowSelector}; use table::predicate::{build_time_range_predicate, Predicate}; -use tokio::sync::{mpsc, Mutex, Semaphore}; +use tokio::sync::{mpsc, Semaphore}; use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; use crate::cache::file_cache::FileCacheRef; use crate::cache::CacheManagerRef; use crate::error::Result; -use crate::memtable::{MemtableRange, MemtableRef}; +use crate::memtable::MemtableRef; use crate::metrics::READ_SST_COUNT; use crate::read::compat::{self, CompatBatch}; use crate::read::projection::ProjectionMapper; -use crate::read::range::{RangeMeta, RowGroupIndex}; +use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex}; use crate::read::seq_scan::SeqScan; use crate::read::unordered_scan::UnorderedScan; use crate::read::{Batch, Source}; @@ -51,7 +49,6 @@ use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBui use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef; -use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef}; use crate::sst::parquet::reader::ReaderMetrics; /// A scanner scans a region and returns a [SendableRecordBatchStream]. @@ -639,14 +636,14 @@ impl ScanInput { } /// Prunes a memtable to scan and returns the builder to build readers. - fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder { + pub(crate) fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder { let memtable = &self.memtables[mem_index]; let row_groups = memtable.ranges(Some(self.mapper.column_ids()), self.predicate.clone()); - MemRangeBuilder { row_groups } + MemRangeBuilder::new(row_groups) } /// Prunes a file to scan and returns the builder to build readers. - async fn prune_file( + pub(crate) async fn prune_file( &self, file_index: usize, reader_metrics: &mut ReaderMetrics, @@ -687,10 +684,7 @@ impl ScanInput { )?; file_range_ctx.set_compat_batch(Some(compat)); } - Ok(FileRangeBuilder { - context: Some(Arc::new(file_range_ctx)), - row_groups, - }) + Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), row_groups)) } /// Scans the input source in another task and sends batches to the sender. @@ -759,8 +753,6 @@ pub(crate) struct StreamContext { pub(crate) input: ScanInput, /// Metadata for partition ranges. pub(crate) ranges: Vec, - /// Lists of range builders. - range_builders: RangeBuilderList, // Metrics: /// The start time of the query. @@ -773,12 +765,10 @@ impl StreamContext { let query_start = input.query_start.unwrap_or_else(Instant::now); let ranges = RangeMeta::seq_scan_ranges(&input); READ_SST_COUNT.observe(input.num_files() as f64); - let range_builders = RangeBuilderList::new(input.num_memtables(), input.num_files()); Self { input, ranges, - range_builders, query_start, } } @@ -788,12 +778,10 @@ impl StreamContext { let query_start = input.query_start.unwrap_or_else(Instant::now); let ranges = RangeMeta::unordered_scan_ranges(&input); READ_SST_COUNT.observe(input.num_files() as f64); - let range_builders = RangeBuilderList::new(input.num_memtables(), input.num_files()); Self { input, ranges, - range_builders, query_start, } } @@ -803,27 +791,6 @@ impl StreamContext { self.input.num_memtables() > index.index } - /// Creates file ranges to scan. - pub(crate) async fn build_file_ranges( - &self, - index: RowGroupIndex, - reader_metrics: &mut ReaderMetrics, - ) -> Result> { - let mut ranges = SmallVec::new(); - self.range_builders - .build_file_ranges(&self.input, index, &mut ranges, reader_metrics) - .await?; - Ok(ranges) - } - - /// Creates memtable ranges to scan. - pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> { - let mut ranges = SmallVec::new(); - self.range_builders - .build_mem_ranges(&self.input, index, &mut ranges); - ranges - } - /// Retrieves the partition ranges. pub(crate) fn partition_ranges(&self) -> Vec { self.ranges @@ -859,125 +826,3 @@ impl StreamContext { Ok(()) } } - -/// List to manages the builders to create file ranges. -struct RangeBuilderList { - mem_builders: Vec>>, - file_builders: Vec>>, -} - -impl RangeBuilderList { - /// Creates a new [ReaderBuilderList] with the given number of memtables and files. - fn new(num_memtables: usize, num_files: usize) -> Self { - let mem_builders = (0..num_memtables).map(|_| StdMutex::new(None)).collect(); - let file_builders = (0..num_files).map(|_| Mutex::new(None)).collect(); - Self { - mem_builders, - file_builders, - } - } - - /// Builds file ranges to read the row group at `index`. - async fn build_file_ranges( - &self, - input: &ScanInput, - index: RowGroupIndex, - ranges: &mut SmallVec<[FileRange; 2]>, - reader_metrics: &mut ReaderMetrics, - ) -> Result<()> { - let file_index = index.index - self.mem_builders.len(); - let mut builder_opt = self.file_builders[file_index].lock().await; - match &mut *builder_opt { - Some(builder) => builder.build_ranges(index.row_group_index, ranges), - None => { - let builder = input.prune_file(file_index, reader_metrics).await?; - builder.build_ranges(index.row_group_index, ranges); - *builder_opt = Some(builder); - } - } - Ok(()) - } - - /// Builds mem ranges to read the row group at `index`. - fn build_mem_ranges( - &self, - input: &ScanInput, - index: RowGroupIndex, - ranges: &mut SmallVec<[MemtableRange; 2]>, - ) { - let mut builder_opt = self.mem_builders[index.index].lock().unwrap(); - match &mut *builder_opt { - Some(builder) => builder.build_ranges(index.row_group_index, ranges), - None => { - let builder = input.prune_memtable(index.index); - builder.build_ranges(index.row_group_index, ranges); - *builder_opt = Some(builder); - } - } - } -} - -/// Builder to create file ranges. -#[derive(Default)] -struct FileRangeBuilder { - /// Context for the file. - /// None indicates nothing to read. - context: Option, - /// Row selections for each row group to read. - /// It skips the row group if it is not in the map. - row_groups: BTreeMap>, -} - -impl FileRangeBuilder { - /// Builds file ranges to read. - /// Negative `row_group_index` indicates all row groups. - fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) { - let Some(context) = self.context.clone() else { - return; - }; - if row_group_index >= 0 { - let row_group_index = row_group_index as usize; - // Scans one row group. - let Some(row_selection) = self.row_groups.get(&row_group_index) else { - return; - }; - ranges.push(FileRange::new( - context, - row_group_index, - row_selection.clone(), - )); - } else { - // Scans all row groups. - ranges.extend( - self.row_groups - .iter() - .map(|(row_group_index, row_selection)| { - FileRange::new(context.clone(), *row_group_index, row_selection.clone()) - }), - ); - } - } -} - -/// Builder to create mem ranges. -struct MemRangeBuilder { - /// Ranges of a memtable. - row_groups: BTreeMap, -} - -impl MemRangeBuilder { - /// Builds mem ranges to read in the memtable. - /// Negative `row_group_index` indicates all row groups. - fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) { - if row_group_index >= 0 { - let row_group_index = row_group_index as usize; - // Scans one row group. - let Some(range) = self.row_groups.get(&row_group_index) else { - return; - }; - ranges.push(range.clone()); - } else { - ranges.extend(self.row_groups.values().cloned()); - } - } -} diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index d27468521a7d..6e6fbf48db60 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -20,10 +20,12 @@ use std::time::{Duration, Instant}; use async_stream::try_stream; use common_telemetry::debug; use futures::Stream; +use prometheus::IntGauge; use store_api::storage::RegionId; use crate::error::Result; -use crate::read::range::RowGroupIndex; +use crate::metrics::IN_PROGRESS_SCAN; +use crate::read::range::{RangeBuilderList, RowGroupIndex}; use crate::read::scan_region::StreamContext; use crate::read::{Batch, ScannerMetrics, Source}; use crate::sst::file::FileTimeRange; @@ -41,6 +43,7 @@ struct PartitionMetricsInner { first_poll: Duration, metrics: ScannerMetrics, reader_metrics: ReaderMetrics, + in_progress_scan: IntGauge, } impl PartitionMetricsInner { @@ -56,6 +59,7 @@ impl Drop for PartitionMetricsInner { fn drop(&mut self) { self.on_finish(); self.metrics.observe_metrics(); + self.in_progress_scan.dec(); debug!( "{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}", @@ -76,6 +80,8 @@ impl PartitionMetrics { query_start: Instant, metrics: ScannerMetrics, ) -> Self { + let partition_str = partition.to_string(); + let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]); let inner = PartitionMetricsInner { region_id, partition, @@ -84,6 +90,7 @@ impl PartitionMetrics { first_poll: Duration::default(), metrics, reader_metrics: ReaderMetrics::default(), + in_progress_scan, }; Self(Arc::new(Mutex::new(inner))) } @@ -130,9 +137,10 @@ pub(crate) fn scan_mem_ranges( part_metrics: PartitionMetrics, index: RowGroupIndex, time_range: FileTimeRange, + range_builder_list: Arc, ) -> impl Stream> { try_stream! { - let ranges = stream_ctx.build_mem_ranges(index); + let ranges = range_builder_list.build_mem_ranges(&stream_ctx.input, index); part_metrics.inc_num_mem_ranges(ranges.len()); for range in ranges { let build_reader_start = Instant::now(); @@ -153,17 +161,18 @@ pub(crate) fn scan_file_ranges( part_metrics: PartitionMetrics, index: RowGroupIndex, read_type: &'static str, + range_builder: Arc, ) -> impl Stream> { try_stream! { let mut reader_metrics = ReaderMetrics::default(); - let ranges = stream_ctx - .build_file_ranges(index, &mut reader_metrics) - .await?; + let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?; part_metrics.inc_num_file_ranges(ranges.len()); + for range in ranges { let build_reader_start = Instant::now(); let reader = range.reader(None).await?; - part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); + let build_cost = build_reader_start.elapsed(); + part_metrics.inc_build_reader_cost(build_cost); let compat_batch = range.compat_batch(); let mut source = Source::PruneReader(reader); while let Some(mut batch) = source.next_batch().await? { diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 345d1d615ba5..9498078ddbc4 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -36,6 +36,7 @@ use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; +use crate::read::range::RangeBuilderList; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics}; use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source}; @@ -131,12 +132,17 @@ impl SeqScan { part_metrics: &PartitionMetrics, ) -> Result { let mut sources = Vec::new(); + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + stream_ctx.input.num_files(), + )); for part_range in partition_ranges { build_sources( stream_ctx, part_range, compaction, part_metrics, + range_builder_list.clone(), &mut sources, ); } @@ -219,10 +225,21 @@ impl SeqScan { let stream = try_stream! { part_metrics.on_first_poll(); + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + stream_ctx.input.num_files(), + )); // Scans each part. for part_range in partition_ranges { let mut sources = Vec::new(); - build_sources(&stream_ctx, &part_range, compaction, &part_metrics, &mut sources); + build_sources( + &stream_ctx, + &part_range, + compaction, + &part_metrics, + range_builder_list.clone(), + &mut sources, + ); let mut reader = Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone()) @@ -353,6 +370,7 @@ fn build_sources( part_range: &PartitionRange, compaction: bool, part_metrics: &PartitionMetrics, + range_builder_list: Arc, sources: &mut Vec, ) { // Gets range meta. @@ -365,6 +383,7 @@ fn build_sources( part_metrics.clone(), *index, range_meta.time_range, + range_builder_list.clone(), ); Box::pin(stream) as _ } else { @@ -373,8 +392,13 @@ fn build_sources( } else { "seq_scan_files" }; - let stream = - scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, read_type); + let stream = scan_file_ranges( + stream_ctx.clone(), + part_metrics.clone(), + *index, + read_type, + range_builder_list.clone(), + ); Box::pin(stream) as _ }; sources.push(Source::Stream(stream)); diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 7a2ce12e62ca..c1ee34b08e5d 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -30,6 +30,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; use crate::error::{PartitionOutOfRangeSnafu, Result}; +use crate::read::range::RangeBuilderList; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics}; use crate::read::{Batch, ScannerMetrics}; @@ -84,18 +85,31 @@ impl UnorderedScan { stream_ctx: Arc, part_range_id: usize, part_metrics: PartitionMetrics, + range_builder_list: Arc, ) -> impl Stream> { stream! { // Gets range meta. let range_meta = &stream_ctx.ranges[part_range_id]; for index in &range_meta.row_group_indices { if stream_ctx.is_mem_range_index(*index) { - let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range); + let stream = scan_mem_ranges( + stream_ctx.clone(), + part_metrics.clone(), + *index, + range_meta.time_range, + range_builder_list.clone(), + ); for await batch in stream { yield batch; } } else { - let stream = scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files"); + let stream = scan_file_ranges( + stream_ctx.clone(), + part_metrics.clone(), + *index, + "unordered_scan_files", + range_builder_list.clone(), + ); for await batch in stream { yield batch; } @@ -136,6 +150,10 @@ impl UnorderedScan { part_metrics.on_first_poll(); let cache = &stream_ctx.input.cache_manager; + let range_builder_list = Arc::new(RangeBuilderList::new( + stream_ctx.input.num_memtables(), + stream_ctx.input.num_files(), + )); // Scans each part. for part_range in part_ranges { let mut metrics = ScannerMetrics::default(); @@ -149,6 +167,7 @@ impl UnorderedScan { stream_ctx.clone(), part_range.identifier, part_metrics.clone(), + range_builder_list.clone(), ); for await batch in stream { let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?;