Skip to content

Commit

Permalink
fix(test): use replace
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksbykov committed Nov 22, 2024
1 parent 830a1d5 commit bd79c21
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 49 deletions.
141 changes: 102 additions & 39 deletions add_new_dc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
from typing import Tuple, List

from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement # pylint: disable=no-name-in-module
from cassandra.query import SimpleStatement
import pytest # pylint: disable=no-name-in-module

from longevity_test import LongevityTest
from sdcm.cluster import BaseNode
from sdcm.cluster import BaseNode, NodeSetupFailed, NodeSetupTimeout
from sdcm.stress_thread import CassandraStressThread
from sdcm.utils.common import skip_optional_stage
from sdcm.utils.decorators import optional_stage
from sdcm.utils.decorators import optional_stage, skip_on_capacity_issues
from sdcm.utils.replication_strategy_utils import NetworkTopologyReplicationStrategy
from sdcm.exceptions import ReadBarrierErrorException
from sdcm.utils.adaptive_timeouts import Operations, adaptive_timeout

warnings.filterwarnings(action="ignore", message="unclosed", category=ResourceWarning)

Expand Down Expand Up @@ -71,54 +74,77 @@ def test_add_new_dc_with_zero_nodes(self):
if not self.db_cluster.nodes[0].raft.is_consistent_topology_changes_enabled:
system_keyspaces.insert(0, "system_auth")

status = self.db_cluster.get_nodetool_status()
self.reconfigure_keyspaces_to_use_network_topology_strategy(
keyspaces=system_keyspaces,
replication_factors={dc: len(status[dc].keys()) for dc in status}
)
self.prewrite_db_with_data()
# new_node = self.add_zero_node_in_new_dc()

status = self.db_cluster.get_nodetool_status()
self.reconfigure_keyspaces_to_use_network_topology_strategy(
keyspaces=system_keyspaces,
keyspaces=system_keyspaces + ["keyspace1"],
replication_factors={dc: len(status[dc].keys()) for dc in status}
)
only_data_nodes = {}
for dc, node_status in status.items():
if dc not in only_data_nodes.keys():
only_data_nodes[dc] = []
for key, value in node_status.items():
if int(value["tokens"]) != 0:
only_data_nodes[dc].append(key)

self.reconfigure_keyspaces_to_use_network_topology_strategy(
keyspaces=["keyspace1"],
replication_factors={dc: len(value) for dc, value in only_data_nodes.items()}
)

# self.log.info("Running rebuild on each node in new DC")
# new_node.run_nodetool(sub_cmd=f"rebuild -- {list(status.keys())[0]}", publish_event=True)

self.log.info("Running repair on all nodes")
for node in self.db_cluster.nodes:
node.run_nodetool(sub_cmd="repair -pr", publish_event=True)

# self.verify_data_can_be_read_from_new_dc(new_node)
status = self.db_cluster.get_nodetool_status()
nodes_to_region = self.db_cluster.nodes_by_region(nodes=self.db_cluster.data_nodes)
regions = list(nodes_to_region.keys())
target_dc_name = regions[0]
alive_dc_name = regions[1]
for node in nodes_to_region[target_dc_name]:
node.stop_scylla()

node = nodes_to_region[alive_dc_name][0]
with pytest.raises(ReadBarrierErrorException):
node.raft.call_read_barrier()

for node in nodes_to_region[target_dc_name]:
node.start_scylla()

self.db_cluster.wait_all_nodes_un()

new_node = self.add_zero_node_in_new_dc()

status = self.db_cluster.get_nodetool_status()
node_host_ids = []

nodes_to_region = self.db_cluster.nodes_by_region(nodes=self.db_cluster.data_nodes)
for _, nodes in nodes_to_region.items():
for node in nodes:
node_host_ids.append(node.host_id)
node.stop_scylla()
break
node = self.db_cluster.data_nodes[-1]
result = node.raft.call_read_barrier()
assert result is not None, f"Read barrier call failed"

node.run_nodetool(
regions = list(nodes_to_region.keys())
target_dc_name = regions[0]

node_for_termination = []
for node in nodes_to_region[target_dc_name]:
node_host_ids.append(node.host_id)
node_for_termination.append(node)
node.stop_scylla()

new_node.raft.call_read_barrier()
new_node.run_nodetool(
sub_cmd=f"removenode {node_host_ids[0]} --ignore-dead-nodes {','.join(node_host_ids[1:])}")

self.replace_cluster_node(new_node,
node_host_ids[1],
nodes_to_region[target_dc_name][-1].dc_idx,
dead_node_hostids=node_host_ids[2])

self.replace_cluster_node(new_node,
node_host_ids[2],
nodes_to_region[target_dc_name][-1].dc_idx)

new_data_node = self.add_node_in_new_dc(nodes_to_region[target_dc_name][-1].dc_idx, 3)
for node in node_for_termination:
self.db_cluster.terminate_node(node)

self.db_cluster.wait_all_nodes_un()
status = self.db_cluster.get_nodetool_status()
self.log.info("Running rebuild on each node in restored DC")
new_data_node.run_nodetool(sub_cmd=f"rebuild -- {list(status.keys())[-1]}", publish_event=True)

self.log.info("Running repair on all nodes")
for node in self.db_cluster.nodes:
node.run_nodetool(sub_cmd="repair -pr", publish_event=True)

self.verify_data_can_be_read_from_new_dc(new_data_node)
self.log.info("Test completed.")

def reconfigure_keyspaces_to_use_network_topology_strategy(self, keyspaces: List[str], replication_factors: dict[str, int]) -> None:
Expand Down Expand Up @@ -147,7 +173,7 @@ def start_stress_during_adding_new_dc(self) -> Tuple[CassandraStressThread, Cass
self.log.info("Stress during adding DC started")
return read_thread, write_thread

def add_node_in_new_dc(self, dc_idx: int = 0) -> BaseNode:
def add_node_in_new_dc(self, dc_idx: int = 0, num_of_dc: int = 2) -> BaseNode:
self.log.info("Adding new node")
new_node = self.db_cluster.add_nodes(1, dc_idx=dc_idx, enable_auto_bootstrap=True)[0] # add node
self.db_cluster.wait_for_init(node_list=[new_node], timeout=900,
Expand All @@ -156,7 +182,7 @@ def add_node_in_new_dc(self, dc_idx: int = 0) -> BaseNode:
self.monitors.reconfigure_scylla_monitoring()

status = self.db_cluster.get_nodetool_status()
assert len(status.keys()) == 2, f"new datacenter was not registered. Cluster status: {status}"
assert len(status.keys()) == num_of_dc, f"new datacenter was not registered. Cluster status: {status}"
self.log.info("New DC to cluster has been added")
return new_node

Expand All @@ -177,7 +203,7 @@ def add_zero_node_in_new_dc(self) -> BaseNode:

@optional_stage('post_test_load')
def verify_data_can_be_read_from_new_dc(self, new_node: BaseNode) -> None:
self.log.info("Veryfing if data has been transferred successfully to the new DC")
self.log.info("Verifying if data has been transferred successfully to the new DC")
stress_cmd = self.params.get('verify_data_after_entire_test') + f" -node {new_node.ip_address}"
end_stress = self.run_stress_thread(stress_cmd=stress_cmd, stats_aggregate_cmds=False, round_robin=False)
self.verify_stress_thread(cs_thread_pool=end_stress)
Expand All @@ -190,3 +216,40 @@ def querying_new_node_should_return_no_data(self, new_node: BaseNode) -> None:
fetch_size=10)
data = session.execute(statement).one()
assert not data, f"no data should be returned when querying with CL=LOCAL_QUORUM and RF=0. {data}"

def replace_cluster_node(self, verification_node: BaseNode,
host_id: str | None = None,
dc_idx: int = 0,
dead_node_hostids: str = "",
timeout: int | float = 3600 * 8) -> BaseNode:
"""When old_node_ip or host_id are not None then replacement node procedure is initiated"""
self.log.info("Adding new node to cluster...")
new_node: BaseNode = skip_on_capacity_issues(self.db_cluster.add_nodes)(
count=1, dc_idx=dc_idx, enable_auto_bootstrap=True)[0]
self.monitors.reconfigure_scylla_monitoring()
with new_node.remote_scylla_yaml() as scylla_yaml:
scylla_yaml.ignore_dead_nodes_for_replace = dead_node_hostids
# since we need this logic before starting a node, and in `use_preinstalled_scylla: false` case
# scylla is not yet installed or target node was terminated, we should use an alive node without nemesis for version,
# it should be up and with scylla executable available

new_node.replacement_host_id = host_id

try:
with adaptive_timeout(Operations.NEW_NODE, node=verification_node, timeout=timeout):
self.db_cluster.wait_for_init(node_list=[new_node], timeout=timeout, check_node_health=False)
self.db_cluster.clean_replacement_node_options(new_node)
self.db_cluster.set_seeds()
self.db_cluster.update_seed_provider()
except (NodeSetupFailed, NodeSetupTimeout):
self.log.warning("TestConfig of the '%s' failed, removing it from list of nodes" % new_node)
self.db_cluster.nodes.remove(new_node)
self.log.warning("Node will not be terminated. Please terminate manually!!!")
raise

self.db_cluster.wait_for_nodes_up_and_normal(nodes=[new_node])
new_node.wait_node_fully_start()
with new_node.remote_scylla_yaml() as scylla_yaml:
scylla_yaml.ignore_dead_nodes_for_replace = ""

return new_node
5 changes: 5 additions & 0 deletions sdcm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,8 @@ class SstablesNotFound(Exception):

class CapacityReservationError(Exception):
pass


class ReadBarrierErrorException(Exception):
"""Raise exception if read barrier call failed"""
pass
1 change: 1 addition & 0 deletions sdcm/provision/scylla_yaml/scylla_yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ def set_authorizer(cls, authorizer: str):
replace_address: str = None # ""
replace_address_first_boot: str = None # ""
replace_node_first_boot: str = None # ""
ignore_dead_nodes_for_replace: str = None # ""
override_decommission: bool = None # False
enable_repair_based_node_ops: bool = None # True
# NOTE: example for disabling RBNO for 'bootstrap' and 'decommission' operations:
Expand Down
19 changes: 10 additions & 9 deletions sdcm/utils/raft/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import logging
import random
import json

from enum import Enum
from abc import ABC, abstractmethod
Expand All @@ -14,6 +15,7 @@
from sdcm.utils.health_checker import HealthEventsGenerator
from sdcm.wait import wait_for
from sdcm.rest.raft_api import RaftApi
from sdcm.exceptions import ReadBarrierErrorException


LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -357,7 +359,7 @@ def get_group0_id(self, session) -> str:
LOGGER.error(err_msg)
return ""

def call_read_barrier(self, timeout: int = 60) -> Any | None:
def call_read_barrier(self, timeout: int = 60):
""" Wait until the node applies all previously committed Raft entries
Any schema/topology changes are committed with Raft group0 on node. Before
Expand All @@ -371,14 +373,13 @@ def call_read_barrier(self, timeout: int = 60) -> Any | None:
with self._node.parent_cluster.cql_connection_patient_exclusive(node=self._node) as session:
raft_group0_id = self.get_group0_id(session)
assert raft_group0_id, "Group0 id was not found"
try:
api = RaftApi(self._node)
result = api.read_barrier(group_id=raft_group0_id, timeout=timeout)
LOGGER.debug("Api response %s", result)
return result
except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
LOGGER.error("Trigger read-barrier via rest api failed %s", exc)
return None
api = RaftApi(self._node)
result = api.read_barrier(group_id=raft_group0_id, timeout=timeout)
LOGGER.debug("Api response %s", result)
if not result:
return
status = json.loads(result)
raise ReadBarrierErrorException(f"Error code: {status['code']}, Error message: {status['message']}")


class NoRaft(RaftFeatureOperations):
Expand Down
2 changes: 1 addition & 1 deletion test-cases/features/add-new-dc-with-zero-nodes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ prepare_write_cmd: "cassandra-stress write cl=QUORUM n=20900 -schema 'replicati
stress_cmd: ["cassandra-stress read cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5",
"cassandra-stress write cl=LOCAL_QUORUM duration=20m -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5"
]
verify_data_after_entire_test: "cassandra-stress read cl=LOCAL_ONE n=20900 -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5"
verify_data_after_entire_test: "cassandra-stress read cl=LOCAL_QUORUM n=20900 -mode cql3 native -rate threads=8 -pop seq=1..20900 -col 'n=FIXED(10) size=FIXED(512)' -log interval=5"

n_db_nodes: '3 3 0' # make n_db_nodes configured as multi-dc with last dc set to 0 (so later easily new node can be added)
region_name: 'eu-west-1 eu-west-2 eu-north-1'
Expand Down

0 comments on commit bd79c21

Please sign in to comment.