Skip to content

Commit

Permalink
full_storage_utilization_test: Storage utilization at 90% cluster size
Browse files Browse the repository at this point in the history
Populate data until it reaches over 90% disk storage and perform
db and cluster options.

Signed-off-by: Lakshmipathi <[email protected]>
  • Loading branch information
Lakshmipathi committed Nov 21, 2024
1 parent 93db67c commit 9d5bac1
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 0 deletions.
3 changes: 3 additions & 0 deletions defaults/test_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,6 @@ skip_test_stages: {}
n_db_zero_token_nodes: 0
zero_token_instance_type_db: 'i4i.large'
use_zero_nodes: false
# Storageutilization default values
diskusage_softlimit: 70
diskusage_hardlimit: 90
3 changes: 3 additions & 0 deletions docs/configuration_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,6 @@
| **<a href="#user-content-n_db_zero_token_nodes" name="n_db_zero_token_nodes">n_db_zero_token_nodes</a>** | Number of zero token nodes in cluster. Value should be set as "0 1 1"<br>for multidc configuration in same manner as 'n_db_nodes' and should be equal<br>number of regions | N/A | SCT_N_DB_ZERO_TOKEN_NODES
| **<a href="#user-content-zero_token_instance_type_db" name="zero_token_instance_type_db">zero_token_instance_type_db</a>** | Instance type for zero token node | i4i.large | SCT_ZERO_TOKEN_INSTANCE_TYPE_DB
| **<a href="#user-content-use_zero_nodes" name="use_zero_nodes">use_zero_nodes</a>** | If True, enable support in sct of zero nodes(configuration, nemesis) | false | SCT_USE_ZERO_NODES
| **<a href="#user-content-scaling_action_type" name="scaling_action_type">scaling_action_type</a>** | Refers to scaling task like scaleout, scalein etc that needs to be performed when storage reaches specific threshold| N/A | SCT_AUTO_SCALING_ACTION_TYPE
| **<a href="#user-content-diskusage_softlimit" name="diskusage_softlimit">diskusage_softlimit</a>** | Soft disk usage limit until this limit data will be written as 10% of available diskspace, after this chunksize will be 1%| 70 | SCT_DISKUSAGE_SOFTLIMIT
| **<a href="#user-content-diskusage_hardlimit" name="diskusage_hardlimit">diskusage_hardlimit</a>** | Maximum disk usage limit where auto-scale tasks will be performed | 90 | SCT_DISKUSAGE_HARDLIMIT
146 changes: 146 additions & 0 deletions full_storage_utilization_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from enum import Enum
import time
from sdcm.tester import ClusterTester
from sdcm.utils.tablets.common import wait_for_tablets_balanced
from sdcm.utils.full_storage_utils import DiskUtils, StressUtils

class ScalingActionType(Enum):
SCALE_OUT = "scale_out"
SCALE_IN = "scale_in"

class FullStorageUtilizationTest(ClusterTester):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num_stress_threads = 10
self.sleep_time_before_scale = 120
self.sleep_time_fill_disk = 1800
self.softlimit = self.params.get('diskusage_softlimit')
self.hardlimit = self.params.get('diskusage_hardlimit')
self.stress_cmd_w = self.params.get('stress_cmd_w')
self.stress_cmd_r = self.params.get('stress_cmd_r')
self.add_node_cnt = self.params.get('add_node_cnt')
self.scaling_action_type = self.params.get('scaling_action_type')
self.stress_utils = None

def setUp(self):
super().setUp()
self.start_time = time.time()
self.stress_utils = StressUtils(db_node=self.db_cluster.nodes[0], cluster_tester=self)

def start_throttle_write(self):
self.stress_cmd_w = self.stress_cmd_w.replace("<THREADS_PLACE_HOLDER>", str(self.num_stress_threads))
self.run_stress_thread(stress_cmd=self.stress_cmd_w)
'''
During scaling operation, make sure there is some on-going read/write
operations to simulate real-world. Wait for 2mins so that c-s tool
is started running.
'''
time.sleep(self.sleep_time_before_scale)

def start_throttle_read(self):
self.stress_cmd_r = self.stress_cmd_r.replace("<THREADS_PLACE_HOLDER>", str(self.num_stress_threads))
self.run_stress_thread(stress_cmd=self.stress_cmd_r)
time.sleep(self.sleep_time_before_scale)

def start_throttle_rw(self):
self.start_throttle_write()
self.start_throttle_read()

def scale_out(self):
self.start_throttle_rw()
self.log.info("Started adding a new node")
start_time = time.time()
self.add_new_node()
duration = time.time() - start_time
self.log.info(f"Adding a node finished with time: {duration}")

def scale_in(self):
self.start_throttle_rw()
self.log.info("Started removing a node")
start_time = time.time()
self.remove_node()
duration = time.time() - start_time
self.log.info(f"Removing a node finished with time: {duration}")

def drop_data(self, keyspace_name):
'''
Drop keyspace and clear snapshots.
'''
node = self.db_cluster.nodes[0]
self.log.info("Dropping some data")
query = f"DROP KEYSPACE {keyspace_name}"
with self.db_cluster.cql_connection_patient(node) as session:
session.execute(query)
#node.run_nodetool(f"clearsnapshot")
DiskUtils.log_disk_usage(self.db_cluster.nodes)

def perform_scale_in(self):
'''
If we are already at 90% disk utilization, first scale-out then
drop 20% of data to make space for scale-in operation.
If we are at 67% disk utilization, scale-in without scale-out or
dropping data.
'''
if self.hardlimit == 90:
self.scale_out()
'''
Before removing a node, we should make sure
other nodes has enough space so that they
can accommodate data from the removed node.
'''
# Remove 20% of data from the cluster.
self.drop_data("keyspace_large1")
self.drop_data("keyspace_large2")
self.scale_in()
elif self.hardlimit == 67:
self.scale_in()

def perform_action(self):
DiskUtils.log_disk_usage(self.db_cluster.nodes)
# Trigger specific action
if self.scaling_action_type == ScalingActionType.SCALE_OUT.value:
self.scale_out()
elif self.scaling_action_type == ScalingActionType.SCALE_IN.value:
self.perform_scale_in()
else:
self.log.info(f"Invalid ActionType {self.scaling_action_type}")
DiskUtils.log_disk_usage(self.db_cluster.nodes)

def test_storage_utilization(self):
"""
Write data until 90% disk usage is reached.
Sleep for 60 minutes.
Perform specific action.
"""
self.run_stress(self.softlimit, sleep_time=self.sleep_time_fill_disk)
self.run_stress(self.hardlimit, sleep_time=self.sleep_time_fill_disk)
self.perform_action()

def run_stress(self, target_usage, sleep_time=600):
target_used_size = DiskUtils.calculate_target_used_size(
self.db_cluster.nodes, target_usage)
self.stress_utils.run_stress_until_target(target_used_size, target_usage)

DiskUtils.log_disk_usage(self.db_cluster.nodes)
self.log.info(f"Wait for {sleep_time} seconds")
time.sleep(sleep_time)
DiskUtils.log_disk_usage(self.db_cluster.nodes)

def add_new_node(self):
new_nodes = self.db_cluster.add_nodes(count=self.add_node_cnt, enable_auto_bootstrap=True)
self.db_cluster.wait_for_init(node_list=new_nodes)
self.db_cluster.wait_for_nodes_up_and_normal(nodes=new_nodes)
total_nodes_in_cluster = len(self.db_cluster.nodes)
self.log.info(f"New node added, total nodes in cluster: {total_nodes_in_cluster}")
self.monitors.reconfigure_scylla_monitoring()
wait_for_tablets_balanced(self.db_cluster.nodes[0])

def remove_node(self):
self.log.info('Removing a second node from the cluster')
node_to_remove = self.db_cluster.nodes[1]
self.log.info(f"Node to be removed: {node_to_remove.name}")
self.db_cluster.decommission(node_to_remove)
self.log.info(f"Node {node_to_remove.name} has been removed from the cluster")
self.monitors.reconfigure_scylla_monitoring()
wait_for_tablets_balanced(self.db_cluster.nodes[0])
12 changes: 12 additions & 0 deletions jenkins-pipelines/oss/full-storage-utilization.jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!groovy

// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43
def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm)

longevityPipeline(
backend: 'aws',
region: 'eu-west-1',
test_name: 'full_storage_utilization_test.FullStorageUtilizationTest.test_storage_utilization',
test_config: 'test-cases/scale/full-storage-utilization.yaml',
timeout: [time: 300, unit: 'MINUTES']
)
8 changes: 8 additions & 0 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4451,6 +4451,14 @@ def get_nodes_up_and_normal(self, verification_node=None):
up_nodes.append(node)
return up_nodes

def flush_all_nodes(self):
"""
This function will connect all db nodes in the cluster and run "nodetool flush" command.
:return:
"""
for node in self.nodes:
node.run_nodetool("flush")

def get_node_status_dictionary(self, ip_address=None, verification_node=None):
"""Get node status dictionary via nodetool (in case it's not found return None)"""
node_status = None
Expand Down
8 changes: 8 additions & 0 deletions sdcm/sct_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,14 @@ class SCTConfiguration(dict):
dict(name="perf_gradual_throttle_steps", env="SCT_PERF_GRADUAL_THROTTLE_STEPS", type=dict,
help="Used for gradual performance test. Define throttle for load step in ops. Example: {'read': ['100000', '150000'], 'mixed': ['300']}"),

# StorageUtilizationTest
dict(name="scaling_action_type", env="SCT_SCALING_ACTION_TYPE", type=str,
help="Refers to type of action (scale_in,scale_out etc) that needs to be performed when specific disk usage threshold is reached."),
dict(name="diskusage_softlimit", env="SCT_DISKUSAGE_SOFTLIMIT", type=int,
help="Soft limit threshold, data will populate as 10% chunk until this limit reached."),
dict(name="diskusage_hardlimit", env="SCT_DISKUSAGE_HARDLIMIT", type=int,
help="Maximum limit for disk usage."),

# RefreshTest
dict(name="skip_download", env="SCT_SKIP_DOWNLOAD", type=boolean,
help=""),
Expand Down
117 changes: 117 additions & 0 deletions sdcm/utils/full_storage_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import logging

LOGGER = logging.getLogger(__name__)

class DiskUtils:
@staticmethod
def get_disk_info(node):
result = node.remoter.run("df -h -BG --output=size,used,avail,pcent /var/lib/scylla | sed 1d | sed 's/G//g' | sed 's/%//'")
size, used, avail, pcent = result.stdout.strip().split()
return {
'total': int(size),
'used': int(used),
'available': int(avail),
'used_percent': int(pcent)
}

@staticmethod
def get_max_disk_usage(nodes):
max_usage = 0
max_used = 0
for node in nodes:
info = DiskUtils.get_disk_info(node)
max_usage = max(max_usage, info["used_percent"])
max_used = max(max_used, info["used"])
return max_usage, max_used

@staticmethod
def calculate_target_used_size(nodes, target_percent):
max_total = 0
for node in nodes:
info = DiskUtils.get_disk_info(node)
max_total = max(max_total, info['total'])

target_used_size = (target_percent / 100) * max_total
current_usage, current_used = DiskUtils.get_max_disk_usage(nodes)
additional_usage_needed = target_used_size - current_used

LOGGER.info(f"Current max disk usage: {current_usage:.2f}%")
LOGGER.info(f"Current max used space: {current_used:.2f} GB")
LOGGER.info(f"Max total disk space: {max_total:.2f} GB")
LOGGER.info(f"Target used space to reach {target_percent}%: {target_used_size:.2f} GB")
LOGGER.info(f"Additional space to be used: {additional_usage_needed:.2f} GB")

return target_used_size

@staticmethod
def log_disk_usage(nodes):
for node in nodes:
info = DiskUtils.get_disk_info(node)
LOGGER.info(f"Disk usage for node {node.name}:")
LOGGER.info(f" Total: {info['total']} GB")
LOGGER.info(f" Used: {info['used']} GB")
LOGGER.info(f" Available: {info['available']} GB")
LOGGER.info(f" Used %: {info['used_percent']}%")

class StressUtils():
def __init__(self, db_node, cluster_tester):
self.db_node = db_node
self.db_cluster = self.db_node.parent_cluster
self.cluster_tester = cluster_tester
self.total_large_ks = 0
self.total_small_ks = 0
cores = self.db_cluster.nodes[0].cpu_cores
self.num_stress_threads = 8 if not cores else int(cores) * 8

def prepare_dataset_layout(self, dataset_size, row_size=10240):
n = dataset_size * 1024 * 1024 * 1024 // row_size
seq_end = n * 100

return f'cassandra-stress write cl=ONE n={n} -mode cql3 native -rate threads={self.num_stress_threads} ' \
f'-pop dist="uniform(1..{seq_end})" ' \
f'-col "size=FIXED({row_size}) n=FIXED(1)" ' \
f'-schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'

def run_stress_until_target(self, target_used_size, target_usage):
current_usage, current_used = DiskUtils.get_max_disk_usage(self.db_cluster.nodes)
smaller_dataset = False

space_needed = target_used_size - current_used
# Calculate chunk size as 10% of space needed
chunk_size = int(space_needed * 0.1)

while current_used < target_used_size and current_usage < target_usage:
# Write smaller dataset near the threshold (15% or 30GB of the target)
smaller_dataset = (((target_used_size - current_used) < 30) or
((target_usage - current_usage) <= 15))

if not smaller_dataset:
self.total_large_ks += 1
else:
self.total_small_ks += 1

# Use 1GB chunks near threshold, otherwise use 10% of remaining space
dataset_size = 1 if smaller_dataset else chunk_size
ks_name = "keyspace_small" if smaller_dataset else "keyspace_large"
num = self.total_small_ks if smaller_dataset else self.total_large_ks

LOGGER.info(f"Writing chunk of size: {dataset_size} GB")
stress_cmd = self.prepare_dataset_layout(dataset_size)
stress_queue = self.cluster_tester.run_stress_thread(
stress_cmd=stress_cmd,
keyspace_name=f"{ks_name}{num}",
stress_num=1,
keyspace_num=num
)

self.cluster_tester.verify_stress_thread(cs_thread_pool=stress_queue)
self.cluster_tester.get_stress_results(queue=stress_queue)

self.db_cluster.flush_all_nodes()
#time.sleep(60) if smaller_dataset else time.sleep(600)

current_usage, current_used = DiskUtils.get_max_disk_usage(self.db_cluster.nodes)
LOGGER.info(
f"Current max disk usage after writing to keyspace{num}: "
f"{current_usage}% ({current_used} GB / {target_used_size} GB)"
)
17 changes: 17 additions & 0 deletions test-cases/scale/full-storage-utilization-scalein.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
test_duration: 3600
n_db_nodes: 4
n_loaders: 1
n_monitor_nodes: 1
user_prefix: 'storage-utilization'
instance_type_db: 'i4i.large'
instance_provision: 'spot'

enterprise_disable_kms: true
scaling_action_type: "scale_in"
diskusage_softlimit: 45
diskusage_hardlimit: 67

stress_cmd_w: 'cassandra-stress write duration=30m -rate threads=<THREADS_PLACE_HOLDER> "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
stress_cmd_r: 'cassandra-stress read duration=30m -rate threads=<THREADS_PLACE_HOLDER> "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
append_scylla_yaml:
enable_tablets: true
18 changes: 18 additions & 0 deletions test-cases/scale/full-storage-utilization-scaleout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
test_duration: 3600
n_db_nodes: 3
n_loaders: 1
n_monitor_nodes: 1
user_prefix: 'storage-utilization'
instance_type_db: 'i4i.large'
instance_provision: 'spot'
add_node_cnt: 3

enterprise_disable_kms: true
scaling_action_type: "scale_out"
diskusage_softlimit: 70
diskusage_hardlimit: 90

stress_cmd_w: 'cassandra-stress write duration=30m -rate threads=<THREADS_PLACE_HOLDER> "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
stress_cmd_r: 'cassandra-stress read duration=30m -rate threads=<THREADS_PLACE_HOLDER> "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
append_scylla_yaml:
enable_tablets: true

0 comments on commit 9d5bac1

Please sign in to comment.