Skip to content

Commit

Permalink
fix(meta): fix time travel GC bug (#20108)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored and zwang28 committed Jan 11, 2025
1 parent 7a73f28 commit fb71554
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 28 deletions.
35 changes: 9 additions & 26 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::cmp;
use std::collections::HashSet;
use std::ops::Bound::{Excluded, Included};
use std::ops::DerefMut;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, SystemTime};
Expand Down Expand Up @@ -223,30 +222,8 @@ impl HummockManager {
) -> Result<Vec<HummockSstableObjectId>> {
// This lock ensures `commit_epoch` and `report_compat_task` can see the latest GC history during sanity check.
let versioning = self.versioning.read().await;
let tracked_object_ids: HashSet<HummockSstableObjectId> = {
let context_info = self.context_info.read().await;
// object ids in checkpoint version
let mut tracked_object_ids = versioning.checkpoint.version.get_object_ids();
// add object ids added between checkpoint version and current version
for (_, delta) in versioning.hummock_version_deltas.range((
Excluded(versioning.checkpoint.version.id),
Included(versioning.current_version.id),
)) {
tracked_object_ids.extend(delta.newly_added_object_ids());
}
// add stale object ids before the checkpoint version
let min_pinned_version_id = context_info.min_pinned_version_id();
tracked_object_ids.extend(
versioning
.checkpoint
.stale_objects
.iter()
.filter(|(version_id, _)| **version_id >= min_pinned_version_id)
.flat_map(|(_, objects)| objects.id.iter())
.cloned(),
);
tracked_object_ids
};
let tracked_object_ids: HashSet<HummockSstableObjectId> = versioning
.get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id());
let to_delete = object_ids.filter(|object_id| !tracked_object_ids.contains(object_id));
self.write_gc_history(to_delete.clone()).await?;
Ok(to_delete.collect())
Expand Down Expand Up @@ -556,9 +533,15 @@ impl HummockManager {
};
// Objects pinned by either meta backup or time travel should be filtered out.
let backup_pinned: HashSet<_> = backup_manager.list_pinned_ssts();
// The version_pinned is obtained after the candidate object_ids for deletion, which is new enough for filtering purpose.
let version_pinned = {
let versioning = self.versioning.read().await;
versioning
.get_tracked_object_ids(self.context_info.read().await.min_pinned_version_id())
};
let object_ids = object_ids
.into_iter()
.filter(|s| !backup_pinned.contains(s));
.filter(|s| !version_pinned.contains(s) && !backup_pinned.contains(s));
let object_ids = self.filter_out_objects_by_time_travel(object_ids).await?;
// Retry is not necessary. Full GC will handle these objects eventually.
self.delete_objects(object_ids.into_iter().collect())
Expand Down
24 changes: 22 additions & 2 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,18 @@ impl HummockManager {
txn.commit().await?;
return Ok(());
};
let (latest_valid_version_id, latest_valid_version_sst_ids) =
{ (latest_valid_version.id, latest_valid_version.get_sst_ids()) };
let (
latest_valid_version_id,
latest_valid_version_sst_ids,
latest_valid_version_object_ids,
) = {
(
latest_valid_version.id,
latest_valid_version.get_sst_ids(),
latest_valid_version.get_object_ids(),
)
};
let mut object_ids_to_delete: HashSet<_> = HashSet::default();
let version_ids_to_delete: Vec<risingwave_meta_model::HummockVersionId> =
hummock_time_travel_version::Entity::find()
.select_only()
Expand Down Expand Up @@ -153,6 +163,8 @@ impl HummockManager {
.filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete))
.exec(&txn)
.await?;
let new_object_ids = delta_to_delete.newly_added_object_ids();
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
tracing::debug!(
delta_id = delta_to_delete.id.to_u64(),
"delete {} rows from hummock_sstable_info",
Expand Down Expand Up @@ -182,13 +194,21 @@ impl HummockManager {
.filter(hummock_sstable_info::Column::SstId.is_in(sst_ids_to_delete))
.exec(&txn)
.await?;
let new_object_ids = prev_version.get_object_ids();
object_ids_to_delete.extend(&new_object_ids - &latest_valid_version_object_ids);
tracing::debug!(
prev_version_id,
"delete {} rows from hummock_sstable_info",
res.rows_affected
);
next_version_sst_ids = sst_ids;
}
if !object_ids_to_delete.is_empty() {
// IMPORTANT: object_ids_to_delete may include objects that are still being used by SSTs not included in time travel metadata.
// So it's crucial to filter out those objects before actually deleting them, i.e. when using `try_take_may_delete_object_ids`.
self.gc_manager
.add_may_delete_object_ids(object_ids_to_delete.into_iter());
}

let res = hummock_time_travel_version::Entity::delete_many()
.filter(
Expand Down
26 changes: 26 additions & 0 deletions src/meta/src/hummock/manager/versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::cmp;
use std::collections::Bound::{Excluded, Included};
use std::collections::{BTreeMap, HashMap, HashSet};

use itertools::Itertools;
Expand Down Expand Up @@ -82,6 +83,31 @@ impl Versioning {
pub(super) fn mark_next_time_travel_version_snapshot(&mut self) {
self.time_travel_snapshot_interval_counter = u64::MAX;
}

pub fn get_tracked_object_ids(
&self,
min_pinned_version_id: HummockVersionId,
) -> HashSet<HummockSstableObjectId> {
// object ids in checkpoint version
let mut tracked_object_ids = self.checkpoint.version.get_object_ids();
// add object ids added between checkpoint version and current version
for (_, delta) in self.hummock_version_deltas.range((
Excluded(self.checkpoint.version.id),
Included(self.current_version.id),
)) {
tracked_object_ids.extend(delta.newly_added_object_ids());
}
// add stale object ids before the checkpoint version
tracked_object_ids.extend(
self.checkpoint
.stale_objects
.iter()
.filter(|(version_id, _)| **version_id >= min_pinned_version_id)
.flat_map(|(_, objects)| objects.id.iter())
.cloned(),
);
tracked_object_ids
}
}

impl HummockManager {
Expand Down

0 comments on commit fb71554

Please sign in to comment.