diff --git a/defaults/test_default.yaml b/defaults/test_default.yaml
index 020b021033..6e1a735271 100644
--- a/defaults/test_default.yaml
+++ b/defaults/test_default.yaml
@@ -274,3 +274,6 @@ skip_test_stages: {}
n_db_zero_token_nodes: 0
zero_token_instance_type_db: 'i4i.large'
use_zero_nodes: false
+# Storageutilization default values
+diskusage_softlimit: 70
+diskusage_hardlimit: 90
diff --git a/docs/configuration_options.md b/docs/configuration_options.md
index a5b868d6c6..3eb03739b9 100644
--- a/docs/configuration_options.md
+++ b/docs/configuration_options.md
@@ -371,3 +371,6 @@
| **n_db_zero_token_nodes** | Number of zero token nodes in cluster. Value should be set as "0 1 1"
for multidc configuration in same manner as 'n_db_nodes' and should be equal
number of regions | N/A | SCT_N_DB_ZERO_TOKEN_NODES
| **zero_token_instance_type_db** | Instance type for zero token node | i4i.large | SCT_ZERO_TOKEN_INSTANCE_TYPE_DB
| **use_zero_nodes** | If True, enable support in sct of zero nodes(configuration, nemesis) | false | SCT_USE_ZERO_NODES
+| **scaling_action_type** | Refers to scaling task like scaleout, scalein etc that needs to be performed when storage reaches specific threshold| N/A | SCT_SCALING_ACTION_TYPE
+| **diskusage_softlimit** | Soft disk usage limit until this limit data will be written as 10% of available diskspace, after this chunksize will be 1GB| 70 | SCT_DISKUSAGE_SOFTLIMIT
+| **diskusage_hardlimit** | Hard disk usage limit where scaling tasks will be performed | 90 | SCT_DISKUSAGE_HARDLIMIT
diff --git a/full_storage_utilization_test.py b/full_storage_utilization_test.py
new file mode 100644
index 0000000000..8f3809a84e
--- /dev/null
+++ b/full_storage_utilization_test.py
@@ -0,0 +1,155 @@
+from enum import Enum
+import time
+from sdcm.tester import ClusterTester
+from sdcm.utils.tablets.common import wait_for_tablets_balanced
+from sdcm.utils.full_storage_utils import DiskUtils, StressUtils
+
+
+class ScalingActionType(Enum):
+ SCALE_OUT = "scale_out"
+ SCALE_IN = "scale_in"
+
+
+class FullStorageUtilizationTest(ClusterTester):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.num_stress_threads = 10
+ self.sleep_seconds_before_scale = 120
+ self.sleep_time_fill_disk = 1800
+ self.softlimit = self.params.get('diskusage_softlimit')
+ self.hardlimit = self.params.get('diskusage_hardlimit')
+ self.stress_cmd_w = self.params.get('stress_cmd_w')
+ self.stress_cmd_r = self.params.get('stress_cmd_r')
+ self.add_node_cnt = self.params.get('add_node_cnt')
+ self.scaling_action_type = self.params.get('scaling_action_type')
+ self.stress_utils = None
+
+ def setUp(self):
+ super().setUp()
+ self.start_time = time.time()
+ self.stress_utils = StressUtils(
+ db_node=self.db_cluster.nodes[0], cluster_tester=self)
+
+ def start_throttle_write(self):
+ self.stress_cmd_w = self.stress_cmd_w.replace(
+ "", str(self.num_stress_threads))
+ self.run_stress_thread(stress_cmd=self.stress_cmd_w)
+ '''
+ During scaling operation, make sure there are some on-going read/write
+ operations to simulate real-world. Wait for 2mins so that c-s tool
+ is started running.
+ '''
+ time.sleep(self.sleep_seconds_before_scale)
+
+ def start_throttle_read(self):
+ self.stress_cmd_r = self.stress_cmd_r.replace(
+ "", str(self.num_stress_threads))
+ self.run_stress_thread(stress_cmd=self.stress_cmd_r)
+ time.sleep(self.sleep_seconds_before_scale)
+
+ def start_throttle_rw(self):
+ self.start_throttle_write()
+ self.start_throttle_read()
+
+ def scale_out(self):
+ self.start_throttle_rw()
+ self.log.info("Started adding a new node")
+ start_time = time.time()
+ self.add_new_nodes()
+ duration = time.time() - start_time
+ self.log.info(f"Adding a node finished with duration: {duration}")
+
+ def scale_in(self):
+ self.start_throttle_rw()
+ self.log.info("Started removing a node")
+ start_time = time.time()
+ self.remove_node()
+ duration = time.time() - start_time
+ self.log.info(f"Removing a node finished with duration: {duration}")
+
+ def drop_data(self, keyspace_name):
+ '''
+ Drop keyspace and clear snapshots.
+ '''
+ node = self.db_cluster.nodes[0]
+ self.log.info("Dropping some data")
+ query = f"DROP KEYSPACE {keyspace_name}"
+ with self.db_cluster.cql_connection_patient(node) as session:
+ session.execute(query)
+ # node.run_nodetool(f"clearsnapshot")
+ DiskUtils.log_disk_usage(self.db_cluster.nodes)
+
+ def perform_scale_in(self):
+ '''
+ If the test was configured to stop populating data at 90% utilization, first scale-out then
+ drop 20% of data to make space for scale-in operation.
+
+ If the test was configured to stop populating data at 67% disk utilization, scale-in without scale-out or
+ dropping data.
+ '''
+ if self.hardlimit == 90:
+ self.scale_out()
+ '''
+ Before removing a node, we should make sure
+ other nodes has enough space so that they
+ can accommodate data from the removed node.
+ '''
+ # Remove 20% of data from the cluster.
+ self.drop_data("keyspace_large1")
+ self.drop_data("keyspace_large2")
+ self.scale_in()
+ elif self.hardlimit == 67:
+ self.scale_in()
+
+ def perform_action(self):
+ DiskUtils.log_disk_usage(self.db_cluster.nodes)
+ # Trigger specific action
+ if self.scaling_action_type == ScalingActionType.SCALE_OUT.value:
+ self.scale_out()
+ elif self.scaling_action_type == ScalingActionType.SCALE_IN.value:
+ self.perform_scale_in()
+ else:
+ self.log.info(f"Invalid ActionType {self.scaling_action_type}")
+ DiskUtils.log_disk_usage(self.db_cluster.nodes)
+
+ def test_storage_utilization(self):
+ """
+ Write data until disk usage reaches specified hardlimit.
+ Sleep for few minutes.
+ Perform specific action.
+ """
+ self.run_stress(self.softlimit, sleep_seconds=self.sleep_time_fill_disk)
+ self.run_stress(self.hardlimit, sleep_seconds=self.sleep_time_fill_disk)
+ self.perform_action()
+
+ def run_stress(self, target_usage, sleep_seconds=600):
+ target_used_size = DiskUtils.determine_storage_limit(
+ self.db_cluster.nodes, target_usage)
+ self.stress_utils.run_stress_until_target(
+ target_used_size, target_usage, self.hardlimit, self.softlimit)
+
+ DiskUtils.log_disk_usage(self.db_cluster.nodes)
+ self.log.info(f"Wait for {sleep_seconds} seconds")
+ time.sleep(sleep_seconds)
+ DiskUtils.log_disk_usage(self.db_cluster.nodes)
+
+ def add_new_nodes(self):
+ new_nodes = self.db_cluster.add_nodes(
+ count=self.add_node_cnt, enable_auto_bootstrap=True)
+ self.db_cluster.wait_for_init(node_list=new_nodes)
+ self.db_cluster.wait_for_nodes_up_and_normal(nodes=new_nodes)
+ total_nodes_in_cluster = len(self.db_cluster.nodes)
+ self.log.info(
+ f"New node added, total nodes in cluster: {total_nodes_in_cluster}")
+ self.monitors.reconfigure_scylla_monitoring()
+ wait_for_tablets_balanced(self.db_cluster.nodes[0])
+
+ def remove_node(self):
+ self.log.info('Removing a second node from the cluster')
+ node_to_remove = self.db_cluster.nodes[1]
+ self.log.info(f"Node to be removed: {node_to_remove.name}")
+ self.db_cluster.decommission(node_to_remove)
+ self.log.info(
+ f"Node {node_to_remove.name} has been removed from the cluster")
+ self.monitors.reconfigure_scylla_monitoring()
+ wait_for_tablets_balanced(self.db_cluster.nodes[0])
diff --git a/jenkins-pipelines/oss/full-storage-utilization.jenkinsfile b/jenkins-pipelines/oss/full-storage-utilization.jenkinsfile
new file mode 100644
index 0000000000..b62422eb0b
--- /dev/null
+++ b/jenkins-pipelines/oss/full-storage-utilization.jenkinsfile
@@ -0,0 +1,12 @@
+#!groovy
+
+// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43
+def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm)
+
+longevityPipeline(
+ backend: 'aws',
+ region: 'eu-west-1',
+ test_name: 'full_storage_utilization_test.FullStorageUtilizationTest.test_storage_utilization',
+ test_config: 'test-cases/scale/full-storage-utilization.yaml',
+ timeout: [time: 300, unit: 'MINUTES']
+)
diff --git a/sdcm/cluster.py b/sdcm/cluster.py
index 86395f823c..0ac284ed89 100644
--- a/sdcm/cluster.py
+++ b/sdcm/cluster.py
@@ -4451,6 +4451,14 @@ def get_nodes_up_and_normal(self, verification_node=None):
up_nodes.append(node)
return up_nodes
+ def flush_all_nodes(self):
+ """
+ This function will connect to all db nodes in the cluster and run "nodetool flush" command.
+ :return:
+ """
+ for node in self.nodes:
+ node.run_nodetool("flush")
+
def get_node_status_dictionary(self, ip_address=None, verification_node=None):
"""Get node status dictionary via nodetool (in case it's not found return None)"""
node_status = None
diff --git a/sdcm/sct_config.py b/sdcm/sct_config.py
index 7a7e97d9d1..6914a2a59f 100644
--- a/sdcm/sct_config.py
+++ b/sdcm/sct_config.py
@@ -1233,6 +1233,14 @@ class SCTConfiguration(dict):
dict(name="perf_gradual_throttle_steps", env="SCT_PERF_GRADUAL_THROTTLE_STEPS", type=dict,
help="Used for gradual performance test. Define throttle for load step in ops. Example: {'read': ['100000', '150000'], 'mixed': ['300']}"),
+ # StorageUtilizationTest
+ dict(name="scaling_action_type", env="SCT_SCALING_ACTION_TYPE", type=str,
+ help="Refers to scaling task like scaleout, scalein etc that needs to be performed when storage reaches specific threshold."),
+ dict(name="diskusage_softlimit", env="SCT_DISKUSAGE_SOFTLIMIT", type=int,
+ help="Soft disk usage limit until this limit data will be written as 10% of available diskspace, after this chunksize will be 1GB."),
+ dict(name="diskusage_hardlimit", env="SCT_DISKUSAGE_HARDLIMIT", type=int,
+ help="Hard disk usage limit where scaling tasks will be performed."),
+
# RefreshTest
dict(name="skip_download", env="SCT_SKIP_DOWNLOAD", type=boolean,
help=""),
diff --git a/sdcm/utils/full_storage_utils.py b/sdcm/utils/full_storage_utils.py
new file mode 100644
index 0000000000..283e83cc64
--- /dev/null
+++ b/sdcm/utils/full_storage_utils.py
@@ -0,0 +1,130 @@
+import logging
+
+LOGGER = logging.getLogger(__name__)
+
+
+class DiskUtils:
+ @staticmethod
+ def get_disk_info(node):
+ result = node.remoter.run(
+ "df -h -BG --output=size,used,avail,pcent /var/lib/scylla | sed 1d | sed 's/G//g' | sed 's/%//'")
+ size, used, avail, pcent = result.stdout.strip().split()
+ return {
+ 'total': int(size),
+ 'used': int(used),
+ 'available': int(avail),
+ 'used_percent': int(pcent)
+ }
+
+ @staticmethod
+ def get_max_disk_usage(nodes):
+ max_used_percent = 0
+ max_used = 0
+ for node in nodes:
+ info = DiskUtils.get_disk_info(node)
+ max_used_percent = max(max_used_percent, info["used_percent"])
+ max_used = max(max_used, info["used"])
+ return max_used_percent, max_used
+
+ @staticmethod
+ def determine_storage_limit(nodes, target_percent):
+ """
+ Calculate the target disk usage limit in GB that needs to be reached
+ in the cluster.
+ """
+ max_total = 0
+ for node in nodes:
+ info = DiskUtils.get_disk_info(node)
+ max_total = max(max_total, info['total'])
+
+ target_used_size = (target_percent / 100) * max_total
+ current_usage, current_used = DiskUtils.get_max_disk_usage(nodes)
+ additional_usage_needed = target_used_size - current_used
+
+ LOGGER.info(f"Current max disk usage: {current_usage:.2f}%")
+ LOGGER.info(f"Current max used space: {current_used:.2f} GB")
+ LOGGER.info(f"Max total disk space: {max_total:.2f} GB")
+ LOGGER.info(
+ f"Target harddisk limit to reach {target_percent}% or {target_used_size:.2f} GB")
+ LOGGER.info(
+ f"Additional space to be populated: {additional_usage_needed:.2f} GB")
+
+ # Target disk usage in GB (either softlimit or hardlimit value)
+ return target_used_size
+
+ @staticmethod
+ def log_disk_usage(nodes):
+ for node in nodes:
+ info = DiskUtils.get_disk_info(node)
+ LOGGER.info(f"Disk usage for node {node.name}:")
+ LOGGER.info(f" Total: {info['total']} GB")
+ LOGGER.info(f" Used: {info['used']} GB")
+ LOGGER.info(f" Available: {info['available']} GB")
+ LOGGER.info(f" Used %: {info['used_percent']}%")
+
+
+class StressUtils():
+ def __init__(self, db_node, cluster_tester):
+ self.db_node = db_node
+ self.db_cluster = self.db_node.parent_cluster
+ self.cluster_tester = cluster_tester
+ self.large_ks_cnt = 0
+ self.small_ks_cnt = 0
+ cores = self.db_cluster.nodes[0].cpu_cores
+ self.num_stress_threads = 8 if not cores else int(cores) * 8
+
+ def prepare_dataset_layout(self, dataset_size_gb, row_size=10240):
+ n = dataset_size_gb * 1024 * 1024 * 1024 // row_size
+ seq_end = n * 100
+
+ return f'cassandra-stress write cl=ONE n={n} -mode cql3 native -rate threads={self.num_stress_threads} ' \
+ f'-pop dist="uniform(1..{seq_end})" ' \
+ f'-col "size=FIXED({row_size}) n=FIXED(1)" ' \
+ f'-schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
+
+ def run_stress_until_target(self, target_used_size, target_usage, hardlimit, softlimit):
+ current_usage, current_used = DiskUtils.get_max_disk_usage(
+ self.db_cluster.nodes)
+ smaller_dataset = False
+
+ space_needed = target_used_size - current_used
+ # Calculate chunk size as 10% of space needed
+ chunk_size = int(space_needed * 0.1)
+
+ while current_used < target_used_size and current_usage < target_usage:
+ # Write smaller dataset near the threshold (15% or 30GB of the target)
+ smaller_dataset = (((target_used_size - current_used) < 30) or
+ ((target_usage - current_usage) <= (hardlimit - softlimit)))
+
+ if not smaller_dataset:
+ self.large_ks_cnt += 1
+ else:
+ self.small_ks_cnt += 1
+
+ # Use 1GB chunks near threshold, otherwise use 10% of remaining space
+ dataset_size_gb = 1 if smaller_dataset else chunk_size
+ ks_name = "keyspace_small" if smaller_dataset else "keyspace_large"
+ num = self.small_ks_cnt if smaller_dataset else self.large_ks_cnt
+
+ LOGGER.info(f"Writing chunk of size: {dataset_size_gb} GB")
+ stress_cmd = self.prepare_dataset_layout(dataset_size_gb)
+ stress_queue = self.cluster_tester.run_stress_thread(
+ stress_cmd=stress_cmd,
+ keyspace_name=f"{ks_name}{num}",
+ stress_num=1,
+ keyspace_num=num
+ )
+
+ self.cluster_tester.verify_stress_thread(
+ cs_thread_pool=stress_queue)
+ self.cluster_tester.get_stress_results(queue=stress_queue)
+
+ self.db_cluster.flush_all_nodes()
+ # time.sleep(60) if smaller_dataset else time.sleep(600)
+
+ current_usage, current_used = DiskUtils.get_max_disk_usage(
+ self.db_cluster.nodes)
+ LOGGER.info(
+ f"Current max disk usage after writing to keyspace{num}: "
+ f"{current_usage}% ({current_used} GB / {target_used_size} GB)"
+ )
diff --git a/test-cases/scale/full-storage-utilization-scalein.yaml b/test-cases/scale/full-storage-utilization-scalein.yaml
new file mode 100644
index 0000000000..c8a82e84dc
--- /dev/null
+++ b/test-cases/scale/full-storage-utilization-scalein.yaml
@@ -0,0 +1,17 @@
+test_duration: 3600
+n_db_nodes: 4
+n_loaders: 1
+n_monitor_nodes: 1
+user_prefix: 'storage-utilization'
+instance_type_db: 'i4i.large'
+instance_provision: 'spot'
+
+enterprise_disable_kms: true
+scaling_action_type: "scale_in"
+diskusage_softlimit: 45
+diskusage_hardlimit: 67
+
+stress_cmd_w: 'cassandra-stress write duration=30m -rate threads= "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
+stress_cmd_r: 'cassandra-stress read duration=30m -rate threads= "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
+append_scylla_yaml:
+ enable_tablets: true
diff --git a/test-cases/scale/full-storage-utilization-scaleout.yaml b/test-cases/scale/full-storage-utilization-scaleout.yaml
new file mode 100644
index 0000000000..f7a51d85e6
--- /dev/null
+++ b/test-cases/scale/full-storage-utilization-scaleout.yaml
@@ -0,0 +1,18 @@
+test_duration: 3600
+n_db_nodes: 3
+n_loaders: 1
+n_monitor_nodes: 1
+user_prefix: 'storage-utilization'
+instance_type_db: 'i4i.large'
+instance_provision: 'spot'
+add_node_cnt: 3
+
+enterprise_disable_kms: true
+scaling_action_type: "scale_out"
+diskusage_softlimit: 70
+diskusage_hardlimit: 90
+
+stress_cmd_w: 'cassandra-stress write duration=30m -rate threads= -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
+stress_cmd_r: 'cassandra-stress read duration=30m -rate threads= -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
+append_scylla_yaml:
+ enable_tablets: true