diff --git a/.github/workflows/dep-lic-scan.yaml b/.github/workflows/dep-lic-scan.yaml deleted file mode 100644 index b2a5b8e1925..00000000000 --- a/.github/workflows/dep-lic-scan.yaml +++ /dev/null @@ -1,39 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -name: Dependency and License Scan -on: - push: - branches: - - '4.x' - - '3.x' - paths-ignore: - - 'manual/**' - - 'faq/**' - - 'upgrade_guide/**' - - 'changelog/**' -jobs: - scan-repo: - runs-on: ubuntu-latest - steps: - - name: Check out code - uses: actions/checkout@v4 - - name: Install Fossa CLI - run: | - curl -H 'Cache-Control: no-cache' https://raw.githubusercontent.com/fossas/fossa-cli/master/install-latest.sh | bash -s -- -b . - - name: Scan for dependencies and licenses - run: | - FOSSA_API_KEY=${{ secrets.FOSSA_PUSH_ONLY_API_KEY }} ./fossa analyze diff --git a/Jenkinsfile b/Jenkinsfile index c8247769631..d38b7c63849 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -61,12 +61,6 @@ def initializeEnvironment() { . ${JABBA_SHELL} jabba which 1.8''', returnStdout: true).trim() - env.TEST_JAVA_HOME = sh(label: 'Get TEST_JAVA_HOME',script: '''#!/bin/bash -le - . ${JABBA_SHELL} - jabba which ${JABBA_VERSION}''', returnStdout: true).trim() - env.TEST_JAVA_VERSION = sh(label: 'Get TEST_JAVA_VERSION',script: '''#!/bin/bash -le - echo "${JABBA_VERSION##*.}"''', returnStdout: true).trim() - sh label: 'Download Apache CassandraⓇ or DataStax Enterprise',script: '''#!/bin/bash -le . ${JABBA_SHELL} jabba use 1.8 @@ -104,18 +98,25 @@ ENVIRONMENT_EOF } def buildDriver(jabbaVersion) { - withEnv(["BUILD_JABBA_VERSION=${jabbaVersion}"]) { - sh label: 'Build driver', script: '''#!/bin/bash -le - . ${JABBA_SHELL} - jabba use ${BUILD_JABBA_VERSION} + def buildDriverScript = '''#!/bin/bash -le - mvn -B -V install -DskipTests -Dmaven.javadoc.skip=true - ''' - } + . ${JABBA_SHELL} + jabba use '''+jabbaVersion+''' + + echo "Building with Java version '''+jabbaVersion+'''" + + mvn -B -V install -DskipTests -Dmaven.javadoc.skip=true + ''' + sh label: 'Build driver', script: buildDriverScript } def executeTests() { - sh label: 'Execute tests', script: '''#!/bin/bash -le + def testJavaHome = sh(label: 'Get TEST_JAVA_HOME',script: '''#!/bin/bash -le + . ${JABBA_SHELL} + jabba which ${JABBA_VERSION}''', returnStdout: true).trim() + def testJavaVersion = (JABBA_VERSION =~ /.*\.(\d+)/)[0][1] + + def executeTestScript = '''#!/bin/bash -le # Load CCM environment variables set -o allexport . ${HOME}/environment.txt @@ -137,8 +138,8 @@ def executeTests() { printenv | sort mvn -B -V ${INTEGRATION_TESTS_FILTER_ARGUMENT} -T 1 verify \ - -Ptest-jdk-${TEST_JAVA_VERSION} \ - -DtestJavaHome=${TEST_JAVA_HOME} \ + -Ptest-jdk-'''+testJavaVersion+''' \ + -DtestJavaHome='''+testJavaHome+''' \ -DfailIfNoTests=false \ -Dmaven.test.failure.ignore=true \ -Dmaven.javadoc.skip=${SKIP_JAVADOCS} \ @@ -149,6 +150,8 @@ def executeTests() { ${ISOLATED_ITS_ARGUMENT} \ ${PARALLELIZABLE_ITS_ARGUMENT} ''' + echo "Invoking Maven with parameters test-jdk-${testJavaVersion} and testJavaHome = ${testJavaHome}" + sh label: 'Execute tests', script: executeTestScript } def executeCodeCoverage() { @@ -255,8 +258,10 @@ pipeline { choices: ['2.1', // Legacy Apache CassandraⓇ '2.2', // Legacy Apache CassandraⓇ '3.0', // Previous Apache CassandraⓇ - '3.11', // Current Apache CassandraⓇ - '4.0', // Development Apache CassandraⓇ + '3.11', // Previous Apache CassandraⓇ + '4.0', // Previous Apache CassandraⓇ + '4.1', // Current Apache CassandraⓇ + '5.0', // Development Apache CassandraⓇ 'dse-4.8.16', // Previous EOSL DataStax Enterprise 'dse-5.0.15', // Long Term Support DataStax Enterprise 'dse-5.1.35', // Legacy DataStax Enterprise @@ -290,7 +295,11 @@ pipeline { 4.0 - Apache Cassandra® v4.x (CURRENTLY UNDER DEVELOPMENT) + Apache Cassandra® v4.0.x + + + 4.1 + Apache Cassandra® v4.1.x dse-4.8.16 @@ -444,7 +453,7 @@ pipeline { axis { name 'SERVER_VERSION' values '3.11', // Latest stable Apache CassandraⓇ - '4.0', // Development Apache CassandraⓇ + '4.1', // Development Apache CassandraⓇ 'dse-6.8.30' // Current DataStax Enterprise } axis { @@ -477,7 +486,7 @@ pipeline { } stage('Build-Driver') { steps { - buildDriver('default') + buildDriver('1.8') } } stage('Execute-Tests') { @@ -553,8 +562,10 @@ pipeline { name 'SERVER_VERSION' values '2.1', // Legacy Apache CassandraⓇ '3.0', // Previous Apache CassandraⓇ - '3.11', // Current Apache CassandraⓇ - '4.0', // Development Apache CassandraⓇ + '3.11', // Previous Apache CassandraⓇ + '4.0', // Previous Apache CassandraⓇ + '4.1', // Current Apache CassandraⓇ + '5.0', // Development Apache CassandraⓇ 'dse-4.8.16', // Previous EOSL DataStax Enterprise 'dse-5.0.15', // Last EOSL DataStax Enterprise 'dse-5.1.35', // Legacy DataStax Enterprise @@ -591,8 +602,7 @@ pipeline { } stage('Build-Driver') { steps { - // Jabba default should be a JDK8 for now - buildDriver('default') + buildDriver('1.8') } } stage('Execute-Tests') { diff --git a/README.md b/README.md index 0ba5adf5f00..64a3bd992fe 100644 --- a/README.md +++ b/README.md @@ -63,13 +63,14 @@ See the [upgrade guide](upgrade_guide/) for details. * [Manual](manual/) * [API docs] -* Bug tracking: [JIRA] +* Bug tracking: [GITHUB] * [Mailing list] * Training: [Scylla University] * [Changelog] * [FAQ] [API docs]: https://java-driver.docs.scylladb.com/scylla-4.17.0.x/api/overview-summary.html +[GITHUB]: https://github.com/scylladb/java-driver/issues [Scylla University]: https://university.scylladb.com [Changelog]: changelog/ [FAQ]: faq/ diff --git a/bom/pom.xml b/bom/pom.xml index 4f93166237e..c6288d601e2 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-bom pom @@ -38,42 +38,42 @@ com.scylladb java-driver-core - 4.18.0.2-SNAPSHOT + 4.18.1.0 com.scylladb java-driver-core-shaded - 4.18.0.2-SNAPSHOT + 4.18.1.0 com.scylladb java-driver-mapper-processor - 4.18.0.2-SNAPSHOT + 4.18.1.0 com.scylladb java-driver-mapper-runtime - 4.18.0.2-SNAPSHOT + 4.18.1.0 com.scylladb java-driver-query-builder - 4.18.0.2-SNAPSHOT + 4.18.1.0 com.scylladb java-driver-test-infra - 4.18.0.2-SNAPSHOT + 4.18.1.0 com.scylladb java-driver-metrics-micrometer - 4.18.0.2-SNAPSHOT + 4.18.1.0 com.scylladb java-driver-metrics-microprofile - 4.18.0.2-SNAPSHOT + 4.18.1.0 com.datastax.oss diff --git a/changelog/README.md b/changelog/README.md index 2d859526586..02c88680e49 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -21,6 +21,26 @@ under the License. +### 4.18.1 + +- [improvement] JAVA-3142: Ability to specify ordering of remote local dc's via new configuration for graceful automatic failovers +- [bug] CASSANDRA-19457: Object reference in Micrometer metrics prevent GC from reclaiming Session instances +- [improvement] CASSANDRA-19468: Don't swallow exception during metadata refresh +- [bug] CASSANDRA-19333: Fix data corruption in VectorCodec when using heap buffers +- [improvement] CASSANDRA-19290: Replace uses of AttributeKey.newInstance +- [improvement] CASSANDRA-19352: Support native_transport_(address|port) + native_transport_port_ssl for DSE 6.8 (4.x edition) +- [improvement] CASSANDRA-19180: Support reloading keystore in cassandra-java-driver + +### 4.18.0 + +- [improvement] PR 1689: Add support for publishing percentile time series for the histogram metrics (nparaddi-walmart) +- [improvement] JAVA-3104: Do not eagerly pre-allocate array when deserializing CqlVector +- [improvement] JAVA-3111: upgrade jackson-databind to 2.13.4.2 to address gradle dependency issue +- [improvement] PR 1617: Improve ByteBufPrimitiveCodec readBytes (chibenwa) +- [improvement] JAVA-3095: Fix CREATE keyword in vector search example in upgrade guide +- [improvement] JAVA-3100: Update jackson-databind to 2.13.4.1 and jackson-jaxrs-json-provider to 2.13.4 to address recent CVEs +- [improvement] JAVA-3089: Forbid wildcard imports + ### 4.17.0 - [improvement] JAVA-3070: Make CqlVector and CqlDuration serializable diff --git a/core-shaded/pom.xml b/core-shaded/pom.xml index 389c2144f78..20789e02458 100644 --- a/core-shaded/pom.xml +++ b/core-shaded/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-core-shaded Java driver for Scylla and Apache Cassandra(R) - core with shaded deps diff --git a/core/pom.xml b/core/pom.xml index 0f6d8647e22..228730e1142 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-core bundle diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java index 55e8d53dc66..224ed3b11bd 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/DefaultDriverOption.java @@ -982,7 +982,19 @@ public enum DefaultDriverOption implements DriverOption { *

Value-type: boolean */ METRICS_GENERATE_AGGREGABLE_HISTOGRAMS("advanced.metrics.histograms.generate-aggregable"), - ; + /** + * The duration between attempts to reload the keystore. + * + *

Value-type: {@link java.time.Duration} + */ + SSL_KEYSTORE_RELOAD_INTERVAL("advanced.ssl-engine-factory.keystore-reload-interval"), + /** + * Ordered preference list of remote dcs optionally supplied for automatic failover. + * + *

Value type: {@link java.util.List List}<{@link String}> + */ + LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS( + "advanced.load-balancing-policy.dc-failover.preferred-remote-dcs"); private final String path; diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java index 8906e1dd349..98faf3e590c 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/OptionsMap.java @@ -381,6 +381,8 @@ protected static void fillWithDriverDefaults(OptionsMap map) { map.put(TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_MAX_NODES_PER_REMOTE_DC, 0); map.put(TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS, false); map.put(TypedDriverOption.METRICS_GENERATE_AGGREGABLE_HISTOGRAMS, true); + map.put( + TypedDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS, ImmutableList.of("")); } @Immutable diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java index 9be69d0424f..943c8f9fb75 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/config/TypedDriverOption.java @@ -238,6 +238,12 @@ public String toString() { /** The keystore password. */ public static final TypedDriverOption SSL_KEYSTORE_PASSWORD = new TypedDriverOption<>(DefaultDriverOption.SSL_KEYSTORE_PASSWORD, GenericType.STRING); + + /** The duration between attempts to reload the keystore. */ + public static final TypedDriverOption SSL_KEYSTORE_RELOAD_INTERVAL = + new TypedDriverOption<>( + DefaultDriverOption.SSL_KEYSTORE_RELOAD_INTERVAL, GenericType.DURATION); + /** The location of the truststore file. */ public static final TypedDriverOption SSL_TRUSTSTORE_PATH = new TypedDriverOption<>(DefaultDriverOption.SSL_TRUSTSTORE_PATH, GenericType.STRING); @@ -889,6 +895,16 @@ public String toString() { DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS, GenericType.BOOLEAN); + /** + * Ordered preference list of remote dcs optionally supplied for automatic failover and included + * in query plan. This feature is enabled only when max-nodes-per-remote-dc is greater than 0. + */ + public static final TypedDriverOption> + LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS = + new TypedDriverOption<>( + DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS, + GenericType.listOf(String.class)); + private static Iterable> introspectBuiltInValues() { try { ImmutableList.Builder> result = ImmutableList.builder(); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java index e1f00a0cd8a..c7913660cbb 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/DriverChannel.java @@ -58,11 +58,11 @@ @ThreadSafe public class DriverChannel { - static final AttributeKey CLUSTER_NAME_KEY = AttributeKey.newInstance("cluster_name"); + static final AttributeKey CLUSTER_NAME_KEY = AttributeKey.valueOf("cluster_name"); static final AttributeKey>> OPTIONS_KEY = - AttributeKey.newInstance("options"); + AttributeKey.valueOf("options"); static final AttributeKey SHARDING_INFO_KEY = - AttributeKey.newInstance("sharding_info"); + AttributeKey.valueOf("sharding_info"); static final AttributeKey LWT_INFO_KEY = AttributeKey.newInstance("lwt_info"); @SuppressWarnings("RedundantStringConstructorCall") diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 996c967b7f8..a65ab769096 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -56,10 +56,14 @@ import com.datastax.oss.driver.internal.core.util.collection.QueryPlan; import com.datastax.oss.driver.internal.core.util.collection.SimpleQueryPlan; import com.datastax.oss.driver.shaded.guava.common.base.Predicates; +import com.datastax.oss.driver.shaded.guava.common.collect.Lists; +import com.datastax.oss.driver.shaded.guava.common.collect.Sets; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -129,6 +133,7 @@ public class BasicLoadBalancingPolicy implements LoadBalancingPolicy { private volatile String localDc; private volatile String localRack; private volatile NodeSet liveNodes; + private final LinkedHashSet preferredRemoteDcs; public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String profileName) { this.context = (InternalDriverContext) context; @@ -143,6 +148,11 @@ public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String this.context .getConsistencyLevelRegistry() .nameToLevel(profile.getString(DefaultDriverOption.REQUEST_CONSISTENCY)); + + preferredRemoteDcs = + new LinkedHashSet<>( + profile.getStringList( + DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS)); } /** @@ -370,27 +380,59 @@ protected Queue maybeAddDcFailover(@Nullable Request request, @NonNull Que return local; } } - QueryPlan remote = - new LazyQueryPlan() { - - @Override - protected Object[] computeNodes() { - Object[] remoteNodes = - liveNodes.dcs().stream() - .filter(Predicates.not(Predicates.equalTo(localDc))) - .flatMap(dc -> liveNodes.dc(dc).stream().limit(maxNodesPerRemoteDc)) - .toArray(); - - int remoteNodesLength = remoteNodes.length; - if (remoteNodesLength == 0) { - return EMPTY_NODES; - } - shuffleHead(remoteNodes, remoteNodesLength); - return remoteNodes; - } - }; + if (preferredRemoteDcs.isEmpty()) { + return new CompositeQueryPlan(local, buildRemoteQueryPlanAll()); + } + return new CompositeQueryPlan(local, buildRemoteQueryPlanPreferred()); + } + + private QueryPlan buildRemoteQueryPlanAll() { + + return new LazyQueryPlan() { + @Override + protected Object[] computeNodes() { + + Object[] remoteNodes = + liveNodes.dcs().stream() + .filter(Predicates.not(Predicates.equalTo(localDc))) + .flatMap(dc -> liveNodes.dc(dc).stream().limit(maxNodesPerRemoteDc)) + .toArray(); + if (remoteNodes.length == 0) { + return EMPTY_NODES; + } + shuffleHead(remoteNodes, remoteNodes.length); + return remoteNodes; + } + }; + } - return new CompositeQueryPlan(local, remote); + private QueryPlan buildRemoteQueryPlanPreferred() { + + Set dcs = liveNodes.dcs(); + List orderedDcs = Lists.newArrayListWithCapacity(dcs.size()); + orderedDcs.addAll(preferredRemoteDcs); + orderedDcs.addAll(Sets.difference(dcs, preferredRemoteDcs)); + + QueryPlan[] queryPlans = + orderedDcs.stream() + .filter(Predicates.not(Predicates.equalTo(localDc))) + .map( + (dc) -> { + return new LazyQueryPlan() { + @Override + protected Object[] computeNodes() { + Object[] rv = liveNodes.dc(dc).stream().limit(maxNodesPerRemoteDc).toArray(); + if (rv.length == 0) { + return EMPTY_NODES; + } + shuffleHead(rv, rv.length); + return rv; + } + }; + }) + .toArray(QueryPlan[]::new); + + return new CompositeQueryPlan(queryPlans); } /** Exposed as a protected method so that it can be accessed by tests */ diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java index 87008b05cec..f3dc988cfbc 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java @@ -34,6 +34,7 @@ import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; +import com.datastax.oss.driver.shaded.guava.common.collect.Iterators; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.protocol.internal.response.Error; import edu.umd.cs.findbugs.annotations.NonNull; @@ -69,6 +70,10 @@ public class DefaultTopologyMonitor implements TopologyMonitor { // Assume topology queries never need paging private static final int INFINITE_PAGE_SIZE = -1; + // A few system.peers columns which get special handling below + private static final String NATIVE_PORT = "native_port"; + private static final String NATIVE_TRANSPORT_PORT = "native_transport_port"; + private final String logPrefix; private final InternalDriverContext context; private final ControlConnection controlConnection; @@ -494,28 +499,65 @@ private void savePort(DriverChannel channel) { @Nullable protected InetSocketAddress getBroadcastRpcAddress( @NonNull AdminRow row, @NonNull EndPoint localEndPoint) { - // in system.peers or system.local - InetAddress broadcastRpcInetAddress = row.getInetAddress("rpc_address"); + + InetAddress broadcastRpcInetAddress = null; + Iterator addrCandidates = + Iterators.forArray( + // in system.peers_v2 (Cassandra >= 4.0) + "native_address", + // DSE 6.8 introduced native_transport_address and native_transport_port for the + // listen address. + "native_transport_address", + // in system.peers or system.local + "rpc_address"); + + while (broadcastRpcInetAddress == null && addrCandidates.hasNext()) + broadcastRpcInetAddress = row.getInetAddress(addrCandidates.next()); + // This could only happen if system tables are corrupted, but handle gracefully if (broadcastRpcInetAddress == null) { - // in system.peers_v2 (Cassandra >= 4.0) - broadcastRpcInetAddress = row.getInetAddress("native_address"); - if (broadcastRpcInetAddress == null) { - // This could only happen if system tables are corrupted, but handle gracefully - return null; + LOG.warn( + "[{}] Unable to determine broadcast RPC IP address, returning null. " + + "This is likely due to a misconfiguration or invalid system tables. " + + "Please validate the contents of system.local and/or {}.", + logPrefix, + getPeerTableName()); + return null; + } + + Integer broadcastRpcPort = null; + Iterator portCandidates = + Iterators.forArray( + // in system.peers_v2 (Cassandra >= 4.0) + NATIVE_PORT, + // DSE 6.8 introduced native_transport_address and native_transport_port for the + // listen address. + NATIVE_TRANSPORT_PORT, + // system.local for Cassandra >= 4.0 + "rpc_port"); + + while ((broadcastRpcPort == null || broadcastRpcPort == 0) && portCandidates.hasNext()) { + + String colName = portCandidates.next(); + broadcastRpcPort = row.getInteger(colName); + // Support override for SSL port (if enabled) in DSE + if (NATIVE_TRANSPORT_PORT.equals(colName) && context.getSslEngineFactory().isPresent()) { + + String sslColName = colName + "_ssl"; + broadcastRpcPort = row.getInteger(sslColName); } } - // system.local for Cassandra >= 4.0 - Integer broadcastRpcPort = row.getInteger("rpc_port"); + // use the default port if no port information was found in the row; + // note that in rare situations, the default port might not be known, in which case we + // report zero, as advertised in the javadocs of Node and NodeInfo. if (broadcastRpcPort == null || broadcastRpcPort == 0) { - // system.peers_v2 - broadcastRpcPort = row.getInteger("native_port"); - if (broadcastRpcPort == null || broadcastRpcPort == 0) { - // use the default port if no port information was found in the row; - // note that in rare situations, the default port might not be known, in which case we - // report zero, as advertised in the javadocs of Node and NodeInfo. - broadcastRpcPort = port == -1 ? 0 : port; - } + + LOG.warn( + "[{}] Unable to determine broadcast RPC port. " + + "Trying to fall back to port used by the control connection.", + logPrefix); + broadcastRpcPort = port == -1 ? 0 : port; } + InetSocketAddress broadcastRpcAddress = new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort); if (row.contains("peer") && broadcastRpcAddress.equals(localEndPoint.resolve())) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java index 7aa2fb13bcd..09070525f87 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/MetadataManager.java @@ -447,30 +447,35 @@ private void startSchemaRequest(CompletableFuture refreshFu if (agreementError != null) { refreshFuture.completeExceptionally(agreementError); } else { - schemaQueriesFactory - .newInstance() - .execute() - .thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor) - .whenComplete( - (newMetadata, metadataError) -> { - if (metadataError != null) { - refreshFuture.completeExceptionally(metadataError); - } else { - refreshFuture.complete( - new RefreshSchemaResult(newMetadata, schemaInAgreement)); - } - - firstSchemaRefreshFuture.complete(null); - - currentSchemaRefresh = null; - // If another refresh was enqueued during this one, run it now - if (queuedSchemaRefresh != null) { - CompletableFuture tmp = - this.queuedSchemaRefresh; - this.queuedSchemaRefresh = null; - startSchemaRequest(tmp); - } - }); + try { + schemaQueriesFactory + .newInstance() + .execute() + .thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor) + .whenComplete( + (newMetadata, metadataError) -> { + if (metadataError != null) { + refreshFuture.completeExceptionally(metadataError); + } else { + refreshFuture.complete( + new RefreshSchemaResult(newMetadata, schemaInAgreement)); + } + + firstSchemaRefreshFuture.complete(null); + + currentSchemaRefresh = null; + // If another refresh was enqueued during this one, run it now + if (queuedSchemaRefresh != null) { + CompletableFuture tmp = + this.queuedSchemaRefresh; + this.queuedSchemaRefresh = null; + startSchemaRequest(tmp); + } + }); + } catch (Throwable t) { + LOG.debug("[{}] Exception getting new metadata", logPrefix, t); + refreshFuture.completeExceptionally(t); + } } }); } else if (queuedSchemaRefresh == null) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/AbstractMetricUpdater.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/AbstractMetricUpdater.java index fcfe56b605e..5e2392a2e7f 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/AbstractMetricUpdater.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/AbstractMetricUpdater.java @@ -180,6 +180,4 @@ protected Timeout newTimeout() { expireAfter.toNanos(), TimeUnit.NANOSECONDS); } - - protected abstract void clearMetrics(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricUpdater.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricUpdater.java index 8590917be21..9377fb3a17e 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricUpdater.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricUpdater.java @@ -91,7 +91,7 @@ public void updateTimer( } @Override - protected void clearMetrics() { + public void clearMetrics() { for (MetricT metric : metrics.keySet()) { MetricId id = getMetricId(metric); registry.remove(id.getName()); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/MetricUpdater.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/MetricUpdater.java index c4b432f3c50..c07d1b136af 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/MetricUpdater.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/MetricUpdater.java @@ -46,4 +46,6 @@ default void markMeter(MetricT metric, @Nullable String profileName) { void updateTimer(MetricT metric, @Nullable String profileName, long duration, TimeUnit unit); boolean isEnabled(MetricT metric, @Nullable String profileName); + + void clearMetrics(); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopNodeMetricUpdater.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopNodeMetricUpdater.java index 45f0797c7b5..8d216990331 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopNodeMetricUpdater.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopNodeMetricUpdater.java @@ -53,4 +53,9 @@ public boolean isEnabled(NodeMetric metric, String profileName) { // since methods don't do anything, return false return false; } + + @Override + public void clearMetrics() { + // nothing to do + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopSessionMetricUpdater.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopSessionMetricUpdater.java index 1666261590c..7099a8ddcac 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopSessionMetricUpdater.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopSessionMetricUpdater.java @@ -53,4 +53,7 @@ public boolean isEnabled(SessionMetric metric, String profileName) { // since methods don't do anything, return false return false; } + + @Override + public void clearMetrics() {} } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java index 50ebca151ee..be375cd58e3 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/session/DefaultSession.java @@ -41,10 +41,12 @@ import com.datastax.oss.driver.internal.core.channel.DriverChannel; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.context.LifecycleListener; +import com.datastax.oss.driver.internal.core.metadata.DefaultNode; import com.datastax.oss.driver.internal.core.metadata.MetadataManager; import com.datastax.oss.driver.internal.core.metadata.MetadataManager.RefreshSchemaResult; import com.datastax.oss.driver.internal.core.metadata.NodeStateEvent; import com.datastax.oss.driver.internal.core.metadata.NodeStateManager; +import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater; import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater; import com.datastax.oss.driver.internal.core.pool.ChannelPool; import com.datastax.oss.driver.internal.core.util.Loggers; @@ -568,6 +570,14 @@ private void close() { closePolicies(); + // clear metrics to prevent memory leak + for (Node n : metadataManager.getMetadata().getNodes().values()) { + NodeMetricUpdater updater = ((DefaultNode) n).getMetricUpdater(); + if (updater != null) updater.clearMetrics(); + } + + if (metricUpdater != null) metricUpdater.clearMetrics(); + List> childrenCloseStages = new ArrayList<>(); for (AsyncAutoCloseable closeable : internalComponentsToClose()) { childrenCloseStages.add(closeable.closeAsync()); @@ -587,6 +597,14 @@ private void forceClose() { logPrefix, (closeWasCalled ? "" : "not ")); + // clear metrics to prevent memory leak + for (Node n : metadataManager.getMetadata().getNodes().values()) { + NodeMetricUpdater updater = ((DefaultNode) n).getMetricUpdater(); + if (updater != null) updater.clearMetrics(); + } + + if (metricUpdater != null) metricUpdater.clearMetrics(); + if (closeWasCalled) { // onChildrenClosed has already been scheduled for (AsyncAutoCloseable closeable : internalComponentsToClose()) { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/DefaultSslEngineFactory.java b/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/DefaultSslEngineFactory.java index 085b36dc539..bb95dc738c7 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/DefaultSslEngineFactory.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/DefaultSslEngineFactory.java @@ -27,11 +27,13 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.security.KeyStore; import java.security.SecureRandom; +import java.time.Duration; import java.util.List; -import javax.net.ssl.KeyManagerFactory; +import java.util.Optional; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; @@ -54,6 +56,7 @@ * truststore-password = password123 * keystore-path = /path/to/client.keystore * keystore-password = password123 + * keystore-reload-interval = 30 minutes * } * } * @@ -66,6 +69,7 @@ public class DefaultSslEngineFactory implements SslEngineFactory { private final SSLContext sslContext; private final String[] cipherSuites; private final boolean requireHostnameValidation; + private ReloadingKeyManagerFactory kmf; /** Builds a new instance from the driver configuration. */ public DefaultSslEngineFactory(DriverContext driverContext) { @@ -132,20 +136,8 @@ protected SSLContext buildContext(DriverExecutionProfile config) throws Exceptio } // initialize keystore if configured. - KeyManagerFactory kmf = null; if (config.isDefined(DefaultDriverOption.SSL_KEYSTORE_PATH)) { - try (InputStream ksf = - Files.newInputStream( - Paths.get(config.getString(DefaultDriverOption.SSL_KEYSTORE_PATH)))) { - KeyStore ks = KeyStore.getInstance("JKS"); - char[] password = - config.isDefined(DefaultDriverOption.SSL_KEYSTORE_PASSWORD) - ? config.getString(DefaultDriverOption.SSL_KEYSTORE_PASSWORD).toCharArray() - : null; - ks.load(ksf, password); - kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - kmf.init(ks, password); - } + kmf = buildReloadingKeyManagerFactory(config); } context.init( @@ -159,8 +151,19 @@ protected SSLContext buildContext(DriverExecutionProfile config) throws Exceptio } } + private ReloadingKeyManagerFactory buildReloadingKeyManagerFactory(DriverExecutionProfile config) + throws Exception { + Path keystorePath = Paths.get(config.getString(DefaultDriverOption.SSL_KEYSTORE_PATH)); + String password = config.getString(DefaultDriverOption.SSL_KEYSTORE_PASSWORD, null); + Optional reloadInterval = + Optional.ofNullable( + config.getDuration(DefaultDriverOption.SSL_KEYSTORE_RELOAD_INTERVAL, null)); + + return ReloadingKeyManagerFactory.create(keystorePath, password, reloadInterval); + } + @Override public void close() throws Exception { - // nothing to do + kmf.close(); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/ReloadingKeyManagerFactory.java b/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/ReloadingKeyManagerFactory.java new file mode 100644 index 00000000000..8a9e11bb2e9 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/ssl/ReloadingKeyManagerFactory.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.ssl; + +import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.Principal; +import java.security.PrivateKey; +import java.security.Provider; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.time.Duration; +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.net.ssl.KeyManager; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.KeyManagerFactorySpi; +import javax.net.ssl.ManagerFactoryParameters; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.X509ExtendedKeyManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReloadingKeyManagerFactory extends KeyManagerFactory implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(ReloadingKeyManagerFactory.class); + private static final String KEYSTORE_TYPE = "JKS"; + private Path keystorePath; + private String keystorePassword; + private ScheduledExecutorService executor; + private final Spi spi; + + // We're using a single thread executor so this shouldn't need to be volatile, since all updates + // to lastDigest should come from the same thread + private volatile byte[] lastDigest; + + /** + * Create a new {@link ReloadingKeyManagerFactory} with the given keystore file and password, + * reloading from the file's content at the given interval. This function will do an initial + * reload before returning, to confirm that the file exists and is readable. + * + * @param keystorePath the keystore file to reload + * @param keystorePassword the keystore password + * @param reloadInterval the duration between reload attempts. Set to {@link Optional#empty()} to + * disable scheduled reloading. + * @return + */ + static ReloadingKeyManagerFactory create( + Path keystorePath, String keystorePassword, Optional reloadInterval) + throws UnrecoverableKeyException, KeyStoreException, NoSuchAlgorithmException, + CertificateException, IOException { + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + + KeyStore ks; + try (InputStream ksf = Files.newInputStream(keystorePath)) { + ks = KeyStore.getInstance(KEYSTORE_TYPE); + ks.load(ksf, keystorePassword.toCharArray()); + } + kmf.init(ks, keystorePassword.toCharArray()); + + ReloadingKeyManagerFactory reloadingKeyManagerFactory = new ReloadingKeyManagerFactory(kmf); + reloadingKeyManagerFactory.start(keystorePath, keystorePassword, reloadInterval); + return reloadingKeyManagerFactory; + } + + @VisibleForTesting + protected ReloadingKeyManagerFactory(KeyManagerFactory initial) { + this( + new Spi((X509ExtendedKeyManager) initial.getKeyManagers()[0]), + initial.getProvider(), + initial.getAlgorithm()); + } + + private ReloadingKeyManagerFactory(Spi spi, Provider provider, String algorithm) { + super(spi, provider, algorithm); + this.spi = spi; + } + + private void start( + Path keystorePath, String keystorePassword, Optional reloadInterval) { + this.keystorePath = keystorePath; + this.keystorePassword = keystorePassword; + + // Ensure that reload is called once synchronously, to make sure the file exists etc. + reload(); + + if (!reloadInterval.isPresent() || reloadInterval.get().isZero()) { + final String msg = + "KeyStore reloading is disabled. If your Cassandra cluster requires client certificates, " + + "client application restarts are infrequent, and client certificates have short lifetimes, then your client " + + "may fail to re-establish connections to Cassandra hosts. To enable KeyStore reloading, see " + + "`advanced.ssl-engine-factory.keystore-reload-interval` in reference.conf."; + logger.info(msg); + } else { + logger.info("KeyStore reloading is enabled with interval {}", reloadInterval.get()); + + this.executor = + Executors.newScheduledThreadPool( + 1, + runnable -> { + Thread t = Executors.defaultThreadFactory().newThread(runnable); + t.setName(String.format("%s-%%d", this.getClass().getSimpleName())); + t.setDaemon(true); + return t; + }); + this.executor.scheduleWithFixedDelay( + this::reload, + reloadInterval.get().toMillis(), + reloadInterval.get().toMillis(), + TimeUnit.MILLISECONDS); + } + } + + @VisibleForTesting + void reload() { + try { + reload0(); + } catch (Exception e) { + String msg = + "Failed to reload KeyStore. If this continues to happen, your client may use stale identity" + + " certificates and fail to re-establish connections to Cassandra hosts."; + logger.warn(msg, e); + } + } + + private synchronized void reload0() + throws NoSuchAlgorithmException, IOException, KeyStoreException, CertificateException, + UnrecoverableKeyException { + logger.debug("Checking KeyStore file {} for updates", keystorePath); + + final byte[] keyStoreBytes = Files.readAllBytes(keystorePath); + final byte[] newDigest = digest(keyStoreBytes); + if (lastDigest != null && Arrays.equals(lastDigest, digest(keyStoreBytes))) { + logger.debug("KeyStore file content has not changed; skipping update"); + return; + } + + final KeyStore keyStore = KeyStore.getInstance(KEYSTORE_TYPE); + try (InputStream inputStream = new ByteArrayInputStream(keyStoreBytes)) { + keyStore.load(inputStream, keystorePassword.toCharArray()); + } + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(keyStore, keystorePassword.toCharArray()); + logger.info("Detected updates to KeyStore file {}", keystorePath); + + this.spi.keyManager.set((X509ExtendedKeyManager) kmf.getKeyManagers()[0]); + this.lastDigest = newDigest; + } + + @Override + public void close() throws Exception { + if (executor != null) { + executor.shutdown(); + } + } + + private static byte[] digest(byte[] payload) throws NoSuchAlgorithmException { + final MessageDigest digest = MessageDigest.getInstance("SHA-256"); + return digest.digest(payload); + } + + private static class Spi extends KeyManagerFactorySpi { + DelegatingKeyManager keyManager; + + Spi(X509ExtendedKeyManager initial) { + this.keyManager = new DelegatingKeyManager(initial); + } + + @Override + protected void engineInit(KeyStore ks, char[] password) { + throw new UnsupportedOperationException(); + } + + @Override + protected void engineInit(ManagerFactoryParameters spec) { + throw new UnsupportedOperationException(); + } + + @Override + protected KeyManager[] engineGetKeyManagers() { + return new KeyManager[] {keyManager}; + } + } + + private static class DelegatingKeyManager extends X509ExtendedKeyManager { + AtomicReference delegate; + + DelegatingKeyManager(X509ExtendedKeyManager initial) { + delegate = new AtomicReference<>(initial); + } + + void set(X509ExtendedKeyManager keyManager) { + delegate.set(keyManager); + } + + @Override + public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) { + return delegate.get().chooseEngineClientAlias(keyType, issuers, engine); + } + + @Override + public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) { + return delegate.get().chooseEngineServerAlias(keyType, issuers, engine); + } + + @Override + public String[] getClientAliases(String keyType, Principal[] issuers) { + return delegate.get().getClientAliases(keyType, issuers); + } + + @Override + public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) { + return delegate.get().chooseClientAlias(keyType, issuers, socket); + } + + @Override + public String[] getServerAliases(String keyType, Principal[] issuers) { + return delegate.get().getServerAliases(keyType, issuers); + } + + @Override + public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) { + return delegate.get().chooseServerAlias(keyType, issuers, socket); + } + + @Override + public X509Certificate[] getCertificateChain(String alias) { + return delegate.get().getCertificateChain(alias); + } + + @Override + public PrivateKey getPrivateKey(String alias) { + return delegate.get().getPrivateKey(alias); + } + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/type/codec/VectorCodec.java b/core/src/main/java/com/datastax/oss/driver/internal/core/type/codec/VectorCodec.java index 1b663a29d9e..2c4d2200b13 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/type/codec/VectorCodec.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/type/codec/VectorCodec.java @@ -127,17 +127,19 @@ Elements should at least precede themselves with their size (along the lines of cqlType.getDimensions(), bytes.remaining())); } + ByteBuffer slice = bytes.slice(); List rv = new ArrayList(cqlType.getDimensions()); for (int i = 0; i < cqlType.getDimensions(); ++i) { - ByteBuffer slice = bytes.slice(); - slice.limit(elementSize); + // Set the limit for the current element + int originalPosition = slice.position(); + slice.limit(originalPosition + elementSize); rv.add(this.subtypeCodec.decode(slice, protocolVersion)); - bytes.position(bytes.position() + elementSize); + // Move to the start of the next element + slice.position(originalPosition + elementSize); + // Reset the limit to the end of the buffer + slice.limit(slice.capacity()); } - /* Restore the input ByteBuffer to its original state */ - bytes.rewind(); - return CqlVector.newInstance(rv); } diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 75bed97e498..7b1c43f8bea 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -574,6 +574,13 @@ datastax-java-driver { # Modifiable at runtime: no # Overridable in a profile: yes allow-for-local-consistency-levels = false + + # Ordered preference list of remote dc's (in order) optionally supplied for automatic failover. While building a query plan, the driver uses the DC's supplied in order together with max-nodes-per-remote-dc + # Users are not required to specify all DCs, when listing preferences via this config + # Required: no + # Modifiable at runtime: no + # Overridable in a profile: no + preferred-remote-dcs = [""] } } @@ -790,6 +797,13 @@ datastax-java-driver { // truststore-password = password123 // keystore-path = /path/to/client.keystore // keystore-password = password123 + + # The duration between attempts to reload the keystore from the contents of the file specified + # by `keystore-path`. This is mainly relevant in environments where certificates have short + # lifetimes and applications are restarted infrequently, since an expired client certificate + # will prevent new connections from being established until the application is restarted. If + # not set, defaults to not reload the keystore. + // keystore-reload-interval = 30 minutes } # The generator that assigns a microsecond timestamp to each request. diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyPreferredRemoteDcsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyPreferredRemoteDcsTest.java new file mode 100644 index 00000000000..cefdfd31189 --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicyPreferredRemoteDcsTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.loadbalancing; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.internal.core.metadata.DefaultNode; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; +import java.util.Map; +import java.util.UUID; +import org.junit.Test; +import org.mockito.Mock; + +public class BasicLoadBalancingPolicyPreferredRemoteDcsTest + extends BasicLoadBalancingPolicyDcFailoverTest { + @Mock protected DefaultNode node10; + @Mock protected DefaultNode node11; + @Mock protected DefaultNode node12; + @Mock protected DefaultNode node13; + @Mock protected DefaultNode node14; + + @Override + @Test + public void should_prioritize_single_replica() { + when(request.getRoutingKeyspace()).thenReturn(KEYSPACE); + when(request.getRoutingKey()).thenReturn(ROUTING_KEY); + when(tokenMap.getReplicas(KEYSPACE, ROUTING_KEY)).thenReturn(ImmutableSet.of(node3)); + + // node3 always first, round-robin on the rest + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node3, node1, node2, node4, node5, node9, node10, node6, node7, node12, node13); + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node3, node2, node4, node5, node1, node9, node10, node6, node7, node12, node13); + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node3, node4, node5, node1, node2, node9, node10, node6, node7, node12, node13); + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node3, node5, node1, node2, node4, node9, node10, node6, node7, node12, node13); + + // Should not shuffle replicas since there is only one + verify(policy, never()).shuffleHead(any(), eq(1)); + // But should shuffle remote nodes + verify(policy, times(12)).shuffleHead(any(), eq(2)); + } + + @Override + @Test + public void should_prioritize_and_shuffle_replicas() { + when(request.getRoutingKeyspace()).thenReturn(KEYSPACE); + when(request.getRoutingKey()).thenReturn(ROUTING_KEY); + when(tokenMap.getReplicas(KEYSPACE, ROUTING_KEY)) + .thenReturn(ImmutableSet.of(node1, node2, node3, node6, node9)); + + // node 6 and 9 being in a remote DC, they don't get a boost for being a replica + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node1, node2, node3, node4, node5, node9, node10, node6, node7, node12, node13); + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node1, node2, node3, node5, node4, node9, node10, node6, node7, node12, node13); + + // should shuffle replicas + verify(policy, times(2)).shuffleHead(any(), eq(3)); + // should shuffle remote nodes + verify(policy, times(6)).shuffleHead(any(), eq(2)); + // No power of two choices with only two replicas + verify(session, never()).getPools(); + } + + @Override + protected void assertRoundRobinQueryPlans() { + for (int i = 0; i < 3; i++) { + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node1, node2, node3, node4, node5, node9, node10, node6, node7, node12, node13); + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node2, node3, node4, node5, node1, node9, node10, node6, node7, node12, node13); + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node3, node4, node5, node1, node2, node9, node10, node6, node7, node12, node13); + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node4, node5, node1, node2, node3, node9, node10, node6, node7, node12, node13); + assertThat(policy.newQueryPlan(request, session)) + .containsExactly( + node5, node1, node2, node3, node4, node9, node10, node6, node7, node12, node13); + } + + verify(policy, atLeast(15)).shuffleHead(any(), eq(2)); + } + + @Override + protected BasicLoadBalancingPolicy createAndInitPolicy() { + when(node4.getDatacenter()).thenReturn("dc1"); + when(node5.getDatacenter()).thenReturn("dc1"); + when(node6.getDatacenter()).thenReturn("dc2"); + when(node7.getDatacenter()).thenReturn("dc2"); + when(node8.getDatacenter()).thenReturn("dc2"); + when(node9.getDatacenter()).thenReturn("dc3"); + when(node10.getDatacenter()).thenReturn("dc3"); + when(node11.getDatacenter()).thenReturn("dc3"); + when(node12.getDatacenter()).thenReturn("dc4"); + when(node13.getDatacenter()).thenReturn("dc4"); + when(node14.getDatacenter()).thenReturn("dc4"); + + // Accept 2 nodes per remote DC + when(defaultProfile.getInt( + DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_MAX_NODES_PER_REMOTE_DC)) + .thenReturn(2); + when(defaultProfile.getBoolean( + DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_ALLOW_FOR_LOCAL_CONSISTENCY_LEVELS)) + .thenReturn(false); + + when(defaultProfile.getStringList( + DefaultDriverOption.LOAD_BALANCING_DC_FAILOVER_PREFERRED_REMOTE_DCS)) + .thenReturn(ImmutableList.of("dc3", "dc2")); + + // Use a subclass to disable shuffling, we just spy to make sure that the shuffling method was + // called (makes tests easier) + BasicLoadBalancingPolicy policy = + spy( + new BasicLoadBalancingPolicy(context, DriverExecutionProfile.DEFAULT_NAME) { + @Override + protected void shuffleHead(Object[] currentNodes, int headLength) { + // nothing (keep in same order) + } + }); + Map nodes = + ImmutableMap.builder() + .put(UUID.randomUUID(), node1) + .put(UUID.randomUUID(), node2) + .put(UUID.randomUUID(), node3) + .put(UUID.randomUUID(), node4) + .put(UUID.randomUUID(), node5) + .put(UUID.randomUUID(), node6) + .put(UUID.randomUUID(), node7) + .put(UUID.randomUUID(), node8) + .put(UUID.randomUUID(), node9) + .put(UUID.randomUUID(), node10) + .put(UUID.randomUUID(), node11) + .put(UUID.randomUUID(), node12) + .put(UUID.randomUUID(), node13) + .put(UUID.randomUUID(), node14) + .build(); + policy.init(nodes, distanceReporter); + assertThat(policy.getLiveNodes().dc("dc1")).containsExactly(node1, node2, node3, node4, node5); + assertThat(policy.getLiveNodes().dc("dc2")).containsExactly(node6, node7); // only 2 allowed + assertThat(policy.getLiveNodes().dc("dc3")).containsExactly(node9, node10); // only 2 allowed + assertThat(policy.getLiveNodes().dc("dc4")).containsExactly(node12, node13); // only 2 allowed + return policy; + } +} diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java index cc275eb1624..dd40f233518 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitorTest.java @@ -38,6 +38,7 @@ import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverConfig; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.ssl.SslEngineFactory; import com.datastax.oss.driver.internal.core.addresstranslation.PassThroughAddressTranslator; import com.datastax.oss.driver.internal.core.adminrequest.AdminResult; import com.datastax.oss.driver.internal.core.adminrequest.AdminRow; @@ -50,9 +51,11 @@ import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet; import com.datastax.oss.driver.shaded.guava.common.collect.Iterators; +import com.datastax.oss.driver.shaded.guava.common.collect.Maps; import com.datastax.oss.protocol.internal.Message; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.protocol.internal.response.Error; +import com.google.common.collect.Streams; import com.tngtech.java.junit.dataprovider.DataProvider; import com.tngtech.java.junit.dataprovider.DataProviderRunner; import com.tngtech.java.junit.dataprovider.UseDataProvider; @@ -95,6 +98,8 @@ public class DefaultTopologyMonitorTest { @Mock private Appender appender; @Captor private ArgumentCaptor loggingEventCaptor; + @Mock private SslEngineFactory sslEngineFactory; + private DefaultNode node1; private DefaultNode node2; @@ -414,18 +419,6 @@ public void should_skip_invalid_peers_row_v2(String columnToCheck) { + "This is likely a gossip or snitch issue, this node will be ignored."); } - @DataProvider - public static Object[][] columnsToCheckV1() { - return new Object[][] {{"rpc_address"}, {"host_id"}, {"data_center"}, {"rack"}, {"tokens"}}; - } - - @DataProvider - public static Object[][] columnsToCheckV2() { - return new Object[][] { - {"native_address"}, {"native_port"}, {"host_id"}, {"data_center"}, {"rack"}, {"tokens"} - }; - } - @Test public void should_stop_executing_queries_once_closed() { // Given @@ -443,9 +436,9 @@ public void should_stop_executing_queries_once_closed() { public void should_warn_when_control_host_found_in_system_peers() { // Given AdminRow local = mockLocalRow(1, node1.getHostId()); - AdminRow peer3 = mockPeersRow(3, UUID.randomUUID()); - AdminRow peer2 = mockPeersRow(2, node2.getHostId()); AdminRow peer1 = mockPeersRow(1, node2.getHostId()); // invalid + AdminRow peer2 = mockPeersRow(2, node2.getHostId()); + AdminRow peer3 = mockPeersRow(3, UUID.randomUUID()); topologyMonitor.stubQueries( new StubbedQuery("SELECT * FROM system.local", mockResult(local)), new StubbedQuery("SELECT * FROM system.peers_v2", Collections.emptyMap(), null, true), @@ -462,7 +455,7 @@ public void should_warn_when_control_host_found_in_system_peers() { .hasSize(3) .extractingResultOf("getEndPoint") .containsOnlyOnce(node1.getEndPoint())); - assertLog( + assertLogContains( Level.WARN, "[null] Control node /127.0.0.1:9042 has an entry for itself in system.peers: " + "this entry will be ignored. This is likely due to a misconfiguration; " @@ -492,7 +485,7 @@ public void should_warn_when_control_host_found_in_system_peers_v2() { .hasSize(3) .extractingResultOf("getEndPoint") .containsOnlyOnce(node1.getEndPoint())); - assertLog( + assertLogContains( Level.WARN, "[null] Control node /127.0.0.1:9042 has an entry for itself in system.peers_v2: " + "this entry will be ignored. This is likely due to a misconfiguration; " @@ -500,6 +493,116 @@ public void should_warn_when_control_host_found_in_system_peers_v2() { + "all nodes in your cluster."); } + // Confirm the base case of extracting peer info from peers_v2, no SSL involved + @Test + public void should_get_peer_address_info_peers_v2() { + // Given + AdminRow local = mockLocalRow(1, node1.getHostId()); + AdminRow peer2 = mockPeersV2Row(3, node2.getHostId()); + AdminRow peer1 = mockPeersV2Row(2, node1.getHostId()); + topologyMonitor.isSchemaV2 = true; + topologyMonitor.stubQueries( + new StubbedQuery("SELECT * FROM system.local", mockResult(local)), + new StubbedQuery("SELECT * FROM system.peers_v2", mockResult(peer2, peer1))); + when(context.getSslEngineFactory()).thenReturn(Optional.empty()); + + // When + CompletionStage> futureInfos = topologyMonitor.refreshNodeList(); + + // Then + assertThatStage(futureInfos) + .isSuccess( + infos -> { + Iterator iterator = infos.iterator(); + // First NodeInfo is for local, skip past that + iterator.next(); + NodeInfo peer2nodeInfo = iterator.next(); + assertThat(peer2nodeInfo.getEndPoint().resolve()) + .isEqualTo(new InetSocketAddress("127.0.0.3", 9042)); + NodeInfo peer1nodeInfo = iterator.next(); + assertThat(peer1nodeInfo.getEndPoint().resolve()) + .isEqualTo(new InetSocketAddress("127.0.0.2", 9042)); + }); + } + + // Confirm the base case of extracting peer info from DSE peers table, no SSL involved + @Test + public void should_get_peer_address_info_peers_dse() { + // Given + AdminRow local = mockLocalRow(1, node1.getHostId()); + AdminRow peer2 = mockPeersRowDse(3, node2.getHostId()); + AdminRow peer1 = mockPeersRowDse(2, node1.getHostId()); + topologyMonitor.isSchemaV2 = true; + topologyMonitor.stubQueries( + new StubbedQuery("SELECT * FROM system.local", mockResult(local)), + new StubbedQuery("SELECT * FROM system.peers_v2", Maps.newHashMap(), null, true), + new StubbedQuery("SELECT * FROM system.peers", mockResult(peer2, peer1))); + when(context.getSslEngineFactory()).thenReturn(Optional.empty()); + + // When + CompletionStage> futureInfos = topologyMonitor.refreshNodeList(); + + // Then + assertThatStage(futureInfos) + .isSuccess( + infos -> { + Iterator iterator = infos.iterator(); + // First NodeInfo is for local, skip past that + iterator.next(); + NodeInfo peer2nodeInfo = iterator.next(); + assertThat(peer2nodeInfo.getEndPoint().resolve()) + .isEqualTo(new InetSocketAddress("127.0.0.3", 9042)); + NodeInfo peer1nodeInfo = iterator.next(); + assertThat(peer1nodeInfo.getEndPoint().resolve()) + .isEqualTo(new InetSocketAddress("127.0.0.2", 9042)); + }); + } + + // Confirm the base case of extracting peer info from DSE peers table, this time with SSL + @Test + public void should_get_peer_address_info_peers_dse_with_ssl() { + // Given + AdminRow local = mockLocalRow(1, node1.getHostId()); + AdminRow peer2 = mockPeersRowDseWithSsl(3, node2.getHostId()); + AdminRow peer1 = mockPeersRowDseWithSsl(2, node1.getHostId()); + topologyMonitor.isSchemaV2 = true; + topologyMonitor.stubQueries( + new StubbedQuery("SELECT * FROM system.local", mockResult(local)), + new StubbedQuery("SELECT * FROM system.peers_v2", Maps.newHashMap(), null, true), + new StubbedQuery("SELECT * FROM system.peers", mockResult(peer2, peer1))); + when(context.getSslEngineFactory()).thenReturn(Optional.of(sslEngineFactory)); + + // When + CompletionStage> futureInfos = topologyMonitor.refreshNodeList(); + + // Then + assertThatStage(futureInfos) + .isSuccess( + infos -> { + Iterator iterator = infos.iterator(); + // First NodeInfo is for local, skip past that + iterator.next(); + NodeInfo peer2nodeInfo = iterator.next(); + assertThat(peer2nodeInfo.getEndPoint().resolve()) + .isEqualTo(new InetSocketAddress("127.0.0.3", 9043)); + NodeInfo peer1nodeInfo = iterator.next(); + assertThat(peer1nodeInfo.getEndPoint().resolve()) + .isEqualTo(new InetSocketAddress("127.0.0.2", 9043)); + }); + } + + @DataProvider + public static Object[][] columnsToCheckV1() { + return new Object[][] {{"rpc_address"}, {"host_id"}, {"data_center"}, {"rack"}, {"tokens"}}; + } + + @DataProvider + public static Object[][] columnsToCheckV2() { + return new Object[][] { + {"native_address"}, {"native_port"}, {"host_id"}, {"data_center"}, {"rack"}, {"tokens"} + }; + } + /** Mocks the query execution logic. */ private static class TestTopologyMonitor extends DefaultTopologyMonitor { @@ -641,6 +744,43 @@ private AdminRow mockPeersV2Row(int i, UUID hostId) { } } + // Mock row for DSE ~6.8 + private AdminRow mockPeersRowDse(int i, UUID hostId) { + try { + AdminRow row = mock(AdminRow.class); + when(row.contains("peer")).thenReturn(true); + when(row.isNull("data_center")).thenReturn(false); + when(row.getString("data_center")).thenReturn("dc" + i); + when(row.getString("dse_version")).thenReturn("6.8.30"); + when(row.contains("graph")).thenReturn(true); + when(row.isNull("host_id")).thenReturn(hostId == null); + when(row.getUuid("host_id")).thenReturn(hostId); + when(row.getInetAddress("peer")).thenReturn(InetAddress.getByName("127.0.0." + i)); + when(row.isNull("rack")).thenReturn(false); + when(row.getString("rack")).thenReturn("rack" + i); + when(row.isNull("native_transport_address")).thenReturn(false); + when(row.getInetAddress("native_transport_address")) + .thenReturn(InetAddress.getByName("127.0.0." + i)); + when(row.isNull("native_transport_port")).thenReturn(false); + when(row.getInteger("native_transport_port")).thenReturn(9042); + when(row.isNull("tokens")).thenReturn(false); + when(row.getSetOfString("tokens")).thenReturn(ImmutableSet.of("token" + i)); + when(row.isNull("rpc_address")).thenReturn(false); + + return row; + } catch (UnknownHostException e) { + fail("unexpected", e); + return null; + } + } + + private AdminRow mockPeersRowDseWithSsl(int i, UUID hostId) { + AdminRow row = mockPeersRowDse(i, hostId); + when(row.isNull("native_transport_port_ssl")).thenReturn(false); + when(row.getInteger("native_transport_port_ssl")).thenReturn(9043); + return row; + } + private AdminResult mockResult(AdminRow... rows) { AdminResult result = mock(AdminResult.class); when(result.iterator()).thenReturn(Iterators.forArray(rows)); @@ -654,4 +794,12 @@ private void assertLog(Level level, String message) { assertThat(logs).hasSize(1); assertThat(logs.iterator().next().getFormattedMessage()).contains(message); } + + private void assertLogContains(Level level, String message) { + verify(appender, atLeast(1)).doAppend(loggingEventCaptor.capture()); + Iterable logs = + filter(loggingEventCaptor.getAllValues()).with("level", level).get(); + assertThat( + Streams.stream(logs).map(ILoggingEvent::getFormattedMessage).anyMatch(message::contains)); + } } diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java index 460f99abd85..f9a909400f9 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/metadata/MetadataManagerTest.java @@ -20,6 +20,7 @@ import static com.datastax.oss.driver.Assertions.assertThat; import static com.datastax.oss.driver.Assertions.assertThatStage; import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; @@ -33,6 +34,7 @@ import com.datastax.oss.driver.internal.core.context.EventBus; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.context.NettyOptions; +import com.datastax.oss.driver.internal.core.control.ControlConnection; import com.datastax.oss.driver.internal.core.metadata.schema.parsing.SchemaParserFactory; import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory; import com.datastax.oss.driver.internal.core.metrics.MetricsFactory; @@ -64,6 +66,7 @@ public class MetadataManagerTest { @Mock private InternalDriverContext context; @Mock private NettyOptions nettyOptions; + @Mock private ControlConnection controlConnection; @Mock private TopologyMonitor topologyMonitor; @Mock private DriverConfig config; @Mock private DriverExecutionProfile defaultProfile; @@ -85,6 +88,7 @@ public void setup() { when(context.getNettyOptions()).thenReturn(nettyOptions); when(context.getTopologyMonitor()).thenReturn(topologyMonitor); + when(context.getControlConnection()).thenReturn(controlConnection); when(defaultProfile.getDuration(DefaultDriverOption.METADATA_SCHEMA_WINDOW)) .thenReturn(Duration.ZERO); @@ -286,6 +290,29 @@ public void should_remove_node() { assertThat(refresh.broadcastRpcAddressToRemove).isEqualTo(broadcastRpcAddress2); } + @Test + public void refreshSchema_should_work() { + // Given + IllegalStateException expectedException = new IllegalStateException("Error we're testing"); + when(schemaQueriesFactory.newInstance()).thenThrow(expectedException); + when(topologyMonitor.refreshNodeList()) + .thenReturn(CompletableFuture.completedFuture(ImmutableList.of(mock(NodeInfo.class)))); + when(topologyMonitor.checkSchemaAgreement()) + .thenReturn(CompletableFuture.completedFuture(Boolean.TRUE)); + when(controlConnection.init(anyBoolean(), anyBoolean(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(null)); + metadataManager.refreshNodes(); // required internal state setup for this + waitForPendingAdminTasks(() -> metadataManager.refreshes.size() == 1); // sanity check + + // When + CompletionStage result = + metadataManager.refreshSchema("foo", true, true); + + // Then + waitForPendingAdminTasks(() -> result.toCompletableFuture().isDone()); + assertThatStage(result).isFailed(t -> assertThat(t).isEqualTo(expectedException)); + } + private static class TestMetadataManager extends MetadataManager { private List refreshes = new CopyOnWriteArrayList<>(); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/ssl/ReloadingKeyManagerFactoryTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/ssl/ReloadingKeyManagerFactoryTest.java new file mode 100644 index 00000000000..d07b45c21df --- /dev/null +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/ssl/ReloadingKeyManagerFactoryTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.ssl; + +import static java.nio.file.StandardCopyOption.COPY_ATTRIBUTES; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; + +import java.io.IOException; +import java.io.InputStream; +import java.math.BigInteger; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.KeyManagementException; +import java.security.KeyStore; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.cert.X509Certificate; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.TrustManagerFactory; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReloadingKeyManagerFactoryTest { + private static final Logger logger = + LoggerFactory.getLogger(ReloadingKeyManagerFactoryTest.class); + + static final Path CERT_BASE = + Paths.get( + ReloadingKeyManagerFactoryTest.class + .getResource( + String.format("/%s/certs/", ReloadingKeyManagerFactoryTest.class.getSimpleName())) + .getPath()); + static final Path SERVER_KEYSTORE_PATH = CERT_BASE.resolve("server.keystore"); + static final Path SERVER_TRUSTSTORE_PATH = CERT_BASE.resolve("server.truststore"); + + static final Path ORIGINAL_CLIENT_KEYSTORE_PATH = CERT_BASE.resolve("client-original.keystore"); + static final Path ALTERNATE_CLIENT_KEYSTORE_PATH = CERT_BASE.resolve("client-alternate.keystore"); + static final BigInteger ORIGINAL_CLIENT_KEYSTORE_CERT_SERIAL = + convertSerial("7372a966"); // 1936894310 + static final BigInteger ALTERNATE_CLIENT_KEYSTORE_CERT_SERIAL = + convertSerial("e50bf31"); // 240172849 + + // File at this path will change content + static final Path TMP_CLIENT_KEYSTORE_PATH; + + static { + try { + TMP_CLIENT_KEYSTORE_PATH = + Files.createTempFile(ReloadingKeyManagerFactoryTest.class.getSimpleName(), null); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static final Path CLIENT_TRUSTSTORE_PATH = CERT_BASE.resolve("client.truststore"); + static final String CERTSTORE_PASSWORD = "changeit"; + + private static TrustManagerFactory buildTrustManagerFactory() { + TrustManagerFactory tmf; + try (InputStream tsf = Files.newInputStream(CLIENT_TRUSTSTORE_PATH)) { + KeyStore ts = KeyStore.getInstance("JKS"); + char[] password = CERTSTORE_PASSWORD.toCharArray(); + ts.load(tsf, password); + tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ts); + } catch (Exception e) { + throw new RuntimeException(e); + } + return tmf; + } + + private static SSLContext buildServerSslContext() { + try { + SSLContext context = SSLContext.getInstance("SSL"); + + TrustManagerFactory tmf; + try (InputStream tsf = Files.newInputStream(SERVER_TRUSTSTORE_PATH)) { + KeyStore ts = KeyStore.getInstance("JKS"); + char[] password = CERTSTORE_PASSWORD.toCharArray(); + ts.load(tsf, password); + tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(ts); + } + + KeyManagerFactory kmf; + try (InputStream ksf = Files.newInputStream(SERVER_KEYSTORE_PATH)) { + KeyStore ks = KeyStore.getInstance("JKS"); + char[] password = CERTSTORE_PASSWORD.toCharArray(); + ks.load(ksf, password); + kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, password); + } + + context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom()); + return context; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void client_certificates_should_reload() throws Exception { + Files.copy( + ORIGINAL_CLIENT_KEYSTORE_PATH, TMP_CLIENT_KEYSTORE_PATH, REPLACE_EXISTING, COPY_ATTRIBUTES); + + final BlockingQueue> peerCertificates = + new LinkedBlockingQueue<>(1); + + // Create a listening socket. Make sure there's no backlog so each accept is in order. + SSLContext serverSslContext = buildServerSslContext(); + final SSLServerSocket server = + (SSLServerSocket) serverSslContext.getServerSocketFactory().createServerSocket(); + server.bind(new InetSocketAddress(0), 1); + server.setUseClientMode(false); + server.setNeedClientAuth(true); + Thread serverThread = + new Thread( + () -> { + while (true) { + try { + logger.info("Server accepting client"); + final SSLSocket conn = (SSLSocket) server.accept(); + logger.info("Server accepted client {}", conn); + conn.addHandshakeCompletedListener( + event -> { + boolean offer; + try { + // Transfer certificates to client thread once handshake is complete, so + // it can safely close + // the socket + offer = + peerCertificates.offer( + Optional.of((X509Certificate[]) event.getPeerCertificates())); + } catch (SSLPeerUnverifiedException e) { + offer = peerCertificates.offer(Optional.empty()); + } + Assert.assertTrue(offer); + }); + logger.info("Server starting handshake"); + // Without this, client handshake blocks + conn.startHandshake(); + } catch (IOException e) { + // Not sure why I sometimes see ~thousands of these locally + if (e instanceof SocketException && e.getMessage().contains("Socket closed")) + return; + logger.info("Server accept error", e); + } + } + }); + serverThread.setName(String.format("%s-serverThread", this.getClass().getSimpleName())); + serverThread.setDaemon(true); + serverThread.start(); + + final ReloadingKeyManagerFactory kmf = + ReloadingKeyManagerFactory.create( + TMP_CLIENT_KEYSTORE_PATH, CERTSTORE_PASSWORD, Optional.empty()); + // Need a tmf that tells the server to send its certs + final TrustManagerFactory tmf = buildTrustManagerFactory(); + + // Check original client certificate + testClientCertificates( + kmf, + tmf, + server.getLocalSocketAddress(), + () -> { + try { + return peerCertificates.poll(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + certs -> { + Assert.assertEquals(1, certs.length); + X509Certificate cert = certs[0]; + Assert.assertEquals(ORIGINAL_CLIENT_KEYSTORE_CERT_SERIAL, cert.getSerialNumber()); + }); + + // Update keystore content + logger.info("Updating keystore file with new content"); + Files.copy( + ALTERNATE_CLIENT_KEYSTORE_PATH, + TMP_CLIENT_KEYSTORE_PATH, + REPLACE_EXISTING, + COPY_ATTRIBUTES); + kmf.reload(); + + // Check that alternate client certificate was applied + testClientCertificates( + kmf, + tmf, + server.getLocalSocketAddress(), + () -> { + try { + return peerCertificates.poll(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, + certs -> { + Assert.assertEquals(1, certs.length); + X509Certificate cert = certs[0]; + Assert.assertEquals(ALTERNATE_CLIENT_KEYSTORE_CERT_SERIAL, cert.getSerialNumber()); + }); + + kmf.close(); + server.close(); + } + + private static void testClientCertificates( + KeyManagerFactory kmf, + TrustManagerFactory tmf, + SocketAddress serverAddress, + Supplier> certsSupplier, + Consumer certsConsumer) + throws NoSuchAlgorithmException, KeyManagementException, IOException { + SSLContext clientSslContext = SSLContext.getInstance("TLS"); + clientSslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + final SSLSocket client = (SSLSocket) clientSslContext.getSocketFactory().createSocket(); + logger.info("Client connecting"); + client.connect(serverAddress); + logger.info("Client doing handshake"); + client.startHandshake(); + + final Optional lastCertificate = certsSupplier.get(); + logger.info("Client got its certificate back from the server; closing socket"); + client.close(); + Assert.assertNotNull(lastCertificate); + Assert.assertTrue(lastCertificate.isPresent()); + logger.info("Client got its certificate back from server: {}", lastCertificate); + + certsConsumer.accept(lastCertificate.get()); + } + + private static BigInteger convertSerial(String hex) { + final BigInteger serial = new BigInteger(Integer.valueOf(hex, 16).toString()); + logger.info("Serial hex {} is {}", hex, serial); + return serial; + } +} diff --git a/core/src/test/resources/ReloadingKeyManagerFactoryTest/README.md b/core/src/test/resources/ReloadingKeyManagerFactoryTest/README.md new file mode 100644 index 00000000000..9ff9b622e5b --- /dev/null +++ b/core/src/test/resources/ReloadingKeyManagerFactoryTest/README.md @@ -0,0 +1,39 @@ +# How to create cert stores for ReloadingKeyManagerFactoryTest + +Need the following cert stores: +- `server.keystore` +- `client-original.keystore` +- `client-alternate.keystore` +- `server.truststore`: trusts `client-original.keystore` and `client-alternate.keystore` +- `client.truststore`: trusts `server.keystore` + +We shouldn't need any signing requests or chains of trust, since truststores are just including certs directly. + +First create the three keystores: +``` +$ keytool -genkeypair -keyalg RSA -alias server -keystore server.keystore -dname "CN=server" -storepass changeit -keypass changeit +$ keytool -genkeypair -keyalg RSA -alias client-original -keystore client-original.keystore -dname "CN=client-original" -storepass changeit -keypass changeit +$ keytool -genkeypair -keyalg RSA -alias client-alternate -keystore client-alternate.keystore -dname "CN=client-alternate" -storepass changeit -keypass changeit +``` + +Note that we need to use `-keyalg RSA` because keytool's default keyalg is DSA, which TLS 1.3 doesn't support. If DSA is +used, the handshake will fail due to the server not being able to find any authentication schemes compatible with its +x509 certificate ("Unavailable authentication scheme"). + +Then export all the certs: +``` +$ keytool -exportcert -keystore server.keystore -alias server -file server.cert -storepass changeit +$ keytool -exportcert -keystore client-original.keystore -alias client-original -file client-original.cert -storepass changeit +$ keytool -exportcert -keystore client-alternate.keystore -alias client-alternate -file client-alternate.cert -storepass changeit +``` + +Then create the server.truststore that trusts the two client certs: +``` +$ keytool -import -file client-original.cert -alias client-original -keystore server.truststore -storepass changeit +$ keytool -import -file client-alternate.cert -alias client-alternate -keystore server.truststore -storepass changeit +``` + +Then create the client.truststore that trusts the server cert: +``` +$ keytool -import -file server.cert -alias server -keystore client.truststore -storepass changeit +``` diff --git a/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/client-alternate.keystore b/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/client-alternate.keystore new file mode 100644 index 00000000000..91cee636a0b Binary files /dev/null and b/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/client-alternate.keystore differ diff --git a/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/client-original.keystore b/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/client-original.keystore new file mode 100644 index 00000000000..74e31f7bc6f Binary files /dev/null and b/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/client-original.keystore differ diff --git a/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/client.truststore b/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/client.truststore new file mode 100644 index 00000000000..3ce9a720dbc Binary files /dev/null and b/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/client.truststore differ diff --git a/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/server.keystore b/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/server.keystore new file mode 100644 index 00000000000..7d279638a34 Binary files /dev/null and b/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/server.keystore differ diff --git a/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/server.truststore b/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/server.truststore new file mode 100644 index 00000000000..c9b06b5fbe1 Binary files /dev/null and b/core/src/test/resources/ReloadingKeyManagerFactoryTest/certs/server.truststore differ diff --git a/distribution-source/pom.xml b/distribution-source/pom.xml new file mode 100644 index 00000000000..f68ca1b4564 --- /dev/null +++ b/distribution-source/pom.xml @@ -0,0 +1,125 @@ + + + + 4.0.0 + + org.apache.cassandra + java-driver-parent + 4.18.1.0 + + java-driver-distribution-source + pom + Apache Cassandra Java Driver - source distribution + + apache-cassandra-java-driver-${project.version}-source + + + maven-jar-plugin + + + + default-jar + none + + + + + maven-source-plugin + + true + + + + maven-install-plugin + + true + + + + maven-deploy-plugin + + true + + + + org.revapi + revapi-maven-plugin + + true + + + + org.sonatype.plugins + nexus-staging-maven-plugin + + true + + + + + + + release + + + + maven-assembly-plugin + + + assemble-source-tarball + package + + single + + + + + false + + src/assembly/source-tarball.xml + + posix + + + + net.nicoulaj.maven.plugins + checksum-maven-plugin + 1.7 + + + + artifacts + + + + + true + + sha256 + sha512 + + + + + + + + diff --git a/distribution-tests/pom.xml b/distribution-tests/pom.xml new file mode 100644 index 00000000000..1c0dd09a1b3 --- /dev/null +++ b/distribution-tests/pom.xml @@ -0,0 +1,122 @@ + + + + 4.0.0 + + org.apache.cassandra + java-driver-parent + 4.18.1.0 + + java-driver-distribution-tests + Apache Cassandra Java Driver - distribution tests + + + + ${project.groupId} + java-driver-bom + ${project.version} + pom + import + + + + + + org.apache.cassandra + java-driver-test-infra + test + + + org.apache.cassandra + java-driver-query-builder + test + + + org.apache.cassandra + java-driver-mapper-processor + test + + + org.apache.cassandra + java-driver-mapper-runtime + test + + + org.apache.cassandra + java-driver-core + test + + + org.apache.cassandra + java-driver-metrics-micrometer + test + + + org.apache.cassandra + java-driver-metrics-microprofile + test + + + junit + junit + test + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${testing.jvm}/bin/java + ${mockitoopens.argline} + 1 + + + + org.revapi + revapi-maven-plugin + + true + + + + maven-install-plugin + + true + + + + maven-deploy-plugin + + true + + + + org.sonatype.plugins + nexus-staging-maven-plugin + + true + + + + + diff --git a/distribution/pom.xml b/distribution/pom.xml index 502b7ecb1ff..ee54fa44e77 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-distribution diff --git a/examples/pom.xml b/examples/pom.xml index 391d334fc89..92ed8c723d4 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ java-driver-parent com.scylladb - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-examples Java driver for Scylla and Apache Cassandra(R) - examples. diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 60631228e77..8e3bb7ebad6 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-integration-tests jar diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/DropwizardMetricsIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/DropwizardMetricsIT.java index 6cbe443f2a6..e0184516e21 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/DropwizardMetricsIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/DropwizardMetricsIT.java @@ -198,6 +198,12 @@ protected void assertNodeMetricsNotEvicted(CqlSession session, Node node) { } } + @Override + protected void assertMetricsNotPresent(Object registry) { + MetricRegistry dropwizardRegistry = (MetricRegistry) registry; + assertThat(dropwizardRegistry.getMetrics()).isEmpty(); + } + @Override protected void assertNodeMetricsEvicted(CqlSession session, Node node) { InternalDriverContext context = (InternalDriverContext) session.getContext(); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java index 7fac3f98f52..e6121217619 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/core/metrics/MetricsITBase.java @@ -83,8 +83,10 @@ public void resetSimulacron() { @Test @UseDataProvider("descriptorsAndPrefixes") - public void should_expose_metrics_if_enabled(Class metricIdGenerator, String prefix) { + public void should_expose_metrics_if_enabled_and_clear_metrics_if_closed( + Class metricIdGenerator, String prefix) { + Object registry = newMetricRegistry(); Assume.assumeFalse( "Cannot use metric tags with Dropwizard", metricIdGenerator.getSimpleName().contains("Tagging") @@ -101,12 +103,14 @@ public void should_expose_metrics_if_enabled(Class metricIdGenerator, String CqlSession.builder() .addContactEndPoints(simulacron().getContactPoints()) .withConfigLoader(loader) - .withMetricRegistry(newMetricRegistry()) + .withMetricRegistry(registry) .build()) { session.prepare("irrelevant"); queryAllNodes(session); assertMetricsPresent(session); + } finally { + assertMetricsNotPresent(registry); } } @@ -262,4 +266,6 @@ private DefaultNode findNode(CqlSession session, int id) { return (DefaultNode) session.getMetadata().findNode(address1).orElseThrow(IllegalStateException::new); } + + protected abstract void assertMetricsNotPresent(Object registry); } diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/metrics/micrometer/MicrometerMetricsIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/metrics/micrometer/MicrometerMetricsIT.java index 8f9c0a801d5..ef87040f2a2 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/metrics/micrometer/MicrometerMetricsIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/metrics/micrometer/MicrometerMetricsIT.java @@ -188,6 +188,12 @@ protected void assertNodeMetricsNotEvicted(CqlSession session, Node node) { } } + @Override + protected void assertMetricsNotPresent(Object registry) { + MeterRegistry micrometerRegistry = (MeterRegistry) registry; + assertThat(micrometerRegistry.getMeters()).isEmpty(); + } + @Override protected void assertNodeMetricsEvicted(CqlSession session, Node node) { InternalDriverContext context = (InternalDriverContext) session.getContext(); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/metrics/microprofile/MicroProfileMetricsIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/metrics/microprofile/MicroProfileMetricsIT.java index 1294be3deae..aa04c058a49 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/metrics/microprofile/MicroProfileMetricsIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/metrics/microprofile/MicroProfileMetricsIT.java @@ -188,6 +188,12 @@ protected void assertNodeMetricsNotEvicted(CqlSession session, Node node) { } } + @Override + protected void assertMetricsNotPresent(Object registry) { + MetricRegistry metricRegistry = (MetricRegistry) registry; + assertThat(metricRegistry.getMetrics()).isEmpty(); + } + @Override protected void assertNodeMetricsEvicted(CqlSession session, Node node) { InternalDriverContext context = (InternalDriverContext) session.getContext(); diff --git a/manual/core/ssl/README.md b/manual/core/ssl/README.md index b8aa9b89192..913c7bc6c9a 100644 --- a/manual/core/ssl/README.md +++ b/manual/core/ssl/README.md @@ -94,11 +94,13 @@ If you're using a CA, sign the client certificate with it (see the blog post lin this page). Then the nodes' truststores only need to contain the CA's certificate (which should already be the case if you've followed the steps for inter-node encryption). +`DefaultSslEngineFactory` supports client keystore reloading; see property +`advanced.ssl-engine-factory.keystore-reload-interval`. ### Driver configuration By default, the driver's SSL support is based on the JDK's built-in implementation: JSSE (Java -Secure Socket Extension),. +Secure Socket Extension). To enable it, you need to define an engine factory in the [configuration](../configuration/). @@ -126,6 +128,12 @@ datastax-java-driver { // truststore-password = password123 // keystore-path = /path/to/client.keystore // keystore-password = password123 + + # The duration between attempts to reload the keystore from the contents of the file specified + # by `keystore-path`. This is mainly relevant in environments where certificates have short + # lifetimes and applications are restarted infrequently, since an expired client certificate + # will prevent new connections from being established until the application is restarted. + // keystore-reload-interval = 30 minutes } } ``` diff --git a/mapper-processor/pom.xml b/mapper-processor/pom.xml index 7ef41b00963..99ae9ec2fee 100644 --- a/mapper-processor/pom.xml +++ b/mapper-processor/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-mapper-processor Java driver for Scylla and Apache Cassandra(R) - object mapper processor diff --git a/mapper-runtime/pom.xml b/mapper-runtime/pom.xml index 42b7a968c7f..a8ee2faa8df 100644 --- a/mapper-runtime/pom.xml +++ b/mapper-runtime/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-mapper-runtime bundle diff --git a/metrics/micrometer/pom.xml b/metrics/micrometer/pom.xml index d96c7cb4720..a04fe3e2553 100644 --- a/metrics/micrometer/pom.xml +++ b/metrics/micrometer/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 ../../ java-driver-metrics-micrometer diff --git a/metrics/micrometer/src/main/java/com/datastax/oss/driver/internal/metrics/micrometer/MicrometerMetricUpdater.java b/metrics/micrometer/src/main/java/com/datastax/oss/driver/internal/metrics/micrometer/MicrometerMetricUpdater.java index 7a4a27991e3..b9507c8b7cf 100644 --- a/metrics/micrometer/src/main/java/com/datastax/oss/driver/internal/metrics/micrometer/MicrometerMetricUpdater.java +++ b/metrics/micrometer/src/main/java/com/datastax/oss/driver/internal/metrics/micrometer/MicrometerMetricUpdater.java @@ -83,7 +83,7 @@ public void updateTimer( } @Override - protected void clearMetrics() { + public void clearMetrics() { for (Meter metric : metrics.values()) { registry.remove(metric); } diff --git a/metrics/microprofile/pom.xml b/metrics/microprofile/pom.xml index 7771e0ad6e5..ff68bc6fbbd 100644 --- a/metrics/microprofile/pom.xml +++ b/metrics/microprofile/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 ../../ java-driver-metrics-microprofile diff --git a/metrics/microprofile/src/main/java/com/datastax/oss/driver/internal/metrics/microprofile/MicroProfileMetricUpdater.java b/metrics/microprofile/src/main/java/com/datastax/oss/driver/internal/metrics/microprofile/MicroProfileMetricUpdater.java index a46e82ee624..df44fd69c51 100644 --- a/metrics/microprofile/src/main/java/com/datastax/oss/driver/internal/metrics/microprofile/MicroProfileMetricUpdater.java +++ b/metrics/microprofile/src/main/java/com/datastax/oss/driver/internal/metrics/microprofile/MicroProfileMetricUpdater.java @@ -83,7 +83,7 @@ public void updateTimer( } @Override - protected void clearMetrics() { + public void clearMetrics() { for (MetricT metric : metrics.keySet()) { MetricId id = getMetricId(metric); Tag[] tags = MicroProfileTags.toMicroProfileTags(id.getTags()); diff --git a/osgi-tests/pom.xml b/osgi-tests/pom.xml index 96d4784b99c..36ea8714c12 100644 --- a/osgi-tests/pom.xml +++ b/osgi-tests/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-osgi-tests jar diff --git a/pom.xml b/pom.xml index 414ec6b5eb2..ae7eb9a1e3f 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ 4.0.0 com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 pom Java Driver for Scylla and Apache Cassandra A driver for Scylla and Apache Cassandra(R) 2.1+ that works exclusively with the Cassandra Query Language version 3 (CQL3) and Cassandra's native protocol versions 3 and above. @@ -686,6 +686,7 @@ maven-surefire-plugin + ${testing.jvm}/bin/java ${project.basedir}/src/test/resources/logback-test.xml @@ -712,9 +713,9 @@ true ossrh - https://oss.sonatype.org/ - true - + https://repository.apache.org/ + false + true @@ -1020,16 +1021,6 @@ height="0" width="0" style="display:none;visibility:hidden"> - - - ossrh - https://oss.sonatype.org/content/repositories/snapshots - - - ossrh - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - Apache 2 @@ -1042,7 +1033,7 @@ height="0" width="0" style="display:none;visibility:hidden"> scm:git:https://github.com/scylladb/java-driver scm:git:https://github.com/scylladb/java-driver https://github.com/scylladb/java-driver - HEAD + 4.18.1.0 diff --git a/query-builder/pom.xml b/query-builder/pom.xml index aa79ba1118e..1ad8ce86175 100644 --- a/query-builder/pom.xml +++ b/query-builder/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-query-builder bundle diff --git a/test-infra/pom.xml b/test-infra/pom.xml index aa3c07445bb..e14e2010af1 100644 --- a/test-infra/pom.xml +++ b/test-infra/pom.xml @@ -28,7 +28,7 @@ com.scylladb java-driver-parent - 4.18.0.2-SNAPSHOT + 4.18.1.0 java-driver-test-infra bundle diff --git a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CcmBridge.java b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CcmBridge.java index b14d02455f8..05d1d129b16 100644 --- a/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CcmBridge.java +++ b/test-infra/src/main/java/com/datastax/oss/driver/api/testinfra/ccm/CcmBridge.java @@ -327,23 +327,39 @@ public void create() { // Collect all cassandraConfiguration (and others) into a single "ccm updateconf" call. // Works around the behavior introduced in https://github.com/scylladb/scylla-ccm/pull/410 StringBuilder updateConfArguments = new StringBuilder(); + Version cassandraVersion = getCassandraVersion(); for (Map.Entry conf : cassandraConfiguration.entrySet()) { - updateConfArguments.append(conf.getKey()).append(':').append(conf.getValue()).append(' '); + String originalKey = conf.getKey(); + Object originalValue = conf.getValue(); + updateConfArguments.append( + String.join( + ":", + getConfigKey(originalKey, originalValue, cassandraVersion), + getConfigValue(originalKey, originalValue, cassandraVersion)) + + " "); } - if (getCassandraVersion().compareTo(Version.V2_2_0) >= 0 && !SCYLLA_ENABLEMENT) { - // @IntegrationTestDisabledScyllaJVMArgs @IntegrationTestDisabledScyllaUDF - if (getCassandraVersion().compareTo(Version.V4_1_0) >= 0) { - updateConfArguments.append("user_defined_functions_enabled:true").append(' '); - } else { - updateConfArguments.append("enable_user_defined_functions:true").append(' '); + if (!SCYLLA_ENABLEMENT) { + // If we're dealing with anything more recent than 2.2 explicitly enable UDF... but run it + // through our conversion process to make + // sure more recent versions don't have a problem. + if (getCassandraVersion().compareTo(Version.V2_2_0) >= 0) { + String originalKey = "enable_user_defined_functions"; + Object originalValue = "true"; + updateConfArguments.append( + String.join( + ":", + getConfigKey(originalKey, originalValue, cassandraVersion), + getConfigValue(originalKey, originalValue, cassandraVersion))); } } if (updateConfArguments.length() > 0) { execute("updateconf", updateConfArguments.toString()); } + + // Note that we aren't performing any substitution on DSE key/value props (at least for now) if (DSE_ENABLEMENT) { for (Map.Entry conf : dseConfiguration.entrySet()) { execute("updatedseconf", String.format("%s:%s", conf.getKey(), conf.getValue())); @@ -517,6 +533,79 @@ private static File createTempStore(String storePath) { return f; } + /** + * Get the current JVM major version (1.8.0_372 -> 8, 11.0.19 -> 11) + * + * @return major version of current JVM + */ + private static int getCurrentJvmMajorVersion() { + String version = System.getProperty("java.version"); + if (version.startsWith("1.")) { + version = version.substring(2, 3); + } else { + int dot = version.indexOf("."); + if (dot != -1) { + version = version.substring(0, dot); + } + } + return Integer.parseInt(version); + } + + @SuppressWarnings("UnusedMethod") + private Optional overrideJvmVersionForDseWorkloads() { + if (getCurrentJvmMajorVersion() <= 8) { + return Optional.empty(); + } + + if (!DSE_ENABLEMENT || !getDseVersion().isPresent()) { + return Optional.empty(); + } + + if (getDseVersion().get().compareTo(Version.parse("6.8.19")) < 0) { + return Optional.empty(); + } + + if (dseWorkloads.contains("graph")) { + return Optional.of(8); + } + + return Optional.empty(); + } + + private static String IN_MS_STR = "_in_ms"; + private static int IN_MS_STR_LENGTH = IN_MS_STR.length(); + private static String ENABLE_STR = "enable_"; + private static int ENABLE_STR_LENGTH = ENABLE_STR.length(); + private static String IN_KB_STR = "_in_kb"; + private static int IN_KB_STR_LENGTH = IN_KB_STR.length(); + + @SuppressWarnings("unused") + private String getConfigKey(String originalKey, Object originalValue, Version cassandraVersion) { + + // At least for now we won't support substitutions on nested keys. This requires an extra + // traversal of the string + // but we'll live with that for now + if (originalKey.contains(".")) return originalKey; + if (cassandraVersion.compareTo(Version.V4_1_0) < 0) return originalKey; + if (originalKey.endsWith(IN_MS_STR)) + return originalKey.substring(0, originalKey.length() - IN_MS_STR_LENGTH); + if (originalKey.startsWith(ENABLE_STR)) + return originalKey.substring(ENABLE_STR_LENGTH) + "_enabled"; + if (originalKey.endsWith(IN_KB_STR)) + return originalKey.substring(0, originalKey.length() - IN_KB_STR_LENGTH); + return originalKey; + } + + private String getConfigValue( + String originalKey, Object originalValue, Version cassandraVersion) { + + String originalValueStr = originalValue.toString(); + if (cassandraVersion.compareTo(Version.V4_1_0) < 0) return originalValueStr; + if (originalKey.endsWith(IN_MS_STR)) return originalValueStr + "ms"; + if (originalKey.endsWith(IN_KB_STR)) return originalValueStr + "KiB"; + return originalValueStr; + } + public static Builder builder() { return new Builder(); } diff --git a/upgrade_guide/README.md b/upgrade_guide/README.md index d1b6f8c8f7f..f98770a5cc8 100644 --- a/upgrade_guide/README.md +++ b/upgrade_guide/README.md @@ -19,6 +19,17 @@ under the License. ## Upgrade guide +### NEW VERSION PLACEHOLDER + +#### Keystore reloading in DefaultSslEngineFactory + +`DefaultSslEngineFactory` now includes an optional keystore reloading interval, for detecting changes in the local +client keystore file. This is relevant in environments with mTLS enabled and short-lived client certificates, especially +when an application restart might not always happen between a new keystore becoming available and the previous +keystore certificate expiring. + +This feature is disabled by default for compatibility. To enable, see `keystore-reload-interval` in `reference.conf`. + ### 4.17.0 #### Beta support for Java17