From e097248bafbe00b6b0be045d340441c7b6d19d33 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Sat, 22 Jun 2024 00:34:48 +0300 Subject: [PATCH] chore: add replica-priority flag (#3204) * add replica-priority flag * add it on info replication command * add test --- src/server/replica.cc | 3 ++ src/server/server_family.cc | 3 ++ tests/dragonfly/sentinel_test.py | 55 +++++++++++++++++++++++++++++++- 3 files changed, 60 insertions(+), 1 deletion(-) diff --git a/src/server/replica.cc b/src/server/replica.cc index 4a5ce135f4ad..d7e02fa1d497 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -42,6 +42,9 @@ ABSL_FLAG(bool, break_replication_on_master_restart, false, "When in replica mode, and master restarts, break replication from master to avoid " "flushing the replica's data."); ABSL_DECLARE_FLAG(int32_t, port); +ABSL_FLAG( + int, replica_priority, 100, + "Published by info command for sentinel to pick replica based on score during a failover"); // TODO: Remove this flag on release >= 1.22 ABSL_FLAG(bool, replica_reconnect_on_master_restart, false, diff --git a/src/server/server_family.cc b/src/server/server_family.cc index ac2a16afd730..5f7c4725bd00 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -132,6 +132,7 @@ ABSL_DECLARE_FLAG(uint32_t, hz); ABSL_DECLARE_FLAG(bool, tls); ABSL_DECLARE_FLAG(string, tls_ca_cert_file); ABSL_DECLARE_FLAG(string, tls_ca_cert_dir); +ABSL_DECLARE_FLAG(int, replica_priority); bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) { #define RETURN_ON_ERROR(cond, m) \ @@ -2284,6 +2285,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("master_last_io_seconds_ago", rinfo.master_last_io_sec); append("master_sync_in_progress", rinfo.full_sync_in_progress); append("master_replid", rinfo.master_id); + append("slave_priority", GetFlag(FLAGS_replica_priority)); + append("slave_read_only", 1); }; replication_info_cb(replica_->GetInfo()); for (const auto& replica : cluster_replicas_) { diff --git a/tests/dragonfly/sentinel_test.py b/tests/dragonfly/sentinel_test.py index d65137c84938..8395225ed24a 100644 --- a/tests/dragonfly/sentinel_test.py +++ b/tests/dragonfly/sentinel_test.py @@ -8,6 +8,7 @@ from datetime import datetime from sys import stderr import logging +from . import dfly_args # Helper function to parse some sentinel cli commands output as key value dictionaries. @@ -63,6 +64,7 @@ def start(self): f"port {self.port}", f"sentinel monitor {self.default_deployment} 127.0.0.1 {self.initial_master_port} 1", f"sentinel down-after-milliseconds {self.default_deployment} 3000", + f"slave-priority 100", ] self.config_file.write_text("\n".join(config)) @@ -228,7 +230,7 @@ async def test_master_failure(df_local_factory, sentinel, port_picker): # Simulate master failure. master.stop() - # Verify replica pormoted. + # Verify replica promoted. await await_for( lambda: sentinel.live_master_port(), lambda p: p == replica.port, @@ -239,3 +241,54 @@ async def test_master_failure(df_local_factory, sentinel, port_picker): # Verify we can now write to replica. await replica_client.set("key", "value") assert await replica_client.get("key") == b"value" + + +@dfly_args({"info_replication_valkey_compatible": True}) +@pytest.mark.asyncio +async def test_priority_on_failover(df_local_factory, sentinel, port_picker): + master = df_local_factory.create(port=sentinel.initial_master_port) + # lower priority is the best candidate for sentinel + low_priority_repl = df_local_factory.create( + port=port_picker.get_available_port(), replica_priority=20 + ) + mid_priority_repl = df_local_factory.create( + port=port_picker.get_available_port(), replica_priority=60 + ) + high_priority_repl = df_local_factory.create( + port=port_picker.get_available_port(), replica_priority=80 + ) + + master.start() + low_priority_repl.start() + mid_priority_repl.start() + high_priority_repl.start() + + high_client = aioredis.Redis(port=high_priority_repl.port) + await high_client.execute_command("REPLICAOF localhost " + str(master.port)) + + mid_client = aioredis.Redis(port=mid_priority_repl.port) + await mid_client.execute_command("REPLICAOF localhost " + str(master.port)) + + low_client = aioredis.Redis(port=low_priority_repl.port) + await low_client.execute_command("REPLICAOF localhost " + str(master.port)) + + assert sentinel.live_master_port() == master.port + + # Verify sentinel picked up replica. + await await_for( + lambda: sentinel.master(), + lambda m: m["num-slaves"] == "3", + timeout_sec=15, + timeout_msg="Timeout waiting for sentinel to pick up replica.", + ) + + # Simulate master failure. + master.stop() + + # Verify replica promoted. + await await_for( + lambda: sentinel.live_master_port(), + lambda p: p == low_priority_repl.port, + timeout_sec=30, + timeout_msg="Timeout waiting for sentinel to report replica as master.", + )