Skip to content

Commit

Permalink
FIFO compaction style
Browse files Browse the repository at this point in the history
Summary:
Introducing new compaction style -- FIFO.

FIFO compaction style has write amplification of 1 (+1 for WAL) and it deletes the oldest files when the total DB size exceeds pre-configured values.

FIFO compaction style is suited for storing high-frequency event logs.

Test Plan: Added a unit test

Reviewers: dhruba, haobo, sdong

Reviewed By: dhruba

Subscribers: alberts, leveldb

Differential Revision: https://reviews.facebook.net/D18765
  • Loading branch information
igorcanadi committed May 21, 2014
1 parent 220132b commit 6de6a06
Show file tree
Hide file tree
Showing 13 changed files with 248 additions and 33 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

### New Features
* Hash index for block-based table will be materialized and reconstructed more efficiently. Previously hash index is constructed by scanning the whole table during every table open.
* FIFO compaction style

## 3.0.0 (05/05/2014)

Expand Down
22 changes: 18 additions & 4 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <vector>
#include <string>
#include <algorithm>
#include <limits>

#include "db/db_impl.h"
#include "db/version_set.h"
Expand Down Expand Up @@ -116,6 +117,15 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp,
collector_factories.push_back(
std::make_shared<InternalKeyPropertiesCollectorFactory>());

if (result.compaction_style == kCompactionStyleFIFO) {
result.num_levels = 1;
// since we delete level0 files in FIFO compaction when there are too many
// of them, these options don't really mean anything
result.level0_file_num_compaction_trigger = std::numeric_limits<int>::max();
result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
result.level0_stop_writes_trigger = std::numeric_limits<int>::max();
}

return result;
}

Expand Down Expand Up @@ -196,7 +206,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,
options_(*db_options, SanitizeOptions(&internal_comparator_,
&internal_filter_policy_, options)),
mem_(nullptr),
imm_(options.min_write_buffer_number_to_merge),
imm_(options_.min_write_buffer_number_to_merge),
super_version_(nullptr),
super_version_number_(0),
local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)),
Expand All @@ -209,16 +219,20 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id,

// if dummy_versions is nullptr, then this is a dummy column family.
if (dummy_versions != nullptr) {
internal_stats_.reset(new InternalStats(options.num_levels, db_options->env,
db_options->statistics.get()));
internal_stats_.reset(new InternalStats(
options_.num_levels, db_options->env, db_options->statistics.get()));
table_cache_.reset(
new TableCache(dbname, &options_, storage_options, table_cache));
if (options_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(
new UniversalCompactionPicker(&options_, &internal_comparator_));
} else {
} else if (options_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(&options_, &internal_comparator_));
} else {
assert(options_.compaction_style == kCompactionStyleFIFO);
compaction_picker_.reset(
new FIFOCompactionPicker(&options_, &internal_comparator_));
}

Log(options_.info_log, "Options for column family \"%s\":\n",
Expand Down
8 changes: 7 additions & 1 deletion db/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ static uint64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
Compaction::Compaction(Version* input_version, int level, int out_level,
uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes,
bool seek_compaction, bool enable_compression)
bool seek_compaction, bool enable_compression,
bool deletion_compaction)
: level_(level),
out_level_(out_level),
max_output_file_size_(target_file_size),
Expand All @@ -39,6 +40,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level,
cfd_(input_version_->cfd_),
seek_compaction_(seek_compaction),
enable_compression_(enable_compression),
deletion_compaction_(deletion_compaction),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0),
Expand Down Expand Up @@ -83,6 +85,8 @@ bool Compaction::IsTrivialMove() const {
TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_);
}

bool Compaction::IsDeletionCompaction() const { return deletion_compaction_; }

void Compaction::AddInputDeletions(VersionEdit* edit) {
for (int which = 0; which < 2; which++) {
for (size_t i = 0; i < inputs_[which].size(); i++) {
Expand All @@ -92,6 +96,7 @@ void Compaction::AddInputDeletions(VersionEdit* edit) {
}

bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
return bottommost_level_;
}
Expand Down Expand Up @@ -155,6 +160,7 @@ void Compaction::MarkFilesBeingCompacted(bool value) {

// Is this compaction producing files at the bottommost level?
void Compaction::SetupBottomMostLevel(bool isManual) {
assert(cfd_->options()->compaction_style != kCompactionStyleFIFO);
if (cfd_->options()->compaction_style == kCompactionStyleUniversal) {
// If universal compaction style is used and manual
// compaction is occuring, then we are guaranteed that
Expand Down
9 changes: 8 additions & 1 deletion db/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class Compaction {
// moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const;

// If true, just delete all files in inputs_[0]
bool IsDeletionCompaction() const;

// Add all inputs to this compaction as delete operations to *edit.
void AddInputDeletions(VersionEdit* edit);

Expand Down Expand Up @@ -91,11 +94,13 @@ class Compaction {
private:
friend class CompactionPicker;
friend class UniversalCompactionPicker;
friend class FIFOCompactionPicker;
friend class LevelCompactionPicker;

Compaction(Version* input_version, int level, int out_level,
uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes,
bool seek_compaction = false, bool enable_compression = true);
bool seek_compaction = false, bool enable_compression = true,
bool deletion_compaction = false);

int level_;
int out_level_; // levels to which output files are stored
Expand All @@ -108,6 +113,8 @@ class Compaction {

bool seek_compaction_;
bool enable_compression_;
// if true, just delete files in inputs_[0]
bool deletion_compaction_;

// Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
Expand Down
71 changes: 71 additions & 0 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

#include "db/compaction_picker.h"

#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <limits>
#include "util/log_buffer.h"
#include "util/statistics.h"
Expand Down Expand Up @@ -307,6 +309,9 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
// CompactionPickerFIFO has its own implementation of compact range
assert(options_->compaction_style != kCompactionStyleFIFO);

std::vector<FileMetaData*> inputs;
bool covering_the_whole_range = true;

Expand Down Expand Up @@ -886,4 +891,70 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
return c;
}

Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
assert(version->NumberLevels() == 1);
uint64_t total_size = 0;
for (const auto& file : version->files_[0]) {
total_size += file->file_size;
}

if (total_size <= options_->compaction_options_fifo.max_table_files_size ||
version->files_[0].size() == 0) {
// total size not exceeded
LogToBuffer(log_buffer,
"[%s] FIFO compaction: nothing to do. Total size %" PRIu64
", max size %" PRIu64 "\n",
version->cfd_->GetName().c_str(), total_size,
options_->compaction_options_fifo.max_table_files_size);
return nullptr;
}

if (compactions_in_progress_[0].size() > 0) {
LogToBuffer(log_buffer,
"[%s] FIFO compaction: Already executing compaction. No need "
"to run parallel compactions since compactions are very fast",
version->cfd_->GetName().c_str());
return nullptr;
}

Compaction* c = new Compaction(version, 0, 0, 0, 0, false, false,
true /* is deletion compaction */);
// delete old files (FIFO)
for (auto ritr = version->files_[0].rbegin();
ritr != version->files_[0].rend(); ++ritr) {
auto f = *ritr;
total_size -= f->file_size;
c->inputs_[0].push_back(f);
char tmp_fsize[16];
AppendHumanBytes(f->file_size, tmp_fsize, sizeof(tmp_fsize));
LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
version->cfd_->GetName().c_str(), f->number, tmp_fsize);
if (total_size <= options_->compaction_options_fifo.max_table_files_size) {
break;
}
}

c->MarkFilesBeingCompacted(true);
compactions_in_progress_[0].insert(c);

return c;
}

Compaction* FIFOCompactionPicker::CompactRange(Version* version,
int input_level,
int output_level,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
assert(input_level == 0);
assert(output_level == 0);
*compaction_end = nullptr;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get());
auto c = PickCompaction(version, &log_buffer);
log_buffer.FlushBufferToLog();
return c;
}

} // namespace rocksdb
22 changes: 19 additions & 3 deletions db/compaction_picker.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ class CompactionPicker {
// compaction_end will be set to nullptr.
// Client is responsible for compaction_end storage -- when called,
// *compaction_end should point to valid InternalKey!
Compaction* CompactRange(Version* version, int input_level, int output_level,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);
virtual Compaction* CompactRange(Version* version, int input_level,
int output_level, const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end);

// Free up the files that participated in a compaction
void ReleaseCompactionFiles(Compaction* c, Status status);
Expand Down Expand Up @@ -162,4 +163,19 @@ class LevelCompactionPicker : public CompactionPicker {
Compaction* PickCompactionBySize(Version* version, int level, double score);
};

class FIFOCompactionPicker : public CompactionPicker {
public:
FIFOCompactionPicker(const Options* options,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}

virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;

virtual Compaction* CompactRange(Version* version, int input_level,
int output_level, const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) override;
};

} // namespace rocksdb
38 changes: 31 additions & 7 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1590,7 +1590,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
return s;
}

int max_level_with_files = 1;
int max_level_with_files = 0;
{
MutexLock l(&mutex_);
Version* base = cfd->current();
Expand All @@ -1604,6 +1604,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
// in case the compaction is unversal or if we're compacting the
// bottom-most level, the output level will be the same as input one
if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO ||
level == max_level_with_files) {
s = RunManualCompaction(cfd, level, level, begin, end);
} else {
Expand Down Expand Up @@ -1754,14 +1755,16 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
// For universal compaction, we enforce every manual compaction to compact
// all files.
if (begin == nullptr ||
cfd->options()->compaction_style == kCompactionStyleUniversal) {
cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO) {
manual.begin = nullptr;
} else {
begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
manual.begin = &begin_storage;
}
if (end == nullptr ||
cfd->options()->compaction_style == kCompactionStyleUniversal) {
cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO) {
manual.end = nullptr;
} else {
end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
Expand Down Expand Up @@ -2150,6 +2153,24 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (!c) {
// Nothing to do
LogToBuffer(log_buffer, "Compaction nothing to do");
} else if (c->IsDeletionCompaction()) {
// TODO(icanadi) Do we want to honor snapshots here? i.e. not delete old
// file if there is alive snapshot pointing to it
assert(c->num_input_files(1) == 0);
assert(c->level() == 0);
assert(c->column_family_data()->options()->compaction_style ==
kCompactionStyleFIFO);
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->number);
}
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
InstallSuperVersion(c->column_family_data(), deletion_state);
LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
c->ReleaseCompactionFiles(status);
*madeProgress = true;
} else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level
assert(c->num_input_files(0) == 1);
Expand Down Expand Up @@ -2219,8 +2240,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
if (!m->done) {
// We only compacted part of the requested range. Update *m
// to the range that is left to be compacted.
// Universal compaction should always compact the whole range
// Universal and FIFO compactions should always compact the whole range
assert(m->cfd->options()->compaction_style != kCompactionStyleUniversal);
assert(m->cfd->options()->compaction_style != kCompactionStyleFIFO);
m->tmp_storage = *manual_end;
m->begin = &m->tmp_storage;
}
Expand Down Expand Up @@ -4468,13 +4490,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,

if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (cfd->options()->compaction_style == kCompactionStyleUniversal) {
if (cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO) {
Version* current = cfd->current();
for (int i = 1; i < current->NumberLevels(); ++i) {
int num_files = current->NumLevelFiles(i);
if (num_files > 0) {
s = Status::InvalidArgument("Not all files are at level 0. Cannot "
"open with universal compaction style.");
s = Status::InvalidArgument(
"Not all files are at level 0. Cannot "
"open with universal or FIFO compaction style.");
break;
}
}
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
cfd = cfh->cfd();
}
int output_level =
(cfd->options()->compaction_style == kCompactionStyleUniversal)
(cfd->options()->compaction_style == kCompactionStyleUniversal ||
cfd->options()->compaction_style == kCompactionStyleFIFO)
? level
: level + 1;
return RunManualCompaction(cfd, level, output_level, begin, end);
Expand Down
Loading

0 comments on commit 6de6a06

Please sign in to comment.