From fc38765a232a63e20dddb2e32405dd8621aad55c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Wed, 4 Sep 2024 16:17:55 +0200 Subject: [PATCH] Add MockResolverIT#cannot_reconnect_with_resolved_socket() Adds a test ensuring that once we lose unresolved socket from metadata we cannot reconnect to the cluster. --- .../driver/core/resolver/MockResolverIT.java | 182 ++++++++++++++++++ 1 file changed, 182 insertions(+) diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java index 5b8c3c907b9..a078389dace 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java @@ -24,11 +24,13 @@ package com.datastax.oss.driver.core.resolver; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.CqlSessionBuilder; +import com.datastax.oss.driver.api.core.NoNodeAvailableException; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.config.TypedDriverOption; import com.datastax.oss.driver.api.core.cql.ResultSet; @@ -251,6 +253,186 @@ public void run_replace_test_20_times() { } } + @Test(expected = NoNodeAvailableException.class) + public void cannot_reconnect_with_resolved_socket() { + final int numberOfNodes = 3; + DriverConfigLoader loader = + new DefaultProgrammaticDriverConfigLoaderBuilder() + .withBoolean(TypedDriverOption.RESOLVE_CONTACT_POINTS.getRawOption(), false) + .withBoolean(TypedDriverOption.RECONNECT_ON_INIT.getRawOption(), true) + .withStringList( + TypedDriverOption.CONTACT_POINTS.getRawOption(), + Collections.singletonList("test.cluster.fake:9042")) + .build(); + + CqlSessionBuilder builder = new CqlSessionBuilder().withConfigLoader(loader); + CqlSession session; + Collection nodes; + Set filteredNodes; + try (CcmBridge ccmBridge = + CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) { + RESOLVER_FACTORY.updateResponse( + "test.cluster.fake", + new ValidResponse( + new InetAddress[] { + getNodeInetAddress(ccmBridge, 1), + getNodeInetAddress(ccmBridge, 2), + getNodeInetAddress(ccmBridge, 3) + })); + ccmBridge.create(); + ccmBridge.start(); + session = builder.build(); + boolean allNodesUp = false; + int nodesUp = 0; + for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) { + try { + nodes = session.getMetadata().getNodes().values(); + nodesUp = 0; + for (Node node : nodes) { + if (node.getUpSinceMillis() > 0) { + nodesUp++; + } + } + if (nodesUp == numberOfNodes) { + allNodesUp = true; + break; + } + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + } + if (!allNodesUp) { + LOG.error( + "Driver sees only {} nodes UP instead of {} after waiting {}s", + nodesUp, + numberOfNodes, + CLUSTER_WAIT_SECONDS); + } + ResultSet rs = session.execute("SELECT * FROM system.local"); + assertThat(rs).isNotNull(); + Row row = rs.one(); + assertThat(row).isNotNull(); + nodes = session.getMetadata().getNodes().values(); + assertThat(nodes).hasSize(numberOfNodes); + Iterator iterator = nodes.iterator(); + while (iterator.hasNext()) { + LOG.trace("Metadata node: " + iterator.next().toString()); + } + filteredNodes = + nodes.stream() + .filter(x -> x.toString().contains("test.cluster.fake")) + .collect(Collectors.toSet()); + assertThat(filteredNodes).hasSize(1); + } + int counter = 0; + while (filteredNodes.size() == 1) { + counter++; + LOG.warn( + "Launching another cluster until we lose resolved socket from metadata (run {}).", + counter); + try (CcmBridge ccmBridge = + CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.1.").build()) { + ccmBridge.create(); + ccmBridge.start(); + boolean allNodesUp = false; + int nodesUp = 0; + for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) { + try { + nodes = session.getMetadata().getNodes().values(); + nodesUp = 0; + for (Node node : nodes) { + if (node.getUpSinceMillis() > 0) { + nodesUp++; + } + } + if (nodesUp == numberOfNodes) { + allNodesUp = true; + break; + } + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + } + if (!allNodesUp) { + LOG.error( + "Driver sees only {} nodes UP instead of {} after waiting {}s", + nodesUp, + numberOfNodes, + CLUSTER_WAIT_SECONDS); + } + ResultSet rs = session.execute("SELECT * FROM system.local"); + assertThat(rs).isNotNull(); + Row row = rs.one(); + assertThat(row).isNotNull(); + + nodes = session.getMetadata().getNodes().values(); + assertThat(nodes).hasSize(numberOfNodes); + Iterator iterator = nodes.iterator(); + while (iterator.hasNext()) { + LOG.trace("Metadata node: " + iterator.next().toString()); + } + filteredNodes = + nodes.stream() + .filter(x -> x.toString().contains("test.cluster.fake")) + .collect(Collectors.toSet()); + if (filteredNodes.size() > 1) { + fail( + "Somehow there is more than 1 node in metadata with unresolved hostname. This should not ever happen."); + } + } + } + Iterator iterator = nodes.iterator(); + while (iterator.hasNext()) { + InetSocketAddress address = (InetSocketAddress) iterator.next().getEndPoint().resolve(); + assertFalse(address.isUnresolved()); + } + try (CcmBridge ccmBridge = + CcmBridge.builder().withNodes(numberOfNodes).withIpPrefix("127.0.2.").build()) { + RESOLVER_FACTORY.updateResponse( + "test.cluster.fake", + new ValidResponse( + new InetAddress[] { + getNodeInetAddress(ccmBridge, 1), + getNodeInetAddress(ccmBridge, 2), + getNodeInetAddress(ccmBridge, 3) + })); + // Now the driver should fail to reconnect since unresolved hostname is gone. + ccmBridge.create(); + ccmBridge.start(); + boolean allNodesUp = false; + int nodesUp = 0; + for (int i = 0; i < CLUSTER_WAIT_SECONDS; i++) { + try { + nodes = session.getMetadata().getNodes().values(); + nodesUp = 0; + for (Node node : nodes) { + if (node.getUpSinceMillis() > 0) { + nodesUp++; + } + } + if (nodesUp == numberOfNodes) { + allNodesUp = true; + break; + } + Thread.sleep(1000); + } catch (InterruptedException e) { + break; + } + } + if (!allNodesUp) { + LOG.error( + "Driver sees only {} nodes UP instead of {} after waiting {}s", + nodesUp, + numberOfNodes, + CLUSTER_WAIT_SECONDS); + } + session.execute("SELECT * FROM system.local"); + } + session.close(); + } + private static InetAddress getNodeInetAddress(CcmBridge ccmBridge, int nodeid) { try { return InetAddress.getByName(ccmBridge.getNodeIpAddress(nodeid));