From f569a54af7e03601942c25f5242ea343f44dcfb1 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Mon, 23 Dec 2024 08:56:23 -0800 Subject: [PATCH] Universal test for RangeTombstoneSnapshotMigrateFromLast, refactor Summary: * Expand RangeTombstoneSnapshotMigrateFromLast in tiered_compaction_test (originally from #13124) to reproduce a failure in universal compaciton (as well as leveled), when a specific part of the test is uncommented. * Small refactoring to eliminate unnecessary fields in SubcompactionState. Adding a bool parameter to SubcompactionState::AddToOutput will make more sense in the next PR. * Improve debuggability and performance of some other tests Test Plan: existing tests, updated/expanded tests --- db/compaction/compaction_job.cc | 6 +- db/compaction/subcompaction_state.cc | 21 +++---- db/compaction/subcompaction_state.h | 22 ++----- db/compaction/tiered_compaction_test.cc | 78 ++++++++++++++++++------- 4 files changed, 73 insertions(+), 54 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 60553e1343e..3a7a19edb3f 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1377,7 +1377,9 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { // and `close_file_func`. // TODO: it would be better to have the compaction file open/close moved // into `CompactionOutputs` which has the output file information. - status = sub_compact->AddToOutput(*c_iter, open_file_func, close_file_func); + status = + sub_compact->AddToOutput(*c_iter, c_iter->output_to_penultimate_level(), + open_file_func, close_file_func); if (!status.ok()) { break; } @@ -1882,7 +1884,7 @@ Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact, // enabled and applicable if (last_level_temp != Temperature::kUnknown && sub_compact->compaction->is_last_level() && - !sub_compact->IsCurrentPenultimateLevel()) { + !outputs.IsPenultimateLevel()) { temperature = last_level_temp; } fo_copy.temperature = temperature; diff --git a/db/compaction/subcompaction_state.cc b/db/compaction/subcompaction_state.cc index 2accc0a94d1..1d45939ad7d 100644 --- a/db/compaction/subcompaction_state.cc +++ b/db/compaction/subcompaction_state.cc @@ -16,7 +16,7 @@ namespace ROCKSDB_NAMESPACE { void SubcompactionState::AggregateCompactionOutputStats( InternalStats::CompactionStatsFull& compaction_stats) const { compaction_stats.stats.Add(compaction_outputs_.stats_); - if (HasPenultimateLevelOutputs()) { + if (penultimate_level_outputs_.HasOutput() || HasRangeDel()) { compaction_stats.has_penultimate_level_output = true; compaction_stats.penultimate_level_stats.Add( penultimate_level_outputs_.stats_); @@ -52,7 +52,7 @@ void SubcompactionState::Cleanup(Cache* cache) { } Slice SubcompactionState::SmallestUserKey() const { - if (has_penultimate_level_outputs_) { + if (penultimate_level_outputs_.HasOutput()) { Slice a = compaction_outputs_.SmallestUserKey(); Slice b = penultimate_level_outputs_.SmallestUserKey(); if (a.empty()) { @@ -74,7 +74,7 @@ Slice SubcompactionState::SmallestUserKey() const { } Slice SubcompactionState::LargestUserKey() const { - if (has_penultimate_level_outputs_) { + if (penultimate_level_outputs_.HasOutput()) { Slice a = compaction_outputs_.LargestUserKey(); Slice b = penultimate_level_outputs_.LargestUserKey(); if (a.empty()) { @@ -96,18 +96,13 @@ Slice SubcompactionState::LargestUserKey() const { } Status SubcompactionState::AddToOutput( - const CompactionIterator& iter, + const CompactionIterator& iter, bool use_penultimate_output, const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func) { - // update target output first - is_current_penultimate_level_ = iter.output_to_penultimate_level(); - current_outputs_ = is_current_penultimate_level_ ? &penultimate_level_outputs_ - : &compaction_outputs_; - if (is_current_penultimate_level_) { - has_penultimate_level_outputs_ = true; - } - - return Current().AddToOutput(iter, open_file_func, close_file_func); + // update target output + current_outputs_ = use_penultimate_output ? &penultimate_level_outputs_ + : &compaction_outputs_; + return current_outputs_->AddToOutput(iter, open_file_func, close_file_func); } } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/subcompaction_state.h b/db/compaction/subcompaction_state.h index 8b8c76f6e15..0927dd9f559 100644 --- a/db/compaction/subcompaction_state.h +++ b/db/compaction/subcompaction_state.h @@ -148,20 +148,11 @@ class SubcompactionState { sub_job_id(state.sub_job_id), compaction_outputs_(std::move(state.compaction_outputs_)), penultimate_level_outputs_(std::move(state.penultimate_level_outputs_)), - is_current_penultimate_level_(state.is_current_penultimate_level_), - has_penultimate_level_outputs_(state.has_penultimate_level_outputs_), range_del_agg_(std::move(state.range_del_agg_)) { - current_outputs_ = is_current_penultimate_level_ - ? &penultimate_level_outputs_ - : &compaction_outputs_; - } - - bool HasPenultimateLevelOutputs() const { - return has_penultimate_level_outputs_ || HasRangeDel(); - } - - bool IsCurrentPenultimateLevel() const { - return is_current_penultimate_level_; + current_outputs_ = + state.current_outputs_ == &state.penultimate_level_outputs_ + ? &penultimate_level_outputs_ + : &compaction_outputs_; } // Add all the new files from this compaction to version_edit @@ -195,6 +186,7 @@ class SubcompactionState { // Add compaction_iterator key/value to the `Current` output group. Status AddToOutput(const CompactionIterator& iter, + bool use_penultimate_output, const CompactionFileOpenFunc& open_file_func, const CompactionFileCloseFunc& close_file_func); @@ -207,11 +199,9 @@ class SubcompactionState { // Call FinishCompactionOutputFile() even if status is not ok: it needs to // close the output file. // CloseOutput() may open new compaction output files. - is_current_penultimate_level_ = true; Status s = penultimate_level_outputs_.CloseOutput( curr_status, per_key ? range_del_agg_.get() : nullptr, open_file_func, close_file_func); - is_current_penultimate_level_ = false; s = compaction_outputs_.CloseOutput( s, per_key ? nullptr : range_del_agg_.get(), open_file_func, close_file_func); @@ -223,8 +213,6 @@ class SubcompactionState { CompactionOutputs compaction_outputs_; CompactionOutputs penultimate_level_outputs_; CompactionOutputs* current_outputs_ = &compaction_outputs_; - bool is_current_penultimate_level_ = false; - bool has_penultimate_level_outputs_ = false; std::unique_ptr range_del_agg_; }; diff --git a/db/compaction/tiered_compaction_test.cc b/db/compaction/tiered_compaction_test.cc index b1e222da2c9..c66028921a5 100644 --- a/db/compaction/tiered_compaction_test.cc +++ b/db/compaction/tiered_compaction_test.cc @@ -474,12 +474,12 @@ TEST_F(TieredCompactionTest, RangeBasedTieredStorageUniversal) { { MutexLock l(&mutex); hot_start = Key(1); - hot_end = Key(1000); + hot_end = Key(300); } // generate files just enough to trigger compaction for (int i = 0; i < kNumTrigger - 1; i++) { - for (int j = 0; j < 1000; j++) { + for (int j = 0; j < 300; j++) { ASSERT_OK(Put(Key(j), "value" + std::to_string(j))); } ASSERT_OK(Flush()); @@ -1651,7 +1651,8 @@ TEST_P(PrecludeLastLevelTest, CheckInternalKeyRange) { ASSERT_EQ("0,0,0,0,0,2,2", FilesPerLevel()); - auto VerifyLogicalState = [&]() { + auto VerifyLogicalState = [&](int line) { + SCOPED_TRACE("Called from line " + std::to_string(line)); // First with snapshot ASSERT_EQ("val2", Get(Key(2), snapshot)); ASSERT_EQ("val3", Get(Key(3), snapshot)); @@ -1665,7 +1666,7 @@ TEST_P(PrecludeLastLevelTest, CheckInternalKeyRange) { ASSERT_EQ("val100", Get(Key(100))); }; - VerifyLogicalState(); + VerifyLogicalState(__LINE__); // Try to compact keys up CompactRangeOptions cro; @@ -1676,24 +1677,40 @@ TEST_P(PrecludeLastLevelTest, CheckInternalKeyRange) { // type:1 vs. file #15 smallest key: '6B6579303030303033' seq:104, type:1 ASSERT_OK(CompactRange(cro, Key(1), Key(2))); - VerifyLogicalState(); + VerifyLogicalState(__LINE__); db_->ReleaseSnapshot(snapshot); Close(); } -TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) { +INSTANTIATE_TEST_CASE_P(PrecludeLastLevelTest, PrecludeLastLevelTest, + ::testing::Bool()); + +class PrecludeWithCompactStyleTest : public PrecludeLastLevelTestBase, + public testing::WithParamInterface { + public: + void ApplyConfigChange( + Options* options, + const std::unordered_map& config_change, + const std::unordered_map& db_config_change = + {}) { + // Depends on dynamic config change while holding a snapshot + ApplyConfigChangeImpl(true /*dynamic*/, options, config_change, + db_config_change); + } +}; + +TEST_P(PrecludeWithCompactStyleTest, RangeTombstoneSnapshotMigrateFromLast) { // Reproducer for issue originally described in // https://github.com/facebook/rocksdb/pull/9964/files#r1024449523 - if (!UseDynamicConfig()) { - // Depends on config change while holding a snapshot - return; - } + const bool universal = GetParam(); const int kNumLevels = 7; auto options = CurrentOptions(); options.env = mock_env_.get(); options.last_level_temperature = Temperature::kCold; - options.level_compaction_dynamic_level_bytes = true; + options.compaction_style = + universal ? kCompactionStyleUniversal : kCompactionStyleLevel; + options.compaction_options_universal.allow_trivial_move = true; options.num_levels = kNumLevels; options.statistics = CreateDBStatistics(); options.max_subcompactions = 10; @@ -1713,7 +1730,13 @@ TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) { ASSERT_OK(db_->DeleteRange({}, db_->DefaultColumnFamily(), Key(1), Key(5))); ASSERT_OK(Put(Key(1), "val1")); ASSERT_OK(Flush()); - MoveFilesToLevel(6); + + // Send to last level + if (universal) { + ASSERT_OK(CompactRange({}, {}, {})); + } else { + MoveFilesToLevel(6); + } ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); ApplyConfigChange(&options, {{"preclude_last_level_data_seconds", "10000"}}); @@ -1732,10 +1755,16 @@ TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) { [&] { mock_clock_->MockSleepForSeconds(static_cast(1000)); }); ASSERT_OK(Flush()); - MoveFilesToLevel(5); + + // Send three files to next-to-last level (if explicitly needed) + if (!universal) { + MoveFilesToLevel(5); + } + ASSERT_EQ("0,0,0,0,0,3,1", FilesPerLevel()); - auto VerifyLogicalState = [&]() { + auto VerifyLogicalState = [&](int line) { + SCOPED_TRACE("Called from line " + std::to_string(line)); // First with snapshot if (snapshot) { ASSERT_EQ("NOT_FOUND", Get(Key(0), snapshot)); @@ -1755,21 +1784,26 @@ TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) { ASSERT_EQ("val7", Get(Key(7))); }; - VerifyLogicalState(); + VerifyLogicalState(__LINE__); // Try a limited range compaction - // FIXME: this currently hits the "Unsafe to store Seq later than snapshot" + // FIXME: these currently hit the "Unsafe to store Seq later than snapshot" // error. Needs to work safely for preclude option to be user mutable. - // ASSERT_OK(CompactRange({}, Key(3), Key(4))); + if (universal) { + // uint64_t middle_l5 = GetLevelFileMetadatas(5)[1]->fd.GetNumber(); + // ASSERT_OK(db_->CompactFiles({}, {MakeTableFileName(middle_l5)}, 6)); + } else { + // ASSERT_OK(CompactRange({}, Key(3), Key(4))); + } EXPECT_EQ("0,0,0,0,0,3,1", FilesPerLevel()); - VerifyLogicalState(); + VerifyLogicalState(__LINE__); // Compact everything, but some data still goes to both penultimate and last // levels. A full-range compaction should be safe to "migrate" data from the // last level to penultimate (because of preclude setting change). ASSERT_OK(CompactRange({}, {}, {})); EXPECT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); - VerifyLogicalState(); + VerifyLogicalState(__LINE__); // Key1 should have been migrated out of the last level auto& meta = *GetLevelFileMetadatas(6)[0]; ASSERT_LT(Key(1), meta.smallest.user_key().ToString()); @@ -1781,13 +1815,13 @@ TEST_P(PrecludeLastLevelTest, RangeTombstoneSnapshotMigrateFromLast) { ASSERT_OK(CompactRange({}, {}, {})); EXPECT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); - VerifyLogicalState(); + VerifyLogicalState(__LINE__); Close(); } -INSTANTIATE_TEST_CASE_P(PrecludeLastLevelTest, PrecludeLastLevelTest, - ::testing::Bool()); +INSTANTIATE_TEST_CASE_P(PrecludeWithCompactStyleTest, + PrecludeWithCompactStyleTest, ::testing::Bool()); class TimedPutPrecludeLastLevelTest : public PrecludeLastLevelTestBase,