diff --git a/add_new_dc_test.py b/add_new_dc_test.py index 9937474768e..4b29f6c2f07 100644 --- a/add_new_dc_test.py +++ b/add_new_dc_test.py @@ -2,14 +2,17 @@ from typing import Tuple, List from cassandra import ConsistencyLevel -from cassandra.query import SimpleStatement # pylint: disable=no-name-in-module +from cassandra.query import SimpleStatement +import pytest # pylint: disable=no-name-in-module from longevity_test import LongevityTest -from sdcm.cluster import BaseNode +from sdcm.cluster import BaseNode, NodeSetupFailed, NodeSetupTimeout from sdcm.stress_thread import CassandraStressThread from sdcm.utils.common import skip_optional_stage -from sdcm.utils.decorators import optional_stage +from sdcm.utils.decorators import optional_stage, skip_on_capacity_issues from sdcm.utils.replication_strategy_utils import NetworkTopologyReplicationStrategy +from sdcm.exceptions import ReadBarrierErrorException +from sdcm.utils.adaptive_timeouts import Operations, adaptive_timeout warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning) @@ -71,54 +74,77 @@ def test_add_new_dc_with_zero_nodes(self): if not self.db_cluster.nodes[0].raft.is_consistent_topology_changes_enabled: system_keyspaces.insert(0, "system_auth") - status = self.db_cluster.get_nodetool_status() - self.reconfigure_keyspaces_to_use_network_topology_strategy( - keyspaces=system_keyspaces, - replication_factors={dc: len(status[dc].keys()) for dc in status} - ) self.prewrite_db_with_data() - # new_node = self.add_zero_node_in_new_dc() + status = self.db_cluster.get_nodetool_status() self.reconfigure_keyspaces_to_use_network_topology_strategy( - keyspaces=system_keyspaces, + keyspaces=system_keyspaces + ["keyspace1"], replication_factors={dc: len(status[dc].keys()) for dc in status} ) - only_data_nodes = {} - for dc, node_status in status.items(): - if dc not in only_data_nodes.keys(): - only_data_nodes[dc] = [] - for key, value in node_status.items(): - if int(value["tokens"]) != 0: - only_data_nodes[dc].append(key) - - self.reconfigure_keyspaces_to_use_network_topology_strategy( - keyspaces=["keyspace1"], - replication_factors={dc: len(value) for dc, value in only_data_nodes.items()} - ) - - # self.log.info("Running rebuild on each node in new DC") - # new_node.run_nodetool(sub_cmd=f"rebuild -- {list(status.keys())[0]}", publish_event=True) self.log.info("Running repair on all nodes") for node in self.db_cluster.nodes: node.run_nodetool(sub_cmd="repair -pr", publish_event=True) - # self.verify_data_can_be_read_from_new_dc(new_node) + status = self.db_cluster.get_nodetool_status() + nodes_to_region = self.db_cluster.nodes_by_region(nodes=self.db_cluster.data_nodes) + regions = list(nodes_to_region.keys()) + target_dc_name = regions[0] + alive_dc_name = regions[1] + for node in nodes_to_region[target_dc_name]: + node.stop_scylla() + + node = nodes_to_region[alive_dc_name][0] + with pytest.raises(ReadBarrierErrorException): + node.raft.call_read_barrier() + + for node in nodes_to_region[target_dc_name]: + node.start_scylla() + + self.db_cluster.wait_all_nodes_un() + + new_node = self.add_zero_node_in_new_dc() + status = self.db_cluster.get_nodetool_status() node_host_ids = [] + nodes_to_region = self.db_cluster.nodes_by_region(nodes=self.db_cluster.data_nodes) - for _, nodes in nodes_to_region.items(): - for node in nodes: - node_host_ids.append(node.host_id) - node.stop_scylla() - break - node = self.db_cluster.data_nodes[-1] - result = node.raft.call_read_barrier() - assert result is not None, f"Read barrier call failed" - - node.run_nodetool( + regions = list(nodes_to_region.keys()) + target_dc_name = regions[0] + + node_for_termination = [] + for node in nodes_to_region[target_dc_name]: + node_host_ids.append(node.host_id) + node_for_termination.append(node) + node.stop_scylla() + + new_node.raft.call_read_barrier() + new_node.run_nodetool( sub_cmd=f"removenode {node_host_ids[0]} --ignore-dead-nodes {','.join(node_host_ids[1:])}") + self.replace_cluster_node(new_node, + node_host_ids[1], + nodes_to_region[target_dc_name][-1].dc_idx, + dead_node_hostids=node_host_ids[2]) + + self.replace_cluster_node(new_node, + node_host_ids[2], + nodes_to_region[target_dc_name][-1].dc_idx) + + new_data_node = self.add_node_in_new_dc(nodes_to_region[target_dc_name][-1].dc_idx, 3) + for node in node_for_termination: + self.db_cluster.terminate_node(node) + + self.db_cluster.wait_all_nodes_un() + status = self.db_cluster.get_nodetool_status() + self.log.info("Running rebuild on each node in restored DC") + new_data_node.run_nodetool(sub_cmd=f"rebuild -- {list(status.keys())[-1]}", publish_event=True) + + self.log.info("Running repair on all nodes") + for node in self.db_cluster.nodes: + node.run_nodetool(sub_cmd="repair -pr", publish_event=True) + + self.verify_data_can_be_read_from_new_dc(new_data_node) self.log.info("Test completed.") def reconfigure_keyspaces_to_use_network_topology_strategy(self, keyspaces: List[str], replication_factors: dict[str, int]) -> None: @@ -147,7 +173,7 @@ def start_stress_during_adding_new_dc(self) -> Tuple[CassandraStressThread, Cass self.log.info("Stress during adding DC started") return read_thread, write_thread - def add_node_in_new_dc(self, dc_idx: int = 0) -> BaseNode: + def add_node_in_new_dc(self, dc_idx: int = 0, num_of_dc: int = 2) -> BaseNode: self.log.info("Adding new node") new_node = self.db_cluster.add_nodes(1, dc_idx=dc_idx, enable_auto_bootstrap=True)[0] # add node self.db_cluster.wait_for_init(node_list=[new_node], timeout=900, @@ -156,7 +182,7 @@ def add_node_in_new_dc(self, dc_idx: int = 0) -> BaseNode: self.monitors.reconfigure_scylla_monitoring() status = self.db_cluster.get_nodetool_status() - assert len(status.keys()) == 2, f"new datacenter was not registered. Cluster status: {status}" + assert len(status.keys()) == num_of_dc, f"new datacenter was not registered. Cluster status: {status}" self.log.info("New DC to cluster has been added") return new_node @@ -177,7 +203,7 @@ def add_zero_node_in_new_dc(self) -> BaseNode: @optional_stage('post_test_load') def verify_data_can_be_read_from_new_dc(self, new_node: BaseNode) -> None: - self.log.info("Veryfing if data has been transferred successfully to the new DC") + self.log.info("Verifying if data has been transferred successfully to the new DC") stress_cmd = self.params.get('verify_data_after_entire_test') + f" -node {new_node.ip_address}" end_stress = self.run_stress_thread(stress_cmd=stress_cmd, stats_aggregate_cmds=False, round_robin=False) self.verify_stress_thread(cs_thread_pool=end_stress) @@ -190,3 +216,40 @@ def querying_new_node_should_return_no_data(self, new_node: BaseNode) -> None: fetch_size=10) data = session.execute(statement).one() assert not data, f"no data should be returned when querying with CL=LOCAL_QUORUM and RF=0. {data}" + + def replace_cluster_node(self, verification_node: BaseNode, + host_id: str | None = None, + dc_idx: int = 0, + dead_node_hostids: str = "", + timeout: int | float = 3600 * 8) -> BaseNode: + """When old_node_ip or host_id are not None then replacement node procedure is initiated""" + self.log.info("Adding new node to cluster...") + new_node: BaseNode = skip_on_capacity_issues(self.db_cluster.add_nodes)( + count=1, dc_idx=dc_idx, enable_auto_bootstrap=True)[0] + self.monitors.reconfigure_scylla_monitoring() + with new_node.remote_scylla_yaml() as scylla_yaml: + scylla_yaml.ignore_dead_nodes_for_replace = dead_node_hostids + # since we need this logic before starting a node, and in `use_preinstalled_scylla: false` case + # scylla is not yet installed or target node was terminated, we should use an alive node without nemesis for version, + # it should be up and with scylla executable available + + new_node.replacement_host_id = host_id + + try: + with adaptive_timeout(Operations.NEW_NODE, node=verification_node, timeout=timeout): + self.db_cluster.wait_for_init(node_list=[new_node], timeout=timeout, check_node_health=False) + self.db_cluster.clean_replacement_node_options(new_node) + self.db_cluster.set_seeds() + self.db_cluster.update_seed_provider() + except (NodeSetupFailed, NodeSetupTimeout): + self.log.warning("TestConfig of the '%s' failed, removing it from list of nodes" % new_node) + self.db_cluster.nodes.remove(new_node) + self.log.warning("Node will not be terminated. Please terminate manually!!!") + raise + + self.db_cluster.wait_for_nodes_up_and_normal(nodes=[new_node]) + new_node.wait_node_fully_start() + with new_node.remote_scylla_yaml() as scylla_yaml: + scylla_yaml.ignore_dead_nodes_for_replace = "" + + return new_node diff --git a/sdcm/exceptions.py b/sdcm/exceptions.py index 1c0475fb35b..0461b35a930 100644 --- a/sdcm/exceptions.py +++ b/sdcm/exceptions.py @@ -83,3 +83,8 @@ class SstablesNotFound(Exception): class CapacityReservationError(Exception): pass + + +class ReadBarrierErrorException(Exception): + """Raise exception if read barrier call failed""" + pass diff --git a/sdcm/provision/scylla_yaml/scylla_yaml.py b/sdcm/provision/scylla_yaml/scylla_yaml.py index 4d3845395e2..ede050a9797 100644 --- a/sdcm/provision/scylla_yaml/scylla_yaml.py +++ b/sdcm/provision/scylla_yaml/scylla_yaml.py @@ -265,6 +265,7 @@ def set_authorizer(cls, authorizer: str): replace_address: str = None # "" replace_address_first_boot: str = None # "" replace_node_first_boot: str = None # "" + ignore_dead_nodes_for_replace: str = None # "" override_decommission: bool = None # False enable_repair_based_node_ops: bool = None # True # NOTE: example for disabling RBNO for 'bootstrap' and 'decommission' operations: diff --git a/sdcm/utils/raft/__init__.py b/sdcm/utils/raft/__init__.py index 5c4001820ae..9b1de82bb5f 100644 --- a/sdcm/utils/raft/__init__.py +++ b/sdcm/utils/raft/__init__.py @@ -1,6 +1,7 @@ import contextlib import logging import random +import json from enum import Enum from abc import ABC, abstractmethod @@ -14,6 +15,7 @@ from sdcm.utils.health_checker import HealthEventsGenerator from sdcm.wait import wait_for from sdcm.rest.raft_api import RaftApi +from sdcm.exceptions import ReadBarrierErrorException LOGGER = logging.getLogger(__name__) @@ -357,7 +359,7 @@ def get_group0_id(self, session) -> str: LOGGER.error(err_msg) return "" - def call_read_barrier(self, timeout: int = 60) -> Any | None: + def call_read_barrier(self, timeout: int = 60): """ Wait until the node applies all previously committed Raft entries Any schema/topology changes are committed with Raft group0 on node. Before @@ -371,14 +373,13 @@ def call_read_barrier(self, timeout: int = 60) -> Any | None: with self._node.parent_cluster.cql_connection_patient_exclusive(node=self._node) as session: raft_group0_id = self.get_group0_id(session) assert raft_group0_id, "Group0 id was not found" - try: - api = RaftApi(self._node) - result = api.read_barrier(group_id=raft_group0_id, timeout=timeout) - LOGGER.debug("Api response %s", result) - return result - except Exception as exc: # pylint: disable=broad-except # noqa: BLE001 - LOGGER.error("Trigger read-barrier via rest api failed %s", exc) - return None + api = RaftApi(self._node) + result = api.read_barrier(group_id=raft_group0_id, timeout=timeout) + LOGGER.debug("Api response %s", result) + if not result: + return + status = json.loads(result) + raise ReadBarrierErrorException(f"Error code: {status['code']}, Error message: {status['message']}") class NoRaft(RaftFeatureOperations): diff --git a/test-cases/features/add-new-dc-with-zero-nodes.yaml b/test-cases/features/add-new-dc-with-zero-nodes.yaml index 24faa0727f1..327c2f6d0b4 100644 --- a/test-cases/features/add-new-dc-with-zero-nodes.yaml +++ b/test-cases/features/add-new-dc-with-zero-nodes.yaml @@ -3,7 +3,7 @@ prepare_write_cmd: "cassandra-stress write cl=QUORUM n=20900 -schema 'replicati stress_cmd: ["cassandra-stress read cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5", "cassandra-stress write cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5" ] -verify_data_after_entire_test: "cassandra-stress read cl=LOCAL_ONE n=20900 -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5" +verify_data_after_entire_test: "cassandra-stress read cl=LOCAL_QUORUM n=20900 -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5" n_db_nodes: '3 3 0' # make n_db_nodes configured as multi-dc with last dc set to 0 (so later easily new node can be added) region_name: 'eu-west-1 eu-west-2 eu-north-1'