diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index c16a7a8279..b15468769f 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -21,4 +21,10 @@ jobs: - name: Test with pytest run: | + export SCYLLA_VERSION='release:5.1' ./ci/run_integration_test.sh tests/integration/standard/ tests/integration/cqlengine/ + + - name: Test tablets + run: | + export SCYLLA_VERSION='unstable/master:2023-07-31T05:54:06Z' + ./ci/run_integration_test.sh tests/integration/experiments/ diff --git a/ci/run_integration_test.sh b/ci/run_integration_test.sh index 4bcf4df1e1..6bdf9a5014 100755 --- a/ci/run_integration_test.sh +++ b/ci/run_integration_test.sh @@ -15,8 +15,6 @@ if (( aio_max_nr != aio_max_nr_recommended_value )); then fi fi -SCYLLA_RELEASE='release:5.1' - python3 -m venv .test-venv source .test-venv/bin/activate pip install -U pip wheel setuptools @@ -33,14 +31,11 @@ pip install https://github.com/scylladb/scylla-ccm/archive/master.zip # download version -ccm create scylla-driver-temp -n 1 --scylla --version ${SCYLLA_RELEASE} +ccm create scylla-driver-temp -n 1 --scylla --version ${SCYLLA_VERSION} ccm remove # run test -echo "export SCYLLA_VERSION=${SCYLLA_RELEASE}" -echo "PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=asyncio pytest --import-mode append tests/integration/standard/" -export SCYLLA_VERSION=${SCYLLA_RELEASE} +echo "PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=asyncio pytest --import-mode append $*" export MAPPED_SCYLLA_VERSION=3.11.4 PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=libev pytest -rf --import-mode append $* - diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index cc85289881..f10c534fa5 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -372,7 +372,8 @@ def _id_and_mark(f): # 1. unittest doesn't skip setUpClass when used on class and we need it sometimes # 2. unittest doesn't have conditional xfail, and I prefer to use pytest than custom decorator # 3. unittest doesn't have a reason argument, so you don't see the reason in pytest report -requires_collection_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None and Version(SCYLLA_VERSION.split(':')[1]) < Version('5.2'), +# TODO remove second check when we stop using unstable version in CI for tablets +requires_collection_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None and (len(SCYLLA_VERSION.split('/')) != 0 or Version(SCYLLA_VERSION.split(':')[1]) < Version('5.2')), reason='Scylla supports collection indexes from 5.2 onwards') requires_custom_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None, reason='Scylla does not support SASI or any other CUSTOM INDEX class') @@ -501,7 +502,7 @@ def start_cluster_wait_for_up(cluster): def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None, set_keyspace=True, ccm_options=None, - configuration_options=None, dse_options=None, use_single_interface=USE_SINGLE_INTERFACE): + configuration_options=None, dse_options=None, use_single_interface=USE_SINGLE_INTERFACE, use_tablets=False): configuration_options = configuration_options or {} dse_options = dse_options or {} workloads = workloads or [] @@ -611,7 +612,10 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None, # CDC is causing an issue (can't start cluster with multiple seeds) # Selecting only features we need for tests, i.e. anything but CDC. CCM_CLUSTER = CCMScyllaCluster(path, cluster_name, **ccm_options) - CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf'], 'start_native_transport': True}) + if use_tablets: + CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf', 'consistent-topology-changes', 'tablets'], 'start_native_transport': True}) + else: + CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf'], 'start_native_transport': True}) else: CCM_CLUSTER = CCMCluster(path, cluster_name, **ccm_options) CCM_CLUSTER.set_configuration_options({'start_native_transport': True}) diff --git a/tests/integration/experiments/test_tablets.py b/tests/integration/experiments/test_tablets.py new file mode 100644 index 0000000000..52e462aa4a --- /dev/null +++ b/tests/integration/experiments/test_tablets.py @@ -0,0 +1,148 @@ +import time +import unittest +import pytest +import os +from cassandra.cluster import Cluster +from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy + +from tests.integration import PROTOCOL_VERSION, use_cluster +from tests.unit.test_host_connection_pool import LOGGER + +def setup_module(): + use_cluster('tablets', [3], start=True, use_tablets=True) + +class TestTabletsIntegration(unittest.TestCase): + @classmethod + def setup_class(cls): + cls.cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], protocol_version=PROTOCOL_VERSION, + load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()), + reconnection_policy=ConstantReconnectionPolicy(1), experimental_tablet_feature_enabled=True, experimental_tablet_refresh_time=1) + cls.session = cls.cluster.connect() + cls.create_ks_and_cf(cls) + cls.create_data(cls.session) + + @classmethod + def teardown_class(cls): + cls.cluster.shutdown() + + def verify_same_host_in_tracing(self, results): + traces = results.get_query_trace() + events = traces.events + host_set = set() + for event in events: + LOGGER.info("TRACE EVENT: %s %s %s", event.source, event.thread_name, event.description) + host_set.add(event.source) + + self.assertEqual(len(host_set), 1) + self.assertIn('querying locally', "\n".join([event.description for event in events])) + + trace_id = results.response_future.get_query_trace_ids()[0] + traces = self.session.execute("SELECT * FROM system_traces.events WHERE session_id = %s", (trace_id,)) + events = [event for event in traces] + host_set = set() + for event in events: + LOGGER.info("TRACE EVENT: %s %s", event.source, event.activity) + host_set.add(event.source) + + self.assertEqual(len(host_set), 1) + self.assertIn('querying locally', "\n".join([event.activity for event in events])) + + def verify_same_shard_in_tracing(self, results): + traces = results.get_query_trace() + events = traces.events + shard_set = set() + for event in events: + LOGGER.info("TRACE EVENT: %s %s %s", event.source, event.thread_name, event.description) + shard_set.add(event.thread_name) + + self.assertEqual(len(shard_set), 1) + self.assertIn('querying locally', "\n".join([event.description for event in events])) + + trace_id = results.response_future.get_query_trace_ids()[0] + traces = self.session.execute("SELECT * FROM system_traces.events WHERE session_id = %s", (trace_id,)) + events = [event for event in traces] + shard_set = set() + for event in events: + LOGGER.info("TRACE EVENT: %s %s", event.thread, event.activity) + shard_set.add(event.thread) + + self.assertEqual(len(shard_set), 1) + self.assertIn('querying locally', "\n".join([event.activity for event in events])) + + def create_ks_and_cf(self): + self.session.execute( + """ + DROP KEYSPACE IF EXISTS test1 + """ + ) + self.session.execute( + """ + CREATE KEYSPACE test1 + WITH replication = { + 'class': 'NetworkTopologyStrategy', + 'replication_factor': 1, + 'initial_tablets': 8 + } + """) + + self.session.execute( + """ + CREATE TABLE test1.table1 (pk int, ck int, v int, PRIMARY KEY (pk, ck)); + """) + + @staticmethod + def create_data(session): + prepared = session.prepare( + """ + INSERT INTO test1.table1 (pk, ck, v) VALUES (?, ?, ?) + """) + + for i in range(50): + bound = prepared.bind((i, i%5, i%2)) + session.execute(bound) + + def query_data_shard(self, session, verify_in_tracing=True): + prepared = session.prepare( + """ + SELECT pk, ck, v FROM test1.table1 WHERE pk = ? + """) + + bound = prepared.bind([(2)]) + results = session.execute(bound, trace=True) + self.assertEqual(results, [(2, 2, 0)]) + if verify_in_tracing: + self.verify_same_shard_in_tracing(results) + + def query_data_host(self, session, verify_in_tracing=True): + prepared = session.prepare( + """ + SELECT pk, ck, v FROM test1.table1 WHERE pk = ? + """) + + bound = prepared.bind([(2)]) + results = session.execute(bound, trace=True) + self.assertEqual(results, [(2, 2, 0)]) + if verify_in_tracing: + self.verify_same_host_in_tracing(results) + + def test_tablets(self): + self.query_data_host(self.session) + + def test_tablets_shard_awareness(self): + self.query_data_shard(self.session) + + def test_tablets_refresh(self): + tablets1 = self.cluster.metadata._tablets._tablets + + self.session.execute( + """ + CREATE TABLE test1.table2 (pk int, ck int, v int, PRIMARY KEY (pk, ck)); + """) + + time.sleep(2) + + tablets2 = self.cluster.metadata._tablets._tablets + + self.assertTrue(len(tablets1) < len(tablets2)) + + diff --git a/tests/unit/test_policies.py b/tests/unit/test_policies.py index ccfead0bc9..7fb7f56439 100644 --- a/tests/unit/test_policies.py +++ b/tests/unit/test_policies.py @@ -602,6 +602,9 @@ class FakeCluster: def __init__(self): self.metadata = Mock(spec=Metadata) + def check_tablets_enabled(self): + return False + def test_get_distance(self): """ Same test as DCAwareRoundRobinPolicyTest.test_get_distance()