Skip to content

Commit

Permalink
fix: Correctly grow dense_set in the Reserve call (#2087)
Browse files Browse the repository at this point in the history
Fixes #2066

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Oct 29, 2023
1 parent 502efd8 commit 47d92fb
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 23 deletions.
26 changes: 20 additions & 6 deletions src/core/dense_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,22 @@ void DenseSet::ClearInternal() {
for (auto it = entries_.begin(); it != entries_.end(); ++it) {
while (!it->IsEmpty()) {
bool has_ttl = it->HasTtl();
bool is_displ = it->IsDisplaced();
void* obj = PopDataFront(it);
int32_t delta = int32_t(BucketId(obj, 0)) - int32_t(it - entries_.begin());
if (is_displ) {
DCHECK(delta < 2 || delta > -2);
} else {
DCHECK_EQ(delta, 0);
}
ObjDelete(obj, has_ttl);
}
}

entries_.clear();
num_used_buckets_ = 0;
num_chain_entries_ = 0;
size_ = 0;
}

bool DenseSet::Equal(DensePtr dptr, const void* ptr, uint32_t cookie) const {
Expand Down Expand Up @@ -234,16 +244,14 @@ void DenseSet::Reserve(size_t sz) {

sz = absl::bit_ceil(sz);
if (sz > entries_.size()) {
size_t prev_size = entries_.size();
entries_.resize(sz);
capacity_log_ = absl::bit_width(sz) - 1;
Grow(prev_size);
}
}

void DenseSet::Grow() {
size_t prev_size = entries_.size();
entries_.resize(prev_size * 2);
++capacity_log_;

void DenseSet::Grow(size_t prev_size) {
// perform rehashing of items in the set
for (long i = prev_size - 1; i >= 0; --i) {
DensePtr* curr = &entries_[i];
Expand Down Expand Up @@ -299,6 +307,7 @@ void DenseSet::Grow() {
}

DVLOG(2) << " Pushing to " << bid << " " << dptr.GetObject();
DCHECK_EQ(BucketId(dptr.GetObject(), 0), bid);
PushFront(dest, dptr);

dest->ClearDisplaced();
Expand Down Expand Up @@ -371,7 +380,11 @@ void DenseSet::AddUnique(void* obj, bool has_ttl, uint64_t hashcode) {
break;
}

Grow();
size_t prev_size = entries_.size();
entries_.resize(prev_size * 2);
++capacity_log_;

Grow(prev_size);
bucket_id = BucketId(hashcode);
}

Expand Down Expand Up @@ -403,6 +416,7 @@ void DenseSet::AddUnique(void* obj, bool has_ttl, uint64_t hashcode) {
++num_chain_entries_;
}

DCHECK_EQ(BucketId(to_insert.GetObject(), 0), bucket_id);
ChainVectorIterator list = entries_.begin() + bucket_id;
PushFront(list, to_insert);
obj_malloc_used_ += ObjectAllocSize(obj);
Expand Down
2 changes: 1 addition & 1 deletion src/core/dense_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ class DenseSet {
// return if bucket has no item which is not displaced and right/left bucket has no displaced item
// belong to given bid
bool NoItemBelongsBucket(uint32_t bid) const;
void Grow();
void Grow(size_t prev_size);

// ============ Pseudo Linked List Functions for interacting with Chains ==================
size_t PushFront(ChainVectorIterator, void* obj, bool has_ttl);
Expand Down
14 changes: 13 additions & 1 deletion src/core/string_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pair<sds, uint64_t> CreateEntry(string_view field, string_view value, uint32_t t
} // namespace

StringMap::~StringMap() {
Clear();
ClearInternal();
}

bool StringMap::AddOrUpdate(string_view field, string_view value, uint32_t ttl_sec) {
Expand Down Expand Up @@ -276,4 +276,16 @@ detail::SdsPair StringMap::iterator::BreakToPair(void* obj) {
return detail::SdsPair(f, GetValue(f));
}

bool StringMap::iterator::ReallocIfNeeded(float ratio) {
// Unwrap all links to correctly call SetObject()
auto* ptr = curr_entry_;
while (ptr->IsLink())
ptr = ptr->AsLink();

auto* obj = ptr->GetObject();
auto [new_obj, realloced] = static_cast<StringMap*>(owner_)->ReallocIfNeeded(obj, ratio);
ptr->SetObject(new_obj);
return realloced;
}

} // namespace dfly
12 changes: 1 addition & 11 deletions src/core/string_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,7 @@ class StringMap : public DenseSet {

// Try reducing memory fragmentation of the value by re-allocating. Returns true if
// re-allocation happened.
bool ReallocIfNeeded(float ratio) {
// Unwrap all links to correctly call SetObject()
auto* ptr = curr_entry_;
while (ptr->IsLink())
ptr = ptr->AsLink();

auto* obj = ptr->GetObject();
auto [new_obj, realloced] = static_cast<StringMap*>(owner_)->ReallocIfNeeded(obj, ratio);
ptr->SetObject(new_obj);
return realloced;
}
bool ReallocIfNeeded(float ratio);

iterator& operator++() {
Advance();
Expand Down
30 changes: 30 additions & 0 deletions src/core/string_set_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -418,4 +418,34 @@ TEST_F(StringSetTest, Ttl) {
}
}

TEST_F(StringSetTest, Grow) {
mt19937 generator(0);

for (size_t j = 0; j < 10; ++j) {
for (size_t i = 0; i < 4098; ++i) {
ss_->Reserve(generator() % 256);
auto str = random_string(generator, 3);
ss_->Add(str);
}
ss_->Clear();
}
}

TEST_F(StringSetTest, Reserve) {
vector<string> strs;
mt19937 generator(0);

for (size_t i = 0; i < 10; ++i) {
strs.push_back(random_string(generator, 10));
ss_->Add(strs.back());
}

for (size_t j = 2; j < 20; j += 3) {
ss_->Reserve(j * 20);
for (size_t i = 0; i < 10; ++i) {
ASSERT_TRUE(ss_->Contains(strs[i]));
}
}
}

} // namespace dfly
2 changes: 2 additions & 0 deletions src/server/dfly_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,8 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {

// Start the acceptor loop and wait for the server to shutdown.
acceptor->Run();
google::FlushLogFiles(google::INFO); // Flush the header.

acceptor->Wait();

version_monitor.Shutdown();
Expand Down
6 changes: 4 additions & 2 deletions src/server/hset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -683,10 +683,12 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
bool added;

for (size_t i = 0; i < values.size(); i += 2) {
string_view field = ToSV(values[i]);
string_view value = ToSV(values[i + 1]);
if (op_sp.skip_if_exists)
added = sm->AddOrSkip(ToSV(values[i]), ToSV(values[i + 1]), op_sp.ttl);
added = sm->AddOrSkip(field, value, op_sp.ttl);
else
added = sm->AddOrUpdate(ToSV(values[i]), ToSV(values[i + 1]), op_sp.ttl);
added = sm->AddOrUpdate(field, value, op_sp.ttl);

created += unsigned(added);
}
Expand Down
3 changes: 2 additions & 1 deletion src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
return;

if (!string_map->AddOrSkip(key, val)) {
LOG(ERROR) << "Duplicate hash fields detected";
LOG(ERROR) << "Duplicate hash fields detected for field " << key;
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
Expand Down Expand Up @@ -2272,6 +2272,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
for (const auto* item : ib) {
PrimeValue pv;
if (ec_ = FromOpaque(item->val, &pv); ec_) {
LOG(ERROR) << "Could not load value for key '" << item->key << "' in DB " << db_ind;
stop_early_ = true;
break;
}
Expand Down
4 changes: 3 additions & 1 deletion src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,10 @@ io::Result<uint8_t> RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValu
return make_unexpected(ec);

ec = SaveValue(pv);
if (ec)
if (ec) {
LOG(ERROR) << "Problems saving value for key " << key << " in dbid=" << dbid;
return make_unexpected(ec);
}

return rdb_type;
}
Expand Down

0 comments on commit 47d92fb

Please sign in to comment.