From 3aa19eabc54ee41fd0df3d4a6ae2a45e74283894 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 31 Dec 2024 21:54:14 +0200 Subject: [PATCH] chore: minor refactorings around deletions No functional changes. Done as a preparation to introduce asynchronous deletions for sets/zsets/hmaps. Signed-off-by: Roman Gershman --- src/core/compact_object.cc | 4 +-- src/core/compact_object_test.cc | 39 ++++++++++++++--------- src/core/dense_set.cc | 5 +-- src/core/dense_set.h | 11 +++---- src/server/db_slice.cc | 2 ++ src/server/engine_shard.cc | 5 ++- src/server/generic_family.cc | 56 +++++++++++++++++++-------------- src/server/generic_family.h | 1 + src/server/set_family.cc | 8 +++-- 9 files changed, 76 insertions(+), 55 deletions(-) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index d30f6f44cf64..261a7f4bdfbd 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -791,8 +791,8 @@ uint64_t CompactObj::HashCode() const { } if (encoded) { - GetString(&tl.tmp_str); - return XXH3_64bits_withSeed(tl.tmp_str.data(), tl.tmp_str.size(), kHashSeed); + string_view sv = GetSlice(&tl.tmp_str); + return XXH3_64bits_withSeed(sv.data(), sv.size(), kHashSeed); } switch (taglen_) { diff --git a/src/core/compact_object_test.cc b/src/core/compact_object_test.cc index 207e67f1cfd4..eedce4fee71f 100644 --- a/src/core/compact_object_test.cc +++ b/src/core/compact_object_test.cc @@ -76,29 +76,38 @@ void DeallocateAtRandom(size_t steps, std::vector* ptrs) { } } +static void InitThreadStructs() { + auto* tlh = mi_heap_get_backing(); + init_zmalloc_threadlocal(tlh); + SmallString::InitThreadLocal(tlh); + thread_local MiMemoryResource mi_resource(tlh); + CompactObj::InitThreadLocal(&mi_resource); +}; + +static void CheckEverythingDeallocated() { + mi_heap_collect(mi_heap_get_backing(), true); + + auto cb_visit = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block, + size_t block_size, void* arg) { + LOG(ERROR) << "Unfreed allocations: block_size " << block_size + << ", allocated: " << area->used * block_size; + return true; + }; + + mi_heap_visit_blocks(mi_heap_get_backing(), false /* do not visit all blocks*/, cb_visit, + nullptr); +} + class CompactObjectTest : public ::testing::Test { protected: static void SetUpTestSuite() { InitRedisTables(); // to initialize server struct. - auto* tlh = mi_heap_get_backing(); - init_zmalloc_threadlocal(tlh); - SmallString::InitThreadLocal(tlh); - CompactObj::InitThreadLocal(PMR_NS::get_default_resource()); + InitThreadStructs(); } static void TearDownTestSuite() { - mi_heap_collect(mi_heap_get_backing(), true); - - auto cb_visit = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block, - size_t block_size, void* arg) { - LOG(ERROR) << "Unfreed allocations: block_size " << block_size - << ", allocated: " << area->used * block_size; - return true; - }; - - mi_heap_visit_blocks(mi_heap_get_backing(), false /* do not visit all blocks*/, cb_visit, - nullptr); + CheckEverythingDeallocated(); } CompactObj cobj_; diff --git a/src/core/dense_set.cc b/src/core/dense_set.cc index 0b8b7479d4d9..9750ded2a061 100644 --- a/src/core/dense_set.cc +++ b/src/core/dense_set.cc @@ -168,14 +168,14 @@ auto DenseSet::PopPtrFront(DenseSet::ChainVectorIterator it) -> DensePtr { return front; } -uint32_t DenseSet::ClearInternal(uint32_t start, uint32_t count) { +uint32_t DenseSet::ClearStep(uint32_t start, uint32_t count) { constexpr unsigned kArrLen = 32; ClearItem arr[kArrLen]; unsigned len = 0; size_t end = min(entries_.size(), start + count); for (size_t i = start; i < end; ++i) { - DensePtr ptr = entries_[i]; + DensePtr& ptr = entries_[i]; if (ptr.IsEmpty()) continue; @@ -190,6 +190,7 @@ uint32_t DenseSet::ClearInternal(uint32_t start, uint32_t count) { dest.ptr = ptr; dest.obj = nullptr; } + ptr.Reset(); if (len == kArrLen) { ClearBatch(kArrLen, arr); len = 0; diff --git a/src/core/dense_set.h b/src/core/dense_set.h index 1cc5ac135b78..0ac73a47bc0c 100644 --- a/src/core/dense_set.h +++ b/src/core/dense_set.h @@ -215,9 +215,13 @@ class DenseSet { virtual ~DenseSet(); void Clear() { - ClearInternal(0, entries_.size()); + ClearStep(0, entries_.size()); } + // Returns the next bucket index that should be cleared. + // Returns BucketCount when all objects are erased. + uint32_t ClearStep(uint32_t start, uint32_t count); + // Returns the number of elements in the map. Note that it might be that some of these elements // have expired and can't be accessed. size_t UpperBoundSize() const { @@ -303,11 +307,6 @@ class DenseSet { void* PopInternal(); - // Note this does not free any dynamic allocations done by derived classes, that a DensePtr - // in the set may point to. This function only frees the allocated DenseLinkKeys created by - // DenseSet. All data allocated by a derived class should be freed before calling this - uint32_t ClearInternal(uint32_t start, uint32_t count); - void IncreaseMallocUsed(size_t delta) { obj_malloc_used_ += delta; } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 88f18696b555..633b8a899296 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -670,6 +670,8 @@ void DbSlice::ActivateDb(DbIndex db_ind) { } bool DbSlice::Del(Context cntx, Iterator it) { + DCHECK(IsValid(it)); + if (!IsValid(it)) { return false; } diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 5396b8f6f50d..f637f1a116a2 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -426,12 +426,11 @@ bool EngineShard::DoDefrag() { // priority. // otherwise lower the task priority so that it would not use the CPU when not required uint32_t EngineShard::DefragTask() { + constexpr uint32_t kRunAtLowPriority = 0u; if (!namespaces) { - return util::ProactorBase::kOnIdleMaxLevel; + return kRunAtLowPriority; } - constexpr uint32_t kRunAtLowPriority = 0u; - if (defrag_state_.CheckRequired()) { VLOG(2) << shard_id_ << ": need to run defrag memory cursor state: " << defrag_state_.cursor; if (DoDefrag()) { diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 94a0319a004e..d529906fc1ff 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -995,35 +995,14 @@ std::optional ParseExpireOptionsOrReply(const CmdArgList args, SinkRepl return flags; } -} // namespace - -OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) { - DVLOG(1) << "Del: " << keys.Front(); - auto& db_slice = op_args.GetDbSlice(); - - uint32_t res = 0; - - for (string_view key : keys) { - auto fres = db_slice.FindMutable(op_args.db_cntx, key); - if (!IsValid(fres.it)) - continue; - fres.post_updater.Run(); - res += int(db_slice.Del(op_args.db_cntx, fres.it)); - } - - return res; -} - -void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) { - VLOG(1) << "Del " << ArgS(args, 0); - +void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx) { atomic_uint32_t result{0}; auto* builder = cmd_cntx.rb; bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE); auto cb = [&result](const Transaction* t, EngineShard* shard) { ShardArgs args = t->GetShardArgs(shard->shard_id()); - auto res = OpDel(t->GetOpArgs(shard), args); + auto res = GenericFamily::OpDel(t->GetOpArgs(shard), args); result.fetch_add(res.value_or(0), memory_order_relaxed); return OpStatus::OK; @@ -1049,6 +1028,35 @@ void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) { } } +} // namespace + +OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) { + DVLOG(1) << "Del: " << keys.Front(); + auto& db_slice = op_args.GetDbSlice(); + + uint32_t res = 0; + + for (string_view key : keys) { + auto fres = db_slice.FindMutable(op_args.db_cntx, key); + if (!IsValid(fres.it)) + continue; + fres.post_updater.Run(); + res += int(db_slice.Del(op_args.db_cntx, fres.it)); + } + + return res; +} + +void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) { + VLOG(1) << "Del " << ArgS(args, 0); + + DeleteGeneric(args, cmd_cntx); +} + +void GenericFamily::Unlink(CmdArgList args, const CommandContext& cmd_cntx) { + DeleteGeneric(args, cmd_cntx); +} + void GenericFamily::Ping(CmdArgList args, const CommandContext& cmd_cntx) { auto* rb = static_cast(cmd_cntx.rb); if (args.size() > 1) { @@ -1886,7 +1894,7 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"TIME", CO::LOADING | CO::FAST, 1, 0, 0, acl::kTime}.HFUNC(Time) << CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, acl::kType}.HFUNC(Type) << CI{"DUMP", CO::READONLY, 2, 1, 1, acl::kDump}.HFUNC(Dump) - << CI{"UNLINK", CO::WRITE, -2, 1, -1, acl::kUnlink}.HFUNC(Del) + << CI{"UNLINK", CO::WRITE, -2, 1, -1, acl::kUnlink}.HFUNC(Unlink) << CI{"STICK", CO::WRITE, -2, 1, -1, acl::kStick}.HFUNC(Stick) << CI{"SORT", CO::READONLY, -2, 1, 1, acl::kSort}.HFUNC(Sort) << CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kMove}.HFUNC( diff --git a/src/server/generic_family.h b/src/server/generic_family.h index 40af99bedfff..0fb231ec80eb 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -35,6 +35,7 @@ class GenericFamily { using SinkReplyBuilder = facade::SinkReplyBuilder; static void Del(CmdArgList args, const CommandContext& cmd_cntx); + static void Unlink(CmdArgList args, const CommandContext& cmd_cntx); static void Ping(CmdArgList args, const CommandContext& cmd_cntx); static void Exists(CmdArgList args, const CommandContext& cmd_cntx); static void Expire(CmdArgList args, const CommandContext& cmd_cntx); diff --git a/src/server/set_family.cc b/src/server/set_family.cc index a2ab2cbcf66b..a41f21948325 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -443,9 +443,11 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, const NewE // key if it exists. if (overwrite && (vals_it.begin() == vals_it.end())) { auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately - db_slice.Del(op_args.db_cntx, it); - if (journal_update && op_args.shard->journal()) { - RecordJournal(op_args, "DEL"sv, ArgSlice{key}); + if (IsValid(it)) { + db_slice.Del(op_args.db_cntx, it); + if (journal_update && op_args.shard->journal()) { + RecordJournal(op_args, "DEL"sv, ArgSlice{key}); + } } return 0; }