Skip to content

Commit

Permalink
chore: add replica-priority flag (#3204)
Browse files Browse the repository at this point in the history
* add replica-priority flag
* add it on info replication command
* add test
  • Loading branch information
kostasrim authored Jun 21, 2024
1 parent 40ede6f commit e097248
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ ABSL_FLAG(bool, break_replication_on_master_restart, false,
"When in replica mode, and master restarts, break replication from master to avoid "
"flushing the replica's data.");
ABSL_DECLARE_FLAG(int32_t, port);
ABSL_FLAG(
int, replica_priority, 100,
"Published by info command for sentinel to pick replica based on score during a failover");

// TODO: Remove this flag on release >= 1.22
ABSL_FLAG(bool, replica_reconnect_on_master_restart, false,
Expand Down
3 changes: 3 additions & 0 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ ABSL_DECLARE_FLAG(uint32_t, hz);
ABSL_DECLARE_FLAG(bool, tls);
ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);
ABSL_DECLARE_FLAG(int, replica_priority);

bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
#define RETURN_ON_ERROR(cond, m) \
Expand Down Expand Up @@ -2284,6 +2285,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("master_last_io_seconds_ago", rinfo.master_last_io_sec);
append("master_sync_in_progress", rinfo.full_sync_in_progress);
append("master_replid", rinfo.master_id);
append("slave_priority", GetFlag(FLAGS_replica_priority));
append("slave_read_only", 1);
};
replication_info_cb(replica_->GetInfo());
for (const auto& replica : cluster_replicas_) {
Expand Down
55 changes: 54 additions & 1 deletion tests/dragonfly/sentinel_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime
from sys import stderr
import logging
from . import dfly_args


# Helper function to parse some sentinel cli commands output as key value dictionaries.
Expand Down Expand Up @@ -63,6 +64,7 @@ def start(self):
f"port {self.port}",
f"sentinel monitor {self.default_deployment} 127.0.0.1 {self.initial_master_port} 1",
f"sentinel down-after-milliseconds {self.default_deployment} 3000",
f"slave-priority 100",
]
self.config_file.write_text("\n".join(config))

Expand Down Expand Up @@ -228,7 +230,7 @@ async def test_master_failure(df_local_factory, sentinel, port_picker):
# Simulate master failure.
master.stop()

# Verify replica pormoted.
# Verify replica promoted.
await await_for(
lambda: sentinel.live_master_port(),
lambda p: p == replica.port,
Expand All @@ -239,3 +241,54 @@ async def test_master_failure(df_local_factory, sentinel, port_picker):
# Verify we can now write to replica.
await replica_client.set("key", "value")
assert await replica_client.get("key") == b"value"


@dfly_args({"info_replication_valkey_compatible": True})
@pytest.mark.asyncio
async def test_priority_on_failover(df_local_factory, sentinel, port_picker):
master = df_local_factory.create(port=sentinel.initial_master_port)
# lower priority is the best candidate for sentinel
low_priority_repl = df_local_factory.create(
port=port_picker.get_available_port(), replica_priority=20
)
mid_priority_repl = df_local_factory.create(
port=port_picker.get_available_port(), replica_priority=60
)
high_priority_repl = df_local_factory.create(
port=port_picker.get_available_port(), replica_priority=80
)

master.start()
low_priority_repl.start()
mid_priority_repl.start()
high_priority_repl.start()

high_client = aioredis.Redis(port=high_priority_repl.port)
await high_client.execute_command("REPLICAOF localhost " + str(master.port))

mid_client = aioredis.Redis(port=mid_priority_repl.port)
await mid_client.execute_command("REPLICAOF localhost " + str(master.port))

low_client = aioredis.Redis(port=low_priority_repl.port)
await low_client.execute_command("REPLICAOF localhost " + str(master.port))

assert sentinel.live_master_port() == master.port

# Verify sentinel picked up replica.
await await_for(
lambda: sentinel.master(),
lambda m: m["num-slaves"] == "3",
timeout_sec=15,
timeout_msg="Timeout waiting for sentinel to pick up replica.",
)

# Simulate master failure.
master.stop()

# Verify replica promoted.
await await_for(
lambda: sentinel.live_master_port(),
lambda p: p == low_priority_repl.port,
timeout_sec=30,
timeout_msg="Timeout waiting for sentinel to report replica as master.",
)

0 comments on commit e097248

Please sign in to comment.