From a30d918df2bf4b57e65b59d3eef26765ad96f6b6 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 11 Dec 2024 16:24:41 +0800 Subject: [PATCH] perf: avoid cache during compaction (#5135) * Revert "refactor: Avoid wrapping Option for CacheManagerRef (#4996)" This reverts commit 42bf7e99655bf842a08c657d1d601c0a8a9f41f2. * fix: memory usage during log ingestion * fix: fmt --- src/mito2/src/cache/write_cache.rs | 2 +- src/mito2/src/compaction.rs | 3 +- src/mito2/src/compaction/compactor.rs | 1 - src/mito2/src/engine.rs | 16 ++++++---- src/mito2/src/read/last_row.rs | 24 ++++++++++---- src/mito2/src/read/projection.rs | 24 +++++++------- src/mito2/src/read/range.rs | 9 +++--- src/mito2/src/read/scan_region.rs | 19 +++++++---- src/mito2/src/read/seq_scan.rs | 2 +- src/mito2/src/read/unordered_scan.rs | 2 +- src/mito2/src/sst/parquet.rs | 10 +++--- src/mito2/src/sst/parquet/reader.rs | 32 +++++++++---------- src/mito2/src/sst/parquet/row_group.rs | 44 +++++++++++++++----------- 13 files changed, 108 insertions(+), 80 deletions(-) diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 4e2fe357fd09..8a431f22a63d 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -501,7 +501,7 @@ mod tests { // Read metadata from write cache let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone()) - .cache(cache_manager.clone()); + .cache(Some(cache_manager.clone())); let reader = builder.build().await.unwrap(); // Check parquet metadata diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 5f462f33a111..2b70f455d815 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -570,7 +570,6 @@ pub struct SerializedCompactionOutput { struct CompactionSstReaderBuilder<'a> { metadata: RegionMetadataRef, sst_layer: AccessLayerRef, - cache: CacheManagerRef, inputs: &'a [FileHandle], append_mode: bool, filter_deleted: bool, @@ -584,7 +583,7 @@ impl<'a> CompactionSstReaderBuilder<'a> { let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?) .with_files(self.inputs.to_vec()) .with_append_mode(self.append_mode) - .with_cache(self.cache) + .with_cache(None) .with_filter_deleted(self.filter_deleted) // We ignore file not found error during compaction. .with_ignore_file_not_found(true) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 792634b2e4a2..91ab34c961cf 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -296,7 +296,6 @@ impl Compactor for DefaultCompactor { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), sst_layer: sst_layer.clone(), - cache: cache_manager.clone(), inputs: &output.inputs, append_mode, filter_deleted: output.filter_deleted, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index a518da32535d..9b912318e16b 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -424,12 +424,16 @@ impl EngineInner { // Get cache. let cache_manager = self.workers.cache_manager(); - let scan_region = - ScanRegion::new(version, region.access_layer.clone(), request, cache_manager) - .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size) - .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) - .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) - .with_start_time(query_start); + let scan_region = ScanRegion::new( + version, + region.access_layer.clone(), + request, + Some(cache_manager), + ) + .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size) + .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) + .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) + .with_start_time(query_start); Ok(scan_region) } diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index ee775a8ec2ba..79d035e03271 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -86,7 +86,7 @@ impl RowGroupLastRowCachedReader { pub(crate) fn new( file_id: FileId, row_group_idx: usize, - cache_manager: CacheManagerRef, + cache_manager: Option, row_group_reader: RowGroupReader, ) -> Self { let key = SelectorResultKey { @@ -95,6 +95,9 @@ impl RowGroupLastRowCachedReader { selector: TimeSeriesRowSelector::LastRow, }; + let Some(cache_manager) = cache_manager else { + return Self::new_miss(key, row_group_reader, None); + }; if let Some(value) = cache_manager.get_selector_result(&key) { let schema_matches = value.projection == row_group_reader @@ -105,10 +108,10 @@ impl RowGroupLastRowCachedReader { // Schema matches, use cache batches. Self::new_hit(value) } else { - Self::new_miss(key, row_group_reader, cache_manager) + Self::new_miss(key, row_group_reader, Some(cache_manager)) } } else { - Self::new_miss(key, row_group_reader, cache_manager) + Self::new_miss(key, row_group_reader, Some(cache_manager)) } } @@ -122,7 +125,7 @@ impl RowGroupLastRowCachedReader { fn new_miss( key: SelectorResultKey, row_group_reader: RowGroupReader, - cache_manager: CacheManagerRef, + cache_manager: Option, ) -> Self { selector_result_cache_miss(); Self::Miss(RowGroupLastRowReader::new( @@ -167,13 +170,17 @@ pub(crate) struct RowGroupLastRowReader { reader: RowGroupReader, selector: LastRowSelector, yielded_batches: Vec, - cache_manager: CacheManagerRef, + cache_manager: Option, /// Index buffer to take a new batch from the last row. take_index: UInt32Vector, } impl RowGroupLastRowReader { - fn new(key: SelectorResultKey, reader: RowGroupReader, cache_manager: CacheManagerRef) -> Self { + fn new( + key: SelectorResultKey, + reader: RowGroupReader, + cache_manager: Option, + ) -> Self { Self { key, reader, @@ -213,6 +220,9 @@ impl RowGroupLastRowReader { // we always expect that row groups yields batches. return; } + let Some(cache) = &self.cache_manager else { + return; + }; let value = Arc::new(SelectorResultValue { result: std::mem::take(&mut self.yielded_batches), projection: self @@ -222,7 +232,7 @@ impl RowGroupLastRowReader { .projection_indices() .to_vec(), }); - self.cache_manager.put_selector_result(self.key, value); + cache.put_selector_result(self.key, value); } } diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 78866f0c1ba0..9ba5f6eccf1e 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -171,7 +171,7 @@ impl ProjectionMapper { pub(crate) fn convert( &self, batch: &Batch, - cache_manager: &CacheManager, + cache_manager: Option<&CacheManager>, ) -> common_recordbatch::error::Result { debug_assert_eq!(self.batch_fields.len(), batch.fields().len()); debug_assert!(self @@ -204,12 +204,15 @@ impl ProjectionMapper { match index { BatchIndex::Tag(idx) => { let value = &pk_values[*idx]; - let vector = repeated_vector_with_cache( - &column_schema.data_type, - value, - num_rows, - cache_manager, - )?; + let vector = match cache_manager { + Some(cache) => repeated_vector_with_cache( + &column_schema.data_type, + value, + num_rows, + cache, + )?, + None => new_repeated_vector(&column_schema.data_type, value, num_rows)?, + }; columns.push(vector); } BatchIndex::Timestamp => { @@ -357,7 +360,7 @@ mod tests { // With vector cache. let cache = CacheManager::builder().vector_cache_size(1024).build(); let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3); - let record_batch = mapper.convert(&batch, &cache).unwrap(); + let record_batch = mapper.convert(&batch, Some(&cache)).unwrap(); let expect = "\ +---------------------+----+----+----+----+ | ts | k0 | k1 | v0 | v1 | @@ -377,7 +380,7 @@ mod tests { assert!(cache .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3)) .is_none()); - let record_batch = mapper.convert(&batch, &cache).unwrap(); + let record_batch = mapper.convert(&batch, Some(&cache)).unwrap(); assert_eq!(expect, print_record_batch(record_batch)); } @@ -398,8 +401,7 @@ mod tests { ); let batch = new_batch(0, &[1, 2], &[(4, 4)], 3); - let cache = CacheManager::builder().vector_cache_size(1024).build(); - let record_batch = mapper.convert(&batch, &cache).unwrap(); + let record_batch = mapper.convert(&batch, None).unwrap(); let expect = "\ +----+----+ | v1 | k0 | diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 554751830ffc..bdad5f8fef0c 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -112,7 +112,7 @@ impl RangeMeta { Self::push_unordered_file_ranges( input.memtables.len(), &input.files, - &input.cache_manager, + input.cache_manager.as_deref(), &mut ranges, ); @@ -203,15 +203,16 @@ impl RangeMeta { fn push_unordered_file_ranges( num_memtables: usize, files: &[FileHandle], - cache: &CacheManager, + cache: Option<&CacheManager>, ranges: &mut Vec, ) { // For append mode, we can parallelize reading row groups. for (i, file) in files.iter().enumerate() { let file_index = num_memtables + i; // Get parquet meta from the cache. - let parquet_meta = - cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id()); + let parquet_meta = cache.and_then(|c| { + c.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id()) + }); if let Some(parquet_meta) = parquet_meta { // Scans each row group. for row_group_index in 0..file.meta_ref().num_row_groups { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 471cc1a8e5d4..19324f119f3e 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -167,7 +167,7 @@ pub(crate) struct ScanRegion { /// Scan request. request: ScanRequest, /// Cache. - cache_manager: CacheManagerRef, + cache_manager: Option, /// Capacity of the channel to send data from parallel scan tasks to the main task. parallel_scan_channel_size: usize, /// Whether to ignore inverted index. @@ -184,7 +184,7 @@ impl ScanRegion { version: VersionRef, access_layer: AccessLayerRef, request: ScanRequest, - cache_manager: CacheManagerRef, + cache_manager: Option, ) -> ScanRegion { ScanRegion { version, @@ -401,12 +401,17 @@ impl ScanRegion { } let file_cache = || -> Option { - let write_cache = self.cache_manager.write_cache()?; + let cache_manager = self.cache_manager.as_ref()?; + let write_cache = cache_manager.write_cache()?; let file_cache = write_cache.file_cache(); Some(file_cache) }(); - let index_cache = self.cache_manager.index_cache().cloned(); + let index_cache = self + .cache_manager + .as_ref() + .and_then(|c| c.index_cache()) + .cloned(); InvertedIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), @@ -477,7 +482,7 @@ pub(crate) struct ScanInput { /// Handles to SST files to scan. pub(crate) files: Vec, /// Cache. - pub(crate) cache_manager: CacheManagerRef, + pub(crate) cache_manager: Option, /// Ignores file not found error. ignore_file_not_found: bool, /// Capacity of the channel to send data from parallel scan tasks to the main task. @@ -508,7 +513,7 @@ impl ScanInput { predicate: None, memtables: Vec::new(), files: Vec::new(), - cache_manager: CacheManagerRef::default(), + cache_manager: None, ignore_file_not_found: false, parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, inverted_index_applier: None, @@ -551,7 +556,7 @@ impl ScanInput { /// Sets cache for this query. #[must_use] - pub(crate) fn with_cache(mut self, cache: CacheManagerRef) -> Self { + pub(crate) fn with_cache(mut self, cache: Option) -> Self { self.cache_manager = cache; self } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index d8732cb93df2..bdf3a7d6b8bb 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -257,7 +257,7 @@ impl SeqScan { .await .map_err(BoxedError::new) .context(ExternalSnafu)?; - let cache = &stream_ctx.input.cache_manager; + let cache = stream_ctx.input.cache_manager.as_deref(); let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); #[cfg(debug_assertions)] diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 97db9b86592c..60e5ca5c7cdb 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -149,7 +149,7 @@ impl UnorderedScan { let stream = try_stream! { part_metrics.on_first_poll(); - let cache = &stream_ctx.input.cache_manager; + let cache = stream_ctx.input.cache_manager.as_deref(); let range_builder_list = Arc::new(RangeBuilderList::new( stream_ctx.input.num_memtables(), stream_ctx.input.num_files(), diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index c94ae600735f..ae51a0d37c29 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -195,11 +195,11 @@ mod tests { .unwrap(); // Enable page cache. - let cache = Arc::new( + let cache = Some(Arc::new( CacheManager::builder() .page_cache_size(64 * 1024 * 1024) .build(), - ); + )); let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) .cache(cache.clone()); for _ in 0..3 { @@ -219,15 +219,15 @@ mod tests { // Doesn't have compressed page cached. let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0); - assert!(cache.get_pages(&page_key).is_none()); + assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); // Cache 4 row groups. for i in 0..4 { let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0); - assert!(cache.get_pages(&page_key).is_some()); + assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some()); } let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0); - assert!(cache.get_pages(&page_key).is_none()); + assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); } #[tokio::test] diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index cd219f47ccd6..b73026a7a6e3 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -82,7 +82,7 @@ pub struct ParquetReaderBuilder { /// can contain columns not in the parquet file. projection: Option>, /// Manager that caches SST data. - cache_manager: CacheManagerRef, + cache_manager: Option, /// Index appliers. inverted_index_applier: Option, fulltext_index_applier: Option, @@ -106,7 +106,7 @@ impl ParquetReaderBuilder { predicate: None, time_range: None, projection: None, - cache_manager: CacheManagerRef::default(), + cache_manager: None, inverted_index_applier: None, fulltext_index_applier: None, expected_metadata: None, @@ -138,7 +138,7 @@ impl ParquetReaderBuilder { /// Attaches the cache to the builder. #[must_use] - pub fn cache(mut self, cache: CacheManagerRef) -> ParquetReaderBuilder { + pub fn cache(mut self, cache: Option) -> ParquetReaderBuilder { self.cache_manager = cache; self } @@ -313,12 +313,10 @@ impl ParquetReaderBuilder { let region_id = self.file_handle.region_id(); let file_id = self.file_handle.file_id(); // Tries to get from global cache. - if let Some(metadata) = self - .cache_manager - .get_parquet_meta_data(region_id, file_id) - .await - { - return Ok(metadata); + if let Some(manager) = &self.cache_manager { + if let Some(metadata) = manager.get_parquet_meta_data(region_id, file_id).await { + return Ok(metadata); + } } // Cache miss, load metadata directly. @@ -326,11 +324,13 @@ impl ParquetReaderBuilder { let metadata = metadata_loader.load().await?; let metadata = Arc::new(metadata); // Cache the metadata. - self.cache_manager.put_parquet_meta_data( - self.file_handle.region_id(), - self.file_handle.file_id(), - metadata.clone(), - ); + if let Some(cache) = &self.cache_manager { + cache.put_parquet_meta_data( + self.file_handle.region_id(), + self.file_handle.file_id(), + metadata.clone(), + ); + } Ok(metadata) } @@ -846,7 +846,7 @@ pub(crate) struct RowGroupReaderBuilder { /// Field levels to read. field_levels: FieldLevels, /// Cache. - cache_manager: CacheManagerRef, + cache_manager: Option, } impl RowGroupReaderBuilder { @@ -864,7 +864,7 @@ impl RowGroupReaderBuilder { &self.parquet_meta } - pub(crate) fn cache_manager(&self) -> &CacheManagerRef { + pub(crate) fn cache_manager(&self) -> &Option { &self.cache_manager } diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index dd572d8863f8..73382c06d9b3 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -48,7 +48,7 @@ pub struct InMemoryRowGroup<'a> { region_id: RegionId, file_id: FileId, row_group_idx: usize, - cache_manager: CacheManagerRef, + cache_manager: Option, /// Row group level cached pages for each column. /// /// These pages are uncompressed pages of a row group. @@ -69,7 +69,7 @@ impl<'a> InMemoryRowGroup<'a> { file_id: FileId, parquet_meta: &'a ParquetMetaData, row_group_idx: usize, - cache_manager: CacheManagerRef, + cache_manager: Option, file_path: &'a str, object_store: ObjectStore, ) -> Self { @@ -208,18 +208,19 @@ impl<'a> InMemoryRowGroup<'a> { }; let column = self.metadata.column(idx); - - if !cache_uncompressed_pages(column) { - // For columns that have multiple uncompressed pages, we only cache the compressed page - // to save memory. - let page_key = PageKey::new_compressed( - self.region_id, - self.file_id, - self.row_group_idx, - idx, - ); - self.cache_manager - .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); + if let Some(cache) = &self.cache_manager { + if !cache_uncompressed_pages(column) { + // For columns that have multiple uncompressed pages, we only cache the compressed page + // to save memory. + let page_key = PageKey::new_compressed( + self.region_id, + self.file_id, + self.row_group_idx, + idx, + ); + cache + .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); + } } *chunk = Some(Arc::new(ColumnChunkData::Dense { @@ -241,6 +242,9 @@ impl<'a> InMemoryRowGroup<'a> { .enumerate() .filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx)) .for_each(|(idx, chunk)| { + let Some(cache) = &self.cache_manager else { + return; + }; let column = self.metadata.column(idx); if cache_uncompressed_pages(column) { // Fetches uncompressed pages for the row group. @@ -250,7 +254,7 @@ impl<'a> InMemoryRowGroup<'a> { self.row_group_idx, idx, ); - self.column_uncompressed_pages[idx] = self.cache_manager.get_pages(&page_key); + self.column_uncompressed_pages[idx] = cache.get_pages(&page_key); } else { // Fetches the compressed page from the cache. let page_key = PageKey::new_compressed( @@ -260,7 +264,7 @@ impl<'a> InMemoryRowGroup<'a> { idx, ); - *chunk = self.cache_manager.get_pages(&page_key).map(|page_value| { + *chunk = cache.get_pages(&page_key).map(|page_value| { Arc::new(ColumnChunkData::Dense { offset: column.byte_range().0 as usize, data: page_value.compressed.clone(), @@ -296,7 +300,7 @@ impl<'a> InMemoryRowGroup<'a> { key: IndexKey, ranges: &[Range], ) -> Option> { - if let Some(cache) = self.cache_manager.write_cache() { + if let Some(cache) = self.cache_manager.as_ref()?.write_cache() { return cache.file_cache().read_ranges(key, ranges).await; } None @@ -327,6 +331,10 @@ impl<'a> InMemoryRowGroup<'a> { } }; + let Some(cache) = &self.cache_manager else { + return Ok(Box::new(page_reader)); + }; + let column = self.metadata.column(i); if cache_uncompressed_pages(column) { // This column use row group level page cache. @@ -335,7 +343,7 @@ impl<'a> InMemoryRowGroup<'a> { let page_value = Arc::new(PageValue::new_row_group(pages)); let page_key = PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i); - self.cache_manager.put_pages(page_key, page_value.clone()); + cache.put_pages(page_key, page_value.clone()); return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group))); }