diff --git a/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java b/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java index 7576b6d56..5c5b72a6b 100644 --- a/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java +++ b/src/main/java/org/aksw/iguana/cc/worker/impl/SPARQLProtocolWorker.java @@ -164,8 +164,8 @@ public boolean successful() { } - private final HttpClient httpClient; - private final ExecutorService executor; + private HttpClient httpClient; + private final ThreadPoolExecutor executor; private final XXHashFactory hasherFactory = XXHashFactory.fastestJavaInstance(); private final RequestFactory requestFactory; @@ -178,22 +178,18 @@ public boolean successful() { // used to read the http response body private byte[] buffer = new byte[4096]; - + @Override public Config config() { - return (Config) config; + return (SPARQLProtocolWorker.Config) config; } public SPARQLProtocolWorker(long workerId, ResponseBodyProcessor responseBodyProcessor, Config config) { super(workerId, responseBodyProcessor, config); this.responseBodyProcessor = responseBodyProcessor; - this.executor = Executors.newFixedThreadPool(2); + this.executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1); this.requestFactory = new RequestFactory(config().requestType()); - this.httpClient = HttpClient.newBuilder() - .executor(this.executor) - .followRedirects(HttpClient.Redirect.ALWAYS) - .connectTimeout(config().timeout()) - .build(); + this.httpClient = buildHttpClient(); } @@ -224,10 +220,10 @@ public CompletableFuture start() { } } } catch (IOException | URISyntaxException e) { - throw new RuntimeException(e); + throw new RuntimeException(e); // TODO: better error handling } - return new Result(this.workerId, executionStats); + return new Result(this.workerID, executionStats); }, executor); } @@ -237,7 +233,7 @@ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure) if (result.response().isPresent()) statuscode = Optional.of(result.response().get().statusCode()); - if (result.successful()) { // 2xx + if (result.successful() && this.config.parseResults()) { // 2xx // process result if (!responseBodyProcessor.add(result.actualContentLength().getAsLong(), result.hash().getAsLong(), result.outputStream().get())) { this.responseBodybbaos = result.outputStream().get(); @@ -245,6 +241,7 @@ private ExecutionStats executeQuery(Duration timeout, boolean discardOnFailure) this.responseBodybbaos = new BigByteArrayOutputStream(); } } + this.responseBodybbaos.reset(); if (!result.completed() && discardOnFailure) { return null; @@ -271,6 +268,11 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOExcept config().acceptHeader() ); + if (((ThreadPoolExecutor) this.httpClient.executor().get()).getActiveCount() != 0) { + ((ThreadPoolExecutor) this.httpClient.executor().get()).shutdownNow(); + this.httpClient = buildHttpClient(); + } + final Instant requestStart = Instant.now(); BiFunction, Exception, HttpExecutionResult> createFailedResult = (response, e) -> { final Duration requestDuration = Duration.between(requestStart, Instant.now()); @@ -295,6 +297,9 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOExcept try (var hasher = hasherFactory.newStreamingHash64(0)) { int readBytes; while ((readBytes = bodyStream.readNBytes(this.buffer, 0, this.buffer.length)) != 0) { + if (Duration.between(requestStart, requestStart.plus(timeout)).isNegative()) { + return createFailedResult.apply(httpResponse, new TimeoutException()); + } hasher.update(this.buffer, 0, readBytes); this.responseBodybbaos.write(this.buffer, 0, readBytes); } @@ -302,7 +307,7 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOExcept if (contentLength.isPresent() && (this.responseBodybbaos.size() < contentLength.getAsLong() || this.responseBodybbaos.size() > contentLength.getAsLong())) { - return createFailedResult.apply(httpResponse, null); // TODO: custom exception maybe? + return createFailedResult.apply(httpResponse, new ProtocolException("Content-Length header value doesn't match actual content length.")); } return new HttpExecutionResult( @@ -328,4 +333,12 @@ private HttpExecutionResult executeHttpRequest(Duration timeout) throws IOExcept return createFailedResult.apply(null, e); } } + + private HttpClient buildHttpClient() { + return HttpClient.newBuilder() + .executor(Executors.newFixedThreadPool(1)) + .followRedirects(HttpClient.Redirect.ALWAYS) + .connectTimeout(config().timeout()) + .build(); + } }