diff --git a/pom.xml b/pom.xml index 4b967ce..c24e007 100644 --- a/pom.xml +++ b/pom.xml @@ -18,12 +18,12 @@ fmt-maven-plugin 2.9.1 - - validate - - check - - + + validate + + check + + @@ -138,7 +138,11 @@ - + + org.apache.commons + commons-lang3 + 3.13.0 + com.fasterxml.jackson.core jackson-databind diff --git a/script/build b/script/build index adf7e00..9dc1a12 100755 --- a/script/build +++ b/script/build @@ -4,6 +4,12 @@ set -e cd "$(dirname "$0")/.." + +echo "==> Formatting files" +./mvnw com.coveo:fmt-maven-plugin:format + +echo "==> Validation files" ./mvnw validate +echo "==> Building jar" ./mvnw clean package -DskipTests=true mv $(ls target/dremio-stress-*-jar-with-dependencies.jar) target/dremio-stress.jar diff --git a/script/fmt b/script/fmt new file mode 100755 index 0000000..111d914 --- /dev/null +++ b/script/fmt @@ -0,0 +1,12 @@ +#!/bin/sh +# +# scripts/fmt: formats files + +set -e + +cd "$(dirname "$0")/.." + +[ -z "$DEBUG" ] || set -x + +echo "==> Formatting files" +./mvnw com.coveo:fmt-maven-plugin:format diff --git a/src/main/java/com/dremio/support/diagnostics/stress/ConnectApi.java b/src/main/java/com/dremio/support/diagnostics/stress/ConnectApi.java index 0a579a6..f806f45 100644 --- a/src/main/java/com/dremio/support/diagnostics/stress/ConnectApi.java +++ b/src/main/java/com/dremio/support/diagnostics/stress/ConnectApi.java @@ -20,7 +20,6 @@ DremioApi connect( String username, String password, String host, - FileMaker fileMaker, Integer timeoutSeconds, Protocol protocol, boolean ignoreSSL) diff --git a/src/main/java/com/dremio/support/diagnostics/stress/ConnectDremioApi.java b/src/main/java/com/dremio/support/diagnostics/stress/ConnectDremioApi.java index 4288b67..fd1ab74 100644 --- a/src/main/java/com/dremio/support/diagnostics/stress/ConnectDremioApi.java +++ b/src/main/java/com/dremio/support/diagnostics/stress/ConnectDremioApi.java @@ -22,7 +22,6 @@ public DremioApi connect( String username, String password, String host, - FileMaker fileMaker, Integer timeoutSeconds, Protocol protocol, boolean ignoreSSL) @@ -30,7 +29,7 @@ public DremioApi connect( final UsernamePasswordAuth auth = new UsernamePasswordAuth(username, password); if (protocol.equals(Protocol.HTTP)) { HttpApiCall apiCall = new HttpApiCall(ignoreSSL); - return new DremioV3Api(apiCall, auth, host, fileMaker, timeoutSeconds); + return new DremioV3Api(apiCall, auth, host, timeoutSeconds); } return new DremioArrowFlightJDBCDriver(host); } diff --git a/src/main/java/com/dremio/support/diagnostics/stress/DremioArrowFlightJDBCDriver.java b/src/main/java/com/dremio/support/diagnostics/stress/DremioArrowFlightJDBCDriver.java index 22325f8..10e23a8 100644 --- a/src/main/java/com/dremio/support/diagnostics/stress/DremioArrowFlightJDBCDriver.java +++ b/src/main/java/com/dremio/support/diagnostics/stress/DremioArrowFlightJDBCDriver.java @@ -53,45 +53,41 @@ public DremioArrowFlightJDBCDriver(String url) { */ @Override public DremioApiResponse runSQL(String sql, Collection table) throws IOException { - String context = String.join(".", table); - final DremioApiResponse response = new DremioApiResponse(); + final String context; + if (table == null) { + context = ""; + } else { + context = String.join(".", table); + } synchronized (currentContextLock) { if (!currentContext.equals(context)) { currentContext = context; + logger.info(() -> String.format("changing context %s", context)); try { - logger.info(() -> String.format("changing context %s", context)); if (!connection.createStatement().execute("USE " + context)) { - response.setErrorMessage("failed using USE"); - response.setSuccessful(false); - return response; + throw new RuntimeException("failed using USE"); } - if (connection.createStatement().execute(sql)) { - response.setSuccessful(true); - response.setErrorMessage(""); - return response; + final boolean success = connection.createStatement().execute(sql); + if (!success) { + throw new RuntimeException("unhandled exception executing sql"); } - response.setSuccessful(false); - response.setErrorMessage("unhandled error executing sql"); - return response; - } catch (SQLException e) { - response.setErrorMessage(e.getMessage()); + final DremioApiResponse response = new DremioApiResponse(); + response.setSuccessful(true); return response; + } catch (SQLException ex) { + throw new RuntimeException(ex); } } } try { if (connection.createStatement().execute(sql)) { + final DremioApiResponse response = new DremioApiResponse(); response.setSuccessful(true); - response.setErrorMessage(""); - } else { - response.setSuccessful(false); - response.setErrorMessage("unhandled exception"); + return response; } - return response; + throw new RuntimeException("unhandled exception"); } catch (SQLException e) { - response.setSuccessful(false); - response.setErrorMessage(e.getMessage()); - return response; + throw new RuntimeException(e); } } diff --git a/src/main/java/com/dremio/support/diagnostics/stress/DremioV3Api.java b/src/main/java/com/dremio/support/diagnostics/stress/DremioV3Api.java index 013a628..afa23d1 100644 --- a/src/main/java/com/dremio/support/diagnostics/stress/DremioV3Api.java +++ b/src/main/java/com/dremio/support/diagnostics/stress/DremioV3Api.java @@ -28,13 +28,13 @@ public class DremioV3Api implements DremioApi { /** unmodifiable map of base headers used in all requests that are authenticated */ private final Map baseHeaders; + private static final Logger logger = Logger.getLogger(DremioV3Api.class.getName()); + // base url for the api typically http/https hostname and port. Does not include the ending / private final String baseUrl; // the actual http implementation private final ApiCall apiCall; - private static final Logger logger = Logger.getLogger(DremioV3Api.class.getName()); - private final FileMaker fileMaker; private final int timeoutSeconds; /** @@ -50,15 +50,9 @@ public class DremioV3Api implements DremioApi { * @throws IOException throws when unable to read the response body or unable to attach a request * body */ - public DremioV3Api( - ApiCall apiCall, - UsernamePasswordAuth auth, - String baseUrl, - FileMaker fileMaker, - int timeoutSeconds) + public DremioV3Api(ApiCall apiCall, UsernamePasswordAuth auth, String baseUrl, int timeoutSeconds) throws IOException { this.apiCall = apiCall; - this.fileMaker = fileMaker; this.timeoutSeconds = timeoutSeconds; Map headers = new HashMap<>(); // working with json @@ -96,29 +90,26 @@ private JobStatusResponse checkJobStatus(String jobId) throws IOException { if (jobId == null || jobId.trim().isEmpty()) { throw new InvalidParameterException("jobId cannot be empty"); } + // v3 job api URL url = new URL(this.baseUrl + "/api/v3/job/" + jobId); // setup headers HttpApiResponse response = apiCall.submitGet(url, this.baseHeaders); // jobState is the necessary key - if (response == null - || response.getResponse() == null - || !response.getResponse().containsKey("jobState")) { - String error = tryParseError(response); - if (error != null) { - JobStatusResponse jobStatusResponse = new JobStatusResponse(); - jobStatusResponse.setStatus("UNKNOWN"); - jobStatusResponse.setMessage(error); - return jobStatusResponse; - } - return null; + if (response == null) { + throw new RuntimeException("no valid response"); + } + if (response.getResponse() == null) { + throw new RuntimeException("no valid response body"); + } + if (!response.getResponse().containsKey("jobState")) { + throw new RuntimeException("no jobState key present"); } Object jobState = response.getResponse().get("jobState"); if (jobState == null) { - JobStatusResponse jobStatus = new JobStatusResponse(); - jobStatus.setStatus("UNKNOWN"); - return jobStatus; + throw new RuntimeException("no valid jobState key present"); } + logger.info(() -> String.format("job %s job state %s", jobId, response.getResponse())); // for failed jobs if ("FAILED".equals(jobState)) { String error = @@ -156,30 +147,26 @@ public DremioApiResponse runSQL(String sql, Collection contexts) throws } String json = new ObjectMapper().writeValueAsString(params); HttpApiResponse response = apiCall.submitPost(url, this.baseHeaders, json); - if (response == null - || response.getResponse() == null - || !response.getResponse().containsKey("id")) { - String errorMessage = tryParseError(response); - if (errorMessage == null) { - errorMessage = String.format("id was not contained in the response '%s'", response); - } - - DremioApiResponse failed = new DremioApiResponse(); - failed.setSuccessful(false); - failed.setErrorMessage(errorMessage); - return failed; + if (response == null) { + throw new RuntimeException("missing response"); + } + if (response.getResponse() == null) { + throw new RuntimeException("missing response body"); } - JobStatusResponse status = new JobStatusResponse(); - status.setStatus("UNKNOWN"); + if (!response.getResponse().containsKey("id")) { + throw new RuntimeException("id"); + } + Instant timeout = Instant.now().plus(timeoutSeconds, ChronoUnit.SECONDS); + String jobId = String.valueOf(response.getResponse().get("id")); while (!Instant.now().isAfter(timeout)) { - String jobId = String.valueOf(response.getResponse().get("id")); - status = this.checkJobStatus(jobId); + JobStatusResponse status = this.checkJobStatus(jobId); if (status == null) { - continue; + throw new RuntimeException("unexpected job status critical error"); } final String statusString = status.getStatus(); if ("COMPLETED".equals(statusString)) { + logger.info(() -> statusString); DremioApiResponse success = new DremioApiResponse(); success.setSuccessful(true); return success; @@ -188,6 +175,7 @@ public DremioApiResponse runSQL(String sql, Collection contexts) throws || "INVALID_STATE".equals(statusString) || "CANCELLED".equals(statusString)) { DremioApiResponse failure = new DremioApiResponse(); + failure.setSuccessful(false); failure.setErrorMessage(String.format("Response status is '%s'", status.getMessage())); return failure; } @@ -197,13 +185,10 @@ public DremioApiResponse runSQL(String sql, Collection contexts) throws throw new RuntimeException(e); } } + // hit the timeout DremioApiResponse failed = new DremioApiResponse(); failed.setSuccessful(false); - if (status != null) { - failed.setErrorMessage(String.format("Response status is '%s'", status.getStatus())); - } else { - failed.setErrorMessage("unknown error"); - } + failed.setErrorMessage("timeout hit"); return failed; } catch (Exception ex) { DremioApiResponse failed = new DremioApiResponse(); @@ -218,13 +203,4 @@ public DremioApiResponse runSQL(String sql, Collection contexts) throws public String getUrl() { return this.baseUrl; } - - private String tryParseError(HttpApiResponse response) { - if (response != null - && response.getResponse() != null - && response.getResponse().containsKey("errorMessage")) { - return String.valueOf(response.getResponse().get("errorMessage")); - } - return null; - } } diff --git a/src/main/java/com/dremio/support/diagnostics/stress/QueryConfig.java b/src/main/java/com/dremio/support/diagnostics/stress/QueryConfig.java index bd68ec7..d2d49d5 100644 --- a/src/main/java/com/dremio/support/diagnostics/stress/QueryConfig.java +++ b/src/main/java/com/dremio/support/diagnostics/stress/QueryConfig.java @@ -21,7 +21,7 @@ public class QueryConfig { private String query; private String queryGroup; private int frequency; - private Map parameters; + private Map> parameters; private List sqlContext; public String getQuery() { @@ -48,11 +48,11 @@ public void setFrequency(int frequency) { this.frequency = frequency; } - public Map getParameters() { + public Map> getParameters() { return parameters; } - public void setParameters(Map parameters) { + public void setParameters(Map> parameters) { this.parameters = parameters; } diff --git a/src/main/java/com/dremio/support/diagnostics/stress/StressExec.java b/src/main/java/com/dremio/support/diagnostics/stress/StressExec.java index 7de64cf..8df9119 100644 --- a/src/main/java/com/dremio/support/diagnostics/stress/StressExec.java +++ b/src/main/java/com/dremio/support/diagnostics/stress/StressExec.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; -import java.nio.file.Paths; import java.security.InvalidParameterException; import java.security.SecureRandom; import java.time.Instant; @@ -30,11 +29,16 @@ import java.util.Random; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.commons.lang3.exception.ExceptionUtils; public class StressExec { @@ -158,20 +162,48 @@ private StressConfig getConfig() { } } + private void runQuery(DremioApi dremioApi, Query mappedSql) { + { + try { + Instant startTime = Instant.now(); + DremioApiResponse response = null; + submittedCounter.incrementAndGet(); + response = dremioApi.runSQL(mappedSql.getQueryText(), mappedSql.getContext()); + if (response == null) { + throw new RuntimeException( + String.format("query %s failed with an empty response", mappedSql)); + } + if (!response.isSuccessful()) { + final String errMsg = response.getErrorMessage(); + throw new RuntimeException( + String.format("query %s failed with error %s", mappedSql, errMsg)); + } + Instant endTime = Instant.now(); + long queryTime = endTime.toEpochMilli() - startTime.toEpochMilli(); + totalDurationMS.addAndGet(queryTime); + successfulCounter.incrementAndGet(); + logger.info(() -> String.format("query %s successful", mappedSql)); + } catch (final Exception e) { + failureCounter.incrementAndGet(); + logger.info( + () -> + String.format( + "query %s failed %s %s", mappedSql, e, ExceptionUtils.getStackTrace(e))); + } + } + } /** * The stress job * * @return exit code of the process */ public int run() { - final FileMaker noOpFileMaker = () -> Paths.get(""); try { final DremioApi dremioApi = this.connectApi.connect( dremioUser, dremioPassword, dremioHost, - noOpFileMaker, timeoutSeconds, protocol, skipSSLVerification); @@ -193,38 +225,7 @@ public int run() { final QueryConfig query = queryPool.get(nextQuery); final List mappedSqls = mapSql(query, queryGroups); for (final Query mappedSql : mappedSqls) { - final Runnable runnable = - () -> { - Instant startTime = Instant.now(); - DremioApiResponse response = null; - try { - submittedCounter.incrementAndGet(); - response = dremioApi.runSQL(mappedSql.getQueryText(), query.getSqlContext()); - } catch (final Exception e) { - failureCounter.incrementAndGet(); - logger.info( - () -> - String.format( - "query %s failed with error %s", mappedSql, e.getMessage())); - return; - } - if (response != null) { - Instant endTime = Instant.now(); - long queryTime = endTime.toEpochMilli() - startTime.toEpochMilli(); - totalDurationMS.addAndGet(queryTime); - if (response.isSuccessful()) { - successfulCounter.incrementAndGet(); - logger.info(() -> String.format("query %s successful", mappedSql)); - } else { - failureCounter.incrementAndGet(); - final String errMsg = response.getErrorMessage(); - logger.info( - () -> String.format("query %s failed with error %s", mappedSql, errMsg)); - } - } else { - failureCounter.incrementAndGet(); - } - }; + final Runnable runnable = () -> runQuery(dremioApi, mappedSql); executorService.submit(runnable); counter.incrementAndGet(); } @@ -325,20 +326,33 @@ public List mapSql(final QueryConfig q, final Map que } else if (q.getQuery() != null && !q.getQuery().isEmpty()) { rawQueries.add(q.getQuery()); } + final Map> parameters; + if (q.getParameters() == null) { + parameters = new HashMap<>(); + } else { + parameters = q.getParameters(); + } final List mappedQueries = new ArrayList<>(); for (final String sql : rawQueries) { final String[] tokens = sql.split(" "); final int words = tokens.length; for (int i = 0; i < words; i++) { final String word = tokens[i]; - for (final Entry x : q.getParameters().entrySet()) { + for (final Entry> x : parameters.entrySet()) { if (word.equals(":" + x.getKey())) { - final int valueCount = x.getValue().length; + final int valueCount = x.getValue().size(); if (valueCount > 0) { final int valueIndex = random.nextInt(valueCount); - final String v = String.valueOf(x.getValue()[valueIndex]); + final String v = String.valueOf(x.getValue().get(valueIndex)); tokens[i] = v; } + } else if (word.equals("':" + x.getKey() + "'")) { + final int valueCount = x.getValue().size(); + if (valueCount > 0) { + final int valueIndex = random.nextInt(valueCount); + final String v = String.valueOf(x.getValue().get(valueIndex)); + tokens[i] = "'" + v + "'"; + } } } }