From 2b280a2ba17b093a09bc16ea15cf7efe785ac4fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20B=C4=85czkowski?= Date: Thu, 12 Sep 2024 15:28:58 +0200 Subject: [PATCH] Add optional fallback for `ControlConnection#reconnect()` Adds an experimental option to allow `ControlConnection` to try reconnecting to the original contact points held by `MetadataManager`, in case of getting empty query plan from the load balancing policy. In order to separate this logic from query plans of other queries `LoadBalancingPolicyWrapper#newControlReconnectionQueryPlan()` was introduced and is called during reconnection in place of `newQueryPlan()`. --- .../api/core/config/DefaultDriverOption.java | 9 +++++ .../driver/api/core/config/OptionsMap.java | 1 + .../api/core/config/TypedDriverOption.java | 4 +++ .../core/control/ControlConnection.java | 5 +-- .../metadata/LoadBalancingPolicyWrapper.java | 22 +++++++++++-- core/src/main/resources/reference.conf | 11 +++++++ .../control/ControlConnectionTestBase.java | 16 ++++++++- .../LoadBalancingPolicyWrapperTest.java | 33 +++++++++++++++++-- .../driver/core/resolver/MockResolverIT.java | 19 ----------- 9 files changed, 94 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index 55e8d53dc66..241185d121b 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -668,6 +668,15 @@ public enum DefaultDriverOption implements DriverOption { */ CONTROL_CONNECTION_AGREEMENT_WARN("advanced.control-connection.schema-agreement.warn-on-failure"), + /** + * Whether to forcibly add original contact points held by MetadataManager to the reconnection + * plan, in case there is no live nodes available according to LBP. Experimental. + * + *

Value-type: boolean + */ + CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS( + "advanced.control-connection.reconnection.fallback-to-original-contact-points"), + /** * Whether `Session.prepare` calls should be sent to all nodes in the cluster. * diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java index 8906e1dd349..53e5f4caa6f 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java @@ -360,6 +360,7 @@ protected static void fillWithDriverDefaults(OptionsMap map) { map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, Duration.ofMillis(200)); map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofSeconds(10)); map.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true); + map.put(TypedDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, false); map.put(TypedDriverOption.PREPARE_ON_ALL_NODES, true); map.put(TypedDriverOption.REPREPARE_ENABLED, true); map.put(TypedDriverOption.REPREPARE_CHECK_SYSTEM_TABLE, false); diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java index 9be69d0424f..64f4bd5a224 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java @@ -566,6 +566,10 @@ public String toString() { public static final TypedDriverOption CONTROL_CONNECTION_AGREEMENT_WARN = new TypedDriverOption<>( DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, GenericType.BOOLEAN); + /** Whether to forcibly try original contacts if no live nodes are available */ + public static final TypedDriverOption CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS = + new TypedDriverOption<>( + DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS, GenericType.BOOLEAN); /** Whether `Session.prepare` calls should be sent to all nodes in the cluster. */ public static final TypedDriverOption PREPARE_ON_ALL_NODES = new TypedDriverOption<>(DefaultDriverOption.PREPARE_ON_ALL_NODES, GenericType.BOOLEAN); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java index 7e9592c64d3..6cf16f9c4de 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java @@ -300,7 +300,8 @@ private void init( .withOwnerLogPrefix(logPrefix + "|control") .build(); - Queue nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan(); + Queue nodes = + context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan(); connect( nodes, @@ -336,7 +337,7 @@ private void init( private CompletionStage reconnect() { assert adminExecutor.inEventLoop(); - Queue nodes = context.getLoadBalancingPolicyWrapper().newQueryPlan(); + Queue nodes = context.getLoadBalancingPolicyWrapper().newControlReconnectionQueryPlan(); CompletableFuture result = new CompletableFuture<>(); connect( nodes, diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java index 20d045d4e72..7922b7b5780 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapper.java @@ -17,6 +17,7 @@ */ package com.datastax.oss.driver.internal.core.metadata; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; @@ -161,8 +162,25 @@ public Queue newQueryPlan( } @NonNull - public Queue newQueryPlan() { - return newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null); + public Queue newControlReconnectionQueryPlan() { + // First try the original way + Queue regularQueryPlan = newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null); + if (!regularQueryPlan.isEmpty()) return regularQueryPlan; + + if (context + .getConfig() + .getDefaultProfile() + .getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) { + Set originalNodes = context.getMetadataManager().getContactPoints(); + List nodes = new ArrayList<>(); + for (DefaultNode node : originalNodes) { + nodes.add(new DefaultNode(node.getEndPoint(), context)); + } + Collections.shuffle(nodes); + return new ConcurrentLinkedQueue<>(nodes); + } else { + return regularQueryPlan; + } } // when it comes in from the outside diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 75bed97e498..601543790a0 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -2113,6 +2113,17 @@ datastax-java-driver { # Overridable in a profile: no warn-on-failure = true } + + reconnection { + # Whether to forcibly add original contact points held by MetadataManager to the reconnection plan, + # in case there is no live nodes available according to LBP. + # Experimental. + # + # Required: yes + # Modifiable at runtime: yes, the new value will be used for checks issued after the change. + # Overridable in a profile: no + fallback-to-original-contact-points = false + } } advanced.prepared-statements { diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java index c52199465a8..64c9f06b42e 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java @@ -132,11 +132,25 @@ public void setup() { when(defaultProfile.getBoolean(DefaultDriverOption.CONNECTION_WARN_INIT_ERROR)) .thenReturn(false); + when(context.getConfig()).thenReturn(config); + when(config.getDefaultProfile()).thenReturn(defaultProfile); + when(defaultProfile.getBoolean(DefaultDriverOption.CONTROL_CONNECTION_RECONNECT_CONTACT_POINTS)) + .thenReturn(false); + controlConnection = new ControlConnection(context); } protected void mockQueryPlan(Node... nodes) { - when(loadBalancingPolicyWrapper.newQueryPlan()) + when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan()) + .thenAnswer( + i -> { + ConcurrentLinkedQueue queryPlan = new ConcurrentLinkedQueue<>(); + for (Node node : nodes) { + queryPlan.offer(node); + } + return queryPlan; + }); + when(loadBalancingPolicyWrapper.newControlReconnectionQueryPlan()) .thenAnswer( i -> { ConcurrentLinkedQueue queryPlan = new ConcurrentLinkedQueue<>(); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java index 1a0292e3947..8ad325a1f31 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/LoadBalancingPolicyWrapperTest.java @@ -118,10 +118,22 @@ public void setup() { policy3)); } + @Test + public void should_build_control_connection_query_plan_from_contact_points_before_init() { + // When + Queue queryPlan = wrapper.newControlReconnectionQueryPlan(); + + // Then + for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) { + verify(policy, never()).newQueryPlan(null, null); + } + assertThat(queryPlan).hasSameElementsAs(contactPoints); + } + @Test public void should_build_query_plan_from_contact_points_before_init() { // When - Queue queryPlan = wrapper.newQueryPlan(); + Queue queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null); // Then for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) { @@ -139,7 +151,24 @@ public void should_fetch_query_plan_from_policy_after_init() { } // When - Queue queryPlan = wrapper.newQueryPlan(); + Queue queryPlan = wrapper.newControlReconnectionQueryPlan(); + + // Then + // no-arg newQueryPlan() uses the default profile + verify(policy1).newQueryPlan(null, null); + assertThat(queryPlan).isEqualTo(defaultPolicyQueryPlan); + } + + @Test + public void should_fetch_control_connection_query_plan_from_policy_after_init() { + // Given + wrapper.init(); + for (LoadBalancingPolicy policy : ImmutableList.of(policy1, policy2, policy3)) { + verify(policy).init(anyMap(), any(DistanceReporter.class)); + } + + // When + Queue queryPlan = wrapper.newQueryPlan(null, DriverExecutionProfile.DEFAULT_NAME, null); // Then // no-arg newQueryPlan() uses the default profile diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java index d351821dc51..93ecbf1815c 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/resolver/MockResolverIT.java @@ -353,12 +353,6 @@ public void cannot_reconnect_with_resolved_socket() { break; } } - /* - ResultSet rs = session.execute("SELECT * FROM system.local"); - assertThat(rs).isNotNull(); - Row row = rs.one(); - assertThat(row).isNotNull(); - */ nodes = session.getMetadata().getNodes().values(); assertThat(nodes).hasSize(3); Iterator iterator = nodes.iterator(); @@ -415,19 +409,6 @@ public void cannot_reconnect_with_resolved_socket() { break; } } - /* - for (int i = 0; i < 15; i++) { - try { - nodes = session.getMetadata().getNodes().values(); - if (nodes.size() == 3) { - break; - } - Thread.sleep(1000); - } catch (InterruptedException e) { - break; - } - } - */ session.execute("SELECT * FROM system.local"); } session.close();