Skip to content

Commit

Permalink
feat(cluster): Support STICK bit in slot migration (#3200)
Browse files Browse the repository at this point in the history
  • Loading branch information
chakaz authored Jun 21, 2024
1 parent c8f2f25 commit 6024d79
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 48 deletions.
95 changes: 52 additions & 43 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,57 @@ ::io::Result<size_t> InMemSource::ReadSome(const iovec* v, uint32_t len) {
return read_total;
}

class RestoreArgs {
private:
static constexpr time_t NO_EXPIRATION = 0;

time_t expiration_ = NO_EXPIRATION;
bool abs_time_ = false;
bool replace_ = false; // if true, over-ride existing key
bool sticky_ = false;

public:
RestoreArgs() = default;

RestoreArgs(time_t expiration, bool abs_time, bool replace)
: expiration_(expiration), abs_time_(abs_time), replace_(replace) {
}

constexpr bool Replace() const {
return replace_;
}

constexpr bool Sticky() const {
return sticky_;
}

uint64_t ExpirationTime() const {
DCHECK_GE(expiration_, 0);
return expiration_;
}

[[nodiscard]] constexpr bool Expired() const {
return expiration_ < 0;
}

[[nodiscard]] constexpr bool HasExpiration() const {
return expiration_ != NO_EXPIRATION;
}

[[nodiscard]] bool UpdateExpiration(int64_t now_msec);

static OpResult<RestoreArgs> TryFrom(const CmdArgList& args);
};

class RdbRestoreValue : protected RdbLoaderBase {
public:
RdbRestoreValue(RdbVersion rdb_version) {
rdb_version_ = rdb_version;
}

std::optional<DbSlice::ItAndUpdater> Add(std::string_view payload, std::string_view key,
DbSlice& db_slice, DbIndex index, uint64_t expire_ms);
DbSlice& db_slice, DbIndex index,
const RestoreArgs& args);

private:
std::optional<OpaqueObj> Parse(std::string_view payload);
Expand All @@ -134,7 +177,7 @@ std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(std::string_view

std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
std::string_view key, DbSlice& db_slice,
DbIndex index, uint64_t expire_ms) {
DbIndex index, const RestoreArgs& args) {
auto opaque_res = Parse(data);
if (!opaque_res) {
return std::nullopt;
Expand All @@ -147,50 +190,15 @@ std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
return std::nullopt;
}

auto res = db_slice.AddNew(DbContext{index, GetCurrentTimeMs()}, key, std::move(pv), expire_ms);
auto res = db_slice.AddNew(DbContext{index, GetCurrentTimeMs()}, key, std::move(pv),
args.ExpirationTime());
res->it->first.SetSticky(args.Sticky());
if (res) {
return std::move(res.value());
}
return std::nullopt;
}

class RestoreArgs {
private:
static constexpr time_t NO_EXPIRATION = 0;

time_t expiration_ = NO_EXPIRATION;
bool abs_time_ = false;
bool replace_ = false; // if true, over-ride existing key

public:
RestoreArgs() = default;

RestoreArgs(time_t expiration, bool abs_time, bool replace)
: expiration_(expiration), abs_time_(abs_time), replace_(replace) {
}

constexpr bool Replace() const {
return replace_;
}

uint64_t ExpirationTime() const {
DCHECK_GE(expiration_, 0);
return expiration_;
}

[[nodiscard]] constexpr bool Expired() const {
return expiration_ < 0;
}

[[nodiscard]] constexpr bool HasExpiration() const {
return expiration_ != NO_EXPIRATION;
}

[[nodiscard]] bool UpdateExpiration(int64_t now_msec);

static OpResult<RestoreArgs> TryFrom(const CmdArgList& args);
};

[[nodiscard]] bool RestoreArgs::UpdateExpiration(int64_t now_msec) {
if (HasExpiration()) {
int64_t ttl = abs_time_ ? expiration_ - now_msec : expiration_;
Expand Down Expand Up @@ -230,6 +238,8 @@ OpResult<RestoreArgs> RestoreArgs::TryFrom(const CmdArgList& args) {
out_args.replace_ = true;
} else if (cur_arg == "ABSTTL") {
out_args.abs_time_ = true;
} else if (cur_arg == "STICK") {
out_args.sticky_ = true;
} else if (cur_arg == "IDLETIME" && additional) {
++i;
cur_arg = ArgS(args, i);
Expand Down Expand Up @@ -434,7 +444,7 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) {

RdbRestoreValue loader(serialized_value_.version.value());
auto restored_dest_it = loader.Add(serialized_value_.value, dest_key_, db_slice,
op_args.db_cntx.db_index, restore_args.ExpirationTime());
op_args.db_cntx.db_index, restore_args);

if (restored_dest_it) {
auto& dest_it = restored_dest_it->it;
Expand Down Expand Up @@ -523,8 +533,7 @@ OpResult<bool> OnRestore(const OpArgs& op_args, std::string_view key, std::strin
}

RdbRestoreValue loader(rdb_version);
auto res =
loader.Add(payload, key, db_slice, op_args.db_cntx.db_index, restore_args.ExpirationTime());
auto res = loader.Add(payload, key, db_slice, op_args.db_cntx.db_index, restore_args);
return res.has_value();
}

Expand Down
11 changes: 8 additions & 3 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
expire = db_slice_->ExpireTime(eit);
}

WriteEntry(key, pv, expire);
WriteEntry(key, it->first, pv, expire);
}
}
}
Expand All @@ -326,8 +326,9 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
}
}

void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pv, uint64_t expire_ms) {
absl::InlinedVector<string_view, 4> args;
void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) {
absl::InlinedVector<string_view, 5> args;
args.push_back(key);

string expire_str = absl::StrCat(expire_ms);
Expand All @@ -339,6 +340,10 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pv, uint64_t

args.push_back("ABSTTL"); // Means expire string is since epoch

if (pk.IsSticky()) {
args.push_back("STICK");
}

WriteCommand(journal::Entry::Payload("RESTORE", ArgSlice(args)));
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class RestoreStreamer : public JournalStreamer {

// Returns whether anything was written
bool WriteBucket(PrimeTable::bucket_iterator it);
void WriteEntry(string_view key, const PrimeValue& pv, uint64_t expire_ms);
void WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms);
void WriteCommand(journal::Entry::Payload cmd_payload);

DbSlice* db_slice_;
Expand Down
7 changes: 6 additions & 1 deletion tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,7 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])


@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes", "cache_mode": "true"})
async def test_migration_with_key_ttl(df_local_factory):
instances = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
Expand All @@ -1166,6 +1166,8 @@ async def test_migration_with_key_ttl(df_local_factory):

await nodes[0].client.execute_command("set k_with_ttl v1 EX 2")
await nodes[0].client.execute_command("set k_without_ttl v2")
await nodes[0].client.execute_command("set k_sticky v3")
assert await nodes[0].client.execute_command("stick k_sticky") == 1

nodes[0].migrations.append(
MigrationInfo("127.0.0.1", instances[1].port, [(0, 16383)], nodes[1].id)
Expand All @@ -1183,15 +1185,18 @@ async def test_migration_with_key_ttl(df_local_factory):

assert await nodes[1].client.execute_command("get k_with_ttl") == "v1"
assert await nodes[1].client.execute_command("get k_without_ttl") == "v2"
assert await nodes[1].client.execute_command("get k_sticky") == "v3"
assert await nodes[1].client.execute_command("ttl k_with_ttl") > 0
assert await nodes[1].client.execute_command("ttl k_without_ttl") == -1
assert await nodes[1].client.execute_command("stick k_sticky") == 0 # Sticky bit already set

await asyncio.sleep(2) # Force expiration

assert await nodes[1].client.execute_command("get k_with_ttl") == None
assert await nodes[1].client.execute_command("get k_without_ttl") == "v2"
assert await nodes[1].client.execute_command("ttl k_with_ttl") == -2
assert await nodes[1].client.execute_command("ttl k_without_ttl") == -1
assert await nodes[1].client.execute_command("stick k_sticky") == 0

await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])

Expand Down

0 comments on commit 6024d79

Please sign in to comment.