diff --git a/defaults/test_default.yaml b/defaults/test_default.yaml index 020b021033..6e1a735271 100644 --- a/defaults/test_default.yaml +++ b/defaults/test_default.yaml @@ -274,3 +274,6 @@ skip_test_stages: {} n_db_zero_token_nodes: 0 zero_token_instance_type_db: 'i4i.large' use_zero_nodes: false +# Storageutilization default values +diskusage_softlimit: 70 +diskusage_hardlimit: 90 diff --git a/docs/configuration_options.md b/docs/configuration_options.md index a5b868d6c6..3eb03739b9 100644 --- a/docs/configuration_options.md +++ b/docs/configuration_options.md @@ -371,3 +371,6 @@ | **n_db_zero_token_nodes** | Number of zero token nodes in cluster. Value should be set as "0 1 1"
for multidc configuration in same manner as 'n_db_nodes' and should be equal
number of regions | N/A | SCT_N_DB_ZERO_TOKEN_NODES | **zero_token_instance_type_db** | Instance type for zero token node | i4i.large | SCT_ZERO_TOKEN_INSTANCE_TYPE_DB | **use_zero_nodes** | If True, enable support in sct of zero nodes(configuration, nemesis) | false | SCT_USE_ZERO_NODES +| **scaling_action_type** | Refers to scaling task like scaleout, scalein etc that needs to be performed when storage reaches specific threshold| N/A | SCT_SCALING_ACTION_TYPE +| **diskusage_softlimit** | Soft disk usage limit until this limit data will be written as 10% of available diskspace, after this chunksize will be 1GB| 70 | SCT_DISKUSAGE_SOFTLIMIT +| **diskusage_hardlimit** | Hard disk usage limit where scaling tasks will be performed | 90 | SCT_DISKUSAGE_HARDLIMIT diff --git a/full_storage_utilization_test.py b/full_storage_utilization_test.py new file mode 100644 index 0000000000..8f3809a84e --- /dev/null +++ b/full_storage_utilization_test.py @@ -0,0 +1,155 @@ +from enum import Enum +import time +from sdcm.tester import ClusterTester +from sdcm.utils.tablets.common import wait_for_tablets_balanced +from sdcm.utils.full_storage_utils import DiskUtils, StressUtils + + +class ScalingActionType(Enum): + SCALE_OUT = "scale_out" + SCALE_IN = "scale_in" + + +class FullStorageUtilizationTest(ClusterTester): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.num_stress_threads = 10 + self.sleep_seconds_before_scale = 120 + self.sleep_time_fill_disk = 1800 + self.softlimit = self.params.get('diskusage_softlimit') + self.hardlimit = self.params.get('diskusage_hardlimit') + self.stress_cmd_w = self.params.get('stress_cmd_w') + self.stress_cmd_r = self.params.get('stress_cmd_r') + self.add_node_cnt = self.params.get('add_node_cnt') + self.scaling_action_type = self.params.get('scaling_action_type') + self.stress_utils = None + + def setUp(self): + super().setUp() + self.start_time = time.time() + self.stress_utils = StressUtils( + db_node=self.db_cluster.nodes[0], cluster_tester=self) + + def start_throttle_write(self): + self.stress_cmd_w = self.stress_cmd_w.replace( + "", str(self.num_stress_threads)) + self.run_stress_thread(stress_cmd=self.stress_cmd_w) + ''' + During scaling operation, make sure there are some on-going read/write + operations to simulate real-world. Wait for 2mins so that c-s tool + is started running. + ''' + time.sleep(self.sleep_seconds_before_scale) + + def start_throttle_read(self): + self.stress_cmd_r = self.stress_cmd_r.replace( + "", str(self.num_stress_threads)) + self.run_stress_thread(stress_cmd=self.stress_cmd_r) + time.sleep(self.sleep_seconds_before_scale) + + def start_throttle_rw(self): + self.start_throttle_write() + self.start_throttle_read() + + def scale_out(self): + self.start_throttle_rw() + self.log.info("Started adding a new node") + start_time = time.time() + self.add_new_nodes() + duration = time.time() - start_time + self.log.info(f"Adding a node finished with duration: {duration}") + + def scale_in(self): + self.start_throttle_rw() + self.log.info("Started removing a node") + start_time = time.time() + self.remove_node() + duration = time.time() - start_time + self.log.info(f"Removing a node finished with duration: {duration}") + + def drop_data(self, keyspace_name): + ''' + Drop keyspace and clear snapshots. + ''' + node = self.db_cluster.nodes[0] + self.log.info("Dropping some data") + query = f"DROP KEYSPACE {keyspace_name}" + with self.db_cluster.cql_connection_patient(node) as session: + session.execute(query) + # node.run_nodetool(f"clearsnapshot") + DiskUtils.log_disk_usage(self.db_cluster.nodes) + + def perform_scale_in(self): + ''' + If the test was configured to stop populating data at 90% utilization, first scale-out then + drop 20% of data to make space for scale-in operation. + + If the test was configured to stop populating data at 67% disk utilization, scale-in without scale-out or + dropping data. + ''' + if self.hardlimit == 90: + self.scale_out() + ''' + Before removing a node, we should make sure + other nodes has enough space so that they + can accommodate data from the removed node. + ''' + # Remove 20% of data from the cluster. + self.drop_data("keyspace_large1") + self.drop_data("keyspace_large2") + self.scale_in() + elif self.hardlimit == 67: + self.scale_in() + + def perform_action(self): + DiskUtils.log_disk_usage(self.db_cluster.nodes) + # Trigger specific action + if self.scaling_action_type == ScalingActionType.SCALE_OUT.value: + self.scale_out() + elif self.scaling_action_type == ScalingActionType.SCALE_IN.value: + self.perform_scale_in() + else: + self.log.info(f"Invalid ActionType {self.scaling_action_type}") + DiskUtils.log_disk_usage(self.db_cluster.nodes) + + def test_storage_utilization(self): + """ + Write data until disk usage reaches specified hardlimit. + Sleep for few minutes. + Perform specific action. + """ + self.run_stress(self.softlimit, sleep_seconds=self.sleep_time_fill_disk) + self.run_stress(self.hardlimit, sleep_seconds=self.sleep_time_fill_disk) + self.perform_action() + + def run_stress(self, target_usage, sleep_seconds=600): + target_used_size = DiskUtils.determine_storage_limit( + self.db_cluster.nodes, target_usage) + self.stress_utils.run_stress_until_target( + target_used_size, target_usage, self.hardlimit, self.softlimit) + + DiskUtils.log_disk_usage(self.db_cluster.nodes) + self.log.info(f"Wait for {sleep_seconds} seconds") + time.sleep(sleep_seconds) + DiskUtils.log_disk_usage(self.db_cluster.nodes) + + def add_new_nodes(self): + new_nodes = self.db_cluster.add_nodes( + count=self.add_node_cnt, enable_auto_bootstrap=True) + self.db_cluster.wait_for_init(node_list=new_nodes) + self.db_cluster.wait_for_nodes_up_and_normal(nodes=new_nodes) + total_nodes_in_cluster = len(self.db_cluster.nodes) + self.log.info( + f"New node added, total nodes in cluster: {total_nodes_in_cluster}") + self.monitors.reconfigure_scylla_monitoring() + wait_for_tablets_balanced(self.db_cluster.nodes[0]) + + def remove_node(self): + self.log.info('Removing a second node from the cluster') + node_to_remove = self.db_cluster.nodes[1] + self.log.info(f"Node to be removed: {node_to_remove.name}") + self.db_cluster.decommission(node_to_remove) + self.log.info( + f"Node {node_to_remove.name} has been removed from the cluster") + self.monitors.reconfigure_scylla_monitoring() + wait_for_tablets_balanced(self.db_cluster.nodes[0]) diff --git a/jenkins-pipelines/oss/full-storage-utilization.jenkinsfile b/jenkins-pipelines/oss/full-storage-utilization.jenkinsfile new file mode 100644 index 0000000000..b62422eb0b --- /dev/null +++ b/jenkins-pipelines/oss/full-storage-utilization.jenkinsfile @@ -0,0 +1,12 @@ +#!groovy + +// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43 +def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm) + +longevityPipeline( + backend: 'aws', + region: 'eu-west-1', + test_name: 'full_storage_utilization_test.FullStorageUtilizationTest.test_storage_utilization', + test_config: 'test-cases/scale/full-storage-utilization.yaml', + timeout: [time: 300, unit: 'MINUTES'] +) diff --git a/sdcm/cluster.py b/sdcm/cluster.py index 86395f823c..0ac284ed89 100644 --- a/sdcm/cluster.py +++ b/sdcm/cluster.py @@ -4451,6 +4451,14 @@ def get_nodes_up_and_normal(self, verification_node=None): up_nodes.append(node) return up_nodes + def flush_all_nodes(self): + """ + This function will connect to all db nodes in the cluster and run "nodetool flush" command. + :return: + """ + for node in self.nodes: + node.run_nodetool("flush") + def get_node_status_dictionary(self, ip_address=None, verification_node=None): """Get node status dictionary via nodetool (in case it's not found return None)""" node_status = None diff --git a/sdcm/sct_config.py b/sdcm/sct_config.py index 7a7e97d9d1..6914a2a59f 100644 --- a/sdcm/sct_config.py +++ b/sdcm/sct_config.py @@ -1233,6 +1233,14 @@ class SCTConfiguration(dict): dict(name="perf_gradual_throttle_steps", env="SCT_PERF_GRADUAL_THROTTLE_STEPS", type=dict, help="Used for gradual performance test. Define throttle for load step in ops. Example: {'read': ['100000', '150000'], 'mixed': ['300']}"), + # StorageUtilizationTest + dict(name="scaling_action_type", env="SCT_SCALING_ACTION_TYPE", type=str, + help="Refers to scaling task like scaleout, scalein etc that needs to be performed when storage reaches specific threshold."), + dict(name="diskusage_softlimit", env="SCT_DISKUSAGE_SOFTLIMIT", type=int, + help="Soft disk usage limit until this limit data will be written as 10% of available diskspace, after this chunksize will be 1GB."), + dict(name="diskusage_hardlimit", env="SCT_DISKUSAGE_HARDLIMIT", type=int, + help="Hard disk usage limit where scaling tasks will be performed."), + # RefreshTest dict(name="skip_download", env="SCT_SKIP_DOWNLOAD", type=boolean, help=""), diff --git a/sdcm/utils/full_storage_utils.py b/sdcm/utils/full_storage_utils.py new file mode 100644 index 0000000000..283e83cc64 --- /dev/null +++ b/sdcm/utils/full_storage_utils.py @@ -0,0 +1,130 @@ +import logging + +LOGGER = logging.getLogger(__name__) + + +class DiskUtils: + @staticmethod + def get_disk_info(node): + result = node.remoter.run( + "df -h -BG --output=size,used,avail,pcent /var/lib/scylla | sed 1d | sed 's/G//g' | sed 's/%//'") + size, used, avail, pcent = result.stdout.strip().split() + return { + 'total': int(size), + 'used': int(used), + 'available': int(avail), + 'used_percent': int(pcent) + } + + @staticmethod + def get_max_disk_usage(nodes): + max_used_percent = 0 + max_used = 0 + for node in nodes: + info = DiskUtils.get_disk_info(node) + max_used_percent = max(max_used_percent, info["used_percent"]) + max_used = max(max_used, info["used"]) + return max_used_percent, max_used + + @staticmethod + def determine_storage_limit(nodes, target_percent): + """ + Calculate the target disk usage limit in GB that needs to be reached + in the cluster. + """ + max_total = 0 + for node in nodes: + info = DiskUtils.get_disk_info(node) + max_total = max(max_total, info['total']) + + target_used_size = (target_percent / 100) * max_total + current_usage, current_used = DiskUtils.get_max_disk_usage(nodes) + additional_usage_needed = target_used_size - current_used + + LOGGER.info(f"Current max disk usage: {current_usage:.2f}%") + LOGGER.info(f"Current max used space: {current_used:.2f} GB") + LOGGER.info(f"Max total disk space: {max_total:.2f} GB") + LOGGER.info( + f"Target harddisk limit to reach {target_percent}% or {target_used_size:.2f} GB") + LOGGER.info( + f"Additional space to be populated: {additional_usage_needed:.2f} GB") + + # Target disk usage in GB (either softlimit or hardlimit value) + return target_used_size + + @staticmethod + def log_disk_usage(nodes): + for node in nodes: + info = DiskUtils.get_disk_info(node) + LOGGER.info(f"Disk usage for node {node.name}:") + LOGGER.info(f" Total: {info['total']} GB") + LOGGER.info(f" Used: {info['used']} GB") + LOGGER.info(f" Available: {info['available']} GB") + LOGGER.info(f" Used %: {info['used_percent']}%") + + +class StressUtils(): + def __init__(self, db_node, cluster_tester): + self.db_node = db_node + self.db_cluster = self.db_node.parent_cluster + self.cluster_tester = cluster_tester + self.large_ks_cnt = 0 + self.small_ks_cnt = 0 + cores = self.db_cluster.nodes[0].cpu_cores + self.num_stress_threads = 8 if not cores else int(cores) * 8 + + def prepare_dataset_layout(self, dataset_size_gb, row_size=10240): + n = dataset_size_gb * 1024 * 1024 * 1024 // row_size + seq_end = n * 100 + + return f'cassandra-stress write cl=ONE n={n} -mode cql3 native -rate threads={self.num_stress_threads} ' \ + f'-pop dist="uniform(1..{seq_end})" ' \ + f'-col "size=FIXED({row_size}) n=FIXED(1)" ' \ + f'-schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"' + + def run_stress_until_target(self, target_used_size, target_usage, hardlimit, softlimit): + current_usage, current_used = DiskUtils.get_max_disk_usage( + self.db_cluster.nodes) + smaller_dataset = False + + space_needed = target_used_size - current_used + # Calculate chunk size as 10% of space needed + chunk_size = int(space_needed * 0.1) + + while current_used < target_used_size and current_usage < target_usage: + # Write smaller dataset near the threshold (15% or 30GB of the target) + smaller_dataset = (((target_used_size - current_used) < 30) or + ((target_usage - current_usage) <= (hardlimit - softlimit))) + + if not smaller_dataset: + self.large_ks_cnt += 1 + else: + self.small_ks_cnt += 1 + + # Use 1GB chunks near threshold, otherwise use 10% of remaining space + dataset_size_gb = 1 if smaller_dataset else chunk_size + ks_name = "keyspace_small" if smaller_dataset else "keyspace_large" + num = self.small_ks_cnt if smaller_dataset else self.large_ks_cnt + + LOGGER.info(f"Writing chunk of size: {dataset_size_gb} GB") + stress_cmd = self.prepare_dataset_layout(dataset_size_gb) + stress_queue = self.cluster_tester.run_stress_thread( + stress_cmd=stress_cmd, + keyspace_name=f"{ks_name}{num}", + stress_num=1, + keyspace_num=num + ) + + self.cluster_tester.verify_stress_thread( + cs_thread_pool=stress_queue) + self.cluster_tester.get_stress_results(queue=stress_queue) + + self.db_cluster.flush_all_nodes() + # time.sleep(60) if smaller_dataset else time.sleep(600) + + current_usage, current_used = DiskUtils.get_max_disk_usage( + self.db_cluster.nodes) + LOGGER.info( + f"Current max disk usage after writing to keyspace{num}: " + f"{current_usage}% ({current_used} GB / {target_used_size} GB)" + ) diff --git a/test-cases/scale/full-storage-utilization-scalein.yaml b/test-cases/scale/full-storage-utilization-scalein.yaml new file mode 100644 index 0000000000..c8a82e84dc --- /dev/null +++ b/test-cases/scale/full-storage-utilization-scalein.yaml @@ -0,0 +1,17 @@ +test_duration: 3600 +n_db_nodes: 4 +n_loaders: 1 +n_monitor_nodes: 1 +user_prefix: 'storage-utilization' +instance_type_db: 'i4i.large' +instance_provision: 'spot' + +enterprise_disable_kms: true +scaling_action_type: "scale_in" +diskusage_softlimit: 45 +diskusage_hardlimit: 67 + +stress_cmd_w: 'cassandra-stress write duration=30m -rate threads= "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"' +stress_cmd_r: 'cassandra-stress read duration=30m -rate threads= "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"' +append_scylla_yaml: + enable_tablets: true diff --git a/test-cases/scale/full-storage-utilization-scaleout.yaml b/test-cases/scale/full-storage-utilization-scaleout.yaml new file mode 100644 index 0000000000..f7a51d85e6 --- /dev/null +++ b/test-cases/scale/full-storage-utilization-scaleout.yaml @@ -0,0 +1,18 @@ +test_duration: 3600 +n_db_nodes: 3 +n_loaders: 1 +n_monitor_nodes: 1 +user_prefix: 'storage-utilization' +instance_type_db: 'i4i.large' +instance_provision: 'spot' +add_node_cnt: 3 + +enterprise_disable_kms: true +scaling_action_type: "scale_out" +diskusage_softlimit: 70 +diskusage_hardlimit: 90 + +stress_cmd_w: 'cassandra-stress write duration=30m -rate threads= -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"' +stress_cmd_r: 'cassandra-stress read duration=30m -rate threads= -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"' +append_scylla_yaml: + enable_tablets: true