Skip to content

Commit

Permalink
fix(FullScan): choose non-running_nemesis node
Browse files Browse the repository at this point in the history
this commit has the following changes

1 introduce common targed_node_lock mechanism
that can be used in nemesis and Scan operations

2 FullScan operation now run only on free of nemeses node

3 change all node.running_nemesis settings to use common methods
set/unset_running_nemesis from common targed_node_lock file (except unit tests)

4 change disrupt_rolling_restart_cluster nemesis to lock all nodes in
the cluster befo performing restart

fixes: scylladb#9284
  • Loading branch information
temichus committed Dec 11, 2024
1 parent d199206 commit 267cc0a
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 180 deletions.
6 changes: 2 additions & 4 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import itertools
import json
import ipaddress
from importlib import import_module
from typing import List, Optional, Dict, Union, Set, Iterable, ContextManager, Any, IO, AnyStr, Callable
from datetime import datetime, timezone
from textwrap import dedent
Expand Down Expand Up @@ -66,6 +65,7 @@
from sdcm.mgmt.common import get_manager_repo_from_defaults, get_manager_scylla_backend
from sdcm.prometheus import start_metrics_server, PrometheusAlertManagerListener, AlertSilencer
from sdcm.log import SDCMAdapter
from sdcm.target_node_lock import run_nemesis
from sdcm.provision.common.configuration_script import ConfigurationScriptBuilder
from sdcm.provision.common.utils import disable_daily_apt_triggers
from sdcm.provision.scylla_yaml import ScyllaYamlNodeAttrBuilder
Expand Down Expand Up @@ -4585,9 +4585,7 @@ def _rotate_kms_key(kms_key_alias_name, kms_key_rotation_interval, db_cluster):
message=f"Failed to rotate AWS KMS key for the '{kms_key_alias_name}' alias",
traceback=traceback.format_exc()).publish()
try:
nemesis_class = self.nemesis[0] if self.nemesis else getattr(
import_module('sdcm.nemesis'), "Nemesis")
with nemesis_class.run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node:
with run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node:
self.log.debug("Target node for 'rotate_kms_key' is %s", target_node.name)

ks_cf_list = db_cluster.get_non_system_ks_cf_list(
Expand Down
4 changes: 2 additions & 2 deletions sdcm/kcl_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from functools import cached_property
from typing import Dict

from sdcm.nemesis import Nemesis
from sdcm.target_node_lock import run_nemesis
from sdcm.stress_thread import DockerBasedStressThread
from sdcm.stress.base import format_stress_cmd_error
from sdcm.utils.docker_remote import RemoteDocker
Expand Down Expand Up @@ -132,7 +132,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx):
end_time = time.time() + self._timeout

while not self._stop_event.is_set():
with Nemesis.run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node:
with run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node:
node.run_nodetool('flush')

dst_size = node.get_cfstats(dst_table)['Number of partitions (estimate)']
Expand Down
48 changes: 16 additions & 32 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import itertools
import enum
from distutils.version import LooseVersion
from contextlib import ExitStack, contextmanager
from contextlib import ExitStack
from typing import Any, List, Optional, Type, Tuple, Callable, Dict, Set, Union, Iterable
from functools import wraps, partial
from collections import defaultdict, Counter, namedtuple
Expand Down Expand Up @@ -66,6 +66,8 @@
)
from sdcm.db_stats import PrometheusDBStats
from sdcm.log import SDCMAdapter
from sdcm.target_node_lock import run_nemesis, set_running_nemesis, unset_running_nemesis, \
NEMESIS_TARGET_SELECTION_LOCK, CantAcquireLockException
from sdcm.logcollector import save_kallsyms_map
from sdcm.mgmt.common import TaskStatus, ScyllaManagerError, get_persistent_snapshots
from sdcm.nemesis_publisher import NemesisElasticSearchPublisher
Expand Down Expand Up @@ -106,6 +108,7 @@
update_authenticator, ParallelObject,
ParallelObjectResult, sleep_for_percent_of_duration, get_views_of_base_table)
from sdcm.utils.features import is_tablets_feature_enabled
from sdcm.utils.nemesis_thread_safe_operations import safe_cluster_start_stop, safe_node_restart
from sdcm.utils.quota import configure_quota_on_node_for_scylla_user_context, is_quota_enabled_on_node, enable_quota_on_node, \
write_data_to_reach_end_of_quota
from sdcm.utils.compaction_ops import CompactionOps, StartStopCompactionArgs
Expand Down Expand Up @@ -172,7 +175,6 @@
"disrupt_terminate_kubernetes_host_then_decommission_and_add_scylla_node",
)

NEMESIS_TARGET_SELECTION_LOCK = Lock()
DISRUPT_POOL_PROPERTY_NAME = "target_pool"


Expand Down Expand Up @@ -328,24 +330,6 @@ def wrapper(self, *args, **kwargs):
setattr(cls, func.__name__, wrapper) # bind it to Nemesis class
return func # returning func means func can still be used normally

@staticmethod
@contextmanager
def run_nemesis(node_list: list['BaseNode'], nemesis_label: str):
"""
pick a node out of a `node_list`, and mark is as running_nemesis
for the duration of this context
"""
with NEMESIS_TARGET_SELECTION_LOCK:
free_nodes = [node for node in node_list if not node.running_nemesis]
assert free_nodes, f"couldn't find nodes for running:`{nemesis_label}`, are all nodes running nemesis ?"
node = random.choice(free_nodes)
node.running_nemesis = nemesis_label
try:
yield node
finally:
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None

def use_nemesis_seed(self):
if nemesis_seed := self.tester.params.get("nemesis_seed"):
random.seed(nemesis_seed)
Expand Down Expand Up @@ -379,14 +363,11 @@ def switch_target_node(self, node: BaseNode):
self.target_node = node

def set_current_running_nemesis(self, node):
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = self.current_disruption
set_running_nemesis(node, self.current_disruption)

@staticmethod
def unset_current_running_nemesis(node):
if node is not None:
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None
unset_running_nemesis(node)

def set_target_node_pool_type(self, pool_type: NEMESIS_TARGET_POOLS = NEMESIS_TARGET_POOLS.data_nodes):
"""Set pool type to choose nodes for target node """
Expand Down Expand Up @@ -449,10 +430,10 @@ def set_target_node(self, dc_idx: Optional[int] = None, rack: Optional[int] = No
self.target_node = random.choice(nodes)

if current_disruption:
self.target_node.running_nemesis = current_disruption
set_running_nemesis(self.target_node, current_disruption)
self.set_current_disruption(current_disruption)
elif self.current_disruption:
self.target_node.running_nemesis = self.current_disruption
set_running_nemesis(self.target_node, self.current_disruption)
else:
raise ValueError("current_disruption is not set")
self.log.info('Current Target: %s with running nemesis: %s',
Expand Down Expand Up @@ -980,7 +961,10 @@ def disrupt_soft_reboot_node(self):

@decorate_with_context(ignore_ycsb_connection_refused)
def disrupt_rolling_restart_cluster(self, random_order=False):
self.cluster.restart_scylla(random_order=random_order)
try:
safe_cluster_start_stop(self.target_node.running_nemesis, self.cluster)
except CantAcquireLockException as e:
UnsupportedNemesis(e)

def disrupt_switch_between_password_authenticator_and_saslauthd_authenticator_and_back(self):
"""
Expand Down Expand Up @@ -3076,7 +3060,7 @@ def execute_data_validation_thread(command_template, keyspace_name, number_of_ro
f'Schema restoration of {chosen_snapshot_tag} has failed!'

with ignore_ycsb_connection_refused():
self.cluster.restart_scylla() # After schema restoration, you should restart the nodes
safe_cluster_start_stop(self.target_node.running_nemesis, self.cluster)
self.tester.set_ks_strategy_to_network_and_rf_according_to_cluster(
keyspace=chosen_snapshot_info["keyspace_name"], repair_after_alter=False)

Expand Down Expand Up @@ -4055,7 +4039,7 @@ def decommission_post_action():
terminate_pattern.timeout):
stack.enter_context(expected_start_failed_context)
with ignore_stream_mutation_fragments_errors(), ignore_raft_topology_cmd_failing(), \
self.run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \
run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \
FailedDecommissionOperationMonitoring(target_node=self.target_node,
verification_node=verification_node,
timeout=full_operations_timeout):
Expand Down Expand Up @@ -4372,7 +4356,7 @@ def _enable_disable_table_encryption(self, enable_kms_key_rotation, additional_s
}
is_restart_needed = True
if is_restart_needed:
node.restart_scylla()
safe_node_restart(self.target_node.running_nemesis, node)

# Create table with encryption
keyspace_name, table_name = self.cluster.get_test_keyspaces()[0], 'tmp_encrypted_table'
Expand Down Expand Up @@ -5258,7 +5242,7 @@ def disrupt_bootstrap_streaming_error(self):
decommission_timeout = 7200
monitoring_decommission_timeout = decommission_timeout + 100
un_nodes = self.cluster.get_nodes_up_and_normal()
with Nemesis.run_nemesis(node_list=un_nodes, nemesis_label="BootstrapStreaminError") as verification_node, \
with run_nemesis(node_list=un_nodes, nemesis_label="BootstrapStreaminError") as verification_node, \
FailedDecommissionOperationMonitoring(target_node=new_node, verification_node=verification_node,
timeout=monitoring_decommission_timeout):

Expand Down
Loading

0 comments on commit 267cc0a

Please sign in to comment.