Skip to content

Commit

Permalink
Adjust for zero token nodes
Browse files Browse the repository at this point in the history
Adds ZeroTokenNodesIT that checks the behaviour of the driver when zero-token
nodes are involved.

Changes behavior of the driver in regards to zero-token nodes. Those nodes
will be ignored if encountered in `system.peers` table. If provided as a
contact point, the driver will attempt to connect to it, but will not populate
metadata with it. This results in zero-token nodes being not included in
query planning.
  • Loading branch information
Bouncheck committed Dec 12, 2024
1 parent 204be3a commit a557147
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ CloseFuture closeAsync() {

Host connectedHost() {
Connection current = connectionRef.get();
return (current == null) ? null : cluster.metadata.getHost(current.endPoint);
if (current == null) return null;
Host host = cluster.metadata.getHost(current.endPoint);
// If the host is not in metadata, then it may be zero-token contact point
if (host == null) host = cluster.metadata.getContactPoint(current.endPoint);
return host;
}

void triggerReconnect() {
Expand Down Expand Up @@ -392,6 +396,11 @@ static void refreshSchema(
throws ConnectionException, BusyConnectionException, ExecutionException,
InterruptedException {
Host host = cluster.metadata.getHost(connection.endPoint);
// Host may have been deliberately not added to metadata, because it's a zero-token node
// Try checking contact points if its null:
if (host == null) {
host = cluster.metadata.getContactPoint(connection.endPoint);
}
// Neither host, nor it's version should be null. But instead of dying if there is a race or
// something, we can kind of try to infer
// a Cassandra version from the protocol version (this is not full proof, we can have the
Expand Down Expand Up @@ -826,7 +835,13 @@ private void refreshNodeListAndTokenMap(
}
}
if (isInitialConnection) {
cluster.metadata.addIfAbsent(controlHost);
if (localRow.isNull("tokens")) {
logger.warn(
"Control host ({}) is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.",
connection.endPoint);
} else {
cluster.metadata.addIfAbsent(controlHost);
}
}
}

Expand Down Expand Up @@ -984,7 +999,10 @@ private static Set<Token> toTokens(Token.Factory factory, Set<String> tokensStr)

private boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
boolean isValid =
peerRow.getColumnDefinitions().contains("host_id") && !peerRow.isNull("host_id");
peerRow.getColumnDefinitions().contains("host_id")
&& !peerRow.isNull("host_id")
&& peerRow.getColumnDefinitions().contains("tokens")
&& !peerRow.isNull("tokens");

if (isPeersV2) {
isValid &=
Expand All @@ -1006,14 +1024,12 @@ private boolean isValidPeer(Row peerRow, boolean logIfInvalid) {
peerRow.getColumnDefinitions().contains("data_center")
&& !peerRow.isNull("data_center")
&& peerRow.getColumnDefinitions().contains("rack")
&& !peerRow.isNull("rack")
&& peerRow.getColumnDefinitions().contains("tokens")
&& !peerRow.isNull("tokens");
&& !peerRow.isNull("rack");
}
if (!isValid && logIfInvalid)
logger.warn(
"Found invalid row in system.peers: {}. "
+ "This is likely a gossip or snitch issue, this host will be ignored.",
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
formatInvalidPeer(peerRow));
return isValid;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,13 @@ static void run_with_null_peer_info(String columns, boolean expectPeer2, boolean
expectedError =
String.format(
"Found invalid row in system.peers: [peer=%s, %s]. "
+ "This is likely a gossip or snitch issue, this host will be ignored.",
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
node2Address, columnData);
} else {
expectedError =
String.format(
"Found invalid row in system.peers: [peer=%s, %s%s%s%s]. "
+ "This is likely a gossip or snitch issue, this host will be ignored.",
+ "This is likely a gossip or snitch issue or a zero-token node, this host will be ignored.",
node2Address,
!splitColumnsSet.contains("native_transport_address")
? "missing native_transport_address, "
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
package com.datastax.driver.core;

import static org.apache.log4j.Level.WARN;
import static org.assertj.core.api.Assertions.assertThat;

import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
import com.datastax.driver.core.utils.ScyllaVersion;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ZeroTokenNodesIT {
private Logger logger = Logger.getLogger(ControlConnection.class);
private MemoryAppender appender;
private Level originalLevel;

@DataProvider(name = "loadBalancingPolicies")
public static Object[][] loadBalancingPolicies() {
return new Object[][] {
{DCAwareRoundRobinPolicy.builder().build()},
{new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())},
{new TokenAwarePolicy(new RoundRobinPolicy())}
};
}

@BeforeMethod(groups = "short")
public void startCapturingLogs() {
originalLevel = logger.getLevel();
logger.setLevel(WARN);
logger.addAppender(appender = new MemoryAppender());
}

@AfterMethod(groups = "short")
public void stopCapturingLogs() {
logger.setLevel(originalLevel);
logger.removeAppender(appender);
}

@Test(groups = "short")
@ScyllaVersion(
minOSS = "6.2.0",
minEnterprise = "2024.2.2",
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
public void should_ignore_zero_token_peer() {
// Given 4 node cluster with 1 zero-token node and normal contact point,
// make sure that it's not included in the metadata.
// By extension, it won't be included in the query planning.
Cluster cluster = null;
Session session = null;
CCMBridge ccmBridge = null;
try {
ccmBridge =
CCMBridge.builder().withNodes(3).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
ccmBridge.start();
ccmBridge.add(4);
ccmBridge.updateNodeConfig(4, "join_ring", false);
ccmBridge.start(4);
ccmBridge.waitForUp(4);

cluster =
Cluster.builder()
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
.withPort(9042)
.withoutAdvancedShardAwareness()
.build();
session = cluster.connect();

assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
.isEqualTo(ccmBridge.addressOfNode(1));
String line = null;
try {
line = appender.waitAndGet(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(line).contains("Found invalid row in system.peers");
assertThat(line).contains("tokens=null");
Set<Host> hosts = cluster.getMetadata().getAllHosts();
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042");
} finally {
if (ccmBridge != null) ccmBridge.close();
if (session != null) session.close();
if (cluster != null) cluster.close();
}
}

@Test(groups = "short")
@ScyllaVersion(
minOSS = "6.2.0",
minEnterprise = "2024.2.2",
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
public void should_ignore_zero_token_DC() {
Cluster cluster = null;
Session session = null;
CCMBridge ccmBridge = null;
try {
ccmBridge =
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
ccmBridge.start();
ccmBridge.add(2, 3);
ccmBridge.updateNodeConfig(3, "join_ring", false);
ccmBridge.start(3);
ccmBridge.add(2, 4);
ccmBridge.updateNodeConfig(4, "join_ring", false);
ccmBridge.start(4);
ccmBridge.waitForUp(3);
ccmBridge.waitForUp(4);

cluster =
Cluster.builder()
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(1), 9042))
.withPort(9042)
.withoutAdvancedShardAwareness()
.build();
session = cluster.connect();

assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
.isEqualTo(ccmBridge.addressOfNode(1));
for (int i = 0; i < 2; i++) {
String line = null;
try {
line = appender.waitAndGet(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(line).contains("Found invalid row in system.peers");
assertThat(line).contains("tokens=null");
}
Set<Host> hosts = cluster.getMetadata().getAllHosts();
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
} finally {
if (ccmBridge != null) ccmBridge.close();
if (session != null) session.close();
if (cluster != null) cluster.close();
}
}

@Test(groups = "short", dataProvider = "loadBalancingPolicies")
@ScyllaVersion(
minOSS = "6.2.0",
minEnterprise = "2024.2.2",
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
public void should_connect_to_zero_token_contact_point(LoadBalancingPolicy loadBalancingPolicy) {
Cluster cluster = null;
Session session = null;
CCMBridge ccmBridge = null;

try {
ccmBridge =
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
ccmBridge.start();
ccmBridge.add(3);
ccmBridge.updateNodeConfig(3, "join_ring", false);
ccmBridge.start(3);
ccmBridge.waitForUp(3);

cluster =
Cluster.builder()
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042))
.withPort(9042)
.withLoadBalancingPolicy(loadBalancingPolicy)
.withoutAdvancedShardAwareness()
.build();
session = cluster.connect();

assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
.isEqualTo(ccmBridge.addressOfNode(3));
String line = null;
try {
line = appender.waitAndGet(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(line)
.contains(
"is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.");
assertThat(line).contains(ccmBridge.ipOfNode(3));

session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT");
session.execute(
"CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};");
session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)");
for (int i = 0; i < 30; i++) {
ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")");
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
.isNotEqualTo(ccmBridge.addressOfNode(3));
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
.isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2));
}
Set<Host> hosts = cluster.getMetadata().getAllHosts();
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
} finally {
if (ccmBridge != null) ccmBridge.close();
if (session != null) session.close();
if (cluster != null) cluster.close();
}
}

@Test(groups = "short", dataProvider = "loadBalancingPolicies")
@ScyllaVersion(
minOSS = "6.2.0",
minEnterprise = "2024.2.2",
description = "Zero-token nodes introduced in scylladb/scylladb#19684")
public void should_connect_to_zero_token_DC(LoadBalancingPolicy loadBalancingPolicy) {
Cluster cluster = null;
Session session = null;
CCMBridge ccmBridge = null;

try {
ccmBridge =
CCMBridge.builder().withNodes(2).withIpPrefix("127.0.1.").withBinaryPort(9042).build();
ccmBridge.start();
ccmBridge.add(2, 3);
ccmBridge.updateNodeConfig(3, "join_ring", false);
ccmBridge.start(3);
ccmBridge.add(2, 4);
ccmBridge.updateNodeConfig(4, "join_ring", false);
ccmBridge.start(4);
ccmBridge.waitForUp(3);
ccmBridge.waitForUp(4);

cluster =
Cluster.builder()
.addContactPointsWithPorts(new InetSocketAddress(ccmBridge.ipOfNode(3), 9042))
.withPort(9042)
.withLoadBalancingPolicy(loadBalancingPolicy)
.withoutAdvancedShardAwareness()
.build();
session = cluster.connect();

assertThat(cluster.manager.controlConnection.connectedHost().getEndPoint().resolve())
.isEqualTo(ccmBridge.addressOfNode(3));
String line = null;
try {
line = appender.waitAndGet(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertThat(line)
.contains(
"is a zero-token node. It will not be added to metadata hosts and will be excluded from regular query planning.");
assertThat(line).contains(ccmBridge.ipOfNode(3));

session.execute("DROP KEYSPACE IF EXISTS ZeroTokenNodesIT");
session.execute(
"CREATE KEYSPACE ZeroTokenNodesIT WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 2};");
session.execute("CREATE TABLE ZeroTokenNodesIT.tab (id INT PRIMARY KEY)");
for (int i = 0; i < 30; i++) {
ResultSet rs = session.execute("INSERT INTO ZeroTokenNodesIT.tab (id) VALUES (" + i + ")");
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
.isNotEqualTo(ccmBridge.addressOfNode(3));
assertThat(rs.getExecutionInfo().getQueriedHost().getEndPoint().resolve())
.isIn(ccmBridge.addressOfNode(1), ccmBridge.addressOfNode(2));
}
Set<Host> hosts = cluster.getMetadata().getAllHosts();
Set<String> toStrings = hosts.stream().map(Host::toString).collect(Collectors.toSet());
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
} finally {
if (ccmBridge != null) ccmBridge.close();
if (session != null) session.close();
if (cluster != null) cluster.close();
}
}
}

0 comments on commit a557147

Please sign in to comment.