Skip to content

Commit

Permalink
Add MockResolverIT#cannot_reconnect_with_resolved_socket()
Browse files Browse the repository at this point in the history
Adds a test ensuring that once we lose unresolved socket from metadata we
cannot reconnect to the cluster.
  • Loading branch information
Bouncheck committed Sep 9, 2024
1 parent 275bade commit fc38765
Showing 1 changed file with 182 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
package com.datastax.oss.driver.core.resolver;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.NoNodeAvailableException;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.cql.ResultSet;
Expand Down Expand Up @@ -251,6 +253,186 @@ public void run_replace_test_20_times() {
}
}

@Test(expected = NoNodeAvailableException.class)
public void cannot_reconnect_with_resolved_socket() {
final int numberOfNodes = 3;
DriverConfigLoader loader =
new DefaultProgrammaticDriverConfigLoaderBuilder()
.withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false)
.withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true)
.withStringList(
TypedDriverOption.CONTACT_POINTS.getRawOption(),
Collections.singletonList("test.cluster.fake:9042"))
.build();

CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader);
CqlSession session;
Collection<Node> nodes;
Set<Node> filteredNodes;
try (CcmBridge ccmBridge =
CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) {
RESOLVER_FACTORY.updateResponse(
"test.cluster.fake",
new ValidResponse(
new InetAddress[] {
getNodeInetAddress(ccmBridge, 1),
getNodeInetAddress(ccmBridge, 2),
getNodeInetAddress(ccmBridge, 3)
}));
ccmBridge.create();
ccmBridge.start();
session = builder.build();
boolean allNodesUp = false;
int nodesUp = 0;
for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) {
try {
nodes = session.getMetadata().getNodes().values();
nodesUp = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
nodesUp++;
}
}
if (nodesUp == numberOfNodes) {
allNodesUp = true;
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
if (!allNodesUp) {
LOG.error(
"Driver sees only {} nodes UP instead of {} after waiting {}s",
nodesUp,
numberOfNodes,
CLUSTER_WAIT_SECONDS);
}
ResultSet rs = session.execute("SELECT * FROM system.local");
assertThat(rs).isNotNull();
Row row = rs.one();
assertThat(row).isNotNull();
nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(numberOfNodes);
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
LOG.trace("Metadata node: " + iterator.next().toString());
}
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("test.cluster.fake"))
.collect(Collectors.toSet());
assertThat(filteredNodes).hasSize(1);
}
int counter = 0;
while (filteredNodes.size() == 1) {
counter++;
LOG.warn(
"Launching another cluster until we lose resolved socket from metadata (run {}).",
counter);
try (CcmBridge ccmBridge =
CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) {
ccmBridge.create();
ccmBridge.start();
boolean allNodesUp = false;
int nodesUp = 0;
for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) {
try {
nodes = session.getMetadata().getNodes().values();
nodesUp = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
nodesUp++;
}
}
if (nodesUp == numberOfNodes) {
allNodesUp = true;
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
if (!allNodesUp) {
LOG.error(
"Driver sees only {} nodes UP instead of {} after waiting {}s",
nodesUp,
numberOfNodes,
CLUSTER_WAIT_SECONDS);
}
ResultSet rs = session.execute("SELECT * FROM system.local");
assertThat(rs).isNotNull();
Row row = rs.one();
assertThat(row).isNotNull();

nodes = session.getMetadata().getNodes().values();
assertThat(nodes).hasSize(numberOfNodes);
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
LOG.trace("Metadata node: " + iterator.next().toString());
}
filteredNodes =
nodes.stream()
.filter(x -> x.toString().contains("test.cluster.fake"))
.collect(Collectors.toSet());
if (filteredNodes.size() > 1) {
fail(
"Somehow there is more than 1 node in metadata with unresolved hostname. This should not ever happen.");
}
}
}
Iterator<Node> iterator = nodes.iterator();
while (iterator.hasNext()) {
InetSocketAddress address = (InetSocketAddress) iterator.next().getEndPoint().resolve();
assertFalse(address.isUnresolved());
}
try (CcmBridge ccmBridge =
CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.2.").build()) {
RESOLVER_FACTORY.updateResponse(
"test.cluster.fake",
new ValidResponse(
new InetAddress[] {
getNodeInetAddress(ccmBridge, 1),
getNodeInetAddress(ccmBridge, 2),
getNodeInetAddress(ccmBridge, 3)
}));
// Now the driver should fail to reconnect since unresolved hostname is gone.
ccmBridge.create();
ccmBridge.start();
boolean allNodesUp = false;
int nodesUp = 0;
for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) {
try {
nodes = session.getMetadata().getNodes().values();
nodesUp = 0;
for (Node node : nodes) {
if (node.getUpSinceMillis() > 0) {
nodesUp++;
}
}
if (nodesUp == numberOfNodes) {
allNodesUp = true;
break;
}
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
if (!allNodesUp) {
LOG.error(
"Driver sees only {} nodes UP instead of {} after waiting {}s",
nodesUp,
numberOfNodes,
CLUSTER_WAIT_SECONDS);
}
session.execute("SELECT * FROM system.local");
}
session.close();
}

private static InetAddress getNodeInetAddress(CcmBridge ccmBridge, int nodeid) {
try {
return InetAddress.getByName(ccmBridge.getNodeIpAddress(nodeid));
Expand Down

0 comments on commit fc38765

Please sign in to comment.