Skip to content

Commit

Permalink
perf: avoid cache during compaction (#5135)
Browse files Browse the repository at this point in the history
* Revert "refactor: Avoid wrapping Option for CacheManagerRef (#4996)"

This reverts commit 42bf7e9.

* fix: memory usage during log ingestion

* fix: fmt
  • Loading branch information
v0y4g3r authored Dec 11, 2024
1 parent 2c4ac76 commit a30d918
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 80 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ mod tests {

// Read metadata from write cache
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
.cache(cache_manager.clone());
.cache(Some(cache_manager.clone()));
let reader = builder.build().await.unwrap();

// Check parquet metadata
Expand Down
3 changes: 1 addition & 2 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,6 @@ pub struct SerializedCompactionOutput {
struct CompactionSstReaderBuilder<'a> {
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
cache: CacheManagerRef,
inputs: &'a [FileHandle],
append_mode: bool,
filter_deleted: bool,
Expand All @@ -584,7 +583,7 @@ impl<'a> CompactionSstReaderBuilder<'a> {
let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
.with_cache(self.cache)
.with_cache(None)
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ impl Compactor for DefaultCompactor {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
cache: cache_manager.clone(),
inputs: &output.inputs,
append_mode,
filter_deleted: output.filter_deleted,
Expand Down
16 changes: 10 additions & 6 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,16 @@ impl EngineInner {
// Get cache.
let cache_manager = self.workers.cache_manager();

let scan_region =
ScanRegion::new(version, region.access_layer.clone(), request, cache_manager)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_start_time(query_start);
let scan_region = ScanRegion::new(
version,
region.access_layer.clone(),
request,
Some(cache_manager),
)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_start_time(query_start);

Ok(scan_region)
}
Expand Down
24 changes: 17 additions & 7 deletions src/mito2/src/read/last_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl RowGroupLastRowCachedReader {
pub(crate) fn new(
file_id: FileId,
row_group_idx: usize,
cache_manager: CacheManagerRef,
cache_manager: Option<CacheManagerRef>,
row_group_reader: RowGroupReader,
) -> Self {
let key = SelectorResultKey {
Expand All @@ -95,6 +95,9 @@ impl RowGroupLastRowCachedReader {
selector: TimeSeriesRowSelector::LastRow,
};

let Some(cache_manager) = cache_manager else {
return Self::new_miss(key, row_group_reader, None);
};
if let Some(value) = cache_manager.get_selector_result(&key) {
let schema_matches = value.projection
== row_group_reader
Expand All @@ -105,10 +108,10 @@ impl RowGroupLastRowCachedReader {
// Schema matches, use cache batches.
Self::new_hit(value)
} else {
Self::new_miss(key, row_group_reader, cache_manager)
Self::new_miss(key, row_group_reader, Some(cache_manager))
}
} else {
Self::new_miss(key, row_group_reader, cache_manager)
Self::new_miss(key, row_group_reader, Some(cache_manager))
}
}

Expand All @@ -122,7 +125,7 @@ impl RowGroupLastRowCachedReader {
fn new_miss(
key: SelectorResultKey,
row_group_reader: RowGroupReader,
cache_manager: CacheManagerRef,
cache_manager: Option<CacheManagerRef>,
) -> Self {
selector_result_cache_miss();
Self::Miss(RowGroupLastRowReader::new(
Expand Down Expand Up @@ -167,13 +170,17 @@ pub(crate) struct RowGroupLastRowReader {
reader: RowGroupReader,
selector: LastRowSelector,
yielded_batches: Vec<Batch>,
cache_manager: CacheManagerRef,
cache_manager: Option<CacheManagerRef>,
/// Index buffer to take a new batch from the last row.
take_index: UInt32Vector,
}

impl RowGroupLastRowReader {
fn new(key: SelectorResultKey, reader: RowGroupReader, cache_manager: CacheManagerRef) -> Self {
fn new(
key: SelectorResultKey,
reader: RowGroupReader,
cache_manager: Option<CacheManagerRef>,
) -> Self {
Self {
key,
reader,
Expand Down Expand Up @@ -213,6 +220,9 @@ impl RowGroupLastRowReader {
// we always expect that row groups yields batches.
return;
}
let Some(cache) = &self.cache_manager else {
return;
};
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self
Expand All @@ -222,7 +232,7 @@ impl RowGroupLastRowReader {
.projection_indices()
.to_vec(),
});
self.cache_manager.put_selector_result(self.key, value);
cache.put_selector_result(self.key, value);
}
}

Expand Down
24 changes: 13 additions & 11 deletions src/mito2/src/read/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl ProjectionMapper {
pub(crate) fn convert(
&self,
batch: &Batch,
cache_manager: &CacheManager,
cache_manager: Option<&CacheManager>,
) -> common_recordbatch::error::Result<RecordBatch> {
debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
debug_assert!(self
Expand Down Expand Up @@ -204,12 +204,15 @@ impl ProjectionMapper {
match index {
BatchIndex::Tag(idx) => {
let value = &pk_values[*idx];
let vector = repeated_vector_with_cache(
&column_schema.data_type,
value,
num_rows,
cache_manager,
)?;
let vector = match cache_manager {
Some(cache) => repeated_vector_with_cache(
&column_schema.data_type,
value,
num_rows,
cache,
)?,
None => new_repeated_vector(&column_schema.data_type, value, num_rows)?,
};
columns.push(vector);
}
BatchIndex::Timestamp => {
Expand Down Expand Up @@ -357,7 +360,7 @@ mod tests {
// With vector cache.
let cache = CacheManager::builder().vector_cache_size(1024).build();
let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.convert(&batch, &cache).unwrap();
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
let expect = "\
+---------------------+----+----+----+----+
| ts | k0 | k1 | v0 | v1 |
Expand All @@ -377,7 +380,7 @@ mod tests {
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
.is_none());
let record_batch = mapper.convert(&batch, &cache).unwrap();
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
assert_eq!(expect, print_record_batch(record_batch));
}

Expand All @@ -398,8 +401,7 @@ mod tests {
);

let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
let cache = CacheManager::builder().vector_cache_size(1024).build();
let record_batch = mapper.convert(&batch, &cache).unwrap();
let record_batch = mapper.convert(&batch, None).unwrap();
let expect = "\
+----+----+
| v1 | k0 |
Expand Down
9 changes: 5 additions & 4 deletions src/mito2/src/read/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl RangeMeta {
Self::push_unordered_file_ranges(
input.memtables.len(),
&input.files,
&input.cache_manager,
input.cache_manager.as_deref(),
&mut ranges,
);

Expand Down Expand Up @@ -203,15 +203,16 @@ impl RangeMeta {
fn push_unordered_file_ranges(
num_memtables: usize,
files: &[FileHandle],
cache: &CacheManager,
cache: Option<&CacheManager>,
ranges: &mut Vec<RangeMeta>,
) {
// For append mode, we can parallelize reading row groups.
for (i, file) in files.iter().enumerate() {
let file_index = num_memtables + i;
// Get parquet meta from the cache.
let parquet_meta =
cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id());
let parquet_meta = cache.and_then(|c| {
c.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id())
});
if let Some(parquet_meta) = parquet_meta {
// Scans each row group.
for row_group_index in 0..file.meta_ref().num_row_groups {
Expand Down
19 changes: 12 additions & 7 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ pub(crate) struct ScanRegion {
/// Scan request.
request: ScanRequest,
/// Cache.
cache_manager: CacheManagerRef,
cache_manager: Option<CacheManagerRef>,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
parallel_scan_channel_size: usize,
/// Whether to ignore inverted index.
Expand All @@ -184,7 +184,7 @@ impl ScanRegion {
version: VersionRef,
access_layer: AccessLayerRef,
request: ScanRequest,
cache_manager: CacheManagerRef,
cache_manager: Option<CacheManagerRef>,
) -> ScanRegion {
ScanRegion {
version,
Expand Down Expand Up @@ -401,12 +401,17 @@ impl ScanRegion {
}

let file_cache = || -> Option<FileCacheRef> {
let write_cache = self.cache_manager.write_cache()?;
let cache_manager = self.cache_manager.as_ref()?;
let write_cache = cache_manager.write_cache()?;
let file_cache = write_cache.file_cache();
Some(file_cache)
}();

let index_cache = self.cache_manager.index_cache().cloned();
let index_cache = self
.cache_manager
.as_ref()
.and_then(|c| c.index_cache())
.cloned();

InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
Expand Down Expand Up @@ -477,7 +482,7 @@ pub(crate) struct ScanInput {
/// Handles to SST files to scan.
pub(crate) files: Vec<FileHandle>,
/// Cache.
pub(crate) cache_manager: CacheManagerRef,
pub(crate) cache_manager: Option<CacheManagerRef>,
/// Ignores file not found error.
ignore_file_not_found: bool,
/// Capacity of the channel to send data from parallel scan tasks to the main task.
Expand Down Expand Up @@ -508,7 +513,7 @@ impl ScanInput {
predicate: None,
memtables: Vec::new(),
files: Vec::new(),
cache_manager: CacheManagerRef::default(),
cache_manager: None,
ignore_file_not_found: false,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
inverted_index_applier: None,
Expand Down Expand Up @@ -551,7 +556,7 @@ impl ScanInput {

/// Sets cache for this query.
#[must_use]
pub(crate) fn with_cache(mut self, cache: CacheManagerRef) -> Self {
pub(crate) fn with_cache(mut self, cache: Option<CacheManagerRef>) -> Self {
self.cache_manager = cache;
self
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl SeqScan {
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let cache = &stream_ctx.input.cache_manager;
let cache = stream_ctx.input.cache_manager.as_deref();
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
#[cfg(debug_assertions)]
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl UnorderedScan {
let stream = try_stream! {
part_metrics.on_first_poll();

let cache = &stream_ctx.input.cache_manager;
let cache = stream_ctx.input.cache_manager.as_deref();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
stream_ctx.input.num_files(),
Expand Down
10 changes: 5 additions & 5 deletions src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ mod tests {
.unwrap();

// Enable page cache.
let cache = Arc::new(
let cache = Some(Arc::new(
CacheManager::builder()
.page_cache_size(64 * 1024 * 1024)
.build(),
);
));
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
.cache(cache.clone());
for _ in 0..3 {
Expand All @@ -219,15 +219,15 @@ mod tests {

// Doesn't have compressed page cached.
let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
assert!(cache.get_pages(&page_key).is_none());
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());

// Cache 4 row groups.
for i in 0..4 {
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
assert!(cache.get_pages(&page_key).is_some());
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some());
}
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
assert!(cache.get_pages(&page_key).is_none());
assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none());
}

#[tokio::test]
Expand Down
Loading

0 comments on commit a30d918

Please sign in to comment.