Skip to content

Commit

Permalink
chore: minor refactorings around deletions
Browse files Browse the repository at this point in the history
No functional changes. Done as a preparation to introduce asynchronous deletions for sets/zsets/hmaps.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Jan 1, 2025
1 parent 14fda35 commit 3aa19ea
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 55 deletions.
4 changes: 2 additions & 2 deletions src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -791,8 +791,8 @@ uint64_t CompactObj::HashCode() const {
}

if (encoded) {
GetString(&tl.tmp_str);
return XXH3_64bits_withSeed(tl.tmp_str.data(), tl.tmp_str.size(), kHashSeed);
string_view sv = GetSlice(&tl.tmp_str);
return XXH3_64bits_withSeed(sv.data(), sv.size(), kHashSeed);
}

switch (taglen_) {
Expand Down
39 changes: 24 additions & 15 deletions src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,38 @@ void DeallocateAtRandom(size_t steps, std::vector<void*>* ptrs) {
}
}

static void InitThreadStructs() {
auto* tlh = mi_heap_get_backing();
init_zmalloc_threadlocal(tlh);
SmallString::InitThreadLocal(tlh);
thread_local MiMemoryResource mi_resource(tlh);
CompactObj::InitThreadLocal(&mi_resource);
};

static void CheckEverythingDeallocated() {
mi_heap_collect(mi_heap_get_backing(), true);

auto cb_visit = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) {
LOG(ERROR) << "Unfreed allocations: block_size " << block_size
<< ", allocated: " << area->used * block_size;
return true;
};

mi_heap_visit_blocks(mi_heap_get_backing(), false /* do not visit all blocks*/, cb_visit,
nullptr);
}

class CompactObjectTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
InitRedisTables(); // to initialize server struct.

auto* tlh = mi_heap_get_backing();
init_zmalloc_threadlocal(tlh);
SmallString::InitThreadLocal(tlh);
CompactObj::InitThreadLocal(PMR_NS::get_default_resource());
InitThreadStructs();
}

static void TearDownTestSuite() {
mi_heap_collect(mi_heap_get_backing(), true);

auto cb_visit = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) {
LOG(ERROR) << "Unfreed allocations: block_size " << block_size
<< ", allocated: " << area->used * block_size;
return true;
};

mi_heap_visit_blocks(mi_heap_get_backing(), false /* do not visit all blocks*/, cb_visit,
nullptr);
CheckEverythingDeallocated();
}

CompactObj cobj_;
Expand Down
5 changes: 3 additions & 2 deletions src/core/dense_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ auto DenseSet::PopPtrFront(DenseSet::ChainVectorIterator it) -> DensePtr {
return front;
}

uint32_t DenseSet::ClearInternal(uint32_t start, uint32_t count) {
uint32_t DenseSet::ClearStep(uint32_t start, uint32_t count) {
constexpr unsigned kArrLen = 32;
ClearItem arr[kArrLen];
unsigned len = 0;

size_t end = min<size_t>(entries_.size(), start + count);
for (size_t i = start; i < end; ++i) {
DensePtr ptr = entries_[i];
DensePtr& ptr = entries_[i];
if (ptr.IsEmpty())
continue;

Expand All @@ -190,6 +190,7 @@ uint32_t DenseSet::ClearInternal(uint32_t start, uint32_t count) {
dest.ptr = ptr;
dest.obj = nullptr;
}
ptr.Reset();
if (len == kArrLen) {
ClearBatch(kArrLen, arr);
len = 0;
Expand Down
11 changes: 5 additions & 6 deletions src/core/dense_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,13 @@ class DenseSet {
virtual ~DenseSet();

void Clear() {
ClearInternal(0, entries_.size());
ClearStep(0, entries_.size());
}

// Returns the next bucket index that should be cleared.
// Returns BucketCount when all objects are erased.
uint32_t ClearStep(uint32_t start, uint32_t count);

// Returns the number of elements in the map. Note that it might be that some of these elements
// have expired and can't be accessed.
size_t UpperBoundSize() const {
Expand Down Expand Up @@ -303,11 +307,6 @@ class DenseSet {

void* PopInternal();

// Note this does not free any dynamic allocations done by derived classes, that a DensePtr
// in the set may point to. This function only frees the allocated DenseLinkKeys created by
// DenseSet. All data allocated by a derived class should be freed before calling this
uint32_t ClearInternal(uint32_t start, uint32_t count);

void IncreaseMallocUsed(size_t delta) {
obj_malloc_used_ += delta;
}
Expand Down
2 changes: 2 additions & 0 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,8 @@ void DbSlice::ActivateDb(DbIndex db_ind) {
}

bool DbSlice::Del(Context cntx, Iterator it) {
DCHECK(IsValid(it));

if (!IsValid(it)) {
return false;
}
Expand Down
5 changes: 2 additions & 3 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,11 @@ bool EngineShard::DoDefrag() {
// priority.
// otherwise lower the task priority so that it would not use the CPU when not required
uint32_t EngineShard::DefragTask() {
constexpr uint32_t kRunAtLowPriority = 0u;
if (!namespaces) {
return util::ProactorBase::kOnIdleMaxLevel;
return kRunAtLowPriority;
}

constexpr uint32_t kRunAtLowPriority = 0u;

if (defrag_state_.CheckRequired()) {
VLOG(2) << shard_id_ << ": need to run defrag memory cursor state: " << defrag_state_.cursor;
if (DoDefrag()) {
Expand Down
56 changes: 32 additions & 24 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -995,35 +995,14 @@ std::optional<int32_t> ParseExpireOptionsOrReply(const CmdArgList args, SinkRepl
return flags;
}

} // namespace

OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) {
DVLOG(1) << "Del: " << keys.Front();
auto& db_slice = op_args.GetDbSlice();

uint32_t res = 0;

for (string_view key : keys) {
auto fres = db_slice.FindMutable(op_args.db_cntx, key);
if (!IsValid(fres.it))
continue;
fres.post_updater.Run();
res += int(db_slice.Del(op_args.db_cntx, fres.it));
}

return res;
}

void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) {
VLOG(1) << "Del " << ArgS(args, 0);

void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx) {
atomic_uint32_t result{0};
auto* builder = cmd_cntx.rb;
bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE);

auto cb = [&result](const Transaction* t, EngineShard* shard) {
ShardArgs args = t->GetShardArgs(shard->shard_id());
auto res = OpDel(t->GetOpArgs(shard), args);
auto res = GenericFamily::OpDel(t->GetOpArgs(shard), args);
result.fetch_add(res.value_or(0), memory_order_relaxed);

return OpStatus::OK;
Expand All @@ -1049,6 +1028,35 @@ void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) {
}
}

} // namespace

OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) {
DVLOG(1) << "Del: " << keys.Front();
auto& db_slice = op_args.GetDbSlice();

uint32_t res = 0;

for (string_view key : keys) {
auto fres = db_slice.FindMutable(op_args.db_cntx, key);
if (!IsValid(fres.it))
continue;
fres.post_updater.Run();
res += int(db_slice.Del(op_args.db_cntx, fres.it));
}

return res;
}

void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) {
VLOG(1) << "Del " << ArgS(args, 0);

DeleteGeneric(args, cmd_cntx);
}

void GenericFamily::Unlink(CmdArgList args, const CommandContext& cmd_cntx) {
DeleteGeneric(args, cmd_cntx);
}

void GenericFamily::Ping(CmdArgList args, const CommandContext& cmd_cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (args.size() > 1) {
Expand Down Expand Up @@ -1886,7 +1894,7 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"TIME", CO::LOADING | CO::FAST, 1, 0, 0, acl::kTime}.HFUNC(Time)
<< CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, acl::kType}.HFUNC(Type)
<< CI{"DUMP", CO::READONLY, 2, 1, 1, acl::kDump}.HFUNC(Dump)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, acl::kUnlink}.HFUNC(Del)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, acl::kUnlink}.HFUNC(Unlink)
<< CI{"STICK", CO::WRITE, -2, 1, -1, acl::kStick}.HFUNC(Stick)
<< CI{"SORT", CO::READONLY, -2, 1, 1, acl::kSort}.HFUNC(Sort)
<< CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kMove}.HFUNC(
Expand Down
1 change: 1 addition & 0 deletions src/server/generic_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class GenericFamily {
using SinkReplyBuilder = facade::SinkReplyBuilder;

static void Del(CmdArgList args, const CommandContext& cmd_cntx);
static void Unlink(CmdArgList args, const CommandContext& cmd_cntx);
static void Ping(CmdArgList args, const CommandContext& cmd_cntx);
static void Exists(CmdArgList args, const CommandContext& cmd_cntx);
static void Expire(CmdArgList args, const CommandContext& cmd_cntx);
Expand Down
8 changes: 5 additions & 3 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,11 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE
// key if it exists.
if (overwrite && (vals_it.begin() == vals_it.end())) {
auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately
db_slice.Del(op_args.db_cntx, it);
if (journal_update && op_args.shard->journal()) {
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
if (IsValid(it)) {
db_slice.Del(op_args.db_cntx, it);
if (journal_update && op_args.shard->journal()) {
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
}
}
return 0;
}
Expand Down

0 comments on commit 3aa19ea

Please sign in to comment.