Skip to content

Commit

Permalink
fix: incorrect cluster slost calculation durin squashing
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Jan 14, 2025
1 parent 7d19787 commit a023cae
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 51 deletions.
14 changes: 4 additions & 10 deletions src/server/cluster_support.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "cluster_support.h"
#include "common.h"

using namespace std;

Expand All @@ -31,22 +32,15 @@ void UniqueSlotChecker::Add(SlotId slot_id) {
return;
}

if (!slot_id_.has_value()) {
if (slot_id_ == kEmptySlotId) {
slot_id_ = slot_id;
return;
}

if (*slot_id_ != slot_id) {
} else if (slot_id_ != slot_id) {
slot_id_ = kInvalidSlotId;
}
}

optional<SlotId> UniqueSlotChecker::GetUniqueSlotId() const {
if (slot_id_.has_value() && *slot_id_ == kInvalidSlotId) {
return nullopt;
}

return slot_id_;
return slot_id_ > kMaxSlotNum ? optional<SlotId>() : slot_id_;
}

namespace {
Expand Down
13 changes: 8 additions & 5 deletions src/server/cluster_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@
#include <optional>
#include <string_view>

#include "common.h"

namespace dfly {

using SlotId = std::uint16_t;

constexpr SlotId kMaxSlotNum = 0x3FFF;
constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1;

// A simple utility class that "aggregates" SlotId-s and can tell whether all inputs were the same.
// Only works when cluster is enabled.
Expand All @@ -26,8 +22,15 @@ class UniqueSlotChecker {

std::optional<SlotId> GetUniqueSlotId() const;

void Reset() {
slot_id_ = kEmptySlotId;
}

private:
std::optional<SlotId> slot_id_;
static constexpr SlotId kEmptySlotId = kMaxSlotNum + 1;
static constexpr SlotId kInvalidSlotId = kEmptySlotId + 1;

SlotId slot_id_ = kEmptySlotId;
};

SlotId KeySlot(std::string_view key);
Expand Down
9 changes: 3 additions & 6 deletions src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,14 @@ MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, Connectio
atomic_ = mode != Transaction::NON_ATOMIC;
}

MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(
ShardId sid, optional<SlotId> slot_id) {
MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) {
if (sharded_.empty())
sharded_.resize(shard_set->size());

auto& sinfo = sharded_[sid];
if (!sinfo.local_tx) {
if (IsAtomic()) {
sinfo.local_tx = new Transaction{cntx_->transaction, sid, slot_id};
sinfo.local_tx = new Transaction{cntx_->transaction, sid, nullopt};
} else {
// Non-atomic squashing does not use the transactional framework for fan out, so local
// transactions have to be fully standalone, check locks and release them immediately.
Expand Down Expand Up @@ -121,19 +120,17 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
return SquashResult::NOT_SQUASHED;

// Check if all commands belong to one shard
UniqueSlotChecker slot_checker;
ShardId last_sid = kInvalidSid;

for (string_view key : keys->Range(args)) {
slot_checker.Add(key);
ShardId sid = Shard(key, shard_set->size());
if (last_sid == kInvalidSid || last_sid == sid)
last_sid = sid;
else
return SquashResult::NOT_SQUASHED; // at least two shards
}

auto& sinfo = PrepareShardInfo(last_sid, slot_checker.GetUniqueSlotId());
auto& sinfo = PrepareShardInfo(last_sid);

sinfo.cmds.push_back(cmd);
order_.push_back(last_sid);
Expand Down
2 changes: 1 addition & 1 deletion src/server/multi_command_squasher.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class MultiCommandSquasher {
bool verify_commands, bool error_abort);

// Lazy initialize shard info.
ShardExecInfo& PrepareShardInfo(ShardId sid, std::optional<SlotId> slot_id);
ShardExecInfo& PrepareShardInfo(ShardId sid);

// Retrun squash flags
SquashResult TrySquash(StoredCmd* cmd);
Expand Down
1 change: 1 addition & 0 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
// Stub transactions always operate only on single shard.
bool is_stub = multi_ && multi_->role == SQUASHED_STUB;

unique_slot_checker_.Reset();
if ((key_index.NumArgs() == 1 && !IsAtomicMulti()) || is_stub) {
DCHECK(!IsActiveMulti() || multi_->mode == NON_ATOMIC);

Expand Down
35 changes: 6 additions & 29 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2582,16 +2582,15 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see
assert (await StaticSeeder.capture(nodes[1].client)) == start_capture


@dfly_args({"proactor_threads": 1, "cluster_mode": "yes", "cache_mode": "true"})
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "cache_mode": "true"})
async def test_cache_cluster_data_migration(df_factory: DflyInstanceFactory, df_seeder_factory):
# Check data migration from one node to another with expiration and eviction
instances = [
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
enable_heartbeat_eviction="false",
vmodule="outgoing_slot_migration=2,cluster_family=2,incoming_slot_migration=2,streamer=2",
maxmemory="256mb" if i == 0 else "3G",
maxmemory="1G" if i == 0 else "3G",
)
for i in range(2)
]
Expand All @@ -2605,7 +2604,7 @@ async def test_cache_cluster_data_migration(df_factory: DflyInstanceFactory, df_
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])

logging.debug("Start seeder")
await nodes[0].client.execute_command("DEBUG POPULATE 20000 test 10000 RAND SLOTS 0 16383")
await nodes[0].client.execute_command("DEBUG POPULATE 80000 test 10000 RAND SLOTS 0 16383")
# cluster_mode set to off to prevent eviction on the target node after migration is finished
seeder = df_seeder_factory.create(
keys=1000,
Expand All @@ -2616,28 +2615,9 @@ async def test_cache_cluster_data_migration(df_factory: DflyInstanceFactory, df_
multi_transaction_probability=0,
max_multikey=1,
)
await seeder.run(target_deviation=0.1)

# def rand_str(k=3, s=""):
# # Use small k value to reduce mem usage and increase number of ops
# return s.join(random.choices(string.ascii_letters, k=k))

# generate = True
# big_str = rand_str(10000)
# async def generator():
# i = 0
# while generate:
# try:

# await nodes[0].client.execute_command(f"SET", f"STR_COUNTER{i}", big_str)
# except redis.exceptions.ResponseError:
# continue
# i += 1

# fill_task = asyncio.create_task(generator())

fill_task = asyncio.create_task(seeder.run())

# some time to evict data
await asyncio.sleep(10)

logging.debug("Start migration")
Expand All @@ -2649,17 +2629,14 @@ async def test_cache_cluster_data_migration(df_factory: DflyInstanceFactory, df_
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED", 30)

logging.debug("Stop seeder")
# await asyncio.sleep(20)

seeder.stop()
# generate = False
await fill_task

logging.debug("drop migration for 0 node")
logging.debug("drop migration for 0th node")
nodes[0].migrations = []
await push_config(json.dumps(generate_config(nodes)), [nodes[0].admin_client])

logging.debug("finish migration")
logging.debug("finish migration for 1st")
nodes[0].slots = []
nodes[1].slots = [(0, 16383)]
await push_config(json.dumps(generate_config(nodes)), [nodes[1].admin_client])
Expand Down

0 comments on commit a023cae

Please sign in to comment.