Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove tablet test tag and run all integration tests on 3 node cluster #403

Merged
merged 1 commit into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,3 @@ jobs:
- run: sudo sh -c "echo 2097152 >> /proc/sys/fs/aio-max-nr"
- run: ./integration.sh integration
- run: ./integration.sh ccm
- run: ./integration.sh tablet
90 changes: 13 additions & 77 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@ import (
)

var (
flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
flagMultiNodeCluster = flag.String("multiCluster", "127.0.0.2", "a comma-separated list of host:port tuples")
flagProto = flag.Int("proto", 0, "protcol version")
flagCQL = flag.String("cql", "3.0.0", "CQL version")
flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
flagRetry = flag.Int("retries", 5, "number of times to retry queries")
flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test")
flagRunAuthTest = flag.Bool("runauth", false, "Set to true to run authentication test")
flagCompressTest = flag.String("compressor", "", "compressor to use")
flagTimeout = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")
flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
flagProto = flag.Int("proto", 0, "protcol version")
flagCQL = flag.String("cql", "3.0.0", "CQL version")
flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
clusterSize = flag.Int("clusterSize", 1, "the expected size of the cluster")
flagRetry = flag.Int("retries", 5, "number of times to retry queries")
flagAutoWait = flag.Duration("autowait", 1000*time.Millisecond, "time to wait for autodiscovery to fill the hosts poll")
flagRunSslTest = flag.Bool("runssl", false, "Set to true to run ssl test")
flagRunAuthTest = flag.Bool("runauth", false, "Set to true to run authentication test")
flagCompressTest = flag.String("compressor", "", "compressor to use")
flagTimeout = flag.Duration("gocql.timeout", 5*time.Second, "sets the connection `timeout` for all operations")

flagCassVersion cassVersion
)
Expand All @@ -63,10 +62,6 @@ func getClusterHosts() []string {
return strings.Split(*flagCluster, ",")
}

func getMultiNodeClusterHosts() []string {
return strings.Split(*flagMultiNodeCluster, ",")
}

func addSslOptions(cluster *ClusterConfig) *ClusterConfig {
if *flagRunSslTest {
cluster.Port = 9142
Expand Down Expand Up @@ -153,35 +148,6 @@ func createCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
return cluster
}

func createMultiNodeCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
clusterHosts := getMultiNodeClusterHosts()
cluster := NewCluster(clusterHosts...)
cluster.ProtoVersion = *flagProto
cluster.CQLVersion = *flagCQL
cluster.Timeout = *flagTimeout
cluster.Consistency = Quorum
cluster.MaxWaitSchemaAgreement = 2 * time.Minute // travis might be slow
if *flagRetry > 0 {
cluster.RetryPolicy = &SimpleRetryPolicy{NumRetries: *flagRetry}
}

switch *flagCompressTest {
case "snappy":
cluster.Compressor = &SnappyCompressor{}
case "":
default:
panic("invalid compressor: " + *flagCompressTest)
}

cluster = addSslOptions(cluster)

for _, opt := range opts {
opt(cluster)
}

return cluster
}

func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string, disableTablets bool) {
// TODO: tb.Helper()
c := *cluster
Expand All @@ -206,6 +172,8 @@ func createKeyspace(tb testing.TB, cluster *ClusterConfig, keyspace string, disa

if disableTablets {
query += " AND tablets = {'enabled': false}"
} else {
query += " AND tablets = {'enabled': true, 'initial': 8};"
}

err = createTable(session, query)
Expand Down Expand Up @@ -254,38 +222,6 @@ func createSessionFromCluster(cluster *ClusterConfig, tb testing.TB) *Session {
return createSessionFromClusterHelper(cluster, tb, testKeyspaceOpts{tabletsDisabled: false})
}

func createSessionFromMultiNodeCluster(cluster *ClusterConfig, tb testing.TB) *Session {
keyspace := "test1"

session, err := cluster.CreateSession()
if err != nil {
tb.Fatal("createSession:", err)
}

initKeyspaceOnce.GetOnce(keyspace).Do(func() {
if err = createTable(session, `DROP KEYSPACE IF EXISTS `+keyspace); err != nil {
panic(fmt.Sprintf("unable to drop keyspace: %v", err))
}

if err = createTable(session, fmt.Sprintf(`CREATE KEYSPACE %s
WITH replication = {
'class': 'NetworkTopologyStrategy',
'replication_factor': 1
} AND tablets = {
'initial': 8
};`, keyspace)); err != nil {
panic(fmt.Sprintf("unable to create keyspace: %v", err))
}

})

if err := session.control.awaitSchemaAgreement(); err != nil {
tb.Fatal(err)
}

return session
}

func createSession(tb testing.TB, opts ...func(config *ClusterConfig)) *Session {
cluster := createCluster(opts...)
return createSessionFromCluster(cluster, tb)
Expand Down
2 changes: 1 addition & 1 deletion control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (d unixSocketDialer) DialContext(_ context.Context, _, _ string) (net.Conn,
}

func TestUnixSockets(t *testing.T) {
socketPath := "/tmp/scylla/cql.m"
socketPath := "/tmp/scylla_node_1/cql.m"

c := createCluster()
c.NumConns = 1
Expand Down
63 changes: 40 additions & 23 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ version: "3.7"
services:
node_1:
image: ${SCYLLA_IMAGE}
privileged: true
command: |
--smp 2
--memory 768M
--memory 1G
--seeds 192.168.100.11
--overprovisioned 1
--experimental-features udf
Expand All @@ -15,7 +14,7 @@ services:
public:
ipv4_address: 192.168.100.11
volumes:
- /tmp/scylla:/var/lib/scylla/
- /tmp/scylla_node_1:/var/lib/scylla/
- type: bind
source: ./testdata/config/scylla.yaml
target: /etc/scylla/scylla.yaml
Expand All @@ -29,7 +28,7 @@ services:
source: ./testdata/pki/cassandra.key
target: /etc/scylla/db.key
healthcheck:
test: [ "CMD", "cqlsh", "-e", "select * from system.local where key='local'" ]
test: [ "CMD", "cqlsh", "192.168.100.11", "-e", "select * from system.local where key='local'" ]
interval: 5s
timeout: 5s
retries: 18
Expand All @@ -38,24 +37,59 @@ services:
command: |
--smp 2
--memory 1G
--seeds 192.168.100.12
--seeds 192.168.100.11
--experimental-features udf
--enable-user-defined-functions true
networks:
public:
ipv4_address: 192.168.100.12
volumes:
- /tmp/scylla_node_2:/var/lib/scylla/
- type: bind
source: ./testdata/config/scylla.yaml
target: /etc/scylla/scylla.yaml
- type: bind
source: ./testdata/pki/ca.crt
target: /etc/scylla/ca.crt
- type: bind
source: ./testdata/pki/cassandra.crt
target: /etc/scylla/db.crt
- type: bind
source: ./testdata/pki/cassandra.key
target: /etc/scylla/db.key
healthcheck:
test: [ "CMD", "cqlsh", "192.168.100.12", "-e", "select * from system.local where key='local'" ]
interval: 5s
timeout: 5s
retries: 18
depends_on:
node_1:
condition: service_healthy
node_3:
image: ${SCYLLA_IMAGE}
command: |
--smp 2
--memory 1G
--seeds 192.168.100.12
--seeds 192.168.100.11
--experimental-features udf
--enable-user-defined-functions true
networks:
public:
ipv4_address: 192.168.100.13
volumes:
- /tmp/scylla_node_3:/var/lib/scylla/
- type: bind
source: ./testdata/config/scylla.yaml
target: /etc/scylla/scylla.yaml
- type: bind
source: ./testdata/pki/ca.crt
target: /etc/scylla/ca.crt
- type: bind
source: ./testdata/pki/cassandra.crt
target: /etc/scylla/db.crt
- type: bind
source: ./testdata/pki/cassandra.key
target: /etc/scylla/db.key
healthcheck:
test: [ "CMD", "cqlsh", "192.168.100.13", "-e", "select * from system.local where key='local'" ]
interval: 5s
Expand All @@ -64,23 +98,6 @@ services:
depends_on:
node_2:
condition: service_healthy
node_4:
image: ${SCYLLA_IMAGE}
command: |
--smp 2
--memory 1G
--seeds 192.168.100.12
networks:
public:
ipv4_address: 192.168.100.14
healthcheck:
test: [ "CMD", "cqlsh", "192.168.100.14", "-e", "select * from system.local where key='local'" ]
interval: 5s
timeout: 5s
retries: 18
depends_on:
node_3:
condition: service_healthy
networks:
public:
driver: bridge
Expand Down
20 changes: 6 additions & 14 deletions integration.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,17 @@ function scylla_restart() {

scylla_restart

sudo chmod 0777 /tmp/scylla/cql.m
sudo chmod 0777 /tmp/scylla_node_1/cql.m
sudo chmod 0777 /tmp/scylla_node_2/cql.m
sudo chmod 0777 /tmp/scylla_node_3/cql.m

readonly clusterSize=1
readonly multiNodeClusterSize=3
readonly scylla_liveset="192.168.100.11"
readonly scylla_tablet_liveset="192.168.100.12"
readonly clusterSize=3
readonly scylla_liveset="192.168.100.11,192.168.100.12,192.168.100.13"
readonly cversion="3.11.4"
readonly proto=4
readonly args="-gocql.timeout=60s -proto=${proto} -rf=${clusterSize} -clusterSize=${clusterSize} -autowait=2000ms -compressor=snappy -gocql.cversion=${cversion} -cluster=${scylla_liveset}"
readonly tabletArgs="-gocql.timeout=60s -proto=${proto} -rf=1 -clusterSize=${multiNodeClusterSize} -autowait=2000ms -compressor=snappy -gocql.cversion=${cversion} -multiCluster=${scylla_tablet_liveset}"

if [[ "$*" == *"tablet"* ]];
then
echo "==> Running tablet tests with args: ${tabletArgs}"
go test -timeout=5m -race -tags="tablet" ${tabletArgs} ./...
fi
readonly args="-gocql.timeout=60s -proto=${proto} -rf=1 -clusterSize=${clusterSize} -autowait=2000ms -compressor=snappy -gocql.cversion=${cversion} -cluster=${scylla_liveset}"

TAGS=$*
TAGS=${TAGS//"tablet"/}

if [ ! -z "$TAGS" ];
then
Expand Down
28 changes: 14 additions & 14 deletions tablet_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//go:build tablet
// +build tablet
//go:build integration
// +build integration

package gocql

Expand All @@ -11,16 +11,16 @@ import (

// Check if TokenAwareHostPolicy works correctly when using tablets
func TestTablets(t *testing.T) {
cluster := createMultiNodeCluster()
cluster := createCluster()

fallback := RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)

session := createSessionFromMultiNodeCluster(cluster, t)
session := createSessionFromCluster(cluster, t)
defer session.Close()

if err := createTable(session, fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck));
`, "test1", "table1")); err != nil {
if err := createTable(session, fmt.Sprintf(`CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck));
`, "test_tablets")); err != nil {
panic(fmt.Sprintf("unable to create table: %v", err))
}

Expand All @@ -36,7 +36,7 @@ func TestTablets(t *testing.T) {
i := 0
for i < 50 {
i = i + 1
err := session.Query(`INSERT INTO test1.table1 (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec()
err := session.Query(`INSERT INTO test_tablets (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec()
if err != nil {
t.Fatal(err)
}
Expand All @@ -51,7 +51,7 @@ func TestTablets(t *testing.T) {
var ck int
var v int

err := session.Query(`SELECT pk, ck, v FROM test1.table1 WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v)
err := session.Query(`SELECT pk, ck, v FROM test_tablets WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -101,16 +101,16 @@ func TestTablets(t *testing.T) {

// Check if shard awareness works correctly when using tablets
func TestTabletsShardAwareness(t *testing.T) {
cluster := createMultiNodeCluster()
cluster := createCluster()

fallback := RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = TokenAwareHostPolicy(fallback)

session := createSessionFromMultiNodeCluster(cluster, t)
session := createSessionFromCluster(cluster, t)
defer session.Close()

if err := createTable(session, fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v int, PRIMARY KEY (pk, ck));
`, "test1", "table_shard")); err != nil {
if err := createTable(session, fmt.Sprintf(`CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, ck));
`, "test_tablets_shard_awarness")); err != nil {
panic(fmt.Sprintf("unable to create table: %v", err))
}

Expand All @@ -119,7 +119,7 @@ func TestTabletsShardAwareness(t *testing.T) {
i := 0
for i < 50 {
i = i + 1
err := session.Query(`INSERT INTO test1.table_shard (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec()
err := session.Query(`INSERT INTO test_tablets_shard_awarness (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, i%2).WithContext(ctx).Exec()
if err != nil {
t.Fatal(err)
}
Expand All @@ -134,7 +134,7 @@ func TestTabletsShardAwareness(t *testing.T) {
var ck int
var v int

err := session.Query(`SELECT pk, ck, v FROM test1.table_shard WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v)
err := session.Query(`SELECT pk, ck, v FROM test_tablets_shard_awarness WHERE pk = ?;`, i).WithContext(ctx).Consistency(One).Trace(trace).Scan(&pk, &ck, &v)
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions testdata/config/scylla.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ client_encryption_options:
truststore: /etc/scylla/ca.crt
require_client_auth: true
maintenance_socket: workdir
enable_tablets: true
Loading