From 933c9f0b1c09818cad74eab2e115bd376d872319 Mon Sep 17 00:00:00 2001 From: Borys Date: Wed, 8 Jan 2025 11:38:13 +0200 Subject: [PATCH] refactor: remove transaction lib on cluster code dependency (#4417) --- src/server/CMakeLists.txt | 4 +- src/server/cluster/cluster_config.cc | 10 ++- src/server/cluster/cluster_defs.cc | 62 ++--------------- src/server/cluster/cluster_defs.h | 14 +--- src/server/cluster/cluster_family.cc | 2 +- src/server/cluster/cluster_utility.cc | 35 ++-------- src/server/cluster/cluster_utility.h | 16 ----- src/server/cluster_support.cc | 99 +++++++++++++++++++++++++++ src/server/cluster_support.h | 41 +++++++++++ src/server/common.h | 1 + src/server/db_slice.cc | 27 ++++---- src/server/db_slice.h | 2 +- src/server/debugcmd.cc | 8 +-- src/server/engine_shard.cc | 2 +- src/server/generic_family.cc | 3 +- src/server/journal/journal.cc | 2 +- src/server/journal/journal.h | 2 +- src/server/journal/streamer.cc | 4 +- src/server/journal/streamer.h | 2 +- src/server/journal/types.cc | 2 - src/server/journal/types.h | 13 ++-- src/server/main_service.cc | 29 ++++---- src/server/multi_command_squasher.cc | 5 +- src/server/multi_command_squasher.h | 2 +- src/server/rdb_load.cc | 5 +- src/server/server_family.cc | 6 +- src/server/table.cc | 6 +- src/server/transaction.cc | 5 +- src/server/transaction.h | 8 +-- src/server/tx_base.cc | 2 +- 30 files changed, 221 insertions(+), 198 deletions(-) create mode 100644 src/server/cluster_support.cc create mode 100644 src/server/cluster_support.h diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index bc323ef97764..ebf0e147db1f 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -27,7 +27,7 @@ endif() add_library(dfly_transaction db_slice.cc blocking_controller.cc - command_registry.cc cluster/cluster_utility.cc + command_registry.cc cluster_support.cc journal/cmd_serializer.cc journal/tx_executor.cc namespaces.cc common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc @@ -60,7 +60,7 @@ add_library(dragonfly_lib bloom_family.cc ${DF_SEARCH_SRCS} ${DF_LINUX_SRCS} cluster/cluster_config.cc cluster/cluster_family.cc cluster/incoming_slot_migration.cc - cluster/outgoing_slot_migration.cc cluster/cluster_defs.cc + cluster/outgoing_slot_migration.cc cluster/cluster_defs.cc cluster/cluster_utility.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc acl/validator.cc) diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index ae5bb331a254..a71b9f150297 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -1,3 +1,7 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + #include "cluster_config.h" #include @@ -39,7 +43,7 @@ bool HasValidNodeIds(const ClusterShardInfos& new_config) { bool IsConfigValid(const ClusterShardInfos& new_config) { // Make sure that all slots are set exactly once. - vector slots_found(cluster::kMaxSlotNum + 1); + vector slots_found(kMaxSlotNum + 1); if (!HasValidNodeIds(new_config)) { return false; @@ -309,7 +313,7 @@ std::shared_ptr ClusterConfig::CloneWithoutMigrations() const { } bool ClusterConfig::IsMySlot(SlotId id) const { - if (id > cluster::kMaxSlotNum) { + if (id > kMaxSlotNum) { DCHECK(false) << "Requesting a non-existing slot id " << id; return false; } @@ -322,7 +326,7 @@ bool ClusterConfig::IsMySlot(std::string_view key) const { } ClusterNodeInfo ClusterConfig::GetMasterNodeForSlot(SlotId id) const { - CHECK_LE(id, cluster::kMaxSlotNum) << "Requesting a non-existing slot id " << id; + CHECK_LE(id, kMaxSlotNum) << "Requesting a non-existing slot id " << id; for (const auto& shard : config_) { if (shard.slot_ranges.Contains(id)) { diff --git a/src/server/cluster/cluster_defs.cc b/src/server/cluster/cluster_defs.cc index 36e77b52c195..5c63fa0fa3ae 100644 --- a/src/server/cluster/cluster_defs.cc +++ b/src/server/cluster/cluster_defs.cc @@ -1,27 +1,20 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// -extern "C" { -#include "redis/crc16.h" -} +#include "cluster_defs.h" #include #include -#include "base/flags.h" -#include "base/logging.h" -#include "cluster_defs.h" #include "facade/error.h" #include "slot_set.h" -#include "src/server/common.h" // TODO remove when tl_cluster_config will be moved out from it #include "server/cluster/cluster_family.h" using namespace std; -ABSL_FLAG(string, cluster_mode, "", - "Cluster mode supported. Possible values are " - "'emulated', 'yes' or ''"); - namespace dfly::cluster { std::string SlotRange::ToString() const { return absl::StrCat("[", start, ", ", end, "]"); @@ -69,53 +62,6 @@ ClusterShardInfos::ClusterShardInfos(std::vector infos) std::sort(infos_.begin(), infos_.end()); } -namespace { -enum class ClusterMode { - kUninitialized, - kNoCluster, - kEmulatedCluster, - kRealCluster, -}; - -ClusterMode cluster_mode = ClusterMode::kUninitialized; -} // namespace - -void InitializeCluster() { - string cluster_mode_str = absl::GetFlag(FLAGS_cluster_mode); - - if (cluster_mode_str == "emulated") { - cluster_mode = ClusterMode::kEmulatedCluster; - } else if (cluster_mode_str == "yes") { - cluster_mode = ClusterMode::kRealCluster; - } else if (cluster_mode_str.empty()) { - cluster_mode = ClusterMode::kNoCluster; - } else { - LOG(ERROR) << "Invalid value for flag --cluster_mode. Exiting..."; - exit(1); - } -} - -bool IsClusterEnabled() { - return cluster_mode == ClusterMode::kRealCluster; -} - -bool IsClusterEmulated() { - return cluster_mode == ClusterMode::kEmulatedCluster; -} - -SlotId KeySlot(std::string_view key) { - string_view tag = LockTagOptions::instance().Tag(key); - return crc16(tag.data(), tag.length()) & kMaxSlotNum; -} - -bool IsClusterEnabledOrEmulated() { - return IsClusterEnabled() || IsClusterEmulated(); -} - -bool IsClusterShardedByTag() { - return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled; -} - facade::ErrorReply SlotOwnershipError(SlotId slot_id) { const cluster::ClusterConfig* cluster_config = ClusterFamily::cluster_config(); if (!cluster_config) diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h index 7ef509d49c2f..4d6130288ac1 100644 --- a/src/server/cluster/cluster_defs.h +++ b/src/server/cluster/cluster_defs.h @@ -11,14 +11,10 @@ #include #include "facade/facade_types.h" +#include "server/cluster_support.h" namespace dfly::cluster { -using SlotId = uint16_t; - -constexpr SlotId kMaxSlotNum = 0x3FFF; -constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1; - struct SlotRange { static constexpr SlotId kMaxSlotId = 0x3FFF; SlotId start = 0; @@ -169,15 +165,7 @@ enum class MigrationState : uint8_t { C_FINISHED, }; -SlotId KeySlot(std::string_view key); - // return error message if slot doesn't belong to this node facade::ErrorReply SlotOwnershipError(SlotId slot_id); -void InitializeCluster(); -bool IsClusterEnabled(); -bool IsClusterEmulated(); -bool IsClusterEnabledOrEmulated(); -bool IsClusterShardedByTag(); - } // namespace dfly::cluster diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 3d71befbcd1b..5caa6cd3b23b 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -406,7 +406,7 @@ void ClusterFamily::KeySlot(CmdArgList args, SinkReplyBuilder* builder) { return builder->SendError(WrongNumArgsError("CLUSTER KEYSLOT")); } - SlotId id = cluster::KeySlot(ArgS(args, 1)); + SlotId id = dfly::KeySlot(ArgS(args, 1)); return builder->SendLong(id); } diff --git a/src/server/cluster/cluster_utility.cc b/src/server/cluster/cluster_utility.cc index 9fe22876bbb6..c0dedc6d3702 100644 --- a/src/server/cluster/cluster_utility.cc +++ b/src/server/cluster/cluster_utility.cc @@ -1,3 +1,7 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + #include "server/cluster/cluster_utility.h" #include "server/cluster/cluster_defs.h" @@ -8,37 +12,6 @@ using namespace std; namespace dfly::cluster { -void UniqueSlotChecker::Add(std::string_view key) { - if (!IsClusterEnabled()) { - return; - } - - Add(KeySlot(key)); -} - -void UniqueSlotChecker::Add(SlotId slot_id) { - if (!IsClusterEnabled()) { - return; - } - - if (!slot_id_.has_value()) { - slot_id_ = slot_id; - return; - } - - if (*slot_id_ != slot_id) { - slot_id_ = kInvalidSlotId; - } -} - -optional UniqueSlotChecker::GetUniqueSlotId() const { - if (slot_id_.has_value() && *slot_id_ == kInvalidSlotId) { - return nullopt; - } - - return slot_id_; -} - uint64_t GetKeyCount(const SlotRanges& slots) { std::atomic_uint64_t keys = 0; diff --git a/src/server/cluster/cluster_utility.h b/src/server/cluster/cluster_utility.h index 31ef71d8c0a3..7e13456207ae 100644 --- a/src/server/cluster/cluster_utility.h +++ b/src/server/cluster/cluster_utility.h @@ -4,26 +4,10 @@ #pragma once -#include -#include - #include "server/cluster/cluster_defs.h" namespace dfly::cluster { -// A simple utility class that "aggregates" SlotId-s and can tell whether all inputs were the same. -// Only works when cluster is enabled. -class UniqueSlotChecker { - public: - void Add(std::string_view key); - void Add(SlotId slot_id); - - std::optional GetUniqueSlotId() const; - - private: - std::optional slot_id_; -}; - uint64_t GetKeyCount(const SlotRanges& slots); } // namespace dfly::cluster diff --git a/src/server/cluster_support.cc b/src/server/cluster_support.cc new file mode 100644 index 000000000000..97f07fc72ed6 --- /dev/null +++ b/src/server/cluster_support.cc @@ -0,0 +1,99 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +extern "C" { +#include "redis/crc16.h" +} + +#include "base/flags.h" +#include "base/logging.h" +#include "cluster_support.h" + +using namespace std; + +ABSL_FLAG(string, cluster_mode, "", + "Cluster mode supported. Possible values are " + "'emulated', 'yes' or ''"); + +namespace dfly { + +void UniqueSlotChecker::Add(std::string_view key) { + if (!IsClusterEnabled()) { + return; + } + + Add(KeySlot(key)); +} + +void UniqueSlotChecker::Add(SlotId slot_id) { + if (!IsClusterEnabled()) { + return; + } + + if (!slot_id_.has_value()) { + slot_id_ = slot_id; + return; + } + + if (*slot_id_ != slot_id) { + slot_id_ = kInvalidSlotId; + } +} + +optional UniqueSlotChecker::GetUniqueSlotId() const { + if (slot_id_.has_value() && *slot_id_ == kInvalidSlotId) { + return nullopt; + } + + return slot_id_; +} + +namespace { +enum class ClusterMode { + kUninitialized, + kNoCluster, + kEmulatedCluster, + kRealCluster, +}; + +ClusterMode cluster_mode = ClusterMode::kUninitialized; +} // namespace + +void InitializeCluster() { + string cluster_mode_str = absl::GetFlag(FLAGS_cluster_mode); + + if (cluster_mode_str == "emulated") { + cluster_mode = ClusterMode::kEmulatedCluster; + } else if (cluster_mode_str == "yes") { + cluster_mode = ClusterMode::kRealCluster; + } else if (cluster_mode_str.empty()) { + cluster_mode = ClusterMode::kNoCluster; + } else { + LOG(ERROR) << "Invalid value for flag --cluster_mode. Exiting..."; + exit(1); + } +} + +bool IsClusterEnabled() { + return cluster_mode == ClusterMode::kRealCluster; +} + +bool IsClusterEmulated() { + return cluster_mode == ClusterMode::kEmulatedCluster; +} + +SlotId KeySlot(std::string_view key) { + string_view tag = LockTagOptions::instance().Tag(key); + return crc16(tag.data(), tag.length()) & kMaxSlotNum; +} + +bool IsClusterEnabledOrEmulated() { + return IsClusterEnabled() || IsClusterEmulated(); +} + +bool IsClusterShardedByTag() { + return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled; +} + +} // namespace dfly diff --git a/src/server/cluster_support.h b/src/server/cluster_support.h new file mode 100644 index 000000000000..8f08ea39212d --- /dev/null +++ b/src/server/cluster_support.h @@ -0,0 +1,41 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include +#include + +#include "common.h" + +namespace dfly { + +using SlotId = std::uint16_t; + +constexpr SlotId kMaxSlotNum = 0x3FFF; +constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1; + +// A simple utility class that "aggregates" SlotId-s and can tell whether all inputs were the same. +// Only works when cluster is enabled. +class UniqueSlotChecker { + public: + void Add(std::string_view key); + void Add(SlotId slot_id); + + std::optional GetUniqueSlotId() const; + + private: + std::optional slot_id_; +}; + +SlotId KeySlot(std::string_view key); + +void InitializeCluster(); +bool IsClusterEnabled(); +bool IsClusterEmulated(); +bool IsClusterEnabledOrEmulated(); +bool IsClusterShardedByTag(); + +} // namespace dfly diff --git a/src/server/common.h b/src/server/common.h index 2b66b29ee38b..67165e92911d 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -32,6 +32,7 @@ constexpr int64_t kMaxExpireDeadlineMs = kMaxExpireDeadlineSec * 1000; using LSN = uint64_t; using TxId = uint64_t; using TxClock = uint64_t; +using SlotId = std::uint16_t; using facade::ArgS; using facade::CmdArgList; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index d765caf5cb5d..4f2b5aa43795 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -10,7 +10,6 @@ #include "base/logging.h" #include "search/doc_index.h" #include "server/channel_store.h" -#include "server/cluster/cluster_defs.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/journal/journal.h" @@ -63,8 +62,8 @@ void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable* stats.AddTypeMemoryUsage(type, size); - if (cluster::IsClusterEnabled()) { - db->slots_stats[cluster::KeySlot(key)].memory_bytes += size; + if (IsClusterEnabled()) { + db->slots_stats[KeySlot(key)].memory_bytes += size; } } @@ -312,7 +311,7 @@ auto DbSlice::GetStats() const -> Stats { return s; } -SlotStats DbSlice::GetSlotStats(cluster::SlotId sid) const { +SlotStats DbSlice::GetSlotStats(SlotId sid) const { CHECK(db_arr_[0]); return db_arr_[0]->slots_stats[sid]; } @@ -487,8 +486,8 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: break; case UpdateStatsMode::kReadStats: events_.hits++; - if (cluster::IsClusterEnabled()) { - db.slots_stats[cluster::KeySlot(key)].total_reads++; + if (IsClusterEnabled()) { + db.slots_stats[KeySlot(key)].total_reads++; } if (res.it->second.IsExternal()) { if (res.it->second.IsCool()) @@ -651,8 +650,8 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt events_.stash_unloaded = db.prime.stash_unloaded(); events_.evicted_keys += evp.evicted(); events_.garbage_checked += evp.checked(); - if (cluster::IsClusterEnabled()) { - cluster::SlotId sid = cluster::KeySlot(key); + if (IsClusterEnabled()) { + SlotId sid = KeySlot(key); db.slots_stats[sid].key_count += 1; } @@ -696,7 +695,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { std::string tmp; auto del_entry_cb = [&](PrimeTable::iterator it) { std::string_view key = it->first.GetSlice(&tmp); - cluster::SlotId sid = cluster::KeySlot(key); + SlotId sid = KeySlot(key); if (slot_ids.Contains(sid) && it.GetVersion() < next_version) { PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get()); } @@ -1073,8 +1072,8 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size ++events_.update; - if (cluster::IsClusterEnabled()) { - db.slots_stats[cluster::KeySlot(key)].total_writes += 1; + if (IsClusterEnabled()) { + db.slots_stats[KeySlot(key)].total_writes += 1; } SendInvalidationTrackingMessage(key); @@ -1377,7 +1376,7 @@ void DbSlice::InvalidateDbWatches(DbIndex db_indx) { void DbSlice::InvalidateSlotWatches(const cluster::SlotSet& slot_ids) { for (const auto& [key, conn_list] : db_arr_[0]->watched_keys) { - cluster::SlotId sid = cluster::KeySlot(key); + SlotId sid = KeySlot(key); if (!slot_ids.Contains(sid)) { continue; } @@ -1500,8 +1499,8 @@ void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* tabl --stats.listpack_blob_cnt; } - if (cluster::IsClusterEnabled()) { - cluster::SlotId sid = cluster::KeySlot(del_it.key()); + if (IsClusterEnabled()) { + SlotId sid = KeySlot(del_it.key()); table->slots_stats[sid].key_count -= 1; } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 4f442910e157..b6457bb8c7e7 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -239,7 +239,7 @@ class DbSlice { Stats GetStats() const; // Returns slot statistics for db 0. - SlotStats GetSlotStats(cluster::SlotId sid) const; + SlotStats GetSlotStats(SlotId sid) const; void UpdateExpireBase(uint64_t now, unsigned generation) { expire_base_[generation & 1] = now; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 3c72e57514d4..61620fc96b3c 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -726,7 +726,7 @@ optional DebugCmd::ParsePopulateArgs(CmdArgList args, if (!absl::SimpleAtoi(slot_str, &slot_id)) { return facade::OpStatus::INVALID_INT; } - if (slot_id > cluster::kMaxSlotNum) { + if (slot_id > kMaxSlotNum) { return facade::OpStatus::INVALID_VALUE; } return slot_id; @@ -742,8 +742,8 @@ optional DebugCmd::ParsePopulateArgs(CmdArgList args, builder->SendError(end.status()); return nullopt; } - options.slot_range = cluster::SlotRange{.start = static_cast(start.value()), - .end = static_cast(end.value())}; + options.slot_range = cluster::SlotRange{.start = static_cast(start.value()), + .end = static_cast(end.value())}; } else { builder->SendError(kSyntaxErr); @@ -815,7 +815,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys, // : and continue until num_of_keys are added. // Add keys only in slot range. - cluster::SlotId sid = cluster::KeySlot(key); + SlotId sid = KeySlot(key); if (sid < options.slot_range->start || sid > options.slot_range->end) { ++index; continue; diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index ddb67476057b..e1e9f04ec7e4 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -260,7 +260,7 @@ __thread EngineShard* EngineShard::shard_ = nullptr; uint64_t TEST_current_time_ms = 0; ShardId Shard(string_view v, ShardId shard_num) { - if (cluster::IsClusterShardedByTag()) { + if (IsClusterShardedByTag()) { v = LockTagOptions::instance().Tag(v); } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index efe4fea221c2..3c37e5135b00 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -20,7 +20,6 @@ extern "C" { #include "redis/rdb.h" #include "server/acl/acl_commands_def.h" #include "server/blocking_controller.h" -#include "server/cluster/cluster_defs.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/container_utils.h" @@ -1652,7 +1651,7 @@ void GenericFamily::Select(CmdArgList args, const CommandContext& cmd_cntx) { if (!absl::SimpleAtoi(key, &index)) { return builder->SendError(kInvalidDbIndErr); } - if (cluster::IsClusterEnabled() && index != 0) { + if (IsClusterEnabled() && index != 0) { return builder->SendError("SELECT is not allowed in cluster mode"); } if (index < 0 || index >= absl::GetFlag(FLAGS_dbnum)) { diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 3e4212eb7e89..cf04bc790405 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -84,7 +84,7 @@ LSN Journal::GetLsn() const { } void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, - std::optional slot, Entry::Payload payload) { + std::optional slot, Entry::Payload payload) { journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)}); } diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index ae275471a21d..0b74c6417c42 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -35,7 +35,7 @@ class Journal { LSN GetLsn() const; void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, - std::optional slot, Entry::Payload payload); + std::optional slot, Entry::Payload payload); void SetFlushMode(bool allow_flush); diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index e54bfa2d2b95..e2b04116bfa0 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -278,10 +278,10 @@ bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const { } bool RestoreStreamer::ShouldWrite(std::string_view key) const { - return ShouldWrite(cluster::KeySlot(key)); + return ShouldWrite(KeySlot(key)); } -bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const { +bool RestoreStreamer::ShouldWrite(SlotId slot_id) const { return my_slots_.Contains(slot_id); } diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 907b6e65eee2..6677bdcb08ca 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -99,7 +99,7 @@ class RestoreStreamer : public JournalStreamer { void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); bool ShouldWrite(const journal::JournalItem& item) const override; bool ShouldWrite(std::string_view key) const; - bool ShouldWrite(cluster::SlotId slot_id) const; + bool ShouldWrite(SlotId slot_id) const; // Returns whether anything was written void WriteBucket(PrimeTable::bucket_iterator it); diff --git a/src/server/journal/types.cc b/src/server/journal/types.cc index 4c0e273f7469..ef03b08b13db 100644 --- a/src/server/journal/types.cc +++ b/src/server/journal/types.cc @@ -4,8 +4,6 @@ #include "server/journal/types.h" -#include "server/cluster/cluster_defs.h" - namespace dfly::journal { using namespace std; diff --git a/src/server/journal/types.h b/src/server/journal/types.h index b460cfeb542b..f6eed5bf90b7 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -7,7 +7,6 @@ #include #include -#include "server/cluster/cluster_defs.h" #include "server/common.h" #include "server/table.h" @@ -30,7 +29,7 @@ struct EntryBase { DbIndex dbid; uint32_t shard_cnt; // This field is no longer used by the replica, but we continue to serialize // and deserialize it to maintain backward compatibility. - std::optional slot; + std::optional slot; LSN lsn{0}; }; @@ -52,12 +51,12 @@ struct Entry : public EntryBase { } }; - Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, - std::optional slot_id, Payload pl) + Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, std::optional slot_id, + Payload pl) : EntryBase{txid, opcode, dbid, shard_cnt, slot_id}, payload{pl} { } - Entry(journal::Op opcode, DbIndex dbid, std::optional slot_id) + Entry(journal::Op opcode, DbIndex dbid, std::optional slot_id) : EntryBase{0, opcode, dbid, 0, slot_id, 0} { } @@ -65,7 +64,7 @@ struct Entry : public EntryBase { } Entry(TxId txid, journal::Op opcode, DbIndex dbid, uint32_t shard_cnt, - std::optional slot_id) + std::optional slot_id) : EntryBase{txid, opcode, dbid, shard_cnt, slot_id, 0} { } @@ -93,7 +92,7 @@ struct JournalItem { Op opcode; std::string data; std::string_view cmd; - std::optional slot; + std::optional slot; }; using ChangeCallback = std::function; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 0a21339a0972..2af84299892d 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -39,7 +39,6 @@ extern "C" { #include "server/bloom_family.h" #include "server/channel_store.h" #include "server/cluster/cluster_family.h" -#include "server/cluster/cluster_utility.h" #include "server/conn_context.h" #include "server/error.h" #include "server/generic_family.h" @@ -573,15 +572,13 @@ void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send, auto print_kb = [&](string_view k, bool v) { print_kv(k, v ? "True" : "False"); }; - print_kv("Mode", cluster::IsClusterEmulated() ? "Emulated" - : cluster::IsClusterEnabled() ? "Enabled" - : "Disabled"); + print_kv("Mode", IsClusterEmulated() ? "Emulated" : IsClusterEnabled() ? "Enabled" : "Disabled"); - if (cluster::IsClusterEnabledOrEmulated()) { + if (IsClusterEnabledOrEmulated()) { print_kb("Lock on hashtags", LockTagOptions::instance().enabled); } - if (cluster::IsClusterEnabled()) { + if (IsClusterEnabled()) { if (cluster::ClusterFamily::cluster_config() == nullptr) { resp.body() += "

Not yet configured.

\n"; } else { @@ -932,11 +929,11 @@ optional Service::CheckKeysOwnership(const CommandId* cid, CmdArgLis } const auto& key_index = *key_index_res; - optional keys_slot; + optional keys_slot; bool cross_slot = false; // Iterate keys and check to which slot they belong. for (string_view key : key_index.Range(args)) { - if (cluster::SlotId slot = cluster::KeySlot(key); keys_slot && slot != *keys_slot) { + if (SlotId slot = KeySlot(key); keys_slot && slot != *keys_slot) { cross_slot = true; // keys belong to different slots break; } else { @@ -1104,7 +1101,7 @@ std::optional Service::VerifyCommandState(const CommandId* cid, CmdA return ErrorReply{absl::StrCat("'", cmd_name, "' inside MULTI is not allowed")}; } - if (cluster::IsClusterEnabled()) { + if (IsClusterEnabled()) { if (auto err = CheckKeysOwnership(cid, tail_args, dfly_cntx); err) return err; } @@ -1935,7 +1932,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret optional sid; - cluster::UniqueSlotChecker slot_checker; + UniqueSlotChecker slot_checker; for (size_t i = 0; i < eval_args.keys.size(); ++i) { string_view key = ArgS(eval_args.keys, i); slot_checker.Add(key); @@ -2255,7 +2252,7 @@ void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) { } void Service::Publish(CmdArgList args, const CommandContext& cmd_cntx) { - if (cluster::IsClusterEnabled()) { + if (IsClusterEnabled()) { return cmd_cntx.rb->SendError("PUBLISH is not supported in cluster mode yet"); } string_view channel = ArgS(args, 0); @@ -2266,7 +2263,7 @@ void Service::Publish(CmdArgList args, const CommandContext& cmd_cntx) { } void Service::Subscribe(CmdArgList args, const CommandContext& cmd_cntx) { - if (cluster::IsClusterEnabled()) { + if (IsClusterEnabled()) { return cmd_cntx.rb->SendError("SUBSCRIBE is not supported in cluster mode yet"); } cmd_cntx.conn_cntx->ChangeSubscription(true /*add*/, true /* reply*/, std::move(args), @@ -2275,7 +2272,7 @@ void Service::Subscribe(CmdArgList args, const CommandContext& cmd_cntx) { void Service::Unsubscribe(CmdArgList args, const CommandContext& cmd_cntx) { auto* rb = static_cast(cmd_cntx.rb); - if (cluster::IsClusterEnabled()) { + if (IsClusterEnabled()) { return cmd_cntx.rb->SendError("UNSUBSCRIBE is not supported in cluster mode yet"); } @@ -2289,7 +2286,7 @@ void Service::Unsubscribe(CmdArgList args, const CommandContext& cmd_cntx) { void Service::PSubscribe(CmdArgList args, const CommandContext& cmd_cntx) { auto* rb = static_cast(cmd_cntx.rb); - if (cluster::IsClusterEnabled()) { + if (IsClusterEnabled()) { return rb->SendError("PSUBSCRIBE is not supported in cluster mode yet"); } cmd_cntx.conn_cntx->ChangePSubscription(true, true, args, rb); @@ -2298,7 +2295,7 @@ void Service::PSubscribe(CmdArgList args, const CommandContext& cmd_cntx) { void Service::PUnsubscribe(CmdArgList args, const CommandContext& cmd_cntx) { auto* rb = static_cast(cmd_cntx.rb); - if (cluster::IsClusterEnabled()) { + if (IsClusterEnabled()) { return rb->SendError("PUNSUBSCRIBE is not supported in cluster mode yet"); } if (args.size() == 0) { @@ -2353,7 +2350,7 @@ void Service::Monitor(CmdArgList args, const CommandContext& cmd_cntx) { void Service::Pubsub(CmdArgList args, const CommandContext& cmd_cntx) { auto* rb = static_cast(cmd_cntx.rb); - if (cluster::IsClusterEnabled()) { + if (IsClusterEnabled()) { return rb->SendError("PUBSUB is not supported in cluster mode yet"); } if (args.size() < 1) { diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 432cbc986108..e918138541aa 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -9,7 +9,6 @@ #include "base/logging.h" #include "core/overloaded.h" #include "facade/dragonfly_connection.h" -#include "server/cluster/cluster_utility.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/engine_shard_set.h" @@ -79,7 +78,7 @@ MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, Connectio } MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo( - ShardId sid, optional slot_id) { + ShardId sid, optional slot_id) { if (sharded_.empty()) sharded_.resize(shard_set->size()); @@ -122,7 +121,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm return SquashResult::NOT_SQUASHED; // Check if all commands belong to one shard - cluster::UniqueSlotChecker slot_checker; + UniqueSlotChecker slot_checker; ShardId last_sid = kInvalidSid; for (string_view key : keys->Range(args)) { diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index e269cc52256e..9e88335136ac 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -52,7 +52,7 @@ class MultiCommandSquasher { bool verify_commands, bool error_abort); // Lazy initialize shard info. - ShardExecInfo& PrepareShardInfo(ShardId sid, std::optional slot_id); + ShardExecInfo& PrepareShardInfo(ShardId sid, std::optional slot_id); // Retrun squash flags SquashResult TrySquash(StoredCmd* cmd); diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 383b218dab7a..d49555009b57 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -34,7 +34,6 @@ extern "C" { #include "core/sorted_map.h" #include "core/string_map.h" #include "core/string_set.h" -#include "server/cluster/cluster_defs.h" #include "server/cluster/cluster_family.h" #include "server/container_utils.h" #include "server/engine_shard_set.h" @@ -2464,7 +2463,7 @@ error_code RdbLoader::HandleAux() { if (absl::SimpleAtoi(auxval, &usedmem)) { VLOG(1) << "RDB memory usage when created " << strings::HumanReadableNumBytes(usedmem); if (usedmem > ssize_t(max_memory_limit)) { - if (cluster::IsClusterEnabled()) { + if (IsClusterEnabled()) { LOG(INFO) << "Attempting to load a snapshot of size " << usedmem << ", despite memory limit of " << max_memory_limit; } else { @@ -2709,7 +2708,7 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) { } bool RdbLoader::ShouldDiscardKey(std::string_view key, ObjSettings* settings) const { - if (!load_unowned_slots_ && cluster::IsClusterEnabled()) { + if (!load_unowned_slots_ && IsClusterEnabled()) { const cluster::ClusterConfig* cluster_config = cluster::ClusterFamily::cluster_config(); if (cluster_config != nullptr && !cluster_config->IsMySlot(key)) { return true; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 5674ac30bba6..e897e1f2f423 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -583,7 +583,7 @@ std::string_view GetOSString() { } string_view GetRedisMode() { - return cluster::IsClusterEnabledOrEmulated() ? "cluster"sv : "standalone"sv; + return IsClusterEnabledOrEmulated() ? "cluster"sv : "standalone"sv; } struct ReplicaOfArgs { @@ -622,7 +622,7 @@ optional ReplicaOfArgs::FromCmdArgs(CmdArgList args, SinkReplyBui return nullopt; } if (parser.HasNext()) { - auto [slot_start, slot_end] = parser.Next(); + auto [slot_start, slot_end] = parser.Next(); replicaof_args.slot_range = cluster::SlotRange{slot_start, slot_end}; if (auto err = parser.Error(); err || !replicaof_args.slot_range->IsValid()) { builder->SendError("Invalid slot range"); @@ -2636,7 +2636,7 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) { #endif if (should_enter("CLUSTER")) { - append("cluster_enabled", cluster::IsClusterEnabledOrEmulated()); + append("cluster_enabled", IsClusterEnabledOrEmulated()); } auto* rb = static_cast(cmd_cntx.rb); rb->SendVerbatimString(info); diff --git a/src/server/table.cc b/src/server/table.cc index 1cd65e212642..c48ad1773911 100644 --- a/src/server/table.cc +++ b/src/server/table.cc @@ -6,7 +6,7 @@ #include "base/flags.h" #include "base/logging.h" -#include "server/cluster/cluster_defs.h" +#include "server/cluster_support.h" #include "server/server_state.h" ABSL_FLAG(bool, enable_top_keys_tracking, false, @@ -86,8 +86,8 @@ DbTable::DbTable(PMR_NS::memory_resource* mr, DbIndex db_index) mcflag(0, detail::ExpireTablePolicy{}, mr), top_keys({.enabled = absl::GetFlag(FLAGS_enable_top_keys_tracking)}), index(db_index) { - if (cluster::IsClusterEnabled()) { - slots_stats.resize(cluster::kMaxSlotNum + 1); + if (IsClusterEnabled()) { + slots_stats.resize(kMaxSlotNum + 1); } thread_index = ServerState::tlocal()->thread_index(); } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index ddfc54021195..381f37135743 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -184,8 +184,7 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} { } } -Transaction::Transaction(const Transaction* parent, ShardId shard_id, - std::optional slot_id) +Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optional slot_id) : multi_{make_unique()}, txid_{parent->txid()}, unique_shard_cnt_{1}, @@ -1017,7 +1016,7 @@ ShardId Transaction::GetUniqueShard() const { return unique_shard_id_; } -optional Transaction::GetUniqueSlotId() const { +optional Transaction::GetUniqueSlotId() const { return unique_slot_checker_.GetUniqueSlotId(); } diff --git a/src/server/transaction.h b/src/server/transaction.h index 22fa79374fc4..90f3f73a7dd2 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -18,7 +18,6 @@ #include "core/intent_lock.h" #include "core/tx_queue.h" #include "facade/op_status.h" -#include "server/cluster/cluster_utility.h" #include "server/common.h" #include "server/journal/types.h" #include "server/namespaces.h" @@ -184,8 +183,7 @@ class Transaction { explicit Transaction(const CommandId* cid); // Initialize transaction for squashing placed on a specific shard with a given parent tx - explicit Transaction(const Transaction* parent, ShardId shard_id, - std::optional slot_id); + explicit Transaction(const Transaction* parent, ShardId shard_id, std::optional slot_id); // Initialize from command (args) on specific db. OpStatus InitByArgs(Namespace* ns, DbIndex index, CmdArgList args); @@ -290,7 +288,7 @@ class Transaction { // This method is meaningless if GetUniqueShardCnt() != 1. ShardId GetUniqueShard() const; - std::optional GetUniqueSlotId() const; + std::optional GetUniqueSlotId() const; bool IsMulti() const { return bool(multi_); @@ -633,7 +631,7 @@ class Transaction { uint32_t unique_shard_cnt_{0}; // Number of unique shards active ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1 - cluster::UniqueSlotChecker unique_slot_checker_; + UniqueSlotChecker unique_slot_checker_; // Barrier for waking blocking transactions that ensures exclusivity of waking operation. BatonBarrier blocking_barrier_{}; diff --git a/src/server/tx_base.cc b/src/server/tx_base.cc index 83bfefa9336a..c91769a919fb 100644 --- a/src/server/tx_base.cc +++ b/src/server/tx_base.cc @@ -66,7 +66,7 @@ void RecordExpiry(DbIndex dbid, string_view key) { auto journal = EngineShard::tlocal()->journal(); CHECK(journal); - journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key), + journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, KeySlot(key), Payload("DEL", ArgSlice{key})); }