From 6de6a066313876c0142db643a75272c3578b39f6 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 21 May 2014 11:43:35 -0700 Subject: [PATCH] FIFO compaction style Summary: Introducing new compaction style -- FIFO. FIFO compaction style has write amplification of 1 (+1 for WAL) and it deletes the oldest files when the total DB size exceeds pre-configured values. FIFO compaction style is suited for storing high-frequency event logs. Test Plan: Added a unit test Reviewers: dhruba, haobo, sdong Reviewed By: dhruba Subscribers: alberts, leveldb Differential Revision: https://reviews.facebook.net/D18765 --- HISTORY.md | 1 + db/column_family.cc | 22 +++++++++--- db/compaction.cc | 8 ++++- db/compaction.h | 9 ++++- db/compaction_picker.cc | 71 +++++++++++++++++++++++++++++++++++++++ db/compaction_picker.h | 22 ++++++++++-- db/db_impl.cc | 38 +++++++++++++++++---- db/db_impl_debug.cc | 3 +- db/db_test.cc | 64 ++++++++++++++++++++++++++++++----- db/version_set.cc | 22 ++++++++---- db/version_set.h | 1 + include/rocksdb/options.h | 17 ++++++++-- util/options.cc | 3 ++ 13 files changed, 248 insertions(+), 33 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index f9bdd7c9832..43025b722e7 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ ### New Features * Hash index for block-based table will be materialized and reconstructed more efficiently. Previously hash index is constructed by scanning the whole table during every table open. +* FIFO compaction style ## 3.0.0 (05/05/2014) diff --git a/db/column_family.cc b/db/column_family.cc index 39c37b9e80f..9cf0c0d49a7 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include "db/db_impl.h" #include "db/version_set.h" @@ -116,6 +117,15 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, collector_factories.push_back( std::make_shared()); + if (result.compaction_style == kCompactionStyleFIFO) { + result.num_levels = 1; + // since we delete level0 files in FIFO compaction when there are too many + // of them, these options don't really mean anything + result.level0_file_num_compaction_trigger = std::numeric_limits::max(); + result.level0_slowdown_writes_trigger = std::numeric_limits::max(); + result.level0_stop_writes_trigger = std::numeric_limits::max(); + } + return result; } @@ -196,7 +206,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, options_(*db_options, SanitizeOptions(&internal_comparator_, &internal_filter_policy_, options)), mem_(nullptr), - imm_(options.min_write_buffer_number_to_merge), + imm_(options_.min_write_buffer_number_to_merge), super_version_(nullptr), super_version_number_(0), local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), @@ -209,16 +219,20 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, // if dummy_versions is nullptr, then this is a dummy column family. if (dummy_versions != nullptr) { - internal_stats_.reset(new InternalStats(options.num_levels, db_options->env, - db_options->statistics.get())); + internal_stats_.reset(new InternalStats( + options_.num_levels, db_options->env, db_options->statistics.get())); table_cache_.reset( new TableCache(dbname, &options_, storage_options, table_cache)); if (options_.compaction_style == kCompactionStyleUniversal) { compaction_picker_.reset( new UniversalCompactionPicker(&options_, &internal_comparator_)); - } else { + } else if (options_.compaction_style == kCompactionStyleLevel) { compaction_picker_.reset( new LevelCompactionPicker(&options_, &internal_comparator_)); + } else { + assert(options_.compaction_style == kCompactionStyleFIFO); + compaction_picker_.reset( + new FIFOCompactionPicker(&options_, &internal_comparator_)); } Log(options_.info_log, "Options for column family \"%s\":\n", diff --git a/db/compaction.cc b/db/compaction.cc index 962ce123266..a8caa59efd2 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -29,7 +29,8 @@ static uint64_t TotalFileSize(const std::vector& files) { Compaction::Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, - bool seek_compaction, bool enable_compression) + bool seek_compaction, bool enable_compression, + bool deletion_compaction) : level_(level), out_level_(out_level), max_output_file_size_(target_file_size), @@ -39,6 +40,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level, cfd_(input_version_->cfd_), seek_compaction_(seek_compaction), enable_compression_(enable_compression), + deletion_compaction_(deletion_compaction), grandparent_index_(0), seen_key_(false), overlapped_bytes_(0), @@ -83,6 +85,8 @@ bool Compaction::IsTrivialMove() const { TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); } +bool Compaction::IsDeletionCompaction() const { return deletion_compaction_; } + void Compaction::AddInputDeletions(VersionEdit* edit) { for (int which = 0; which < 2; which++) { for (size_t i = 0; i < inputs_[which].size(); i++) { @@ -92,6 +96,7 @@ void Compaction::AddInputDeletions(VersionEdit* edit) { } bool Compaction::IsBaseLevelForKey(const Slice& user_key) { + assert(cfd_->options()->compaction_style != kCompactionStyleFIFO); if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { return bottommost_level_; } @@ -155,6 +160,7 @@ void Compaction::MarkFilesBeingCompacted(bool value) { // Is this compaction producing files at the bottommost level? void Compaction::SetupBottomMostLevel(bool isManual) { + assert(cfd_->options()->compaction_style != kCompactionStyleFIFO); if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { // If universal compaction style is used and manual // compaction is occuring, then we are guaranteed that diff --git a/db/compaction.h b/db/compaction.h index 8fd95f909a8..aaa40230388 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -54,6 +54,9 @@ class Compaction { // moving a single input file to the next level (no merging or splitting) bool IsTrivialMove() const; + // If true, just delete all files in inputs_[0] + bool IsDeletionCompaction() const; + // Add all inputs to this compaction as delete operations to *edit. void AddInputDeletions(VersionEdit* edit); @@ -91,11 +94,13 @@ class Compaction { private: friend class CompactionPicker; friend class UniversalCompactionPicker; + friend class FIFOCompactionPicker; friend class LevelCompactionPicker; Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, - bool seek_compaction = false, bool enable_compression = true); + bool seek_compaction = false, bool enable_compression = true, + bool deletion_compaction = false); int level_; int out_level_; // levels to which output files are stored @@ -108,6 +113,8 @@ class Compaction { bool seek_compaction_; bool enable_compression_; + // if true, just delete files in inputs_[0] + bool deletion_compaction_; // Each compaction reads inputs from "level_" and "level_+1" std::vector inputs_[2]; // The two sets of inputs diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index a8700bbbc3c..3416a0bac9e 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -9,6 +9,8 @@ #include "db/compaction_picker.h" +#define __STDC_FORMAT_MACROS +#include #include #include "util/log_buffer.h" #include "util/statistics.h" @@ -307,6 +309,9 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) { + // CompactionPickerFIFO has its own implementation of compact range + assert(options_->compaction_style != kCompactionStyleFIFO); + std::vector inputs; bool covering_the_whole_range = true; @@ -886,4 +891,70 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( return c; } +Compaction* FIFOCompactionPicker::PickCompaction(Version* version, + LogBuffer* log_buffer) { + assert(version->NumberLevels() == 1); + uint64_t total_size = 0; + for (const auto& file : version->files_[0]) { + total_size += file->file_size; + } + + if (total_size <= options_->compaction_options_fifo.max_table_files_size || + version->files_[0].size() == 0) { + // total size not exceeded + LogToBuffer(log_buffer, + "[%s] FIFO compaction: nothing to do. Total size %" PRIu64 + ", max size %" PRIu64 "\n", + version->cfd_->GetName().c_str(), total_size, + options_->compaction_options_fifo.max_table_files_size); + return nullptr; + } + + if (compactions_in_progress_[0].size() > 0) { + LogToBuffer(log_buffer, + "[%s] FIFO compaction: Already executing compaction. No need " + "to run parallel compactions since compactions are very fast", + version->cfd_->GetName().c_str()); + return nullptr; + } + + Compaction* c = new Compaction(version, 0, 0, 0, 0, false, false, + true /* is deletion compaction */); + // delete old files (FIFO) + for (auto ritr = version->files_[0].rbegin(); + ritr != version->files_[0].rend(); ++ritr) { + auto f = *ritr; + total_size -= f->file_size; + c->inputs_[0].push_back(f); + char tmp_fsize[16]; + AppendHumanBytes(f->file_size, tmp_fsize, sizeof(tmp_fsize)); + LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64 + " with size %s for deletion", + version->cfd_->GetName().c_str(), f->number, tmp_fsize); + if (total_size <= options_->compaction_options_fifo.max_table_files_size) { + break; + } + } + + c->MarkFilesBeingCompacted(true); + compactions_in_progress_[0].insert(c); + + return c; +} + +Compaction* FIFOCompactionPicker::CompactRange(Version* version, + int input_level, + int output_level, + const InternalKey* begin, + const InternalKey* end, + InternalKey** compaction_end) { + assert(input_level == 0); + assert(output_level == 0); + *compaction_end = nullptr; + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get()); + auto c = PickCompaction(version, &log_buffer); + log_buffer.FlushBufferToLog(); + return c; +} + } // namespace rocksdb diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 6527ef9677b..65b1bc37ace 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -47,9 +47,10 @@ class CompactionPicker { // compaction_end will be set to nullptr. // Client is responsible for compaction_end storage -- when called, // *compaction_end should point to valid InternalKey! - Compaction* CompactRange(Version* version, int input_level, int output_level, - const InternalKey* begin, const InternalKey* end, - InternalKey** compaction_end); + virtual Compaction* CompactRange(Version* version, int input_level, + int output_level, const InternalKey* begin, + const InternalKey* end, + InternalKey** compaction_end); // Free up the files that participated in a compaction void ReleaseCompactionFiles(Compaction* c, Status status); @@ -162,4 +163,19 @@ class LevelCompactionPicker : public CompactionPicker { Compaction* PickCompactionBySize(Version* version, int level, double score); }; +class FIFOCompactionPicker : public CompactionPicker { + public: + FIFOCompactionPicker(const Options* options, + const InternalKeyComparator* icmp) + : CompactionPicker(options, icmp) {} + + virtual Compaction* PickCompaction(Version* version, + LogBuffer* log_buffer) override; + + virtual Compaction* CompactRange(Version* version, int input_level, + int output_level, const InternalKey* begin, + const InternalKey* end, + InternalKey** compaction_end) override; +}; + } // namespace rocksdb diff --git a/db/db_impl.cc b/db/db_impl.cc index bdc1832dc33..f8744276749 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1590,7 +1590,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, return s; } - int max_level_with_files = 1; + int max_level_with_files = 0; { MutexLock l(&mutex_); Version* base = cfd->current(); @@ -1604,6 +1604,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, // in case the compaction is unversal or if we're compacting the // bottom-most level, the output level will be the same as input one if (cfd->options()->compaction_style == kCompactionStyleUniversal || + cfd->options()->compaction_style == kCompactionStyleFIFO || level == max_level_with_files) { s = RunManualCompaction(cfd, level, level, begin, end); } else { @@ -1754,14 +1755,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, // For universal compaction, we enforce every manual compaction to compact // all files. if (begin == nullptr || - cfd->options()->compaction_style == kCompactionStyleUniversal) { + cfd->options()->compaction_style == kCompactionStyleUniversal || + cfd->options()->compaction_style == kCompactionStyleFIFO) { manual.begin = nullptr; } else { begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); manual.begin = &begin_storage; } if (end == nullptr || - cfd->options()->compaction_style == kCompactionStyleUniversal) { + cfd->options()->compaction_style == kCompactionStyleUniversal || + cfd->options()->compaction_style == kCompactionStyleFIFO) { manual.end = nullptr; } else { end_storage = InternalKey(*end, 0, static_cast(0)); @@ -2150,6 +2153,24 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (!c) { // Nothing to do LogToBuffer(log_buffer, "Compaction nothing to do"); + } else if (c->IsDeletionCompaction()) { + // TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old + // file if there is alive snapshot pointing to it + assert(c->num_input_files(1) == 0); + assert(c->level() == 0); + assert(c->column_family_data()->options()->compaction_style == + kCompactionStyleFIFO); + for (const auto& f : *c->inputs(0)) { + c->edit()->DeleteFile(c->level(), f->number); + } + status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, + db_directory_.get()); + InstallSuperVersion(c->column_family_data(), deletion_state); + LogToBuffer(log_buffer, "[%s] Deleted %d files\n", + c->column_family_data()->GetName().c_str(), + c->num_input_files(0)); + c->ReleaseCompactionFiles(status); + *madeProgress = true; } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); @@ -2219,8 +2240,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (!m->done) { // We only compacted part of the requested range. Update *m // to the range that is left to be compacted. - // Universal compaction should always compact the whole range + // Universal and FIFO compactions should always compact the whole range assert(m->cfd->options()->compaction_style != kCompactionStyleUniversal); + assert(m->cfd->options()->compaction_style != kCompactionStyleFIFO); m->tmp_storage = *manual_end; m->begin = &m->tmp_storage; } @@ -4468,13 +4490,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - if (cfd->options()->compaction_style == kCompactionStyleUniversal) { + if (cfd->options()->compaction_style == kCompactionStyleUniversal || + cfd->options()->compaction_style == kCompactionStyleFIFO) { Version* current = cfd->current(); for (int i = 1; i < current->NumberLevels(); ++i) { int num_files = current->NumLevelFiles(i); if (num_files > 0) { - s = Status::InvalidArgument("Not all files are at level 0. Cannot " - "open with universal compaction style."); + s = Status::InvalidArgument( + "Not all files are at level 0. Cannot " + "open with universal or FIFO compaction style."); break; } } diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index d6551b45a48..927a01a043e 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -81,7 +81,8 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin, cfd = cfh->cfd(); } int output_level = - (cfd->options()->compaction_style == kCompactionStyleUniversal) + (cfd->options()->compaction_style == kCompactionStyleUniversal || + cfd->options()->compaction_style == kCompactionStyleFIFO) ? level : level + 1; return RunManualCompaction(cfd, level, output_level, begin, end); diff --git a/db/db_test.cc b/db/db_test.cc index 05403fc070b..5e30b33f7e9 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -317,6 +317,7 @@ class DBTest { kCompressedBlockCache, kInfiniteMaxOpenFiles, kxxHashChecksum, + kFIFOCompaction, kEnd }; int option_config_; @@ -339,7 +340,8 @@ class DBTest { kSkipPlainTable = 8, kSkipHashIndex = 16, kSkipNoSeekToLast = 32, - kSkipHashCuckoo = 64 + kSkipHashCuckoo = 64, + kSkipFIFOCompaction = 128, }; DBTest() : option_config_(kDefault), @@ -391,6 +393,10 @@ class DBTest { if ((skip_mask & kSkipHashCuckoo) && (option_config_ == kHashCuckoo)) { continue; } + if ((skip_mask & kSkipFIFOCompaction) && + option_config_ == kFIFOCompaction) { + continue; + } break; } @@ -503,6 +509,10 @@ class DBTest { options.table_factory.reset(NewBlockBasedTableFactory(table_options)); break; } + case kFIFOCompaction: { + options.compaction_style = kCompactionStyleFIFO; + break; + } case kBlockBasedTableWithPrefixHashIndex: { BlockBasedTableOptions table_options; table_options.index_type = BlockBasedTableOptions::kHashSearch; @@ -1394,7 +1404,7 @@ TEST(DBTest, GetEncountersEmptyLevel) { env_->SleepForMicroseconds(1000000); ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); // XXX - } while (ChangeOptions(kSkipUniversalCompaction)); + } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction)); } // KeyMayExist can lead to a few false positives, but not false negatives. @@ -1460,7 +1470,8 @@ TEST(DBTest, KeyMayExist) { // KeyMayExist function only checks data in block caches, which is not used // by plain table format. - } while (ChangeOptions(kSkipPlainTable | kSkipHashIndex)); + } while ( + ChangeOptions(kSkipPlainTable | kSkipHashIndex | kSkipFIFOCompaction)); } TEST(DBTest, NonBlockingIteration) { @@ -4387,7 +4398,8 @@ TEST(DBTest, ApproximateSizes) { ASSERT_GT(NumTableFilesAtLevel(1, 1), 0); } // ApproximateOffsetOf() is not yet implemented in plain table format. - } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable)); + } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction | + kSkipPlainTable)); } TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) { @@ -4531,8 +4543,8 @@ TEST(DBTest, HiddenValuesAreRemoved) { // ApproximateOffsetOf() is not yet implemented in plain table format, // which is used by Size(). // skip HashCuckooRep as it does not support snapshot - } while (ChangeOptions(kSkipUniversalCompaction | kSkipPlainTable | - kSkipHashCuckoo)); + } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction | + kSkipPlainTable | kSkipHashCuckoo)); } TEST(DBTest, CompactBetweenSnapshots) { @@ -4588,7 +4600,7 @@ TEST(DBTest, CompactBetweenSnapshots) { ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]"); // skip HashCuckooRep as it does not support snapshot - } while (ChangeOptions(kSkipHashCuckoo)); + } while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction)); } TEST(DBTest, DeletionMarkers1) { @@ -4694,7 +4706,7 @@ TEST(DBTest, OverlapInLevel0) { Flush(1); ASSERT_EQ("3", FilesPerLevel(1)); ASSERT_EQ("NOT_FOUND", Get(1, "600")); - } while (ChangeOptions(kSkipUniversalCompaction)); + } while (ChangeOptions(kSkipUniversalCompaction | kSkipFIFOCompaction)); } TEST(DBTest, L0_CompactionBug_Issue44_a) { @@ -6797,6 +6809,42 @@ TEST(DBTest, ChecksumTest) { ASSERT_EQ("f", Get("e")); ASSERT_EQ("h", Get("g")); } + +TEST(DBTest, FIFOCompactionTest) { + for (int iter = 0; iter < 2; ++iter) { + // first iteration -- auto compaction + // second iteration -- manual compaction + Options options; + options.compaction_style = kCompactionStyleFIFO; + options.write_buffer_size = 100 << 10; // 100KB + options.compaction_options_fifo.max_table_files_size = 500 << 10; // 500KB + options.compression = kNoCompression; + options.create_if_missing = true; + if (iter == 1) { + options.disable_auto_compactions = true; + } + DestroyAndReopen(&options); + + Random rnd(301); + for (int i = 0; i < 6; ++i) { + for (int j = 0; j < 100; ++j) { + ASSERT_OK(Put(std::to_string(i * 100 + j), RandomString(&rnd, 1024))); + } + // flush should happen here + } + if (iter == 0) { + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + } else { + ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + } + // only 5 files should survive + ASSERT_EQ(NumTableFilesAtLevel(0), 5); + for (int i = 0; i < 50; ++i) { + // these keys should be deleted in previous compaction + ASSERT_EQ("NOT_FOUND", Get(std::to_string(i))); + } + } +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/version_set.cc b/db/version_set.cc index 02e9aa15237..5327cf55fd9 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -711,7 +711,8 @@ void Version::ComputeCompactionScore( int max_score_level = 0; int num_levels_to_check = - (cfd_->options()->compaction_style != kCompactionStyleUniversal) + (cfd_->options()->compaction_style != kCompactionStyleUniversal && + cfd_->options()->compaction_style != kCompactionStyleFIFO) ? NumberLevels() - 1 : 1; @@ -730,14 +731,18 @@ void Version::ComputeCompactionScore( // setting, or very high compression ratios, or lots of // overwrites/deletions). int numfiles = 0; + uint64_t total_size = 0; for (unsigned int i = 0; i < files_[level].size(); i++) { if (!files_[level][i]->being_compacted) { + total_size += files_[level][i]->file_size; numfiles++; } } - - // If we are slowing down writes, then we better compact that first - if (numfiles >= cfd_->options()->level0_stop_writes_trigger) { + if (cfd_->options()->compaction_style == kCompactionStyleFIFO) { + score = static_cast(total_size) / + cfd_->options()->compaction_options_fifo.max_table_files_size; + } else if (numfiles >= cfd_->options()->level0_stop_writes_trigger) { + // If we are slowing down writes, then we better compact that first score = 1000000; } else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) { score = 10000; @@ -803,6 +808,10 @@ bool CompareSeqnoDescending(const Version::Fsize& first, } // anonymous namespace void Version::UpdateFilesBySize() { + if (cfd_->options()->compaction_style == kCompactionStyleFIFO) { + // don't need this + return; + } // No need to sort the highest level because it is never compacted. int max_level = (cfd_->options()->compaction_style == kCompactionStyleUniversal) @@ -871,7 +880,8 @@ bool Version::NeedsCompaction() const { // TODO(sdong): improve this function to be accurate for universal // compactions. int num_levels_to_check = - (cfd_->options()->compaction_style != kCompactionStyleUniversal) + (cfd_->options()->compaction_style != kCompactionStyleUniversal && + cfd_->options()->compaction_style != kCompactionStyleFIFO) ? NumberLevels() - 1 : 1; for (int i = 0; i < num_levels_to_check; i++) { @@ -1253,7 +1263,7 @@ struct VersionSet::ManifestWriter { class VersionSet::Builder { private: // Helper to sort v->files_ - // kLevel0LevelCompaction -- NewestFirst + // kLevel0LevelCompaction -- NewestFirst (also used for FIFO compaction) // kLevel0UniversalCompaction -- NewestFirstBySeqNo // kLevelNon0 -- BySmallestKey struct FileComparator { diff --git a/db/version_set.h b/db/version_set.h index 13a138341b7..ffadb581330 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -217,6 +217,7 @@ class Version { friend class CompactionPicker; friend class LevelCompactionPicker; friend class UniversalCompactionPicker; + friend class FIFOCompactionPicker; class LevelFileNumIterator; class LevelFileIteratorState; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e26ecde5163..9ba6a522cfa 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -53,8 +53,18 @@ enum CompressionType : char { }; enum CompactionStyle : char { - kCompactionStyleLevel = 0x0, // level based compaction style - kCompactionStyleUniversal = 0x1 // Universal compaction style + kCompactionStyleLevel = 0x0, // level based compaction style + kCompactionStyleUniversal = 0x1, // Universal compaction style + kCompactionStyleFIFO = 0x2, // FIFO compaction style +}; + +struct CompactionOptionsFIFO { + // once the total sum of table files reaches this, we will delete the oldest + // table file + // Default: 1GB + uint64_t max_table_files_size; + + CompactionOptionsFIFO() : max_table_files_size(1 * 1024 * 1024 * 1024) {} }; // Compression options for different compression algorithms like Zlib @@ -429,6 +439,9 @@ struct ColumnFamilyOptions { // The options needed to support Universal Style compactions CompactionOptionsUniversal compaction_options_universal; + // The options for FIFO compaction style + CompactionOptionsFIFO compaction_options_fifo; + // Use KeyMayExist API to filter deletes when this is true. // If KeyMayExist returns false, i.e. the key definitely does not exist, then // the delete is a noop. KeyMayExist only incurs in-memory look up. diff --git a/util/options.cc b/util/options.cc index 22952f587b0..4fe8b219ecd 100644 --- a/util/options.cc +++ b/util/options.cc @@ -135,6 +135,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) compaction_style(options.compaction_style), verify_checksums_in_compaction(options.verify_checksums_in_compaction), compaction_options_universal(options.compaction_options_universal), + compaction_options_fifo(options.compaction_options_fifo), filter_deletes(options.filter_deletes), max_sequential_skip_in_iterations( options.max_sequential_skip_in_iterations), @@ -413,6 +414,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { Log(log, "Options.compaction_options_universal.compression_size_percent: %u", compaction_options_universal.compression_size_percent); + Log(log, "Options.compaction_options_fifo.max_table_files_size: %" PRIu64, + compaction_options_fifo.max_table_files_size); std::string collector_names; for (const auto& collector_factory : table_properties_collector_factories) { collector_names.append(collector_factory->Name());