Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

full_storage_utilization_test: Storage utilization at 90% cluster size #9018

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions defaults/test_default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,6 @@ skip_test_stages: {}
n_db_zero_token_nodes: 0
zero_token_instance_type_db: 'i4i.large'
use_zero_nodes: false
# Storageutilization default values
diskusage_softlimit: 70
diskusage_hardlimit: 90
3 changes: 3 additions & 0 deletions docs/configuration_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,6 @@
| **<a href="#user-content-n_db_zero_token_nodes" name="n_db_zero_token_nodes">n_db_zero_token_nodes</a>** | Number of zero token nodes in cluster. Value should be set as "0 1 1"<br>for multidc configuration in same manner as 'n_db_nodes' and should be equal<br>number of regions | N/A | SCT_N_DB_ZERO_TOKEN_NODES
| **<a href="#user-content-zero_token_instance_type_db" name="zero_token_instance_type_db">zero_token_instance_type_db</a>** | Instance type for zero token node | i4i.large | SCT_ZERO_TOKEN_INSTANCE_TYPE_DB
| **<a href="#user-content-use_zero_nodes" name="use_zero_nodes">use_zero_nodes</a>** | If True, enable support in sct of zero nodes(configuration, nemesis) | false | SCT_USE_ZERO_NODES
| **<a href="#user-content-scaling_action_type" name="scaling_action_type">scaling_action_type</a>** | Refers to scaling task like scaleout, scalein etc that needs to be performed when storage reaches specific threshold| N/A | SCT_SCALING_ACTION_TYPE
| **<a href="#user-content-diskusage_softlimit" name="diskusage_softlimit">diskusage_softlimit</a>** | Soft disk usage limit until this limit data will be written as 10% of available diskspace, after this chunksize will be 1GB| 70 | SCT_DISKUSAGE_SOFTLIMIT
| **<a href="#user-content-diskusage_hardlimit" name="diskusage_hardlimit">diskusage_hardlimit</a>** | Hard disk usage limit where scaling tasks will be performed | 90 | SCT_DISKUSAGE_HARDLIMIT
155 changes: 155 additions & 0 deletions full_storage_utilization_test.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this file just duplicates functions we already have in performance_regression.py and could be integrated directly there.

Not only it saves duplications but also benefits from all automations like decorators we have for them.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @roydahan for pointing to this! @Lakshmipathi could you reuse the existing code where possible? And do not worry about the lost code you have to delete - I am sure it was great learning experience to write it and you will long benefit from it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will check performance_regression.py and re-use them where-ever required. Less code is better :)

Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from enum import Enum
import time
from sdcm.tester import ClusterTester
from sdcm.utils.tablets.common import wait_for_tablets_balanced
from sdcm.utils.full_storage_utils import DiskUtils, StressUtils


class ScalingActionType(Enum):
SCALE_OUT = "scale_out"
SCALE_IN = "scale_in"


class FullStorageUtilizationTest(ClusterTester):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num_stress_threads = 10
self.sleep_seconds_before_scale = 120
self.sleep_time_fill_disk = 1800
self.softlimit = self.params.get('diskusage_softlimit')
self.hardlimit = self.params.get('diskusage_hardlimit')
self.stress_cmd_w = self.params.get('stress_cmd_w')
self.stress_cmd_r = self.params.get('stress_cmd_r')
self.add_node_cnt = self.params.get('add_node_cnt')
self.scaling_action_type = self.params.get('scaling_action_type')
self.stress_utils = None

def setUp(self):
super().setUp()
self.start_time = time.time()
self.stress_utils = StressUtils(
db_node=self.db_cluster.nodes[0], cluster_tester=self)

def start_throttle_write(self):
self.stress_cmd_w = self.stress_cmd_w.replace(
"<THREADS_PLACE_HOLDER>", str(self.num_stress_threads))
self.run_stress_thread(stress_cmd=self.stress_cmd_w)
'''
During scaling operation, make sure there are some on-going read/write
operations to simulate real-world. Wait for 2mins so that c-s tool
is started running.
'''
time.sleep(self.sleep_seconds_before_scale)

def start_throttle_read(self):
self.stress_cmd_r = self.stress_cmd_r.replace(
"<THREADS_PLACE_HOLDER>", str(self.num_stress_threads))
self.run_stress_thread(stress_cmd=self.stress_cmd_r)
time.sleep(self.sleep_seconds_before_scale)

def start_throttle_rw(self):
self.start_throttle_write()
self.start_throttle_read()

def scale_out(self):
self.start_throttle_rw()
self.log.info("Started adding a new node")
start_time = time.time()
self.add_new_nodes()
duration = time.time() - start_time
self.log.info(f"Adding a node finished with duration: {duration}")

def scale_in(self):
self.start_throttle_rw()
self.log.info("Started removing a node")
start_time = time.time()
self.remove_node()
duration = time.time() - start_time
self.log.info(f"Removing a node finished with duration: {duration}")

def drop_data(self, keyspace_name):
'''
Drop keyspace and clear snapshots.
'''
node = self.db_cluster.nodes[0]
self.log.info("Dropping some data")
query = f"DROP KEYSPACE {keyspace_name}"
with self.db_cluster.cql_connection_patient(node) as session:
session.execute(query)
# node.run_nodetool(f"clearsnapshot")
DiskUtils.log_disk_usage(self.db_cluster.nodes)

def perform_scale_in(self):
'''
If the test was configured to stop populating data at 90% utilization, first scale-out then
drop 20% of data to make space for scale-in operation.

If the test was configured to stop populating data at 67% disk utilization, scale-in without scale-out or
dropping data.
'''
if self.hardlimit == 90:
self.scale_out()
'''
Before removing a node, we should make sure
other nodes has enough space so that they
can accommodate data from the removed node.
'''
# Remove 20% of data from the cluster.
self.drop_data("keyspace_large1")
self.drop_data("keyspace_large2")
self.scale_in()
elif self.hardlimit == 67:
self.scale_in()

def perform_action(self):
DiskUtils.log_disk_usage(self.db_cluster.nodes)
# Trigger specific action
if self.scaling_action_type == ScalingActionType.SCALE_OUT.value:
self.scale_out()
elif self.scaling_action_type == ScalingActionType.SCALE_IN.value:
self.perform_scale_in()
else:
self.log.info(f"Invalid ActionType {self.scaling_action_type}")
DiskUtils.log_disk_usage(self.db_cluster.nodes)

def test_storage_utilization(self):
"""
Write data until disk usage reaches specified hardlimit.
Sleep for few minutes.
Perform specific action.
"""
self.run_stress(self.softlimit, sleep_seconds=self.sleep_time_fill_disk)
self.run_stress(self.hardlimit, sleep_seconds=self.sleep_time_fill_disk)
self.perform_action()

def run_stress(self, target_usage, sleep_seconds=600):
target_used_size = DiskUtils.determine_storage_limit(
self.db_cluster.nodes, target_usage)
self.stress_utils.run_stress_until_target(
target_used_size, target_usage, self.hardlimit, self.softlimit)

DiskUtils.log_disk_usage(self.db_cluster.nodes)
self.log.info(f"Wait for {sleep_seconds} seconds")
time.sleep(sleep_seconds)
DiskUtils.log_disk_usage(self.db_cluster.nodes)

def add_new_nodes(self):
new_nodes = self.db_cluster.add_nodes(
count=self.add_node_cnt, enable_auto_bootstrap=True)
self.db_cluster.wait_for_init(node_list=new_nodes)
self.db_cluster.wait_for_nodes_up_and_normal(nodes=new_nodes)
total_nodes_in_cluster = len(self.db_cluster.nodes)
self.log.info(
f"New node added, total nodes in cluster: {total_nodes_in_cluster}")
self.monitors.reconfigure_scylla_monitoring()
wait_for_tablets_balanced(self.db_cluster.nodes[0])

def remove_node(self):
self.log.info('Removing a second node from the cluster')
node_to_remove = self.db_cluster.nodes[1]
self.log.info(f"Node to be removed: {node_to_remove.name}")
self.db_cluster.decommission(node_to_remove)
self.log.info(
f"Node {node_to_remove.name} has been removed from the cluster")
self.monitors.reconfigure_scylla_monitoring()
wait_for_tablets_balanced(self.db_cluster.nodes[0])
12 changes: 12 additions & 0 deletions jenkins-pipelines/oss/full-storage-utilization.jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!groovy

// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43
def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm)

longevityPipeline(
backend: 'aws',
region: 'eu-west-1',
test_name: 'full_storage_utilization_test.FullStorageUtilizationTest.test_storage_utilization',
test_config: 'test-cases/scale/full-storage-utilization.yaml',
timeout: [time: 300, unit: 'MINUTES']
)
8 changes: 8 additions & 0 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4451,6 +4451,14 @@ def get_nodes_up_and_normal(self, verification_node=None):
up_nodes.append(node)
return up_nodes

def flush_all_nodes(self):
"""
This function will connect to all db nodes in the cluster and run "nodetool flush" command.
:return:
"""
for node in self.nodes:
node.run_nodetool("flush")

def get_node_status_dictionary(self, ip_address=None, verification_node=None):
"""Get node status dictionary via nodetool (in case it's not found return None)"""
node_status = None
Expand Down
8 changes: 8 additions & 0 deletions sdcm/sct_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,14 @@ class SCTConfiguration(dict):
dict(name="perf_gradual_throttle_steps", env="SCT_PERF_GRADUAL_THROTTLE_STEPS", type=dict,
help="Used for gradual performance test. Define throttle for load step in ops. Example: {'read': ['100000', '150000'], 'mixed': ['300']}"),

# StorageUtilizationTest
Lakshmipathi marked this conversation as resolved.
Show resolved Hide resolved
dict(name="scaling_action_type", env="SCT_SCALING_ACTION_TYPE", type=str,
help="Refers to scaling task like scaleout, scalein etc that needs to be performed when storage reaches specific threshold."),
dict(name="diskusage_softlimit", env="SCT_DISKUSAGE_SOFTLIMIT", type=int,
help="Soft disk usage limit until this limit data will be written as 10% of available diskspace, after this chunksize will be 1GB."),
dict(name="diskusage_hardlimit", env="SCT_DISKUSAGE_HARDLIMIT", type=int,
help="Hard disk usage limit where scaling tasks will be performed."),

# RefreshTest
dict(name="skip_download", env="SCT_SKIP_DOWNLOAD", type=boolean,
help=""),
Expand Down
130 changes: 130 additions & 0 deletions sdcm/utils/full_storage_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import logging

LOGGER = logging.getLogger(__name__)


class DiskUtils:
@staticmethod
def get_disk_info(node):
result = node.remoter.run(
"df -h -BG --output=size,used,avail,pcent /var/lib/scylla | sed 1d | sed 's/G//g' | sed 's/%//'")
size, used, avail, pcent = result.stdout.strip().split()
return {
'total': int(size),
'used': int(used),
'available': int(avail),
'used_percent': int(pcent)
}

@staticmethod
def get_max_disk_usage(nodes):
max_used_percent = 0
max_used = 0
for node in nodes:
info = DiskUtils.get_disk_info(node)
max_used_percent = max(max_used_percent, info["used_percent"])
max_used = max(max_used, info["used"])
return max_used_percent, max_used

@staticmethod
def determine_storage_limit(nodes, target_percent):
"""
Calculate the target disk usage limit in GB that needs to be reached
in the cluster.
"""
max_total = 0
for node in nodes:
info = DiskUtils.get_disk_info(node)
max_total = max(max_total, info['total'])

target_used_size = (target_percent / 100) * max_total
current_usage, current_used = DiskUtils.get_max_disk_usage(nodes)
additional_usage_needed = target_used_size - current_used

LOGGER.info(f"Current max disk usage: {current_usage:.2f}%")
LOGGER.info(f"Current max used space: {current_used:.2f} GB")
LOGGER.info(f"Max total disk space: {max_total:.2f} GB")
LOGGER.info(
f"Target harddisk limit to reach {target_percent}% or {target_used_size:.2f} GB")
LOGGER.info(
f"Additional space to be populated: {additional_usage_needed:.2f} GB")

# Target disk usage in GB (either softlimit or hardlimit value)
return target_used_size

@staticmethod
def log_disk_usage(nodes):
for node in nodes:
info = DiskUtils.get_disk_info(node)
LOGGER.info(f"Disk usage for node {node.name}:")
LOGGER.info(f" Total: {info['total']} GB")
LOGGER.info(f" Used: {info['used']} GB")
LOGGER.info(f" Available: {info['available']} GB")
LOGGER.info(f" Used %: {info['used_percent']}%")


class StressUtils():
def __init__(self, db_node, cluster_tester):
self.db_node = db_node
self.db_cluster = self.db_node.parent_cluster
self.cluster_tester = cluster_tester
self.large_ks_cnt = 0
self.small_ks_cnt = 0
cores = self.db_cluster.nodes[0].cpu_cores
self.num_stress_threads = 8 if not cores else int(cores) * 8

def prepare_dataset_layout(self, dataset_size_gb, row_size=10240):
n = dataset_size_gb * 1024 * 1024 * 1024 // row_size
seq_end = n * 100

return f'cassandra-stress write cl=ONE n={n} -mode cql3 native -rate threads={self.num_stress_threads} ' \
f'-pop dist="uniform(1..{seq_end})" ' \
f'-col "size=FIXED({row_size}) n=FIXED(1)" ' \
f'-schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'

def run_stress_until_target(self, target_used_size, target_usage, hardlimit, softlimit):
current_usage, current_used = DiskUtils.get_max_disk_usage(
self.db_cluster.nodes)
smaller_dataset = False

space_needed = target_used_size - current_used
# Calculate chunk size as 10% of space needed
chunk_size = int(space_needed * 0.1)

while current_used < target_used_size and current_usage < target_usage:
# Write smaller dataset near the threshold (15% or 30GB of the target)
smaller_dataset = (((target_used_size - current_used) < 30) or
((target_usage - current_usage) <= (hardlimit - softlimit)))

if not smaller_dataset:
self.large_ks_cnt += 1
else:
self.small_ks_cnt += 1

# Use 1GB chunks near threshold, otherwise use 10% of remaining space
dataset_size_gb = 1 if smaller_dataset else chunk_size
ks_name = "keyspace_small" if smaller_dataset else "keyspace_large"
num = self.small_ks_cnt if smaller_dataset else self.large_ks_cnt

LOGGER.info(f"Writing chunk of size: {dataset_size_gb} GB")
stress_cmd = self.prepare_dataset_layout(dataset_size_gb)
stress_queue = self.cluster_tester.run_stress_thread(
stress_cmd=stress_cmd,
keyspace_name=f"{ks_name}{num}",
stress_num=1,
keyspace_num=num
)

self.cluster_tester.verify_stress_thread(
cs_thread_pool=stress_queue)
self.cluster_tester.get_stress_results(queue=stress_queue)

self.db_cluster.flush_all_nodes()
# time.sleep(60) if smaller_dataset else time.sleep(600)

current_usage, current_used = DiskUtils.get_max_disk_usage(
self.db_cluster.nodes)
LOGGER.info(
f"Current max disk usage after writing to keyspace{num}: "
f"{current_usage}% ({current_used} GB / {target_used_size} GB)"
)
17 changes: 17 additions & 0 deletions test-cases/scale/full-storage-utilization-scalein.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
test_duration: 3600
n_db_nodes: 4
n_loaders: 1
n_monitor_nodes: 1
user_prefix: 'storage-utilization'
instance_type_db: 'i4i.large'
instance_provision: 'spot'

enterprise_disable_kms: true
scaling_action_type: "scale_in"
diskusage_softlimit: 45
diskusage_hardlimit: 67

stress_cmd_w: 'cassandra-stress write duration=30m -rate threads=<THREADS_PLACE_HOLDER> "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
stress_cmd_r: 'cassandra-stress read duration=30m -rate threads=<THREADS_PLACE_HOLDER> "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
append_scylla_yaml:
enable_tablets: true
18 changes: 18 additions & 0 deletions test-cases/scale/full-storage-utilization-scaleout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
test_duration: 3600
n_db_nodes: 3
n_loaders: 1
n_monitor_nodes: 1
user_prefix: 'storage-utilization'
instance_type_db: 'i4i.large'
instance_provision: 'spot'
add_node_cnt: 3

enterprise_disable_kms: true
scaling_action_type: "scale_out"
diskusage_softlimit: 70
diskusage_hardlimit: 90

stress_cmd_w: 'cassandra-stress write duration=30m -rate threads=<THREADS_PLACE_HOLDER> -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
stress_cmd_r: 'cassandra-stress read duration=30m -rate threads=<THREADS_PLACE_HOLDER> -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"'
append_scylla_yaml:
enable_tablets: true