From 3eb3f140bfeeca796f6094a3320eed4c568467b0 Mon Sep 17 00:00:00 2001 From: Artsiom Mishuta Date: Tue, 26 Nov 2024 13:18:01 +0100 Subject: [PATCH] fix(FullScan): choose non-running_nemesis node this commit has the following changes 1 introduce common targed_node_lock mechanism that can be used in nemesis and Scan operations 2 FullScan operation now run only on free of nemeses node 3 change all node.running_nemesis settings to use common methods set/unset_running_nemesis from common targed_node_lock file (except unit tests) 4 change disrupt_rolling_restart_cluster nemesis to lock all nodes in the cluster befo performing restart fixes: #9284 --- sdcm/cluster.py | 6 +- sdcm/kcl_thread.py | 4 +- sdcm/nemesis.py | 46 ++-- sdcm/scan_operation_thread.py | 260 +++++++++++------------ sdcm/target_node_lock.py | 97 +++++++++ unit_tests/test_nemesis.py | 2 + unit_tests/test_scan_operation_thread.py | 9 +- 7 files changed, 245 insertions(+), 179 deletions(-) create mode 100644 sdcm/target_node_lock.py diff --git a/sdcm/cluster.py b/sdcm/cluster.py index 6a309c5618..178dbe6873 100644 --- a/sdcm/cluster.py +++ b/sdcm/cluster.py @@ -32,7 +32,6 @@ import itertools import json import ipaddress -from importlib import import_module from typing import List, Optional, Dict, Union, Set, Iterable, ContextManager, Any, IO, AnyStr, Callable from datetime import datetime, timezone from textwrap import dedent @@ -66,6 +65,7 @@ from sdcm.mgmt.common import get_manager_repo_from_defaults, get_manager_scylla_backend from sdcm.prometheus import start_metrics_server, PrometheusAlertManagerListener, AlertSilencer from sdcm.log import SDCMAdapter +from sdcm.target_node_lock import run_nemesis from sdcm.provision.common.configuration_script import ConfigurationScriptBuilder from sdcm.provision.common.utils import disable_daily_apt_triggers from sdcm.provision.scylla_yaml import ScyllaYamlNodeAttrBuilder @@ -4579,9 +4579,7 @@ def _rotate_kms_key(kms_key_alias_name, kms_key_rotation_interval, db_cluster): message=f"Failed to rotate AWS KMS key for the '{kms_key_alias_name}' alias", traceback=traceback.format_exc()).publish() try: - nemesis_class = self.nemesis[0] if self.nemesis else getattr( - import_module('sdcm.nemesis'), "Nemesis") - with nemesis_class.run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node: + with run_nemesis(node_list=db_cluster.data_nodes, nemesis_label="KMS encryption check") as target_node: self.log.debug("Target node for 'rotate_kms_key' is %s", target_node.name) ks_cf_list = db_cluster.get_non_system_ks_cf_list( diff --git a/sdcm/kcl_thread.py b/sdcm/kcl_thread.py index fb372c63f0..3f09a582ce 100644 --- a/sdcm/kcl_thread.py +++ b/sdcm/kcl_thread.py @@ -21,7 +21,7 @@ from functools import cached_property from typing import Dict -from sdcm.nemesis import Nemesis +from sdcm.target_node_lock import run_nemesis from sdcm.stress_thread import DockerBasedStressThread from sdcm.stress.base import format_stress_cmd_error from sdcm.utils.docker_remote import RemoteDocker @@ -132,7 +132,7 @@ def _run_stress(self, loader, loader_idx, cpu_idx): end_time = time.time() + self._timeout while not self._stop_event.is_set(): - with Nemesis.run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node: + with run_nemesis(node_list=self.node_list, nemesis_label="Compare tables size by cf-stats") as node: node.run_nodetool('flush') dst_size = node.get_cfstats(dst_table)['Number of partitions (estimate)'] diff --git a/sdcm/nemesis.py b/sdcm/nemesis.py index 9130fb57e5..5231decb8b 100644 --- a/sdcm/nemesis.py +++ b/sdcm/nemesis.py @@ -30,7 +30,7 @@ import json import itertools from distutils.version import LooseVersion -from contextlib import ExitStack, contextmanager +from contextlib import ExitStack from typing import Any, List, Optional, Type, Tuple, Callable, Dict, Set, Union, Iterable from functools import wraps, partial from collections import defaultdict, Counter, namedtuple @@ -65,6 +65,8 @@ ) from sdcm.db_stats import PrometheusDBStats from sdcm.log import SDCMAdapter +from sdcm.target_node_lock import run_nemesis, set_running_nemesis, unset_running_nemesis, \ + NEMESIS_TARGET_SELECTION_LOCK, lock_node, CantAcquireLockException from sdcm.logcollector import save_kallsyms_map from sdcm.mgmt.common import TaskStatus, ScyllaManagerError, get_persistent_snapshots from sdcm.nemesis_publisher import NemesisElasticSearchPublisher @@ -170,8 +172,6 @@ "disrupt_terminate_kubernetes_host_then_decommission_and_add_scylla_node", ) -NEMESIS_TARGET_SELECTION_LOCK = Lock() - class DefaultValue: # pylint: disable=too-few-public-methods """ @@ -337,24 +337,6 @@ def wrapper(self, *args, **kwargs): setattr(cls, func.__name__, wrapper) # bind it to Nemesis class return func # returning func means func can still be used normally - @staticmethod - @contextmanager - def run_nemesis(node_list: list['BaseNode'], nemesis_label: str): - """ - pick a node out of a `node_list`, and mark is as running_nemesis - for the duration of this context - """ - with NEMESIS_TARGET_SELECTION_LOCK: - free_nodes = [node for node in node_list if not node.running_nemesis] - assert free_nodes, f"couldn't find nodes for running:`{nemesis_label}`, are all nodes running nemesis ?" - node = random.choice(free_nodes) - node.running_nemesis = nemesis_label - try: - yield node - finally: - with NEMESIS_TARGET_SELECTION_LOCK: - node.running_nemesis = None - def use_nemesis_seed(self): if nemesis_seed := self.tester.params.get("nemesis_seed"): random.seed(nemesis_seed) @@ -381,14 +363,11 @@ def publish_event(self, disrupt, status=True, data=None): DisruptionEvent(nemesis_name=disrupt, severity=severity, **data).publish() def set_current_running_nemesis(self, node): - with NEMESIS_TARGET_SELECTION_LOCK: - node.running_nemesis = self.current_disruption + set_running_nemesis(node, self.current_disruption) @staticmethod def unset_current_running_nemesis(node): - if node is not None: - with NEMESIS_TARGET_SELECTION_LOCK: - node.running_nemesis = None + unset_running_nemesis(node) def set_target_node_pool(self, nodelist: list[BaseNode] | None = None): """Set pool of nodes to choose target node """ @@ -453,10 +432,10 @@ def set_target_node(self, dc_idx: Optional[int] = None, rack: Optional[int] = No self.target_node = random.choice(nodes) if current_disruption: - self.target_node.running_nemesis = current_disruption + set_running_nemesis(self.target_node, current_disruption) self.set_current_disruption(current_disruption) elif self.current_disruption: - self.target_node.running_nemesis = self.current_disruption + set_running_nemesis(self.target_node, self.current_disruption) else: raise ValueError("current_disruption is not set") self.log.info('Current Target: %s with running nemesis: %s', @@ -984,7 +963,12 @@ def disrupt_soft_reboot_node(self): @decorate_with_context(ignore_ycsb_connection_refused) def disrupt_rolling_restart_cluster(self, random_order=False): - self.cluster.restart_scylla(random_order=random_order) + try: + for node in self.cluster.nodes: + with lock_node(node, self.target_node.running_nemesis, timeout=3000): + self.cluster.restart_scylla(nodes=[node], random_order=random_order) + except CantAcquireLockException as e: + UnsupportedNemesis(e) def disrupt_switch_between_password_authenticator_and_saslauthd_authenticator_and_back(self): """ @@ -4046,7 +4030,7 @@ def decommission_post_action(): terminate_pattern.timeout): stack.enter_context(expected_start_failed_context) with ignore_stream_mutation_fragments_errors(), ignore_raft_topology_cmd_failing(), \ - self.run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \ + run_nemesis(node_list=self.cluster.data_nodes, nemesis_label="DecommissionStreamingErr") as verification_node, \ FailedDecommissionOperationMonitoring(target_node=self.target_node, verification_node=verification_node, timeout=full_operations_timeout): @@ -5240,7 +5224,7 @@ def disrupt_bootstrap_streaming_error(self): decommission_timeout = 7200 monitoring_decommission_timeout = decommission_timeout + 100 un_nodes = self.cluster.get_nodes_up_and_normal() - with Nemesis.run_nemesis(node_list=un_nodes, nemesis_label="BootstrapStreaminError") as verification_node, \ + with run_nemesis(node_list=un_nodes, nemesis_label="BootstrapStreaminError") as verification_node, \ FailedDecommissionOperationMonitoring(target_node=new_node, verification_node=verification_node, timeout=monitoring_decommission_timeout): diff --git a/sdcm/scan_operation_thread.py b/sdcm/scan_operation_thread.py index fa046a1500..22f2ef61d5 100644 --- a/sdcm/scan_operation_thread.py +++ b/sdcm/scan_operation_thread.py @@ -10,7 +10,7 @@ from pathlib import Path from abc import abstractmethod from string import Template -from typing import Optional, Type, NamedTuple, TYPE_CHECKING +from typing import Optional, Type, NamedTuple from pytz import utc from cassandra import ConsistencyLevel, OperationTimedOut, ReadTimeout @@ -18,6 +18,7 @@ from cassandra.query import SimpleStatement # pylint: disable=no-name-in-module from sdcm.remote import LocalCmdRunner +from sdcm.target_node_lock import run_nemesis from sdcm.sct_events import Severity from sdcm.sct_events.database import FullScanEvent, FullPartitionScanReversedOrderEvent, FullPartitionScanEvent, \ FullScanAggregateEvent @@ -27,8 +28,6 @@ from sdcm.test_config import TestConfig from sdcm.utils.decorators import retrying, Retry -if TYPE_CHECKING: - from sdcm.cluster import BaseNode ERROR_SUBSTRINGS = ("timed out", "unpack requires", "timeout", 'Host has been marked down or removed') BYPASS_CACHE_VALUES = [" BYPASS CACHE", ""] @@ -104,13 +103,9 @@ def __init__(self, generator: random.Random, thread_params: ThreadParams, thread self.log.info("FullscanOperationBase scan_event: %s", self.scan_event) self.termination_event = self.fullscan_params.termination_event self.generator = generator - self.db_node = self._get_random_node() self.current_operation_stat = None self.log.info("FullscanOperationBase init finished") - def _get_random_node(self) -> BaseNode: - return self.generator.choice(self.fullscan_params.db_cluster.data_nodes) - @abstractmethod def randomly_form_cql_statement(self) -> str: ... @@ -132,51 +127,49 @@ def run_scan_event(self, cmd: str, | FullPartitionScanReversedOrderEvent] = None) -> OneOperationStat: scan_event = scan_event or self.scan_event cmd = cmd or self.randomly_form_cql_statement() - with scan_event(node=self.db_node.name, ks_cf=self.fullscan_params.ks_cf, message=f"Will run command {cmd}", - user=self.fullscan_params.user, - password=self.fullscan_params.user_password) as scan_op_event: - - self.current_operation_stat = OneOperationStat( - op_type=scan_op_event.__class__.__name__, - nemesis_at_start=self.db_node.running_nemesis, - cmd=cmd - ) - with self.fullscan_params.db_cluster.cql_connection_patient( - node=self.db_node, - connect_timeout=300, - user=self.fullscan_params.user, - password=self.fullscan_params.user_password) as session: - try: - scan_op_event.message = '' - start_time = time.time() - result = self.execute_query(session=session, cmd=cmd, event=scan_op_event) - if result: - self.fetch_result_pages(result=result, read_pages=self.fullscan_stats.read_pages) - if not scan_op_event.message: - scan_op_event.message = f"{type(self).__name__} operation ended successfully" - except Exception as exc: # pylint: disable=broad-except # noqa: BLE001 - self.log.error(traceback.format_exc()) - msg = repr(exc) - self.current_operation_stat.exceptions.append(repr(exc)) - msg = f"{msg} while running " \ - f"Nemesis: {self.db_node.running_nemesis}" if self.db_node.running_nemesis else msg - scan_op_event.message = msg - - if self.db_node.running_nemesis or any(s in msg.lower() for s in ERROR_SUBSTRINGS): - scan_op_event.severity = Severity.WARNING - else: - scan_op_event.severity = Severity.ERROR - finally: - duration = time.time() - start_time - self.fullscan_stats.time_elapsed += duration - self.fullscan_stats.scans_counter += 1 - self.current_operation_stat.nemesis_at_end = self.db_node.running_nemesis - self.current_operation_stat.duration = duration - # success is True if there were no exceptions - self.current_operation_stat.success = not bool(self.current_operation_stat.exceptions) - self.update_stats(self.current_operation_stat) - return self.current_operation_stat # pylint: disable=lost-exception + with run_nemesis(self.fullscan_params.db_cluster.data_nodes, f'run_scan: {cmd}') as db_node: + with scan_event(node=db_node.name, ks_cf=self.fullscan_params.ks_cf, message=f"Will run command {cmd}", + user=self.fullscan_params.user, + password=self.fullscan_params.user_password) as scan_op_event: + self.current_operation_stat = OneOperationStat( + op_type=scan_op_event.__class__.__name__, + nemesis_at_start=db_node.running_nemesis, + cmd=cmd + ) + + with self.fullscan_params.db_cluster.cql_connection_patient( + node=db_node, + connect_timeout=3000, + user=self.fullscan_params.user, + password=self.fullscan_params.user_password) as session: + try: + scan_op_event.message = '' + start_time = time.time() + result = self.execute_query(session=session, cmd=cmd, event=scan_op_event) + if result: + self.fetch_result_pages(result=result, read_pages=self.fullscan_stats.read_pages) + if not scan_op_event.message: + scan_op_event.message = f"{type(self).__name__} operation ended successfully" + except Exception as exc: # pylint: disable=broad-except # noqa: BLE001 + self.log.error(traceback.format_exc()) + msg = repr(exc) + self.current_operation_stat.exceptions.append(repr(exc)) + scan_op_event.message = msg + if any(s in msg.lower() for s in ERROR_SUBSTRINGS): + scan_op_event.severity = Severity.WARNING + else: + scan_op_event.severity = Severity.ERROR + finally: + duration = time.time() - start_time + self.fullscan_stats.time_elapsed += duration + self.fullscan_stats.scans_counter += 1 + self.current_operation_stat.nemesis_at_end = db_node.running_nemesis + self.current_operation_stat.duration = duration + # success is True if there were no exceptions + self.current_operation_stat.success = not bool(self.current_operation_stat.exceptions) + self.update_stats(self.current_operation_stat) + return self.current_operation_stat # pylint: disable=lost-exception def update_stats(self, new_stat): self.fullscan_stats.stats.append(new_stat) @@ -238,19 +231,19 @@ def __init__(self, generator, **kwargs): encoding='utf-8') def get_table_clustering_order(self) -> str: - node = self._get_random_node() - try: - with self.fullscan_params.db_cluster.cql_connection_patient(node=node, connect_timeout=300) as session: - # Using CL ONE. No need for a quorum since querying a constant fixed attribute of a table. - session.default_consistency_level = ConsistencyLevel.ONE - return get_table_clustering_order(ks_cf=self.fullscan_params.ks_cf, - ck_name=self.fullscan_params.ck_name, session=session) - except Exception as error: # pylint: disable=broad-except # noqa: BLE001 - self.log.error(traceback.format_exc()) - self.log.error('Failed getting table %s clustering order through node %s : %s', - self.fullscan_params.ks_cf, node.name, - error) - raise Exception('Failed getting table clustering order from all db nodes') + with run_nemesis(self.fullscan_params.db_cluster.data_nodes, 'get_table_clustering_order') as node: + try: + with self.fullscan_params.db_cluster.cql_connection_patient(node=node, connect_timeout=3000) as session: + # Using CL ONE. No need for a quorum since querying a constant fixed attribute of a table. + session.default_consistency_level = ConsistencyLevel.ONE + return get_table_clustering_order(ks_cf=self.fullscan_params.ks_cf, + ck_name=self.fullscan_params.ck_name, session=session) + except Exception as error: # pylint: disable=broad-except # noqa: BLE001 + self.log.error(traceback.format_exc()) + self.log.error('Failed getting table %s clustering order through node %s : %s', + self.fullscan_params.ks_cf, node.name, + error) + raise Exception('Failed getting table clustering order from all db nodes') def randomly_form_cql_statement(self) -> Optional[tuple[str, str]]: # pylint: disable=too-many-branches """ @@ -263,75 +256,74 @@ def randomly_form_cql_statement(self) -> Optional[tuple[str, str]]: # pylint: d 3) Add a random CK filter with random row values. :return: a CQL reversed-query """ - db_node = self._get_random_node() - - with self.fullscan_params.db_cluster.cql_connection_patient( - node=db_node, connect_timeout=300) as session: - ck_random_min_value = self.generator.randint(a=1, b=self.fullscan_params.rows_count) - ck_random_max_value = self.generator.randint(a=ck_random_min_value, b=self.fullscan_params.rows_count) - self.ck_filter = ck_filter = self.generator.choice(list(self.reversed_query_filter_ck_by.keys())) - - if pks := get_partition_keys(ks_cf=self.fullscan_params.ks_cf, session=session, pk_name=self.fullscan_params.pk_name): - partition_key = self.generator.choice(pks) - # Form a random query out of all options, like: - # select * from scylla_bench.test where pk = 1234 and ck < 4721 and ck > 2549 order by ck desc - # limit 3467 bypass cache - selected_columns = [self.fullscan_params.pk_name, self.fullscan_params.ck_name] - if self.fullscan_params.include_data_column: - selected_columns.append(self.fullscan_params.data_column_name) - reversed_query = f'select {",".join(selected_columns)} from {self.fullscan_params.ks_cf}' + \ - f' where {self.fullscan_params.pk_name} = {partition_key}' - query_suffix = self.limit = '' - # Randomly add CK filtering ( less-than / greater-than / both / non-filter ) - - # example: rows-count = 20, ck > 10, ck < 15, limit = 3 ==> ck_range = [11..14] = 4 - # ==> limit < ck_range - # reversed query is: select * from scylla_bench.test where pk = 1 and ck > 10 - # order by ck desc limit 5 - # normal query should be: select * from scylla_bench.test where pk = 1 and ck > 15 limit 5 - match ck_filter: - case 'lt_and_gt': - # Example: select * from scylla_bench.test where pk = 1 and ck > 10 and ck < 15 order by ck desc - reversed_query += self.reversed_query_filter_ck_by[ck_filter].format( - self.fullscan_params.ck_name, - ck_random_max_value, - self.fullscan_params.ck_name, - ck_random_min_value - ) - - case 'gt': - # example: rows-count = 20, ck > 10, limit = 5 ==> ck_range = 20 - 10 = 10 ==> limit < ck_range - # reversed query is: select * from scylla_bench.test where pk = 1 and ck > 10 - # order by ck desc limit 5 - # normal query should be: select * from scylla_bench.test where pk = 1 and ck > 15 limit 5 - reversed_query += self.reversed_query_filter_ck_by[ck_filter].format( - self.fullscan_params.ck_name, - ck_random_min_value - ) - - case 'lt': - # example: rows-count = 20, ck < 10, limit = 5 ==> limit < ck_random_min_value (ck_range) - # reversed query is: select * from scylla_bench.test where pk = 1 and ck < 10 - # order by ck desc limit 5 - # normal query should be: select * from scylla_bench.test where pk = 1 and ck >= 5 limit 5 - reversed_query += self.reversed_query_filter_ck_by[ck_filter].format( - self.fullscan_params.ck_name, - ck_random_min_value - ) - - query_suffix = f"{query_suffix} {self.generator.choice(BYPASS_CACHE_VALUES)}" - normal_query = reversed_query + query_suffix - if random.choice([False] + [True]): # Randomly add a LIMIT - self.limit = random.randint(a=1, b=self.fullscan_params.rows_count) - query_suffix = f' limit {self.limit}' + query_suffix - reversed_query += f' order by {self.fullscan_params.ck_name} {self.reversed_order}' + query_suffix - self.log.debug('Randomly formed normal query is: %s', normal_query) - self.log.debug('[scan: %s, type: %s] Randomly formed reversed query is: %s', self.fullscan_stats.scans_counter, - ck_filter, reversed_query) - else: - self.log.debug('No partition keys found for table: %s! A reversed query cannot be executed!', - self.fullscan_params.ks_cf) - return None + with run_nemesis(self.fullscan_params.db_cluster.data_nodes, 'randomly_form_cql_statement') as db_node: + with self.fullscan_params.db_cluster.cql_connection_patient( + node=db_node, connect_timeout=3000) as session: + ck_random_min_value = self.generator.randint(a=1, b=self.fullscan_params.rows_count) + ck_random_max_value = self.generator.randint(a=ck_random_min_value, b=self.fullscan_params.rows_count) + self.ck_filter = ck_filter = self.generator.choice(list(self.reversed_query_filter_ck_by.keys())) + + if pks := get_partition_keys(ks_cf=self.fullscan_params.ks_cf, session=session, pk_name=self.fullscan_params.pk_name): + partition_key = self.generator.choice(pks) + # Form a random query out of all options, like: + # select * from scylla_bench.test where pk = 1234 and ck < 4721 and ck > 2549 order by ck desc + # limit 3467 bypass cache + selected_columns = [self.fullscan_params.pk_name, self.fullscan_params.ck_name] + if self.fullscan_params.include_data_column: + selected_columns.append(self.fullscan_params.data_column_name) + reversed_query = f'select {",".join(selected_columns)} from {self.fullscan_params.ks_cf}' + \ + f' where {self.fullscan_params.pk_name} = {partition_key}' + query_suffix = self.limit = '' + # Randomly add CK filtering ( less-than / greater-than / both / non-filter ) + + # example: rows-count = 20, ck > 10, ck < 15, limit = 3 ==> ck_range = [11..14] = 4 + # ==> limit < ck_range + # reversed query is: select * from scylla_bench.test where pk = 1 and ck > 10 + # order by ck desc limit 5 + # normal query should be: select * from scylla_bench.test where pk = 1 and ck > 15 limit 5 + match ck_filter: + case 'lt_and_gt': + # Example: select * from scylla_bench.test where pk = 1 and ck > 10 and ck < 15 order by ck desc + reversed_query += self.reversed_query_filter_ck_by[ck_filter].format( + self.fullscan_params.ck_name, + ck_random_max_value, + self.fullscan_params.ck_name, + ck_random_min_value + ) + + case 'gt': + # example: rows-count = 20, ck > 10, limit = 5 ==> ck_range = 20 - 10 = 10 ==> limit < ck_range + # reversed query is: select * from scylla_bench.test where pk = 1 and ck > 10 + # order by ck desc limit 5 + # normal query should be: select * from scylla_bench.test where pk = 1 and ck > 15 limit 5 + reversed_query += self.reversed_query_filter_ck_by[ck_filter].format( + self.fullscan_params.ck_name, + ck_random_min_value + ) + + case 'lt': + # example: rows-count = 20, ck < 10, limit = 5 ==> limit < ck_random_min_value (ck_range) + # reversed query is: select * from scylla_bench.test where pk = 1 and ck < 10 + # order by ck desc limit 5 + # normal query should be: select * from scylla_bench.test where pk = 1 and ck >= 5 limit 5 + reversed_query += self.reversed_query_filter_ck_by[ck_filter].format( + self.fullscan_params.ck_name, + ck_random_min_value + ) + + query_suffix = f"{query_suffix} {self.generator.choice(BYPASS_CACHE_VALUES)}" + normal_query = reversed_query + query_suffix + if random.choice([False] + [True]): # Randomly add a LIMIT + self.limit = random.randint(a=1, b=self.fullscan_params.rows_count) + query_suffix = f' limit {self.limit}' + query_suffix + reversed_query += f' order by {self.fullscan_params.ck_name} {self.reversed_order}' + query_suffix + self.log.debug('Randomly formed normal query is: %s', normal_query) + self.log.debug('[scan: %s, type: %s] Randomly formed reversed query is: %s', self.fullscan_stats.scans_counter, + ck_filter, reversed_query) + else: + self.log.debug('No partition keys found for table: %s! A reversed query cannot be executed!', + self.fullscan_params.ks_cf) + return None return normal_query, reversed_query def fetch_result_pages(self, result: ResponseFuture, read_pages): @@ -418,7 +410,6 @@ def run_scan_operation(self, cmd: str = None): # pylint: disable=too-many-local full_partition_op_stat = OneOperationStat( op_type=self.__class__.__name__, - nemesis_at_start=self.db_node.running_nemesis, cmd=str(queries) ) @@ -434,7 +425,6 @@ def run_scan_operation(self, cmd: str = None): # pylint: disable=too-many-local self.scan_event = FullPartitionScanEvent regular_op_stat = self.run_scan_event(cmd=normal_query, scan_event=self.scan_event) comparison_result = self._compare_output_files() - full_partition_op_stat.nemesis_at_end = self.db_node.running_nemesis full_partition_op_stat.exceptions.append(regular_op_stat.exceptions) full_partition_op_stat.exceptions.append(reversed_op_stat.exceptions) if comparison_result and not full_partition_op_stat.exceptions: diff --git a/sdcm/target_node_lock.py b/sdcm/target_node_lock.py new file mode 100644 index 0000000000..501362e827 --- /dev/null +++ b/sdcm/target_node_lock.py @@ -0,0 +1,97 @@ +from __future__ import annotations +from threading import RLock +from contextlib import contextmanager, ExitStack +import random +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from sdcm.cluster import BaseNode + +# this lock must use when you are checking id nod free or not(when calling node.lock.locked()) +# so you can mace sure that of node free, only ou you about in and at the acquire() call node still will be free +# for acquire() node that already locked NEMESIS_TARGET_SELECTION_LOCK should not be used +NEMESIS_TARGET_SELECTION_LOCK = RLock() + +# this lock need to support current nemesis logic in "set_target_node "when nemesis call release any lock rather than releasing only own locks +NEMESIS_TARGET_RELEASE_LOCK = RLock() + + +# the following 2 Exception just to avoid using wide AssertionError +class CantAcquireLockException(Exception): + pass + + +class CantReleaseLockException(Exception): + pass + + +class AlreadyAcquireLockException(Exception): + pass + + +def unset_running_nemesis(node, nemesis=None, raise_on_nemesis_mismatching=True): + # unlock node + with NEMESIS_TARGET_RELEASE_LOCK: + if node is not None and node.lock.locked(): + if nemesis and nemesis != node.running_nemesis and raise_on_nemesis_mismatching: + raise CantReleaseLockException( + f"node locked by another nemesis, expected {nemesis}, got {node.running_nemesis}") + node.lock.release() + node.running_nemesis = None + + +def set_running_nemesis(node, nemesis, timeout=30, raise_on_nemesis_already_acquire_lock=True): + # use NEMESIS_TARGET_SELECTION_LOCK in case of locking free node + # if nade are lock, release NEMESIS_TARGET_SELECTION_LOCK and wait for lock by acquire + with NEMESIS_TARGET_SELECTION_LOCK: + if not node.lock.locked(): + #node is free + assert node.lock.acquire(timeout=5) + node.running_nemesis = nemesis + elif node.running_nemesis != nemesis: + # node locked by another nemesis + if not node.lock.acquire(timeout=timeout): + raise CantAcquireLockException(f"cant lock node within given timeout: {timeout}") + node.running_nemesis = nemesis + elif raise_on_nemesis_already_acquire_lock: + # node already locked by this nemesis + raise AlreadyAcquireLockException(f"lock already acquired by nemesis name '{nemesis}'") + + +@contextmanager +def run_nemesis(node_list: list[BaseNode], nemesis_label: str): + """ + pick a free node out of a `node_list`, and lock it + for the duration of this context + """ + with NEMESIS_TARGET_SELECTION_LOCK: + free_nodes = [node for node in node_list if not node.lock.locked()] + assert free_nodes, f"couldn't find nodes for running:`{nemesis_label}`, are all nodes running nemesis ?" + node = random.choice(free_nodes) + set_running_nemesis(node, nemesis_label) + try: + yield node + finally: + unset_running_nemesis(node, nemesis_label) + + +@contextmanager +def lock_node(node: BaseNode, nemesis_label: str, timeout=30): + try: + set_running_nemesis(node, nemesis_label, timeout) + try: + yield + finally: + unset_running_nemesis(node, nemesis_label) + except AlreadyAcquireLockException: + #lock already Acquire somewhere else, do not acquire/release + yield + except CantAcquireLockException: + unset_running_nemesis(node, nemesis_label, False) + raise + + +@contextmanager +def lock_nodes(nodelist, nemesis_label: str, timeout=30): + with ExitStack() as stack: + [stack.enter_context(lock_node(x, nemesis_label, timeout)) for x in nodelist] + yield \ No newline at end of file diff --git a/unit_tests/test_nemesis.py b/unit_tests/test_nemesis.py index 63ffaebfa9..9043f9a977 100644 --- a/unit_tests/test_nemesis.py +++ b/unit_tests/test_nemesis.py @@ -1,5 +1,6 @@ import inspect import logging +import threading from dataclasses import dataclass, field import pytest @@ -22,6 +23,7 @@ @dataclass class Node: running_nemesis = None + lock = threading.Lock() public_ip_address: str = '127.0.0.1' name: str = 'Node1' diff --git a/unit_tests/test_scan_operation_thread.py b/unit_tests/test_scan_operation_thread.py index ad29b7ae90..a98f6825b6 100644 --- a/unit_tests/test_scan_operation_thread.py +++ b/unit_tests/test_scan_operation_thread.py @@ -251,18 +251,13 @@ def execute_async(*args, **kwargs): raise Exception("Exception") -@pytest.mark.parametrize(("running_nemesis", 'severity'), [[True, 'WARNING'], [False, 'ERROR']]) @pytest.mark.parametrize(('mode', 'execute_mock'), [ ['partition', 'execute_async'], ['aggregate', 'execute'], ['table', 'execute']]) -def test_scan_negative_exception(mode, severity, running_nemesis, execute_mock, events, node): +def test_scan_negative_exception(mode, execute_mock, events, node): # pylint: disable=redefined-outer-name # pylint: disable=too-many-arguments - if running_nemesis: - node.running_nemesis = MagicMock() - else: - node.running_nemesis = None if execute_mock == 'execute_async': connection = ExecuteAsyncExceptionMockCqlConnectionPatient() else: @@ -279,4 +274,4 @@ def test_scan_negative_exception(mode, severity, running_nemesis, execute_mock, ScanOperationThread(default_params)._run_next_operation() # pylint: disable=protected-access all_events = get_event_log_file(events) assert "Severity.NORMAL" in all_events[0] and "period_type=begin" in all_events[0] - assert f"Severity.{severity}" in all_events[1] and "period_type=end" in all_events[1] + assert f"Severity.ERROR" in all_events[1] and "period_type=end" in all_events[1]