-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
full_storage_utilization_test: Storage utilization at 90% cluster size #9018
Open
Lakshmipathi
wants to merge
1
commit into
scylladb:master
Choose a base branch
from
Lakshmipathi:wip/full_storage_utilization
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,155 @@ | ||
from enum import Enum | ||
import time | ||
from sdcm.tester import ClusterTester | ||
from sdcm.utils.tablets.common import wait_for_tablets_balanced | ||
from sdcm.utils.full_storage_utils import DiskUtils, StressUtils | ||
|
||
|
||
class ScalingActionType(Enum): | ||
SCALE_OUT = "scale_out" | ||
SCALE_IN = "scale_in" | ||
|
||
|
||
class FullStorageUtilizationTest(ClusterTester): | ||
def __init__(self, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
self.num_stress_threads = 10 | ||
self.sleep_seconds_before_scale = 120 | ||
self.sleep_time_fill_disk = 1800 | ||
self.softlimit = self.params.get('diskusage_softlimit') | ||
self.hardlimit = self.params.get('diskusage_hardlimit') | ||
self.stress_cmd_w = self.params.get('stress_cmd_w') | ||
self.stress_cmd_r = self.params.get('stress_cmd_r') | ||
self.add_node_cnt = self.params.get('add_node_cnt') | ||
self.scaling_action_type = self.params.get('scaling_action_type') | ||
self.stress_utils = None | ||
|
||
def setUp(self): | ||
super().setUp() | ||
self.start_time = time.time() | ||
self.stress_utils = StressUtils( | ||
db_node=self.db_cluster.nodes[0], cluster_tester=self) | ||
|
||
def start_throttle_write(self): | ||
self.stress_cmd_w = self.stress_cmd_w.replace( | ||
"<THREADS_PLACE_HOLDER>", str(self.num_stress_threads)) | ||
self.run_stress_thread(stress_cmd=self.stress_cmd_w) | ||
''' | ||
During scaling operation, make sure there are some on-going read/write | ||
operations to simulate real-world. Wait for 2mins so that c-s tool | ||
is started running. | ||
''' | ||
time.sleep(self.sleep_seconds_before_scale) | ||
|
||
def start_throttle_read(self): | ||
self.stress_cmd_r = self.stress_cmd_r.replace( | ||
"<THREADS_PLACE_HOLDER>", str(self.num_stress_threads)) | ||
self.run_stress_thread(stress_cmd=self.stress_cmd_r) | ||
time.sleep(self.sleep_seconds_before_scale) | ||
|
||
def start_throttle_rw(self): | ||
self.start_throttle_write() | ||
self.start_throttle_read() | ||
|
||
def scale_out(self): | ||
self.start_throttle_rw() | ||
self.log.info("Started adding a new node") | ||
start_time = time.time() | ||
self.add_new_nodes() | ||
duration = time.time() - start_time | ||
self.log.info(f"Adding a node finished with duration: {duration}") | ||
|
||
def scale_in(self): | ||
self.start_throttle_rw() | ||
self.log.info("Started removing a node") | ||
start_time = time.time() | ||
self.remove_node() | ||
duration = time.time() - start_time | ||
self.log.info(f"Removing a node finished with duration: {duration}") | ||
|
||
def drop_data(self, keyspace_name): | ||
''' | ||
Drop keyspace and clear snapshots. | ||
''' | ||
node = self.db_cluster.nodes[0] | ||
self.log.info("Dropping some data") | ||
query = f"DROP KEYSPACE {keyspace_name}" | ||
with self.db_cluster.cql_connection_patient(node) as session: | ||
session.execute(query) | ||
# node.run_nodetool(f"clearsnapshot") | ||
DiskUtils.log_disk_usage(self.db_cluster.nodes) | ||
|
||
def perform_scale_in(self): | ||
''' | ||
If the test was configured to stop populating data at 90% utilization, first scale-out then | ||
drop 20% of data to make space for scale-in operation. | ||
|
||
If the test was configured to stop populating data at 67% disk utilization, scale-in without scale-out or | ||
dropping data. | ||
''' | ||
if self.hardlimit == 90: | ||
self.scale_out() | ||
''' | ||
Before removing a node, we should make sure | ||
other nodes has enough space so that they | ||
can accommodate data from the removed node. | ||
''' | ||
# Remove 20% of data from the cluster. | ||
self.drop_data("keyspace_large1") | ||
self.drop_data("keyspace_large2") | ||
self.scale_in() | ||
elif self.hardlimit == 67: | ||
self.scale_in() | ||
|
||
def perform_action(self): | ||
DiskUtils.log_disk_usage(self.db_cluster.nodes) | ||
# Trigger specific action | ||
if self.scaling_action_type == ScalingActionType.SCALE_OUT.value: | ||
self.scale_out() | ||
elif self.scaling_action_type == ScalingActionType.SCALE_IN.value: | ||
self.perform_scale_in() | ||
else: | ||
self.log.info(f"Invalid ActionType {self.scaling_action_type}") | ||
DiskUtils.log_disk_usage(self.db_cluster.nodes) | ||
|
||
def test_storage_utilization(self): | ||
""" | ||
Write data until disk usage reaches specified hardlimit. | ||
Sleep for few minutes. | ||
Perform specific action. | ||
""" | ||
self.run_stress(self.softlimit, sleep_seconds=self.sleep_time_fill_disk) | ||
self.run_stress(self.hardlimit, sleep_seconds=self.sleep_time_fill_disk) | ||
self.perform_action() | ||
|
||
def run_stress(self, target_usage, sleep_seconds=600): | ||
target_used_size = DiskUtils.determine_storage_limit( | ||
self.db_cluster.nodes, target_usage) | ||
self.stress_utils.run_stress_until_target( | ||
target_used_size, target_usage, self.hardlimit, self.softlimit) | ||
|
||
DiskUtils.log_disk_usage(self.db_cluster.nodes) | ||
self.log.info(f"Wait for {sleep_seconds} seconds") | ||
time.sleep(sleep_seconds) | ||
DiskUtils.log_disk_usage(self.db_cluster.nodes) | ||
|
||
def add_new_nodes(self): | ||
new_nodes = self.db_cluster.add_nodes( | ||
count=self.add_node_cnt, enable_auto_bootstrap=True) | ||
self.db_cluster.wait_for_init(node_list=new_nodes) | ||
self.db_cluster.wait_for_nodes_up_and_normal(nodes=new_nodes) | ||
total_nodes_in_cluster = len(self.db_cluster.nodes) | ||
self.log.info( | ||
f"New node added, total nodes in cluster: {total_nodes_in_cluster}") | ||
self.monitors.reconfigure_scylla_monitoring() | ||
wait_for_tablets_balanced(self.db_cluster.nodes[0]) | ||
|
||
def remove_node(self): | ||
self.log.info('Removing a second node from the cluster') | ||
node_to_remove = self.db_cluster.nodes[1] | ||
self.log.info(f"Node to be removed: {node_to_remove.name}") | ||
self.db_cluster.decommission(node_to_remove) | ||
self.log.info( | ||
f"Node {node_to_remove.name} has been removed from the cluster") | ||
self.monitors.reconfigure_scylla_monitoring() | ||
wait_for_tablets_balanced(self.db_cluster.nodes[0]) |
12 changes: 12 additions & 0 deletions
12
jenkins-pipelines/oss/full-storage-utilization.jenkinsfile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
#!groovy | ||
|
||
// trick from https://github.com/jenkinsci/workflow-cps-global-lib-plugin/pull/43 | ||
def lib = library identifier: 'sct@snapshot', retriever: legacySCM(scm) | ||
|
||
longevityPipeline( | ||
backend: 'aws', | ||
region: 'eu-west-1', | ||
test_name: 'full_storage_utilization_test.FullStorageUtilizationTest.test_storage_utilization', | ||
test_config: 'test-cases/scale/full-storage-utilization.yaml', | ||
timeout: [time: 300, unit: 'MINUTES'] | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
import logging | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class DiskUtils: | ||
@staticmethod | ||
def get_disk_info(node): | ||
result = node.remoter.run( | ||
"df -h -BG --output=size,used,avail,pcent /var/lib/scylla | sed 1d | sed 's/G//g' | sed 's/%//'") | ||
size, used, avail, pcent = result.stdout.strip().split() | ||
return { | ||
'total': int(size), | ||
'used': int(used), | ||
'available': int(avail), | ||
'used_percent': int(pcent) | ||
} | ||
|
||
@staticmethod | ||
def get_max_disk_usage(nodes): | ||
max_used_percent = 0 | ||
max_used = 0 | ||
for node in nodes: | ||
info = DiskUtils.get_disk_info(node) | ||
max_used_percent = max(max_used_percent, info["used_percent"]) | ||
max_used = max(max_used, info["used"]) | ||
return max_used_percent, max_used | ||
|
||
@staticmethod | ||
def determine_storage_limit(nodes, target_percent): | ||
""" | ||
Calculate the target disk usage limit in GB that needs to be reached | ||
in the cluster. | ||
""" | ||
max_total = 0 | ||
for node in nodes: | ||
info = DiskUtils.get_disk_info(node) | ||
max_total = max(max_total, info['total']) | ||
|
||
target_used_size = (target_percent / 100) * max_total | ||
current_usage, current_used = DiskUtils.get_max_disk_usage(nodes) | ||
additional_usage_needed = target_used_size - current_used | ||
|
||
LOGGER.info(f"Current max disk usage: {current_usage:.2f}%") | ||
LOGGER.info(f"Current max used space: {current_used:.2f} GB") | ||
LOGGER.info(f"Max total disk space: {max_total:.2f} GB") | ||
LOGGER.info( | ||
f"Target harddisk limit to reach {target_percent}% or {target_used_size:.2f} GB") | ||
LOGGER.info( | ||
f"Additional space to be populated: {additional_usage_needed:.2f} GB") | ||
|
||
# Target disk usage in GB (either softlimit or hardlimit value) | ||
return target_used_size | ||
|
||
@staticmethod | ||
def log_disk_usage(nodes): | ||
for node in nodes: | ||
info = DiskUtils.get_disk_info(node) | ||
LOGGER.info(f"Disk usage for node {node.name}:") | ||
LOGGER.info(f" Total: {info['total']} GB") | ||
LOGGER.info(f" Used: {info['used']} GB") | ||
LOGGER.info(f" Available: {info['available']} GB") | ||
LOGGER.info(f" Used %: {info['used_percent']}%") | ||
|
||
|
||
class StressUtils(): | ||
def __init__(self, db_node, cluster_tester): | ||
self.db_node = db_node | ||
self.db_cluster = self.db_node.parent_cluster | ||
self.cluster_tester = cluster_tester | ||
self.large_ks_cnt = 0 | ||
self.small_ks_cnt = 0 | ||
cores = self.db_cluster.nodes[0].cpu_cores | ||
self.num_stress_threads = 8 if not cores else int(cores) * 8 | ||
|
||
def prepare_dataset_layout(self, dataset_size_gb, row_size=10240): | ||
n = dataset_size_gb * 1024 * 1024 * 1024 // row_size | ||
seq_end = n * 100 | ||
|
||
return f'cassandra-stress write cl=ONE n={n} -mode cql3 native -rate threads={self.num_stress_threads} ' \ | ||
f'-pop dist="uniform(1..{seq_end})" ' \ | ||
f'-col "size=FIXED({row_size}) n=FIXED(1)" ' \ | ||
f'-schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"' | ||
|
||
def run_stress_until_target(self, target_used_size, target_usage, hardlimit, softlimit): | ||
current_usage, current_used = DiskUtils.get_max_disk_usage( | ||
self.db_cluster.nodes) | ||
smaller_dataset = False | ||
|
||
space_needed = target_used_size - current_used | ||
# Calculate chunk size as 10% of space needed | ||
chunk_size = int(space_needed * 0.1) | ||
|
||
while current_used < target_used_size and current_usage < target_usage: | ||
# Write smaller dataset near the threshold (15% or 30GB of the target) | ||
smaller_dataset = (((target_used_size - current_used) < 30) or | ||
((target_usage - current_usage) <= (hardlimit - softlimit))) | ||
|
||
if not smaller_dataset: | ||
self.large_ks_cnt += 1 | ||
else: | ||
self.small_ks_cnt += 1 | ||
|
||
# Use 1GB chunks near threshold, otherwise use 10% of remaining space | ||
dataset_size_gb = 1 if smaller_dataset else chunk_size | ||
ks_name = "keyspace_small" if smaller_dataset else "keyspace_large" | ||
num = self.small_ks_cnt if smaller_dataset else self.large_ks_cnt | ||
|
||
LOGGER.info(f"Writing chunk of size: {dataset_size_gb} GB") | ||
stress_cmd = self.prepare_dataset_layout(dataset_size_gb) | ||
stress_queue = self.cluster_tester.run_stress_thread( | ||
stress_cmd=stress_cmd, | ||
keyspace_name=f"{ks_name}{num}", | ||
stress_num=1, | ||
keyspace_num=num | ||
) | ||
|
||
self.cluster_tester.verify_stress_thread( | ||
cs_thread_pool=stress_queue) | ||
self.cluster_tester.get_stress_results(queue=stress_queue) | ||
|
||
self.db_cluster.flush_all_nodes() | ||
# time.sleep(60) if smaller_dataset else time.sleep(600) | ||
|
||
current_usage, current_used = DiskUtils.get_max_disk_usage( | ||
self.db_cluster.nodes) | ||
LOGGER.info( | ||
f"Current max disk usage after writing to keyspace{num}: " | ||
f"{current_usage}% ({current_used} GB / {target_used_size} GB)" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
test_duration: 3600 | ||
n_db_nodes: 4 | ||
n_loaders: 1 | ||
n_monitor_nodes: 1 | ||
user_prefix: 'storage-utilization' | ||
instance_type_db: 'i4i.large' | ||
instance_provision: 'spot' | ||
|
||
enterprise_disable_kms: true | ||
scaling_action_type: "scale_in" | ||
diskusage_softlimit: 45 | ||
diskusage_hardlimit: 67 | ||
|
||
stress_cmd_w: 'cassandra-stress write duration=30m -rate threads=<THREADS_PLACE_HOLDER> "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"' | ||
stress_cmd_r: 'cassandra-stress read duration=30m -rate threads=<THREADS_PLACE_HOLDER> "throttle=1400/s" -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"' | ||
append_scylla_yaml: | ||
enable_tablets: true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
test_duration: 3600 | ||
n_db_nodes: 3 | ||
n_loaders: 1 | ||
n_monitor_nodes: 1 | ||
user_prefix: 'storage-utilization' | ||
instance_type_db: 'i4i.large' | ||
instance_provision: 'spot' | ||
add_node_cnt: 3 | ||
|
||
enterprise_disable_kms: true | ||
scaling_action_type: "scale_out" | ||
diskusage_softlimit: 70 | ||
diskusage_hardlimit: 90 | ||
|
||
stress_cmd_w: 'cassandra-stress write duration=30m -rate threads=<THREADS_PLACE_HOLDER> -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"' | ||
stress_cmd_r: 'cassandra-stress read duration=30m -rate threads=<THREADS_PLACE_HOLDER> -mode cql3 native -pop seq=1..5000000 -col "size=FIXED(10240) n=FIXED(1)" -schema "replication(strategy=NetworkTopologyStrategy,replication_factor=3)"' | ||
append_scylla_yaml: | ||
enable_tablets: true |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of this file just duplicates functions we already have in performance_regression.py and could be integrated directly there.
Not only it saves duplications but also benefits from all automations like decorators we have for them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @roydahan for pointing to this! @Lakshmipathi could you reuse the existing code where possible? And do not worry about the lost code you have to delete - I am sure it was great learning experience to write it and you will long benefit from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will check performance_regression.py and re-use them where-ever required. Less code is better :)