Skip to content

Commit

Permalink
fix(FullScan): choose only non-disruptive node
Browse files Browse the repository at this point in the history
this commit has the following changes

1 introduce acommon targed_node_lock mechanism
that can be used in nemesis and Scan operations

2 common run_nemesis wrapper can provide a node
that is not under disruptive_nemesis together with providing a node with no nemesis.
This will allow non-disruptive operations to pick the same node

3 change all node.running_nemesis settings to use common methods
set/unset_running_nemesis from common targed_node_lock file (except unit tests)

fixes: scylladb#9284
  • Loading branch information
temichus committed Nov 26, 2024
1 parent 57e5dd0 commit fc4727f
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 161 deletions.
6 changes: 3 additions & 3 deletions sdcm/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
from sdcm.mgmt.common import get_manager_repo_from_defaults, get_manager_scylla_backend
from sdcm.prometheus import start_metrics_server, PrometheusAlertManagerListener, AlertSilencer
from sdcm.log import SDCMAdapter
from sdcm.target_node_lock import run_nemesis
from sdcm.provision.common.configuration_script import ConfigurationScriptBuilder
from sdcm.provision.common.utils import disable_daily_apt_triggers
from sdcm.provision.scylla_yaml import ScyllaYamlNodeAttrBuilder
Expand Down Expand Up @@ -292,6 +293,7 @@ def __init__(self, name, parent_cluster, ssh_login_info=None, base_logdir=None,
self.lock = threading.Lock()

self.running_nemesis = None
self.is_under_disruptive_nemesis = False

# We should disable bootstrap when we create nodes to establish the cluster,
# if we want to add more nodes when the cluster already exists, then we should
Expand Down Expand Up @@ -4579,9 +4581,7 @@ def _rotate_kms_key(kms_key_alias_name, kms_key_rotation_interval, db_cluster):
message=f"Failed to rotate AWS KMS key for the '{kms_key_alias_name}' alias",
traceback=traceback.format_exc()).publish()
try:
nemesis_class = self.nemesis[0] if self.nemesis else getattr(
import_module('sdcm.nemesis'), "Nemesis")
with nemesis_class.run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node:
with run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node:
self.log.debug("Target node for 'rotate_kms_key' is %s", target_node.name)

ks_cf_list = db_cluster.get_non_system_ks_cf_list(
Expand Down
4 changes: 2 additions & 2 deletions sdcm/kcl_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from functools import cached_property
from typing import Dict

from sdcm.nemesis import Nemesis
from sdcm.target_node_lock import run_nemesis
from sdcm.stress_thread import DockerBasedStressThread
from sdcm.stress.base import format_stress_cmd_error
from sdcm.utils.docker_remote import RemoteDocker
Expand Down Expand Up @@ -132,7 +132,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx):
end_time = time.time() + self._timeout

while not self._stop_event.is_set():
with Nemesis.run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node:
with run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node:
node.run_nodetool('flush')

dst_size = node.get_cfstats(dst_table)['Number of partitions (estimate)']
Expand Down
36 changes: 7 additions & 29 deletions sdcm/nemesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
)
from sdcm.db_stats import PrometheusDBStats
from sdcm.log import SDCMAdapter
from sdcm.target_node_lock import run_nemesis, set_running_nemesis, unset_running_nemesis, NEMESIS_TARGET_SELECTION_LOCK
from sdcm.logcollector import save_kallsyms_map
from sdcm.mgmt.common import TaskStatus, ScyllaManagerError, get_persistent_snapshots
from sdcm.nemesis_publisher import NemesisElasticSearchPublisher
Expand Down Expand Up @@ -170,8 +171,6 @@
"disrupt_terminate_kubernetes_host_then_decommission_and_add_scylla_node",
)

NEMESIS_TARGET_SELECTION_LOCK = Lock()


class DefaultValue: # pylint: disable=too-few-public-methods
"""
Expand Down Expand Up @@ -337,24 +336,6 @@ def wrapper(self, *args, **kwargs):
setattr(cls, func.__name__, wrapper) # bind it to Nemesis class
return func # returning func means func can still be used normally

@staticmethod
@contextmanager
def run_nemesis(node_list: list['BaseNode'], nemesis_label: str):
"""
pick a node out of a `node_list`, and mark is as running_nemesis
for the duration of this context
"""
with NEMESIS_TARGET_SELECTION_LOCK:
free_nodes = [node for node in node_list if not node.running_nemesis]
assert free_nodes, f"couldn't find nodes for running:`{nemesis_label}`, are all nodes running nemesis ?"
node = random.choice(free_nodes)
node.running_nemesis = nemesis_label
try:
yield node
finally:
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None

def use_nemesis_seed(self):
if nemesis_seed := self.tester.params.get("nemesis_seed"):
random.seed(nemesis_seed)
Expand All @@ -381,14 +362,11 @@ def publish_event(self, disrupt, status=True, data=None):
DisruptionEvent(nemesis_name=disrupt, severity=severity, **data).publish()

def set_current_running_nemesis(self, node):
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = self.current_disruption
set_running_nemesis(node, self.current_disruption, self.disruptive)

@staticmethod
def unset_current_running_nemesis(node):
if node is not None:
with NEMESIS_TARGET_SELECTION_LOCK:
node.running_nemesis = None
unset_running_nemesis(node)

def set_target_node_pool(self, nodelist: list[BaseNode] | None = None):
"""Set pool of nodes to choose target node """
Expand Down Expand Up @@ -453,10 +431,10 @@ def set_target_node(self, dc_idx: Optional[int] = None, rack: Optional[int] = No
self.target_node = random.choice(nodes)

if current_disruption:
self.target_node.running_nemesis = current_disruption
set_running_nemesis(self.target_node, current_disruption, self.disruptive)
self.set_current_disruption(current_disruption)
elif self.current_disruption:
self.target_node.running_nemesis = self.current_disruption
set_running_nemesis(self.target_node, self.current_disruption, self.disruptive)
else:
raise ValueError("current_disruption is not set")
self.log.info('Current Target: %s with running nemesis: %s',
Expand Down Expand Up @@ -4046,7 +4024,7 @@ def decommission_post_action():
terminate_pattern.timeout):
stack.enter_context(expected_start_failed_context)
with ignore_stream_mutation_fragments_errors(), ignore_raft_topology_cmd_failing(), \
self.run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \
run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \
FailedDecommissionOperationMonitoring(target_node=self.target_node,
verification_node=verification_node,
timeout=full_operations_timeout):
Expand Down Expand Up @@ -5240,7 +5218,7 @@ def disrupt_bootstrap_streaming_error(self):
decommission_timeout = 7200
monitoring_decommission_timeout = decommission_timeout + 100
un_nodes = self.cluster.get_nodes_up_and_normal()
with Nemesis.run_nemesis(node_list=un_nodes, nemesis_label="BootstrapStreaminError") as verification_node, \
with run_nemesis(node_list=un_nodes, nemesis_label="BootstrapStreaminError") as verification_node, \
FailedDecommissionOperationMonitoring(target_node=new_node, verification_node=verification_node,
timeout=monitoring_decommission_timeout):

Expand Down
Loading

0 comments on commit fc4727f

Please sign in to comment.