From 8ef92629c58135b3e39752fe0b2ac7fc123254ff Mon Sep 17 00:00:00 2001 From: Leonardo Mello Date: Tue, 27 Feb 2024 11:03:21 -0300 Subject: [PATCH] feat(generic_family): implement RANDOMKEY command (#2639) Signed-off-by: Leonardo Mello --- src/core/dash.h | 13 ++++++++ src/server/generic_family.cc | 55 ++++++++++++++++++++++++++++++- src/server/generic_family.h | 1 + src/server/generic_family_test.cc | 8 +++++ 4 files changed, 76 insertions(+), 1 deletion(-) diff --git a/src/core/dash.h b/src/core/dash.h index bd3d4c3c6042..4328b5b4fd5b 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -5,6 +5,7 @@ #include +#include "absl/random/random.h" #include "base/pmr/memory_resource.h" #include "core/dash_internal.h" @@ -225,6 +226,10 @@ class DashTable : public detail::DashTableBase { return double(size()) / (SegmentType::capacity() * unique_segments()); } + // Gets a random cursor based on the available segments and buckets. + // Returns: cursor with a random position + Cursor GetRandomCursor(absl::BitGen* bitgen); + // Traverses over a single bucket in table and calls cb(iterator) 0 or more // times. if cursor=0 starts traversing from the beginning, otherwise continues from where it // stopped. returns 0 if the supplied cursor reached end of traversal. Traverse iterates at bucket @@ -916,6 +921,14 @@ auto DashTable<_Key, _Value, Policy>::TraverseBySegmentOrder(Cursor curs, Cb&& c return Cursor{global_depth_, sid, bid}; } +template +auto DashTable<_Key, _Value, Policy>::GetRandomCursor(absl::BitGen* bitgen) -> Cursor { + uint32_t sid = absl::Uniform(*bitgen, 0, segment_.size()); + uint8_t bid = absl::Uniform(*bitgen, 0, kLogicalBucketNum); + + return Cursor{global_depth_, sid, bid}; +} + template template auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor { diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 8303cf057131..8a7d45343374 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -4,6 +4,10 @@ #include "server/generic_family.h" +#include + +#include "facade/reply_builder.h" + extern "C" { #include "redis/crc64.h" #include "redis/util.h" @@ -1509,6 +1513,54 @@ OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex t return OpStatus::OK; } +void GenericFamily::RandomKey(CmdArgList args, ConnectionContext* cntx) { + const static size_t kMaxAttempts = 3; + + absl::BitGen bitgen; + atomic_size_t candidates_counter{0}; + DbContext db_cntx{.db_index = cntx->conn_state.db_index}; + ScanOpts scan_opts; + scan_opts.limit = 3; // number of entries per shard + std::vector candidates_collection(shard_set->size()); + + shard_set->RunBriefInParallel( + [&](EngineShard* shard) { + auto [prime_table, expire_table] = shard->db_slice().GetTables(db_cntx.db_index); + if (prime_table->size() == 0) { + return; + } + + StringVec* candidates = &candidates_collection[shard->shard_id()]; + + for (size_t i = 0; i <= kMaxAttempts; ++i) { + if (!candidates->empty()) { + break; + } + uint64_t cursor = 0; // scans from the start of the shard after reaching kMaxAttemps + if (i < kMaxAttempts) { + cursor = prime_table->GetRandomCursor(&bitgen).value(); + } + OpScan({shard, 0u, db_cntx}, scan_opts, &cursor, candidates); + } + + candidates_counter.fetch_add(candidates->size(), memory_order_relaxed); + }, + [&](ShardId) { return true; }); + + auto candidates_count = candidates_counter.load(memory_order_relaxed); + std::optional random_key = std::nullopt; + auto random_idx = absl::Uniform(bitgen, 0, candidates_count); + auto* rb = static_cast(cntx->reply_builder()); + for (const auto& candidate : candidates_collection) { + if (random_idx >= candidate.size()) { + random_idx -= candidate.size(); + } else { + return rb->SendBulkString(candidate[random_idx]); + } + } + rb->SendNull(); +} + using CI = CommandId; #define HFUNC(x) SetHandler(&GenericFamily::x) @@ -1580,7 +1632,8 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"SORT", CO::READONLY, -2, 1, 1, acl::kSort}.HFUNC(Sort) << CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kMove}.HFUNC( Move) - << CI{"RESTORE", CO::WRITE, -4, 1, 1, acl::kRestore}.HFUNC(Restore); + << CI{"RESTORE", CO::WRITE, -4, 1, 1, acl::kRestore}.HFUNC(Restore) + << CI{"RANDOMKEY", CO::READONLY, 1, 0, 0, 0}.HFUNC(RandomKey); } } // namespace dfly diff --git a/src/server/generic_family.h b/src/server/generic_family.h index e5581cebaebd..f7e888c6707b 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -68,6 +68,7 @@ class GenericFamily { static void Type(CmdArgList args, ConnectionContext* cntx); static void Dump(CmdArgList args, ConnectionContext* cntx); static void Restore(CmdArgList args, ConnectionContext* cntx); + static void RandomKey(CmdArgList args, ConnectionContext* cntx); static void FieldTtl(CmdArgList args, ConnectionContext* cntx); static OpResult RenameGeneric(CmdArgList args, bool skip_exist_dest, diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 966958015b1c..4eb323a1590b 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -734,4 +734,12 @@ TEST_F(GenericFamilyTest, FieldTtl) { EXPECT_EQ(-3, CheckedInt({"fieldttl", "k2", "f4"})); } +TEST_F(GenericFamilyTest, RandomKey) { + auto resp = Run({"randomkey"}); + EXPECT_EQ(resp.type, RespExpr::NIL); + + resp = Run({"set", "k1", "1"}); + EXPECT_EQ(Run({"randomkey"}), "k1"); +} + } // namespace dfly