Skip to content

Commit

Permalink
Add integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sylwiaszunejko committed Aug 14, 2023
1 parent 5edc5da commit 4ba3fc6
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 10 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,10 @@ jobs:

- name: Test with pytest
run: |
export SCYLLA_VERSION='release:5.1'
./ci/run_integration_test.sh tests/integration/standard/ tests/integration/cqlengine/
- name: Test tablets
run: |
export SCYLLA_VERSION='unstable/master:2023-07-31T05:54:06Z'
./ci/run_integration_test.sh tests/integration/experiments/
9 changes: 2 additions & 7 deletions ci/run_integration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ if (( aio_max_nr != aio_max_nr_recommended_value )); then
fi
fi

SCYLLA_RELEASE='release:5.1'

python3 -m venv .test-venv
source .test-venv/bin/activate
pip install -U pip wheel setuptools
Expand All @@ -33,14 +31,11 @@ pip install https://github.com/scylladb/scylla-ccm/archive/master.zip

# download version

ccm create scylla-driver-temp -n 1 --scylla --version ${SCYLLA_RELEASE}
ccm create scylla-driver-temp -n 1 --scylla --version ${SCYLLA_VERSION}
ccm remove

# run test

echo "export SCYLLA_VERSION=${SCYLLA_RELEASE}"
echo "PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=asyncio pytest --import-mode append tests/integration/standard/"
export SCYLLA_VERSION=${SCYLLA_RELEASE}
echo "PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=asyncio pytest --import-mode append $*"
export MAPPED_SCYLLA_VERSION=3.11.4
PROTOCOL_VERSION=4 EVENT_LOOP_MANAGER=libev pytest -rf --import-mode append $*

10 changes: 7 additions & 3 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,8 @@ def _id_and_mark(f):
# 1. unittest doesn't skip setUpClass when used on class and we need it sometimes
# 2. unittest doesn't have conditional xfail, and I prefer to use pytest than custom decorator
# 3. unittest doesn't have a reason argument, so you don't see the reason in pytest report
requires_collection_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None and Version(SCYLLA_VERSION.split(':')[1]) < Version('5.2'),
# TODO remove second check when we stop using unstable version in CI for tablets
requires_collection_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None and (len(SCYLLA_VERSION.split('/')) != 0 or Version(SCYLLA_VERSION.split(':')[1]) < Version('5.2')),
reason='Scylla supports collection indexes from 5.2 onwards')
requires_custom_indexes = pytest.mark.skipif(SCYLLA_VERSION is not None,
reason='Scylla does not support SASI or any other CUSTOM INDEX class')
Expand Down Expand Up @@ -501,7 +502,7 @@ def start_cluster_wait_for_up(cluster):


def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None, set_keyspace=True, ccm_options=None,
configuration_options=None, dse_options=None, use_single_interface=USE_SINGLE_INTERFACE):
configuration_options=None, dse_options=None, use_single_interface=USE_SINGLE_INTERFACE, use_tablets=False):
configuration_options = configuration_options or {}
dse_options = dse_options or {}
workloads = workloads or []
Expand Down Expand Up @@ -611,7 +612,10 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None,
# CDC is causing an issue (can't start cluster with multiple seeds)
# Selecting only features we need for tests, i.e. anything but CDC.
CCM_CLUSTER = CCMScyllaCluster(path, cluster_name, **ccm_options)
CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf'], 'start_native_transport': True})
if use_tablets:
CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf', 'consistent-topology-changes', 'tablets'], 'start_native_transport': True})
else:
CCM_CLUSTER.set_configuration_options({'experimental_features': ['lwt', 'udf'], 'start_native_transport': True})
else:
CCM_CLUSTER = CCMCluster(path, cluster_name, **ccm_options)
CCM_CLUSTER.set_configuration_options({'start_native_transport': True})
Expand Down
148 changes: 148 additions & 0 deletions tests/integration/experiments/test_tablets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import time
import unittest
import pytest
import os
from cassandra.cluster import Cluster
from cassandra.policies import ConstantReconnectionPolicy, RoundRobinPolicy, TokenAwarePolicy

from tests.integration import PROTOCOL_VERSION, use_cluster
from tests.unit.test_host_connection_pool import LOGGER

def setup_module():
use_cluster('tablets', [3], start=True, use_tablets=True)

class TestTabletsIntegration(unittest.TestCase):
@classmethod
def setup_class(cls):
cls.cluster = Cluster(contact_points=["127.0.0.1", "127.0.0.2", "127.0.0.3"], protocol_version=PROTOCOL_VERSION,
load_balancing_policy=TokenAwarePolicy(RoundRobinPolicy()),
reconnection_policy=ConstantReconnectionPolicy(1), experimental_tablet_feature_enabled=True, experimental_tablet_refresh_time=1)
cls.session = cls.cluster.connect()
cls.create_ks_and_cf(cls)
cls.create_data(cls.session)

@classmethod
def teardown_class(cls):
cls.cluster.shutdown()

def verify_same_host_in_tracing(self, results):
traces = results.get_query_trace()
events = traces.events
host_set = set()
for event in events:
LOGGER.info("TRACE EVENT: %s %s %s", event.source, event.thread_name, event.description)
host_set.add(event.source)

self.assertEqual(len(host_set), 1)
self.assertIn('querying locally', "\n".join([event.description for event in events]))

trace_id = results.response_future.get_query_trace_ids()[0]
traces = self.session.execute("SELECT * FROM system_traces.events WHERE session_id = %s", (trace_id,))
events = [event for event in traces]
host_set = set()
for event in events:
LOGGER.info("TRACE EVENT: %s %s", event.source, event.activity)
host_set.add(event.source)

self.assertEqual(len(host_set), 1)
self.assertIn('querying locally', "\n".join([event.activity for event in events]))

def verify_same_shard_in_tracing(self, results):
traces = results.get_query_trace()
events = traces.events
shard_set = set()
for event in events:
LOGGER.info("TRACE EVENT: %s %s %s", event.source, event.thread_name, event.description)
shard_set.add(event.thread_name)

self.assertEqual(len(shard_set), 1)
self.assertIn('querying locally', "\n".join([event.description for event in events]))

trace_id = results.response_future.get_query_trace_ids()[0]
traces = self.session.execute("SELECT * FROM system_traces.events WHERE session_id = %s", (trace_id,))
events = [event for event in traces]
shard_set = set()
for event in events:
LOGGER.info("TRACE EVENT: %s %s", event.thread, event.activity)
shard_set.add(event.thread)

self.assertEqual(len(shard_set), 1)
self.assertIn('querying locally', "\n".join([event.activity for event in events]))

def create_ks_and_cf(self):
self.session.execute(
"""
DROP KEYSPACE IF EXISTS test1
"""
)
self.session.execute(
"""
CREATE KEYSPACE test1
WITH replication = {
'class': 'NetworkTopologyStrategy',
'replication_factor': 1,
'initial_tablets': 8
}
""")

self.session.execute(
"""
CREATE TABLE test1.table1 (pk int, ck int, v int, PRIMARY KEY (pk, ck));
""")

@staticmethod
def create_data(session):
prepared = session.prepare(
"""
INSERT INTO test1.table1 (pk, ck, v) VALUES (?, ?, ?)
""")

for i in range(50):
bound = prepared.bind((i, i%5, i%2))
session.execute(bound)

def query_data_shard(self, session, verify_in_tracing=True):
prepared = session.prepare(
"""
SELECT pk, ck, v FROM test1.table1 WHERE pk = ?
""")

bound = prepared.bind([(2)])
results = session.execute(bound, trace=True)
self.assertEqual(results, [(2, 2, 0)])
if verify_in_tracing:
self.verify_same_shard_in_tracing(results)

def query_data_host(self, session, verify_in_tracing=True):
prepared = session.prepare(
"""
SELECT pk, ck, v FROM test1.table1 WHERE pk = ?
""")

bound = prepared.bind([(2)])
results = session.execute(bound, trace=True)
self.assertEqual(results, [(2, 2, 0)])
if verify_in_tracing:
self.verify_same_host_in_tracing(results)

def test_tablets(self):
self.query_data_host(self.session)

def test_tablets_shard_awareness(self):
self.query_data_shard(self.session)

def test_tablets_refresh(self):
tablets1 = self.cluster.metadata._tablets._tablets

self.session.execute(
"""
CREATE TABLE test1.table2 (pk int, ck int, v int, PRIMARY KEY (pk, ck));
""")

time.sleep(2)

tablets2 = self.cluster.metadata._tablets._tablets

self.assertTrue(len(tablets1) < len(tablets2))


3 changes: 3 additions & 0 deletions tests/unit/test_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,9 @@ class FakeCluster:
def __init__(self):
self.metadata = Mock(spec=Metadata)

def check_tablets_enabled(self):
return False

def test_get_distance(self):
"""
Same test as DCAwareRoundRobinPolicyTest.test_get_distance()
Expand Down

0 comments on commit 4ba3fc6

Please sign in to comment.