diff --git a/src/server/cluster_support.cc b/src/server/cluster_support.cc index 97f07fc72ed6..f8ee02433c98 100644 --- a/src/server/cluster_support.cc +++ b/src/server/cluster_support.cc @@ -9,6 +9,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" #include "cluster_support.h" +#include "common.h" using namespace std; @@ -31,22 +32,15 @@ void UniqueSlotChecker::Add(SlotId slot_id) { return; } - if (!slot_id_.has_value()) { + if (slot_id_ == kEmptySlotId) { slot_id_ = slot_id; - return; - } - - if (*slot_id_ != slot_id) { + } else if (slot_id_ != slot_id) { slot_id_ = kInvalidSlotId; } } optional UniqueSlotChecker::GetUniqueSlotId() const { - if (slot_id_.has_value() && *slot_id_ == kInvalidSlotId) { - return nullopt; - } - - return slot_id_; + return slot_id_ > kMaxSlotNum ? optional() : slot_id_; } namespace { diff --git a/src/server/cluster_support.h b/src/server/cluster_support.h index 8f08ea39212d..5fe7c93980a3 100644 --- a/src/server/cluster_support.h +++ b/src/server/cluster_support.h @@ -8,14 +8,10 @@ #include #include -#include "common.h" - namespace dfly { using SlotId = std::uint16_t; - constexpr SlotId kMaxSlotNum = 0x3FFF; -constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1; // A simple utility class that "aggregates" SlotId-s and can tell whether all inputs were the same. // Only works when cluster is enabled. @@ -26,8 +22,15 @@ class UniqueSlotChecker { std::optional GetUniqueSlotId() const; + void Reset() { + slot_id_ = kEmptySlotId; + } + private: - std::optional slot_id_; + static constexpr SlotId kEmptySlotId = kMaxSlotNum + 1; + static constexpr SlotId kInvalidSlotId = kEmptySlotId + 1; + + SlotId slot_id_ = kEmptySlotId; }; SlotId KeySlot(std::string_view key); diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index e918138541aa..df109fecaef3 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -77,15 +77,14 @@ MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, Connectio atomic_ = mode != Transaction::NON_ATOMIC; } -MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo( - ShardId sid, optional slot_id) { +MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) { if (sharded_.empty()) sharded_.resize(shard_set->size()); auto& sinfo = sharded_[sid]; if (!sinfo.local_tx) { if (IsAtomic()) { - sinfo.local_tx = new Transaction{cntx_->transaction, sid, slot_id}; + sinfo.local_tx = new Transaction{cntx_->transaction, sid, nullopt}; } else { // Non-atomic squashing does not use the transactional framework for fan out, so local // transactions have to be fully standalone, check locks and release them immediately. @@ -121,11 +120,9 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm return SquashResult::NOT_SQUASHED; // Check if all commands belong to one shard - UniqueSlotChecker slot_checker; ShardId last_sid = kInvalidSid; for (string_view key : keys->Range(args)) { - slot_checker.Add(key); ShardId sid = Shard(key, shard_set->size()); if (last_sid == kInvalidSid || last_sid == sid) last_sid = sid; @@ -133,7 +130,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm return SquashResult::NOT_SQUASHED; // at least two shards } - auto& sinfo = PrepareShardInfo(last_sid, slot_checker.GetUniqueSlotId()); + auto& sinfo = PrepareShardInfo(last_sid); sinfo.cmds.push_back(cmd); order_.push_back(last_sid); diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 9e88335136ac..3230b66e460b 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -52,7 +52,7 @@ class MultiCommandSquasher { bool verify_commands, bool error_abort); // Lazy initialize shard info. - ShardExecInfo& PrepareShardInfo(ShardId sid, std::optional slot_id); + ShardExecInfo& PrepareShardInfo(ShardId sid); // Retrun squash flags SquashResult TrySquash(StoredCmd* cmd); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 381f37135743..ca3d15172f85 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -325,6 +325,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { // Stub transactions always operate only on single shard. bool is_stub = multi_ && multi_->role == SQUASHED_STUB; + unique_slot_checker_.Reset(); if ((key_index.NumArgs() == 1 && !IsAtomicMulti()) || is_stub) { DCHECK(!IsActiveMulti() || multi_->mode == NON_ATOMIC); diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 756f1d9e86c3..a3faed74458f 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -2582,16 +2582,15 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see assert (await StaticSeeder.capture(nodes[1].client)) == start_capture -@dfly_args({"proactor_threads": 1, "cluster_mode": "yes", "cache_mode": "true"}) +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cache_mode": "true"}) async def test_cache_cluster_data_migration(df_factory: DflyInstanceFactory, df_seeder_factory): # Check data migration from one node to another with expiration and eviction instances = [ df_factory.create( port=next(next_port), admin_port=next(next_port), - enable_heartbeat_eviction="false", vmodule="outgoing_slot_migration=2,cluster_family=2,incoming_slot_migration=2,streamer=2", - maxmemory="256mb" if i == 0 else "3G", + maxmemory="1G" if i == 0 else "3G", ) for i in range(2) ] @@ -2605,7 +2604,7 @@ async def test_cache_cluster_data_migration(df_factory: DflyInstanceFactory, df_ await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) logging.debug("Start seeder") - await nodes[0].client.execute_command("DEBUG POPULATE 20000 test 10000 RAND SLOTS 0 16383") + await nodes[0].client.execute_command("DEBUG POPULATE 80000 test 10000 RAND SLOTS 0 16383") # cluster_mode set to off to prevent eviction on the target node after migration is finished seeder = df_seeder_factory.create( keys=1000, @@ -2616,28 +2615,9 @@ async def test_cache_cluster_data_migration(df_factory: DflyInstanceFactory, df_ multi_transaction_probability=0, max_multikey=1, ) - await seeder.run(target_deviation=0.1) - - # def rand_str(k=3, s=""): - # # Use small k value to reduce mem usage and increase number of ops - # return s.join(random.choices(string.ascii_letters, k=k)) - - # generate = True - # big_str = rand_str(10000) - # async def generator(): - # i = 0 - # while generate: - # try: - - # await nodes[0].client.execute_command(f"SET", f"STR_COUNTER{i}", big_str) - # except redis.exceptions.ResponseError: - # continue - # i += 1 - - # fill_task = asyncio.create_task(generator()) fill_task = asyncio.create_task(seeder.run()) - + # some time to evict data await asyncio.sleep(10) logging.debug("Start migration") @@ -2649,17 +2629,14 @@ async def test_cache_cluster_data_migration(df_factory: DflyInstanceFactory, df_ await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED", 30) logging.debug("Stop seeder") - # await asyncio.sleep(20) - seeder.stop() - # generate = False await fill_task - logging.debug("drop migration for 0 node") + logging.debug("drop migration for 0th node") nodes[0].migrations = [] await push_config(json.dumps(generate_config(nodes)), [nodes[0].admin_client]) - logging.debug("finish migration") + logging.debug("finish migration for 1st") nodes[0].slots = [] nodes[1].slots = [(0, 16383)] await push_config(json.dumps(generate_config(nodes)), [nodes[1].admin_client])