From e5d967c1f502e9ce526103a90f482efa2bb007b7 Mon Sep 17 00:00:00 2001 From: Dmitry Kropachev Date: Tue, 12 Nov 2024 22:39:29 -0400 Subject: [PATCH] Add rack/dc aware load balancing scylladb/scylladb#12147 implements a feature for `/locanodes` to filter out nodes by rack and/or datacenter. Now we can make dynamodb client to target particular rack and/or datacenter. This PR allows user to target dynamodb client to a particular rack/datacenter or rack+datacenter, via following API: ``` AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, datacenter, rack); ``` Changes in auxiliary `AlternatorLiveNodes` API: 1. Constructor input argument is an single URI now 2. Validation is introduced 3. Start is moved out from the constructor ``` liveNodes = new AlternatorLiveNodes(Collections.singletonList(seedURI), datacenter, rack); try { liveNodes.validate(); liveNodes.checkIfRackAndDatacenterSetCorrectly(); } catch (AlternatorLiveNodes.ValidationError | AlternatorLiveNodes.FailedToCheck e) { throw new RuntimeException(e); } liveNodes.start(); ``` 1. Regular, old code: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri)` for `6.2.0` and `master` 2. Dc+Rack-aware: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, "dc1", "rack1")` for `6.2.0` and `master` 3. Dc-aware: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, "dc1", "")` for `6.2.0` and `master` 4. Rack-aware: `AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, "", "rack1")` for `6.2.0` and `master` All tests are done for following deployments: 1. MultiDc 2. MultiRack+MultiDc 3. SingleDc Closes #40 --- .../AlternatorEndpointProvider.java | 21 +- .../alternator/AlternatorLiveNodes.java | 212 +++++++++++++----- .../alternator/AlternatorRequestHandler.java | 22 +- .../com/scylladb/alternator/test/Demo1.java | 62 ++++- .../com/scylladb/alternator/test/Demo2.java | 61 ++++- .../com/scylladb/alternator/test/Demo3.java | 9 +- 6 files changed, 301 insertions(+), 86 deletions(-) diff --git a/java/src/main/java/com/scylladb/alternator/AlternatorEndpointProvider.java b/java/src/main/java/com/scylladb/alternator/AlternatorEndpointProvider.java index 0fed7e9..f9ac18d 100644 --- a/java/src/main/java/com/scylladb/alternator/AlternatorEndpointProvider.java +++ b/java/src/main/java/com/scylladb/alternator/AlternatorEndpointProvider.java @@ -4,6 +4,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; +import java.util.logging.Logger; import software.amazon.awssdk.endpoints.Endpoint; import software.amazon.awssdk.services.dynamodb.endpoints.DynamoDbEndpointParams; @@ -17,16 +19,27 @@ public class AlternatorEndpointProvider implements DynamoDbEndpointProvider { private final AlternatorLiveNodes liveNodes; private final Map> futureCache; + private static Logger logger = Logger.getLogger(AlternatorEndpointProvider.class.getName()); public AlternatorEndpointProvider(URI seedURI) { + this(seedURI, "", ""); + } + + public AlternatorEndpointProvider(URI seedURI, String datacenter, String rack) { futureCache = new ConcurrentHashMap<>(); - liveNodes = new AlternatorLiveNodes(seedURI); + liveNodes = new AlternatorLiveNodes(seedURI, datacenter, rack); try { liveNodes.validate(); - } catch (AlternatorLiveNodes.ValidationError e) { + liveNodes.checkIfRackAndDatacenterSetCorrectly(); + if (!datacenter.isEmpty() || !rack.isEmpty()) { + if (!liveNodes.checkIfRackDatacenterFeatureIsSupported()) { + logger.log(Level.SEVERE, String.format("server %s does not support rack or datacenter filtering", seedURI)); + } + } + } catch (AlternatorLiveNodes.ValidationError | AlternatorLiveNodes.FailedToCheck e) { throw new RuntimeException(e); - } - liveNodes.start(); + } + liveNodes.start(); } @Override diff --git a/java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java b/java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java index df36a39..4b7c3a4 100644 --- a/java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java +++ b/java/src/main/java/com/scylladb/alternator/AlternatorLiveNodes.java @@ -1,15 +1,13 @@ package com.scylladb.alternator; import java.io.IOException; -import java.net.MalformedURLException; -import java.util.Collections; -import java.util.List; -import java.util.ArrayList; -import java.util.Scanner; -import java.util.concurrent.atomic.AtomicInteger; import java.net.URI; +import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.HttpURLConnection; +import java.net.ProtocolException; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import java.util.logging.Level; @@ -26,10 +24,11 @@ public class AlternatorLiveNodes extends Thread { private final String alternatorScheme; private final int alternatorPort; - private final AtomicReference> liveNodes; - private final List initialNodes; + private final List initialNodes; private final AtomicInteger nextLiveNodeIndex; + private final String rack; + private final String datacenter; private static Logger logger = Logger.getLogger(AlternatorLiveNodes.class.getName()); @@ -55,48 +54,57 @@ public void run() { } } - public AlternatorLiveNodes(String alternatorScheme, List liveNodes, int alternatorPort) { - this.alternatorScheme = alternatorScheme; - this.initialNodes = new ArrayList<>(liveNodes); - this.liveNodes = new AtomicReference<>(); - this.alternatorPort = alternatorPort; - this.nextLiveNodeIndex = new AtomicInteger(0); + public AlternatorLiveNodes(URI liveNode, String datacenter, String rack) { + this(Collections.singletonList(liveNode), liveNode.getScheme(), liveNode.getPort(), datacenter, rack); } - public AlternatorLiveNodes(URI uri) { - this(uri.getScheme(), Collections.singletonList(uri.getHost()), uri.getPort()); + public AlternatorLiveNodes(List liveNodes, String scheme, int port, String datacenter, String rack) { + if (liveNodes == null || liveNodes.isEmpty()) { + throw new RuntimeException("liveNodes cannot be null or empty"); + } + this.alternatorScheme = scheme; + this.initialNodes = liveNodes; + this.liveNodes = new AtomicReference<>(); + this.alternatorPort = port; + this.nextLiveNodeIndex = new AtomicInteger(0); + this.rack = rack; + this.datacenter = datacenter; } @Override public void start() { - List nodes = new ArrayList<>(); - for (String liveNode : initialNodes) { - try { - nodes.add(this.hostToURI(liveNode)); - } catch (URISyntaxException | MalformedURLException e) { - // Should not happen, initialLiveNodes should be validated at this point - throw new RuntimeException(e); - } + try { + this.validate(); + } catch (ValidationError e) { + throw new RuntimeException(e); } - this.liveNodes.set(nodes); + liveNodes.set(initialNodes); // setDaemon(true) allows the program to exit even if the thread is still running. this.setDaemon(true); super.start(); } + public void validateURI(URI uri) throws ValidationError { + try { + uri.toURL(); + } catch (MalformedURLException e) { + throw new ValidationError("Invalid URI: " + uri, e); + } + } + public void validate() throws ValidationError { this.validateConfig(); - for (String liveNode : initialNodes) { - try { - this.hostToURI(liveNode); - } catch (MalformedURLException | URISyntaxException e) { - throw new ValidationError(String.format("failed to validate initial node %s", liveNode), e); - } + for (URI liveNode : initialNodes) { + this.validateURI(liveNode); } } public static class ValidationError extends Exception { + public ValidationError(String message) { + super(message); + } + public ValidationError(String message, Throwable cause) { super(message, cause); } @@ -104,6 +112,7 @@ public ValidationError(String message, Throwable cause) { private void validateConfig() throws ValidationError { try { + // Make sure that `alternatorScheme` and `alternatorPort` are correct values this.hostToURI("1.1.1.1"); } catch (MalformedURLException | URISyntaxException e) { throw new ValidationError("failed to validate configuration", e); @@ -111,11 +120,7 @@ private void validateConfig() throws ValidationError { } private URI hostToURI(String host) throws URISyntaxException, MalformedURLException { - return hostToURI(host, null, null); - } - - private URI hostToURI(String host, String path, String query) throws URISyntaxException, MalformedURLException { - URI uri = new URI(alternatorScheme, null, host, alternatorPort, path, query, null); + URI uri = new URI(alternatorScheme, null, host, alternatorPort, null, null, null); // Make sure that URI to URL conversion works uri.toURL(); return uri; @@ -129,10 +134,10 @@ public URI nextAsURI() { return nodes.get(Math.abs(nextLiveNodeIndex.getAndIncrement() % nodes.size())); } - public URI nextAsURI(String file, String query) { + public URI nextAsURI(String path, String query) { try { URI uri = this.nextAsURI(); - return new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), file, query, null); + return new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), path, query, null); } catch (URISyntaxException e) { // Should never happen, nextAsURI content is already validated throw new RuntimeException(e); @@ -147,38 +152,123 @@ private static String streamToString(java.io.InputStream is) { } private void updateLiveNodes() throws IOException { - List newHosts = new ArrayList<>(); - URI uri = nextAsURI("/localnodes", null); + List newHosts = getNodes(nextAsLocalNodesURI()); + if (!newHosts.isEmpty()) { + liveNodes.set(newHosts); + logger.log(Level.FINE, "Updated hosts to " + liveNodes); + } + } + + private List getNodes(URI uri) throws IOException { // Note that despite this being called HttpURLConnection, it actually // supports HTTPS as well. HttpURLConnection conn; + conn = (HttpURLConnection) uri.toURL().openConnection(); try { - conn = (HttpURLConnection) uri.toURL().openConnection(); - } catch (MalformedURLException e){ - // Should never happen, uri is already validated at this point + conn.setRequestMethod("GET"); + } catch (ProtocolException e) { + // It can happen only of conn is already connected or "GET" is not a valid method + // Both cases not true, os it should happen throw new RuntimeException(e); } - conn.setRequestMethod("GET"); int responseCode = conn.getResponseCode(); - if (responseCode == HttpURLConnection.HTTP_OK) { - String response = streamToString(conn.getInputStream()); - // response looks like: ["127.0.0.2","127.0.0.3","127.0.0.1"] - response = response.trim(); - response = response.substring(1, response.length() - 1); - String[] list = response.split(","); - for (String host : list) { - host = host.trim(); - host = host.substring(1, host.length() - 1); - try { - newHosts.add(this.hostToURI(host)); - } catch (URISyntaxException | MalformedURLException e) { - logger.log(Level.WARNING, "Invalid host: " + host, e); - } + if (responseCode != HttpURLConnection.HTTP_OK) { + return Collections.emptyList(); + } + String response = streamToString(conn.getInputStream()); + // response looks like: ["127.0.0.2","127.0.0.3","127.0.0.1"] + response = response.trim(); + response = response.substring(1, response.length() - 1); + String[] list = response.split(","); + List newHosts = new ArrayList<>(); + for (String host : list) { + if (host.isEmpty()){ + continue; + } + host = host.trim(); + host = host.substring(1, host.length() - 1); + try { + newHosts.add(this.hostToURI(host)); + } catch (URISyntaxException | MalformedURLException e) { + logger.log(Level.WARNING, "Invalid host: " + host, e); } } - if (!newHosts.isEmpty()) { - liveNodes.set(newHosts); - logger.log(Level.FINE, "Updated hosts to " + liveNodes); + return newHosts; + } + + private URI nextAsLocalNodesURI() { + if (this.rack.isEmpty() && this.datacenter.isEmpty()) { + return nextAsURI("/localnodes", null); + } + String query = ""; + if (!this.rack.isEmpty()) { + query = "rack=" + this.rack; + } + if (!this.datacenter.isEmpty()) { + if (query.isEmpty()) { + query = "dc=" + this.datacenter; + } else { + query += "&dc=" + this.datacenter; + } + } + return nextAsURI("/localnodes", query); + } + + public static class FailedToCheck extends Exception { + public FailedToCheck(String message, Throwable cause) { + super(message, cause); + } + + public FailedToCheck(String message) { + super(message); + } + } + + /** + * Checks if server returns non-empty node list for given datacenter/rack. + * throws {@link FailedToCheck} if it fails to reach server and {@link ValidationError} if list is empty + * otherwise do not throw + * + **/ + public void checkIfRackAndDatacenterSetCorrectly() throws FailedToCheck, ValidationError { + if (this.rack.isEmpty() && this.datacenter.isEmpty()) { + return; + } + try { + List nodes = getNodes(nextAsLocalNodesURI()); + if (nodes.isEmpty()) { + throw new ValidationError("node returned empty list, datacenter or rack are set incorrectly"); + } + } catch (IOException e) { + throw new FailedToCheck("failed to read list of nodes from the node", e); + } + } + + /** + * Returns true if remote node supports /localnodes?rack=<>&dc=. + * If it can't conclude by any reason it throws {@link FailedToCheck} + */ + public Boolean checkIfRackDatacenterFeatureIsSupported() throws FailedToCheck { + URI uri = nextAsURI("/localnodes", null); + URI fakeRackUrl; + try { + fakeRackUrl = new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getQuery(), "rack=fakeRack", ""); + } catch (URISyntaxException e) { + // Should not ever happen + throw new FailedToCheck("Invalid URI: " + uri, e); + } + try { + List hostsWithFakeRack = getNodes(fakeRackUrl); + List hostsWithoutRack = getNodes(uri); + if (hostsWithoutRack.isEmpty()) { + // This should not normally happen. + // If list of nodes is empty, it is impossible to conclude if it supports rack/datacenter filtering or not. + throw new FailedToCheck(String.format("host %s returned empty list", uri)); + } + // When rack filtering is not supported server returns same nodes. + return hostsWithFakeRack.size() != hostsWithoutRack.size(); + } catch (IOException e) { + throw new FailedToCheck("failed to read list of nodes from the node", e); } } } diff --git a/java/src/main/java/com/scylladb/alternator/AlternatorRequestHandler.java b/java/src/main/java/com/scylladb/alternator/AlternatorRequestHandler.java index a762c2a..41cd458 100644 --- a/java/src/main/java/com/scylladb/alternator/AlternatorRequestHandler.java +++ b/java/src/main/java/com/scylladb/alternator/AlternatorRequestHandler.java @@ -4,18 +4,34 @@ import com.amazonaws.Request; import java.net.URI; +import java.util.logging.Level; +import java.util.logging.Logger; /* AlternatorRequestHandler is RequestHandler2 implementation for AWS SDK * for Java v1. It tells the SDK to replace the endpoint in the request, * whatever it was, with the next Alternator node. */ public class AlternatorRequestHandler extends RequestHandler2 { + + private static Logger logger = Logger.getLogger(AlternatorRequestHandler.class.getName()); + AlternatorLiveNodes liveNodes; - public AlternatorRequestHandler(URI seedURI){ - liveNodes = new AlternatorLiveNodes(seedURI); + + public AlternatorRequestHandler(URI seedURI) { + this(seedURI, "", ""); + } + + public AlternatorRequestHandler(URI seedURI, String datacenter, String rack) { + liveNodes = new AlternatorLiveNodes(seedURI, datacenter, rack); try { liveNodes.validate(); - } catch (AlternatorLiveNodes.ValidationError e) { + liveNodes.checkIfRackAndDatacenterSetCorrectly(); + if (!datacenter.isEmpty() || !rack.isEmpty()) { + if (!liveNodes.checkIfRackDatacenterFeatureIsSupported()) { + logger.log(Level.SEVERE, String.format("server %s does not support rack or datacenter filtering", seedURI)); + } + } + } catch (AlternatorLiveNodes.ValidationError | AlternatorLiveNodes.FailedToCheck e) { throw new RuntimeException(e); } liveNodes.start(); diff --git a/java/src/test/java/com/scylladb/alternator/test/Demo1.java b/java/src/test/java/com/scylladb/alternator/test/Demo1.java index 02ac439..ce7d81a 100755 --- a/java/src/test/java/com/scylladb/alternator/test/Demo1.java +++ b/java/src/test/java/com/scylladb/alternator/test/Demo1.java @@ -19,7 +19,12 @@ import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; +import java.net.URISyntaxException; import java.util.Random; import java.util.Arrays; @@ -41,13 +46,10 @@ public class Demo1 { - // Set here the authentication credentials needed by the server: - static AWSCredentialsProvider myCredentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials("alternator", "secret_pass")); - // The following is the "traditional" way to get a DynamoDB connection to // a specific endpoint URL, with no client-side load balancing, or any // Alternator-specific code. - static DynamoDB getTraditionalClient(URI url) { + static DynamoDB getTraditionalClient(URI url, AWSCredentialsProvider myCredentials) { AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( url.toString(), "region-doesnt-matter")) @@ -58,8 +60,8 @@ static DynamoDB getTraditionalClient(URI url) { // And this is the Alternator-specific way to get a DynamoDB connection // which load-balances several Scylla nodes. - static DynamoDB getAlternatorClient(URI uri) { - AlternatorRequestHandler handler = new AlternatorRequestHandler(uri); + static DynamoDB getAlternatorClient(URI uri, AWSCredentialsProvider myCredentials, String datacenter, String rack) { + AlternatorRequestHandler handler = new AlternatorRequestHandler(uri, datacenter, rack); AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() // The endpoint doesn't matter, we will override it anyway in the // RequestHandler, but without setting it the library will complain @@ -82,14 +84,56 @@ public static void main(String[] args) { logger.addHandler(handler); logger.setUseParentHandlers(false); + ArgumentParser parser = ArgumentParsers.newFor("Demo1").build() + .defaultHelp(true).description( + "Simple example of AWS SDK v1 alternator access"); + + try { + parser.addArgument("-e", "--endpoint") + .setDefault(new URI("http://localhost:8043")) + .help("DynamoDB/Alternator endpoint"); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + parser.addArgument("-u", "--user").setDefault("none") + .help("Credentials username"); + parser.addArgument("-p", "--password").setDefault("none") + .help("Credentials password"); + parser.addArgument("--datacenter").type(String.class).setDefault("") + .help("Target only nodes from particular datacenter. If it is not provided it is going to target datacenter of the endpoint."); + parser.addArgument("--rack").type(String.class).setDefault("") + .help("Target only nodes from particular rack"); + parser.addArgument("--no-lb").type(Boolean.class).setDefault(false) + .help("Turn off load balancing"); + + Namespace ns = null; + try { + ns = parser.parseArgs(args); + } catch (ArgumentParserException e) { + parser.handleError(e); + System.exit(1); + } + + String endpoint = ns.getString("endpoint"); + String user = ns.getString("user"); + String pass = ns.getString("password"); + String datacenter = ns.getString("datacenter"); + String rack = ns.getString("rack"); + Boolean disableLoadBalancing = ns.getBoolean("no-lb"); + + // In our test setup, the Alternator HTTPS server set up with a self- // signed certficate, so we need to disable certificate checking. // Obviously, this doesn't need to be done in production code. disableCertificateChecks(); - //DynamoDB ddb = getTraditionalClient(URI.create("https://localhost:8043")); - //DynamoDB ddb = getTraditionalClient(URI.create("http://localhost:8000")); - DynamoDB ddb = getAlternatorClient(URI.create("https://127.0.0.1:8043")); + AWSCredentialsProvider myCredentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials(user, pass)); + DynamoDB ddb; + if (disableLoadBalancing) { + ddb = getTraditionalClient(URI.create(endpoint), myCredentials); + } else { + ddb = getAlternatorClient(URI.create(endpoint), myCredentials, datacenter, rack); + } Random rand = new Random(); String tabName = "table" + rand.nextInt(1000000); diff --git a/java/src/test/java/com/scylladb/alternator/test/Demo2.java b/java/src/test/java/com/scylladb/alternator/test/Demo2.java index 9236b50..0f699a4 100755 --- a/java/src/test/java/com/scylladb/alternator/test/Demo2.java +++ b/java/src/test/java/com/scylladb/alternator/test/Demo2.java @@ -1,6 +1,10 @@ package com.scylladb.alternator.test; import com.scylladb.alternator.AlternatorEndpointProvider; +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.Namespace; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.DescribeEndpointsRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeEndpointsResponse; @@ -23,6 +27,8 @@ import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import java.net.URISyntaxException; +import java.net.URL; import java.security.cert.X509Certificate; // For enabling trace-level logging @@ -32,12 +38,11 @@ public class Demo2 { // Set here the authentication credentials needed by the server: - static AwsCredentialsProvider myCredentials = StaticCredentialsProvider.create(AwsBasicCredentials.create("alternator", "secret_pass")); // The following is the "traditional" way to get a DynamoDB connection to // a specific endpoint URL, with no client-side load balancing, or any // Alternator-specific code. - static DynamoDbClient getTraditionalClient(URI url) { + static DynamoDbClient getTraditionalClient(URI url, AwsCredentialsProvider myCredentials) { // To support HTTPS connections to a test server *without* checking // SSL certificates we need the httpClient() hack. It's of course not // needed in a production installation. @@ -57,7 +62,7 @@ static DynamoDbClient getTraditionalClient(URI url) { // Basically the only change is replacing the endpointOverride() call // with its fixed endpoind URL, with an endpointProvider() call, giving // an AlternatorEndpointProvider object. - static DynamoDbClient getAlternatorClient(URI url) { + static DynamoDbClient getAlternatorClient(URI url, AwsCredentialsProvider myCredentials, String datacenter, String rack) { // To support HTTPS connections to a test server *without* checking // SSL certificates we need the httpClient() hack. It's of course not // needed in a production installation. @@ -65,7 +70,7 @@ static DynamoDbClient getAlternatorClient(URI url) { AttributeMap.builder() .put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, true) .build()); - AlternatorEndpointProvider alternatorEndpointProvider = new AlternatorEndpointProvider(url); + AlternatorEndpointProvider alternatorEndpointProvider = new AlternatorEndpointProvider(url, datacenter, rack); return DynamoDbClient.builder() .credentialsProvider(myCredentials) .httpClient(http) @@ -85,13 +90,55 @@ public static void main(String[] args) { logger.addHandler(handler); logger.setUseParentHandlers(false); + ArgumentParser parser = ArgumentParsers.newFor("Demo2").build() + .defaultHelp(true).description( + "Simple example of AWS SDK v1 alternator access"); + + try { + parser.addArgument("-e", "--endpoint") + .setDefault(new URI("http://localhost:8043")) + .help("DynamoDB/Alternator endpoint"); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + parser.addArgument("-u", "--user").setDefault("none") + .help("Credentials username"); + parser.addArgument("-p", "--password").setDefault("none") + .help("Credentials password"); + parser.addArgument("--datacenter").type(String.class).setDefault("") + .help("Target only nodes from particular datacenter. If it is not provided it is going to target datacenter of the endpoint."); + parser.addArgument("--rack").type(String.class).setDefault("") + .help("Target only nodes from particular rack"); + parser.addArgument("--no-lb").type(Boolean.class).setDefault(false) + .help("Turn off load balancing"); + + Namespace ns = null; + try { + ns = parser.parseArgs(args); + } catch (ArgumentParserException e) { + parser.handleError(e); + System.exit(1); + } + + String endpoint = ns.getString("endpoint"); + String user = ns.getString("user"); + String pass = ns.getString("password"); + String datacenter = ns.getString("datacenter"); + String rack = ns.getString("rack"); + Boolean disableLoadBalancing = ns.getBoolean("no-lb"); + + // In our test setup, the Alternator HTTPS server set up with a self- // signed certificate, so we need to disable certificate checking. // Obviously, this doesn't need to be done in production code. disableCertificateChecks(); - - //DynamoDbClient ddb = getTraditionalClient(URI.create("https://localhost:8043")); - DynamoDbClient ddb = getAlternatorClient(URI.create("https://localhost:8043")); + AwsCredentialsProvider myCredentials = StaticCredentialsProvider.create(AwsBasicCredentials.create(user, pass)); + DynamoDbClient ddb; + if (disableLoadBalancing) { + ddb = getTraditionalClient(URI.create(endpoint), myCredentials); + } else { + ddb = getAlternatorClient(URI.create(endpoint), myCredentials, datacenter, rack); + } // run DescribeEndpoints several times for (int i=0; i<10; i++) { diff --git a/java/src/test/java/com/scylladb/alternator/test/Demo3.java b/java/src/test/java/com/scylladb/alternator/test/Demo3.java index 6f1686b..74ca326 100644 --- a/java/src/test/java/com/scylladb/alternator/test/Demo3.java +++ b/java/src/test/java/com/scylladb/alternator/test/Demo3.java @@ -55,6 +55,10 @@ public static void main(String[] args) throws MalformedURLException { .help("Max worker threads"); parser.addArgument("--trust-ssl").type(Boolean.class).setDefault(false) .help("Trust all certificates"); + parser.addArgument("--datacenter").type(String.class).setDefault("") + .help("Target only nodes from particular datacenter. If it is not provided it is going to target datacenter of the endpoint."); + parser.addArgument("--rack").type(String.class).setDefault("") + .help("Target only nodes from particular rack"); Namespace ns = null; try { @@ -70,6 +74,8 @@ public static void main(String[] args) throws MalformedURLException { int threads = ns.getInt("threads"); Region region = Region.of(ns.getString("region")); Boolean trustSSL = ns.getBoolean("trust-ssl"); + String datacenter = ns.getString("datacenter"); + String rack = ns.getString("rack"); // The load balancer library logs the list of live nodes, and hosts // it chooses to send requests to, if the FINE logging level is @@ -93,8 +99,7 @@ public static void main(String[] args) throws MalformedURLException { if (endpoint != null) { URI uri = URI.create(endpoint); - AlternatorEndpointProvider alternatorEndpointProvider = new AlternatorEndpointProvider(uri); - + AlternatorEndpointProvider alternatorEndpointProvider = new AlternatorEndpointProvider(uri, datacenter, rack); if (trustSSL != null && trustSSL.booleanValue()) { // In our test setup, the Alternator HTTPS server set up with a