From 7de14dfb99b0af9f44454a46cd483222a084d44d Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Mon, 23 Dec 2024 10:12:05 -0800 Subject: [PATCH] Update `cluster_manager` script (#2353) * update script Signed-off-by: Yury-Fridlyand --- .../tests/Integration/IntegrationTestBase.cs | 2 +- go/integTest/glide_test_suite_test.go | 4 +- java/integTest/build.gradle | 4 +- utils/cluster_manager.py | 315 ++++++++---------- 4 files changed, 145 insertions(+), 180 deletions(-) mode change 100644 => 100755 utils/cluster_manager.py diff --git a/csharp/tests/Integration/IntegrationTestBase.cs b/csharp/tests/Integration/IntegrationTestBase.cs index 10d9872c4f..6df910175c 100644 --- a/csharp/tests/Integration/IntegrationTestBase.cs +++ b/csharp/tests/Integration/IntegrationTestBase.cs @@ -73,7 +73,7 @@ internal List StartRedis(bool cluster, bool tls = false, string? name = nu /// internal void StopRedis(bool keepLogs, string? name = null) { - string cmd = $"stop --prefix {name ?? "redis-cluster"} {(keepLogs ? "--keep-folder" : "")}"; + string cmd = $"stop --prefix {name ?? "cluster"} {(keepLogs ? "--keep-folder" : "")}"; _ = RunClusterManager(cmd, true); } diff --git a/go/integTest/glide_test_suite_test.go b/go/integTest/glide_test_suite_test.go index 520037dad0..51efe6d7fd 100644 --- a/go/integTest/glide_test_suite_test.go +++ b/go/integTest/glide_test_suite_test.go @@ -37,7 +37,7 @@ var ( func (suite *GlideTestSuite) SetupSuite() { // Stop cluster in case previous test run was interrupted or crashed and didn't stop. // If an error occurs, we ignore it in case the servers actually were stopped before running this. - runClusterManager(suite, []string{"stop", "--prefix", "redis-cluster"}, true) + runClusterManager(suite, []string{"stop", "--prefix", "cluster"}, true) // Delete dirs if stop failed due to https://github.com/valkey-io/valkey-glide/issues/849 err := os.RemoveAll("../../utils/clusters") @@ -205,7 +205,7 @@ func TestGlideTestSuite(t *testing.T) { } func (suite *GlideTestSuite) TearDownSuite() { - runClusterManager(suite, []string{"stop", "--prefix", "redis-cluster", "--keep-folder"}, false) + runClusterManager(suite, []string{"stop", "--prefix", "cluster", "--keep-folder"}, false) } func (suite *GlideTestSuite) TearDownTest() { diff --git a/java/integTest/build.gradle b/java/integTest/build.gradle index 3e97f58f10..663c19eb52 100644 --- a/java/integTest/build.gradle +++ b/java/integTest/build.gradle @@ -49,7 +49,7 @@ ext { tasks.register('stopAllAfterTests', Exec) { workingDir "${project.rootDir}/../utils" - commandLine 'python3', 'cluster_manager.py', 'stop', '--prefix', 'redis-cluster', '--keep-folder' + commandLine 'python3', 'cluster_manager.py', 'stop', '--prefix', 'cluster', '--keep-folder' } // We need to call for stop before and after the test, but gradle doesn't support executing a task @@ -57,7 +57,7 @@ tasks.register('stopAllAfterTests', Exec) { // We need to call for stop in case if previous test run was interrupted/crashed and didn't stop. tasks.register('stopAllBeforeTests', Exec) { workingDir "${project.rootDir}/../utils" - commandLine 'python3', 'cluster_manager.py', 'stop', '--prefix', 'redis-cluster' + commandLine 'python3', 'cluster_manager.py', 'stop', '--prefix', 'cluster' ignoreExitValue true // ignore fail if servers are stopped before } diff --git a/utils/cluster_manager.py b/utils/cluster_manager.py old mode 100644 new mode 100755 index dc196bcd4f..36e23723d2 --- a/utils/cluster_manager.py +++ b/utils/cluster_manager.py @@ -3,16 +3,16 @@ # Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 import argparse +import json import logging -import os, signal +import os import random +import re +import signal import socket import string import subprocess import time -import json -import re - from datetime import datetime, timezone from pathlib import Path from typing import List, Optional, Tuple @@ -26,14 +26,35 @@ "debug": logging.DEBUG, } -GLIDE_HOME_DIR = os.getenv("GLIDE_HOME_DIR") or f"{__file__}/../.." +GLIDE_HOME_DIR = os.getenv("GLIDE_HOME_DIR") or f"{__file__}/.." CLUSTERS_FOLDER = os.getenv("CLUSTERS_FOLDER") or os.path.abspath( - f"{GLIDE_HOME_DIR}/utils/clusters" + f"{GLIDE_HOME_DIR}/clusters" ) -TLS_FOLDER = os.path.abspath(f"{GLIDE_HOME_DIR}/utils/tls_crts") +TLS_FOLDER = os.path.abspath(f"{GLIDE_HOME_DIR}/tls_crts") CA_CRT = f"{TLS_FOLDER}/ca.crt" -REDIS_CRT = f"{TLS_FOLDER}/redis.crt" -REDIS_KEY = f"{TLS_FOLDER}/redis.key" +SERVER_CRT = f"{TLS_FOLDER}/server.crt" +SERVER_KEY = f"{TLS_FOLDER}/server.key" + + +def get_command(commands: List[str]) -> str: + for command in commands: + try: + result = subprocess.run( + ["which", command], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + if result.returncode == 0: + return command + except Exception as e: + logging.error(f"Error checking {command}: {e}") + raise Exception(f"Neither ${'nor'.join(command)} found in the system.") + + +# Determine which server to use by checking `valkey-server` and `redis-server` +SERVER_COMMAND = get_command(["valkey-server", "redis-server"]) +CLI_COMMAND = get_command(["valkey-cli", "redis-cli"]) def init_logger(logfile: str): @@ -67,18 +88,16 @@ def should_generate_new_tls_certs() -> bool: try: Path(TLS_FOLDER).mkdir(exist_ok=False) except FileExistsError: - files_list = [CA_CRT, REDIS_KEY, REDIS_CRT] + files_list = [CA_CRT, SERVER_KEY, SERVER_CRT] for file in files_list: - if check_if_tls_cert_exist(file) and check_if_tls_cert_is_valid( - file - ): + if check_if_tls_cert_exist(file) and check_if_tls_cert_is_valid(file): return False return True def generate_tls_certs(): - # Based on shell script in redis's server tests - # https://github.com/redis/redis/blob/8c291b97b95f2e011977b522acf77ead23e26f55/utils/gen-test-certs.sh + # Based on shell script in valkey's server tests + # https://github.com/valkey-io/valkey/blob/0d2ba9b94d28d4022ea475a2b83157830982c941/utils/gen-test-certs.sh logging.debug("## Generating TLS certificates") tic = time.perf_counter() ca_key = f"{TLS_FOLDER}/ca.key" @@ -111,8 +130,8 @@ def make_key(name: str, size: int): # Build CA key make_key(ca_key, 4096) - # Build redis key - make_key(REDIS_KEY, 2048) + # Build server key + make_key(SERVER_KEY, 2048) # Build CA Cert p = subprocess.Popen( @@ -128,7 +147,7 @@ def make_key(name: str, size: int): "-days", "3650", "-subj", - "/O=Redis Test/CN=Certificate Authority", + "/O=Valkey GLIDE Test/CN=Certificate Authority", "-out", CA_CRT, ], @@ -142,7 +161,7 @@ def make_key(name: str, size: int): f"Failed to make create CA cert. Executed: {str(p.args)}:\n{err}" ) - # Read Redis key + # Read server key p1 = subprocess.Popen( [ "openssl", @@ -150,21 +169,19 @@ def make_key(name: str, size: int): "-new", "-sha256", "-subj", - "/O=Redis Test/CN=Generic-cert", + "/O=Valkey GLIDE Test/CN=Generic-cert", "-key", - REDIS_KEY, + SERVER_KEY, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) - _redis_key_output, err = p.communicate(timeout=10) + _key_output, err = p.communicate(timeout=10) if p.returncode != 0: - raise Exception( - f"Failed to read Redis key. Executed: {str(p.args)}:\n{err}" - ) + raise Exception(f"Failed to read server key. Executed: {str(p.args)}:\n{err}") - # Build redis cert + # Build server cert p = subprocess.Popen( [ "openssl", @@ -183,7 +200,7 @@ def make_key(name: str, size: int): "-extfile", ext_file, "-out", - REDIS_CRT, + SERVER_CRT, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -193,23 +210,23 @@ def make_key(name: str, size: int): output, err = p.communicate(timeout=10) if p.returncode != 0: raise Exception( - f"Failed to create redis cert. Executed: {str(p.args)}:\n{err}" + f"Failed to create server cert. Executed: {str(p.args)}:\n{err}" ) toc = time.perf_counter() logging.debug(f"generate_tls_certs() Elapsed time: {toc - tic:0.4f}") - logging.debug(f"TLS files= {REDIS_CRT}, {REDIS_KEY}, {CA_CRT}") + logging.debug(f"TLS files= {SERVER_CRT}, {SERVER_KEY}, {CA_CRT}") -def get_redis_cli_option_args( +def get_cli_option_args( cluster_folder: str, use_tls: bool, auth: Optional[str] = None ) -> List[str]: args = ( [ "--tls", "--cert", - REDIS_CRT, + SERVER_CRT, "--key", - REDIS_KEY, + SERVER_KEY, "--cacert", CA_CRT, ] @@ -227,7 +244,7 @@ def get_random_string(length): return result_str -class RedisServer: +class Server: def __init__(self, host: str, port: int) -> None: self.host = host self.port = port @@ -255,7 +272,7 @@ def set_primary(self, is_primary: bool): self.is_primary = is_primary -def print_servers_json(servers: List[RedisServer]): +def print_servers_json(servers: List[Server]): """ Print the list of servers to the stdout as JSON array """ @@ -279,9 +296,7 @@ def next_free_port( sock.bind(("127.0.0.1", port)) sock.close() toc = time.perf_counter() - logging.debug( - f"next_free_port() is {port} Elapsed time: {toc - tic:0.4f}" - ) + logging.debug(f"next_free_port() is {port} Elapsed time: {toc - tic:0.4f}") return port except OSError as e: logging.warning(f"next_free_port error for port {port}: {e}") @@ -310,7 +325,7 @@ def create_cluster_folder(path: str, prefix: str) -> str: return cluster_folder -def start_redis_server( +def start_server( host: str, port: Optional[int], cluster_folder: str, @@ -318,7 +333,7 @@ def start_redis_server( tls_args: List[str], cluster_mode: bool, load_module: Optional[List[str]] = None, -) -> Tuple[RedisServer, str]: +) -> Tuple[Server, str]: port = port if port else next_free_port() logging.debug(f"Creating server {host}:{port}") @@ -326,29 +341,10 @@ def start_redis_server( node_folder = f"{cluster_folder}/{port}" Path(node_folder).mkdir(exist_ok=True) - # Determine which server to use by checking `valkey-server` and `redis-server` - def get_server_command() -> str: - for server in ["valkey-server", "redis-server"]: - try: - result = subprocess.run( - ["which", server], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - if result.returncode == 0: - return server - except Exception as e: - logging.error(f"Error checking {server}: {e}") - raise Exception( - "Neither valkey-server nor redis-server found in the system." - ) - - server_name = get_server_command() - logfile = f"{node_folder}/redis.log" # Define command arguments + logfile = f"{node_folder}/server.log" cmd_args = [ - server_name, + SERVER_COMMAND, f"{'--tls-port' if tls else '--port'}", str(port), "--cluster-enabled", @@ -359,6 +355,8 @@ def get_server_command() -> str: "yes", "--logfile", logfile, + "--protected-mode", + "no", ] if load_module: if len(load_module) == 0: @@ -380,12 +378,12 @@ def get_server_command() -> str: f"Failed to execute command: {str(p.args)}\n Return code: {p.returncode}\n Error: {err}" ) - server = RedisServer(host, port) + server = Server(host, port) # Read the process ID from the log file # Note that `p.pid` is not good here since we daemonize the process process_id = wait_for_regex_in_log( - logfile, "version=(.*?)pid=([\d]+), just started", 2 + logfile, r"version=(.*?)pid=([\d]+), just started", 2 ) if process_id: server.set_process_id(int(process_id)) @@ -403,10 +401,10 @@ def create_servers( cluster_mode: bool, load_module: Optional[List[str]] = None, json_output: bool = False, -) -> List[RedisServer]: +) -> List[Server]: tic = time.perf_counter() logging.debug("## Creating servers") - ready_servers: List[RedisServer] = [] + ready_servers: List[Server] = [] nodes_count = shard_count * (1 + replica_count) tls_args = [] if tls is True: @@ -416,9 +414,9 @@ def create_servers( "--tls-cluster", "yes", "--tls-cert-file", - REDIS_CRT, + SERVER_CRT, "--tls-key-file", - REDIS_KEY, + SERVER_KEY, "--tls-ca-cert-file", CA_CRT, "--tls-auth-clients", # Make it so client doesn't have to send cert @@ -436,30 +434,24 @@ def create_servers( for i in range(nodes_count): port = ports[i] if ports else None servers_to_check.add( - start_redis_server( - host, - port, - cluster_folder, - tls, - tls_args, - cluster_mode, - load_module, + start_server( + host, port, cluster_folder, tls, tls_args, cluster_mode, load_module ) ) # Check all servers while len(servers_to_check) > 0: server, node_folder = servers_to_check.pop() logging.debug(f"Checking server {server.host}:{server.port}") - if is_address_already_in_use(server, f"{node_folder}/redis.log"): + if is_address_already_in_use(server, f"{node_folder}/server.log"): remove_folder(node_folder) if ports is not None: # The user passed a taken port, exit with an error raise Exception( - f"Couldn't start redis on {server.host}:{server.port}, address already in use" + f"Couldn't start server on {server.host}:{server.port}, address already in use" ) # The port was already taken, try to find a new free one servers_to_check.add( - start_redis_server( + start_server( server.host, None, cluster_folder, @@ -473,7 +465,7 @@ def create_servers( if not wait_for_server(server, cluster_folder, tls): raise Exception( f"Waiting for server {server.host}:{server.port} to start exceeded timeout.\n" - f"See {node_folder}/redis.log for more information" + f"See {node_folder}/server.log for more information" ) ready_servers.append(server) logging.debug("All servers are up!") @@ -483,7 +475,7 @@ def create_servers( def create_cluster( - servers: List[RedisServer], + servers: List[Server], shard_count: int, replica_count: int, cluster_folder: str, @@ -494,8 +486,8 @@ def create_cluster( logging.debug("## Starting cluster creation...") p = subprocess.Popen( [ - "redis-cli", - *get_redis_cli_option_args(cluster_folder, use_tls), + CLI_COMMAND, + *get_cli_option_args(cluster_folder, use_tls), "--cluster", "create", *servers_tuple, @@ -511,25 +503,21 @@ def create_cluster( if err or "[OK] All 16384 slots covered." not in output: raise Exception(f"Failed to create cluster: {err if err else output}") - wait_for_a_message_in_redis_logs( - cluster_folder, "Cluster state changed: ok" - ) + wait_for_a_message_in_logs(cluster_folder, "Cluster state changed: ok") wait_for_all_topology_views(servers, cluster_folder, use_tls) print_servers_json(servers) logging.debug("The cluster was successfully created!") toc = time.perf_counter() - logging.debug( - f"create_cluster {cluster_folder} Elapsed time: {toc - tic:0.4f}" - ) + logging.debug(f"create_cluster {cluster_folder} Elapsed time: {toc - tic:0.4f}") def create_standalone_replication( - servers: List[RedisServer], + servers: List[Server], cluster_folder: str, use_tls: bool, ): - # Sets up replication among Redis servers, making them replicas of the primary server. + # Sets up replication among servers, making them replicas of the primary server. tic = time.perf_counter() primary_server = servers[0] @@ -539,8 +527,8 @@ def create_standalone_replication( if i == 0: continue # Skip the primary server replica_of_command = [ - "redis-cli", - *get_redis_cli_option_args(cluster_folder, use_tls), + CLI_COMMAND, + *get_cli_option_args(cluster_folder, use_tls), "-h", str(server.host), "-p", @@ -561,7 +549,7 @@ def create_standalone_replication( f"Failed to set up replication for server {server}: {err if err else output}" ) servers_ports = [str(server.port) for server in servers] - wait_for_a_message_in_redis_logs( + wait_for_a_message_in_logs( cluster_folder, "sync: Finished with success", servers_ports[1:], @@ -574,7 +562,7 @@ def create_standalone_replication( logging.debug(f"create_replication Elapsed time: {toc - tic:0.4f}") -def wait_for_a_message_in_redis_logs( +def wait_for_a_message_in_logs( cluster_folder: str, message: str, server_ports: Optional[List[str]] = None, @@ -582,17 +570,14 @@ def wait_for_a_message_in_redis_logs( for dir in Path(cluster_folder).rglob("*"): if not dir.is_dir(): continue - log_file = f"{dir}/redis.log" + log_file = f"{dir}/server.log" - if ( - server_ports - and os.path.basename(os.path.normpath(dir)) not in server_ports - ): + if server_ports and os.path.basename(os.path.normpath(dir)) not in server_ports: continue if not wait_for_message(log_file, message, 10): raise Exception( f"During the timeout duration, the server logs associated with port {dir} did not contain the message:{message}." - f"See {dir}/redis.log for more information" + f"See {dir}/server.log for more information" ) @@ -649,7 +634,7 @@ def redis_cli_run_command(cmd_args: List[str]) -> Optional[str]: def wait_for_all_topology_views( - servers: List[RedisServer], cluster_folder: str, use_tls: bool + servers: List[Server], cluster_folder: str, use_tls: bool ): """ Wait for each of the nodes to have a topology view that contains all nodes. @@ -657,12 +642,12 @@ def wait_for_all_topology_views( """ for server in servers: cmd_args = [ - "redis-cli", + CLI_COMMAND, "-h", server.host, "-p", str(server.port), - *get_redis_cli_option_args(cluster_folder, use_tls), + *get_cli_option_args(cluster_folder, use_tls), "cluster", "slots", ] @@ -670,9 +655,7 @@ def wait_for_all_topology_views( retries = 60 while retries >= 0: output = redis_cli_run_command(cmd_args) - if output is not None and output.count(f"{server.host}") == len( - servers - ): + if output is not None and output.count(f"{server.host}") == len(servers): # Server is ready, get the node's role cmd_args = [ "redis-cli", @@ -680,7 +663,7 @@ def wait_for_all_topology_views( server.host, "-p", str(server.port), - *get_redis_cli_option_args(cluster_folder, use_tls), + *get_cli_option_args(cluster_folder, use_tls), "cluster", "nodes", ] @@ -703,7 +686,7 @@ def wait_for_all_topology_views( def wait_for_server( - server: RedisServer, + server: Server, cluster_folder: str, use_tls: bool, timeout: int = 10, @@ -713,12 +696,12 @@ def wait_for_server( while time.time() < timeout_start + timeout: p = subprocess.Popen( [ - "redis-cli", + CLI_COMMAND, "-h", server.host, "-p", str(server.port), - *get_redis_cli_option_args(cluster_folder, use_tls), + *get_cli_option_args(cluster_folder, use_tls), "PING", ], stdout=subprocess.PIPE, @@ -750,15 +733,13 @@ def wait_for_message( timeout_start = time.time() while time.time() < timeout_start + timeout: with open(log_file, "r") as f: - redis_log = f.read() - if message in redis_log: + server_log = f.read() + if message in server_log: return True else: time.sleep(0.1) continue - logging.warn( - f"Timeout exceeded trying to check if {log_file} contains {message}" - ) + logging.warn(f"Timeout exceeded trying to check if {log_file} contains {message}") return False @@ -790,7 +771,7 @@ def wait_for_regex_in_log( def is_address_already_in_use( - server: RedisServer, + server: Server, log_file: str, timeout: int = 5, ): @@ -798,11 +779,11 @@ def is_address_already_in_use( timeout_start = time.time() while time.time() < timeout_start + timeout: with open(log_file, "r") as f: - redis_log = f.read() - if "Address already in use" in redis_log: + server_log = f.read() + if "Address already in use" in server_log: logging.debug(f"Address is already bind for server {server}") return True - elif "Ready" in redis_log: + elif "Ready" in server_log: logging.debug(f"Address is free for server {server}!") return False else: @@ -826,17 +807,15 @@ def dir_path(path: str): raise NotADirectoryError(path) -def stop_server( - server: RedisServer, cluster_folder: str, use_tls: bool, auth: str -): +def stop_server(server: Server, cluster_folder: str, use_tls: bool, auth: str): logging.debug(f"Stopping server {server}") cmd_args = [ - "redis-cli", + CLI_COMMAND, "-h", server.host, "-p", str(server.port), - *get_redis_cli_option_args(cluster_folder, use_tls, auth), + *get_cli_option_args(cluster_folder, use_tls, auth), "shutdown", "nosave", ] @@ -853,32 +832,28 @@ def stop_server( ) output, err = p.communicate(timeout=5) if err and "Warning: Using a password with '-a'" not in err: - err_msg = f"Failed to shutdown host {server.host}:{server.port}:\n {err}" + err_msg = ( + f"Failed to shutdown host {server.host}:{server.port}:\n {err}" + ) logging.error(err_msg) raise Exception( f"Failed to execute command: {str(p.args)}\n Return code: {p.returncode}\n Error: {err}" ) - if not wait_for_server_shutdown( - server, cluster_folder, use_tls, auth - ): - err_msg = ( - "Timeout elapsed while waiting for the node to shutdown" - ) + if not wait_for_server_shutdown(server, cluster_folder, use_tls, auth): + err_msg = "Timeout elapsed while waiting for the node to shutdown" logging.error(err_msg) raise Exception(err_msg) return except subprocess.TimeoutExpired as e: raise_err = e retries -= 1 - err_msg = ( - f"Failed to shutdown host {server.host}:{server.port}: {raise_err}" - ) + err_msg = f"Failed to shutdown host {server.host}:{server.port}: {raise_err}" logging.error(err_msg) raise Exception(err_msg) def wait_for_server_shutdown( - server: RedisServer, + server: Server, cluster_folder: str, use_tls: bool, auth: str, @@ -890,12 +865,12 @@ def wait_for_server_shutdown( while time.time() < timeout_start + timeout: p = subprocess.Popen( [ - "redis-cli", + CLI_COMMAND, "-h", server.host, "-p", str(server.port), - *get_redis_cli_option_args(cluster_folder, use_tls, auth), + *get_cli_option_args(cluster_folder, use_tls, auth), "PING", ], stdout=subprocess.PIPE, @@ -949,16 +924,6 @@ def stop_clusters( keep_folder: bool, pids: Optional[str], ): - if pids: - pid_arr = pids.split(",") - for pid in pid_arr: - try: - # Kill the process group - os.killpg(int(pid), signal.SIGKILL) - except ProcessLookupError as e: - logging.debug(f"Could not kill server with PID: {pid}. {e}") - pass - if cluster_folder: cluster_folders = [cluster_folder] else: @@ -970,12 +935,19 @@ def stop_clusters( and dirname.startswith(prefix) ] - # request for graceful shutdown only if PID list was not provided - graceful_shutdown = pids is None + # request for graceful shutdown for folder in cluster_folders: - stop_cluster( - host, folder, use_tls, auth, logfile, keep_folder, graceful_shutdown - ) + stop_cluster(host, folder, use_tls, auth, logfile, keep_folder) + + if pids: + pid_arr = pids.split(",") + for pid in pid_arr: + try: + # Kill the process + os.kill(int(pid), signal.SIGKILL) + except ProcessLookupError as e: + logging.debug(f"Could not kill server with PID: {pid}. {e}") + pass def stop_cluster( @@ -985,23 +957,20 @@ def stop_cluster( auth: str, logfile: Optional[str], keep_folder: bool, - graceful_shutdown: bool, ): - if graceful_shutdown: - logfile = ( - f"{cluster_folder}/cluster_manager.log" if not logfile else logfile - ) - init_logger(logfile) - logging.debug(f"## Stopping cluster in path {cluster_folder}") - for it in os.scandir(cluster_folder): - if it.is_dir() and it.name.isdigit(): - port = it.name - stop_server( - RedisServer(host, int(port)), cluster_folder, use_tls, auth - ) - logging.debug("All hosts were stopped") - else: - logging.debug("Servers terminated using kill") + logfile = f"{cluster_folder}/cluster_manager.log" if not logfile else logfile + init_logger(logfile) + logging.debug(f"## Stopping cluster in path {cluster_folder}") + all_stopped = True + for it in os.scandir(cluster_folder): + if it.is_dir() and it.name.isdigit(): + port = it.name + try: + stop_server(Server(host, int(port)), cluster_folder, use_tls, auth) + except Exception: + all_stopped = False + if all_stopped: + logging.debug("All hosts were stopped gracefully") if not keep_folder: remove_folder(cluster_folder) @@ -1101,21 +1070,19 @@ def main(): type=str, help="Prefix to be used for the cluster folder name " "(default without TLS: %(default)s, default with TLS: tls-%(default)s)", - default="redis-cluster", + default="cluster", required=False, ) parser_start.add_argument( "--load-module", action="append", - help="The paths of the redis modules to load.", + help="The paths of the server modules to load.", required=False, ) # Stop parser - parser_stop = subparsers.add_parser( - "stop", help="Shutdown a running cluster" - ) + parser_stop = subparsers.add_parser("stop", help="Shutdown a running cluster") parser_stop.add_argument( "--folder-path", type=dir_path, @@ -1163,9 +1130,7 @@ def main(): f" -- must be one of: {' | '.join(LOG_LEVELS.keys())}" ) logging.root.setLevel(level=level) - logging.info( - f"## Executing cluster_manager.py with the following args:\n {args}" - ) + logging.info(f"## Executing cluster_manager.py with the following args:\n {args}") if args.action == "start": if not args.cluster_mode: