Skip to content

Commit

Permalink
fix: memory usage during log ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Dec 10, 2024
1 parent f24ebbd commit ec67fcb
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 9 deletions.
3 changes: 1 addition & 2 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,6 @@ pub struct SerializedCompactionOutput {
struct CompactionSstReaderBuilder<'a> {
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
cache: Option<CacheManagerRef>,
inputs: &'a [FileHandle],
append_mode: bool,
filter_deleted: bool,
Expand All @@ -584,7 +583,7 @@ impl<'a> CompactionSstReaderBuilder<'a> {
let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
.with_cache(self.cache)
.with_cache(None)
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
Expand Down
1 change: 0 additions & 1 deletion src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ impl Compactor for DefaultCompactor {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
cache: Some(cache_manager.clone()),
inputs: &output.inputs,
append_mode,
filter_deleted: output.filter_deleted,
Expand Down
13 changes: 7 additions & 6 deletions src/mito2/src/read/last_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ impl RowGroupLastRowCachedReader {
// Schema matches, use cache batches.
Self::new_hit(value)
} else {
Self::new_miss(key, row_group_reader, cache_manager)
Self::new_miss(key, row_group_reader, Some(cache_manager))
}
} else {
Self::new_miss(key, row_group_reader, cache_manager)
Self::new_miss(key, row_group_reader, Some(cache_manager))
}
}

Expand All @@ -125,7 +125,7 @@ impl RowGroupLastRowCachedReader {
fn new_miss(
key: SelectorResultKey,
row_group_reader: RowGroupReader,
cache_manager: CacheManagerRef,
cache_manager: Option<CacheManagerRef>,
) -> Self {
selector_result_cache_miss();
Self::Miss(RowGroupLastRowReader::new(
Expand Down Expand Up @@ -170,13 +170,13 @@ pub(crate) struct RowGroupLastRowReader {
reader: RowGroupReader,
selector: LastRowSelector,
yielded_batches: Vec<Batch>,
cache_manager: CacheManagerRef,
cache_manager: Option<CacheManagerRef>,
/// Index buffer to take a new batch from the last row.
take_index: UInt32Vector,
}

Check warning on line 176 in src/mito2/src/read/last_row.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/greptimedb/greptimedb/src/mito2/src/read/last_row.rs

impl RowGroupLastRowReader {
fn new(key: SelectorResultKey, reader: RowGroupReader, cache_manager: CacheManagerRef) -> Self {
fn new(key: SelectorResultKey, reader: RowGroupReader, cache_manager: Option<CacheManagerRef>) -> Self {
Self {
key,
reader,
Expand Down Expand Up @@ -216,6 +216,7 @@ impl RowGroupLastRowReader {
// we always expect that row groups yields batches.

Check warning on line 216 in src/mito2/src/read/last_row.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/greptimedb/greptimedb/src/mito2/src/read/last_row.rs
return;
}
let Some(cache) = &self.cache_manager else { return; };
let value = Arc::new(SelectorResultValue {
result: std::mem::take(&mut self.yielded_batches),
projection: self
Expand All @@ -225,7 +226,7 @@ impl RowGroupLastRowReader {
.projection_indices()
.to_vec(),
});
self.cache_manager.put_selector_result(self.key, value);
cache.put_selector_result(self.key, value);
}
}

Expand Down

0 comments on commit ec67fcb

Please sign in to comment.