Skip to content

Commit

Permalink
chore: minor refactorings around dense_set deletions (#4390)
Browse files Browse the repository at this point in the history
chore: refactorings around deletions

Done as a preparation to introduce asynchronous deletions for sets/zsets/hmaps.
1. Restrict the interface around DbSlice::Del. Now it requires for the iterator to be valid and the checks should
be explicit before the call. Most callers already provides a valid iterator.

2. Some minor refactoring in compact_object_test.
3. Expose DenseSet::ClearStep to allow iterative deletions of elements.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jan 2, 2025
1 parent 3b082e4 commit 7a68528
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 92 deletions.
4 changes: 2 additions & 2 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -791,8 +791,8 @@ uint64_t CompactObj::HashCode() const {
}

if (encoded) {
GetString(&tl.tmp_str);
return XXH3_64bits_withSeed(tl.tmp_str.data(), tl.tmp_str.size(), kHashSeed);
string_view sv = GetSlice(&tl.tmp_str);
return XXH3_64bits_withSeed(sv.data(), sv.size(), kHashSeed);
}

switch (taglen_) {
Expand Down
39 changes: 24 additions & 15 deletions src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,38 @@ void DeallocateAtRandom(size_t steps, std::vector<void*>* ptrs) {
}
}

static void InitThreadStructs() {
auto* tlh = mi_heap_get_backing();
init_zmalloc_threadlocal(tlh);
SmallString::InitThreadLocal(tlh);
thread_local MiMemoryResource mi_resource(tlh);
CompactObj::InitThreadLocal(&mi_resource);
};

static void CheckEverythingDeallocated() {
mi_heap_collect(mi_heap_get_backing(), true);

auto cb_visit = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) {
LOG(ERROR) << "Unfreed allocations: block_size " << block_size
<< ", allocated: " << area->used * block_size;
return true;
};

mi_heap_visit_blocks(mi_heap_get_backing(), false /* do not visit all blocks*/, cb_visit,
nullptr);
}

class CompactObjectTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
InitRedisTables(); // to initialize server struct.

auto* tlh = mi_heap_get_backing();
init_zmalloc_threadlocal(tlh);
SmallString::InitThreadLocal(tlh);
CompactObj::InitThreadLocal(PMR_NS::get_default_resource());
InitThreadStructs();
}

static void TearDownTestSuite() {
mi_heap_collect(mi_heap_get_backing(), true);

auto cb_visit = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) {
LOG(ERROR) << "Unfreed allocations: block_size " << block_size
<< ", allocated: " << area->used * block_size;
return true;
};

mi_heap_visit_blocks(mi_heap_get_backing(), false /* do not visit all blocks*/, cb_visit,
nullptr);
CheckEverythingDeallocated();
}

CompactObj cobj_;
Expand Down
5 changes: 3 additions & 2 deletions src/core/dense_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ auto DenseSet::PopPtrFront(DenseSet::ChainVectorIterator it) -> DensePtr {
return front;
}

uint32_t DenseSet::ClearInternal(uint32_t start, uint32_t count) {
uint32_t DenseSet::ClearStep(uint32_t start, uint32_t count) {
constexpr unsigned kArrLen = 32;
ClearItem arr[kArrLen];
unsigned len = 0;

size_t end = min<size_t>(entries_.size(), start + count);
for (size_t i = start; i < end; ++i) {
DensePtr ptr = entries_[i];
DensePtr& ptr = entries_[i];
if (ptr.IsEmpty())
continue;

Expand All @@ -190,6 +190,7 @@ uint32_t DenseSet::ClearInternal(uint32_t start, uint32_t count) {
dest.ptr = ptr;
dest.obj = nullptr;
}
ptr.Reset();
if (len == kArrLen) {
ClearBatch(kArrLen, arr);
len = 0;
Expand Down
11 changes: 5 additions & 6 deletions src/core/dense_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,13 @@ class DenseSet {
virtual ~DenseSet();

void Clear() {
ClearInternal(0, entries_.size());
ClearStep(0, entries_.size());
}

// Returns the next bucket index that should be cleared.
// Returns BucketCount when all objects are erased.
uint32_t ClearStep(uint32_t start, uint32_t count);

// Returns the number of elements in the map. Note that it might be that some of these elements
// have expired and can't be accessed.
size_t UpperBoundSize() const {
Expand Down Expand Up @@ -303,11 +307,6 @@ class DenseSet {

void* PopInternal();

// Note this does not free any dynamic allocations done by derived classes, that a DensePtr
// in the set may point to. This function only frees the allocated DenseLinkKeys created by
// DenseSet. All data allocated by a derived class should be freed before calling this
uint32_t ClearInternal(uint32_t start, uint32_t count);

void IncreaseMallocUsed(size_t delta) {
obj_malloc_used_ += delta;
}
Expand Down
10 changes: 3 additions & 7 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -669,10 +669,8 @@ void DbSlice::ActivateDb(DbIndex db_ind) {
CreateDb(db_ind);
}

bool DbSlice::Del(Context cntx, Iterator it) {
if (!IsValid(it)) {
return false;
}
void DbSlice::Del(Context cntx, Iterator it) {
CHECK(IsValid(it));

auto& db = db_arr_[cntx.db_index];
auto obj_type = it->second.ObjType();
Expand All @@ -683,8 +681,6 @@ bool DbSlice::Del(Context cntx, Iterator it) {
doc_del_cb_(key, cntx, it->second);
}
PerformDeletion(it, db.get());

return true;
}

void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
Expand Down Expand Up @@ -917,7 +913,7 @@ OpResult<int64_t> DbSlice::UpdateExpire(const Context& cntx, Iterator prime_it,
}

if (rel_msec <= 0) { // implicit - don't persist
CHECK(Del(cntx, prime_it));
Del(cntx, prime_it);
return -1;
} else if (IsValid(expire_it) && !params.persist) {
auto current = ExpireTime(expire_it);
Expand Down
3 changes: 2 additions & 1 deletion src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ class DbSlice {
// Delete a key referred by its iterator.
void PerformDeletion(Iterator del_it, DbTable* table);

bool Del(Context cntx, Iterator it);
// Deletes the iterator. The iterator must be valid.
void Del(Context cntx, Iterator it);

constexpr static DbIndex kDbAll = 0xFFFF;

Expand Down
5 changes: 2 additions & 3 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,11 @@ bool EngineShard::DoDefrag() {
// priority.
// otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
constexpr uint32_t kRunAtLowPriority = 0u;
if (!namespaces) {
return util::ProactorBase::kOnIdleMaxLevel;
return kRunAtLowPriority;
}

constexpr uint32_t kRunAtLowPriority = 0u;

if (defrag_state_.CheckRequired()) {
VLOG(2) << shard_id_ << ": need to run defrag memory cursor state: " << defrag_state_.cursor;
if (DoDefrag()) {
Expand Down
69 changes: 39 additions & 30 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ OpStatus Renamer::DelSrc(Transaction* t, EngineShard* shard) {
DVLOG(1) << "Rename: removing the key '" << src_key_;

res.post_updater.Run();
CHECK(db_slice.Del(t->GetDbContext(), it));
db_slice.Del(t->GetDbContext(), it);
if (shard->journal()) {
RecordJournal(t->GetOpArgs(shard), "DEL"sv, ArgSlice{src_key_}, 2);
}
Expand All @@ -462,7 +462,7 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) {
if (dest_found_) {
DVLOG(1) << "Rename: deleting the destiny key '" << dest_key_;
dest_res.post_updater.Run();
CHECK(db_slice.Del(op_args.db_cntx, dest_res.it));
db_slice.Del(op_args.db_cntx, dest_res.it);
}

if (restore_args.Expired()) {
Expand Down Expand Up @@ -554,7 +554,7 @@ OpResult<bool> OpRestore(const OpArgs& op_args, std::string_view key, std::strin
VLOG(1) << "restore command is running with replace, found old key '" << key
<< "' and removing it";
res.post_updater.Run();
CHECK(db_slice.Del(op_args.db_cntx, res.it));
db_slice.Del(op_args.db_cntx, res.it);
} else {
// we are not allowed to replace it.
return OpStatus::KEY_EXISTS;
Expand Down Expand Up @@ -812,7 +812,7 @@ OpStatus OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) {
// Restore expire flag after std::move.
from_res.it->second.SetExpire(IsValid(from_res.exp_it));

CHECK(db_slice.Del(op_args.db_cntx, from_res.it));
db_slice.Del(op_args.db_cntx, from_res.it);
auto op_result = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts);
RETURN_ON_BAD_STATUS(op_result);
auto& add_res = *op_result;
Expand Down Expand Up @@ -868,13 +868,13 @@ OpResult<void> OpRen(const OpArgs& op_args, string_view from_key, string_view to
to_res.post_updater.Run();

from_res.post_updater.Run();
CHECK(db_slice.Del(op_args.db_cntx, from_res.it));
db_slice.Del(op_args.db_cntx, from_res.it);
} else {
// Here we first delete from_it because AddNew below could invalidate from_it.
// On the other hand, AddNew does not rely on the iterators - this is why we keep
// the value in `from_obj`.
from_res.post_updater.Run();
CHECK(db_slice.Del(op_args.db_cntx, from_res.it));
db_slice.Del(op_args.db_cntx, from_res.it);
auto op_result = db_slice.AddNew(op_args.db_cntx, to_key, std::move(from_obj), exp_ts);
RETURN_ON_BAD_STATUS(op_result);
to_res = std::move(*op_result);
Expand Down Expand Up @@ -995,35 +995,14 @@ std::optional<int32_t> ParseExpireOptionsOrReply(const CmdArgList args, SinkRepl
return flags;
}

} // namespace

OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) {
DVLOG(1) << "Del: " << keys.Front();
auto& db_slice = op_args.GetDbSlice();

uint32_t res = 0;

for (string_view key : keys) {
auto fres = db_slice.FindMutable(op_args.db_cntx, key);
if (!IsValid(fres.it))
continue;
fres.post_updater.Run();
res += int(db_slice.Del(op_args.db_cntx, fres.it));
}

return res;
}

void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) {
VLOG(1) << "Del " << ArgS(args, 0);

void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx) {
atomic_uint32_t result{0};
auto* builder = cmd_cntx.rb;
bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE);

auto cb = [&result](const Transaction* t, EngineShard* shard) {
ShardArgs args = t->GetShardArgs(shard->shard_id());
auto res = OpDel(t->GetOpArgs(shard), args);
auto res = GenericFamily::OpDel(t->GetOpArgs(shard), args);
result.fetch_add(res.value_or(0), memory_order_relaxed);

return OpStatus::OK;
Expand All @@ -1049,6 +1028,36 @@ void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) {
}
}

} // namespace

OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) {
DVLOG(1) << "Del: " << keys.Front();
auto& db_slice = op_args.GetDbSlice();

uint32_t res = 0;

for (string_view key : keys) {
auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately
if (!IsValid(it))
continue;

db_slice.Del(op_args.db_cntx, it);
++res;
}

return res;
}

void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) {
VLOG(1) << "Del " << ArgS(args, 0);

DeleteGeneric(args, cmd_cntx);
}

void GenericFamily::Unlink(CmdArgList args, const CommandContext& cmd_cntx) {
DeleteGeneric(args, cmd_cntx);
}

void GenericFamily::Ping(CmdArgList args, const CommandContext& cmd_cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (args.size() > 1) {
Expand Down Expand Up @@ -1886,7 +1895,7 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"TIME", CO::LOADING | CO::FAST, 1, 0, 0, acl::kTime}.HFUNC(Time)
<< CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, acl::kType}.HFUNC(Type)
<< CI{"DUMP", CO::READONLY, 2, 1, 1, acl::kDump}.HFUNC(Dump)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, acl::kUnlink}.HFUNC(Del)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, acl::kUnlink}.HFUNC(Unlink)
<< CI{"STICK", CO::WRITE, -2, 1, -1, acl::kStick}.HFUNC(Stick)
<< CI{"SORT", CO::READONLY, -2, 1, 1, acl::kSort}.HFUNC(Sort)
<< CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kMove}.HFUNC(
Expand Down
1 change: 1 addition & 0 deletions src/server/generic_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class GenericFamily {
using SinkReplyBuilder = facade::SinkReplyBuilder;

static void Del(CmdArgList args, const CommandContext& cmd_cntx);
static void Unlink(CmdArgList args, const CommandContext& cmd_cntx);
static void Ping(CmdArgList args, const CommandContext& cmd_cntx);
static void Exists(CmdArgList args, const CommandContext& cmd_cntx);
static void Expire(CmdArgList args, const CommandContext& cmd_cntx);
Expand Down
19 changes: 9 additions & 10 deletions src/server/hset_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -585,10 +585,10 @@ OpResult<vector<string>> OpGetAll(const OpArgs& op_args, string_view key, uint8_
// and the enconding is guaranteed to be a DenseSet since we only support expiring
// value with that enconding.
if (res.empty()) {
auto mutable_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_HASH);
// Run postupdater, it means that we deleted the keys
mutable_res->post_updater.Run();
db_slice.Del(op_args.db_cntx, mutable_res->it);
// post_updater will run immediately
auto it = db_slice.FindMutable(op_args.db_cntx, key).it;

db_slice.Del(op_args.db_cntx, it);
}

return res;
Expand Down Expand Up @@ -1169,10 +1169,10 @@ void HSetFamily::HRandField(CmdArgList args, const CommandContext& cmd_cntx) {
}
}

if (string_map->Empty()) {
auto it_mutable = db_slice.FindMutable(db_context, key, OBJ_HASH);
it_mutable->post_updater.Run();
db_slice.Del(db_context, it_mutable->it);
if (string_map->Empty()) { // Can happen if we use a TTL on hash members.
// post_updater will run immediately
auto it = db_slice.FindMutable(db_context, key).it;
db_slice.Del(db_context, it);
return facade::OpStatus::KEY_NOTFOUND;
}
} else if (pv.Encoding() == kEncodingListPack) {
Expand Down Expand Up @@ -1207,8 +1207,7 @@ void HSetFamily::HRandField(CmdArgList args, const CommandContext& cmd_cntx) {
}
}
} else {
LOG(ERROR) << "Invalid encoding " << pv.Encoding();
return OpStatus::INVALID_VALUE;
LOG(FATAL) << "Invalid encoding " << pv.Encoding();
}
return str_vec;
};
Expand Down
7 changes: 6 additions & 1 deletion src/server/json_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -913,8 +913,13 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path,
if (json_path.RefersToRootElement()) {
auto& db_slice = op_args.GetDbSlice();
auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately
return static_cast<long>(db_slice.Del(op_args.db_cntx, it));
if (IsValid(it)) {
db_slice.Del(op_args.db_cntx, it);
return 1;
}
return 0;
}

JsonMemTracker tracker;
// FindMutable because we need to run the AutoUpdater at the end which will account
// the deltas calculated from the MemoryTracker
Expand Down
Loading

0 comments on commit 7a68528

Please sign in to comment.