diff --git a/fabric/src/main/java/net/smyler/terramap/http/TerramapHttpClient.java b/fabric/src/main/java/net/smyler/terramap/http/TerramapHttpClient.java index 39320283..de1dc1cf 100644 --- a/fabric/src/main/java/net/smyler/terramap/http/TerramapHttpClient.java +++ b/fabric/src/main/java/net/smyler/terramap/http/TerramapHttpClient.java @@ -1,8 +1,9 @@ package net.smyler.terramap.http; -import net.smyler.smylib.Pair; import net.smyler.smylib.Strings; import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.io.IOException; import java.io.InputStream; @@ -11,10 +12,7 @@ import java.net.http.HttpResponse; import java.nio.file.Files; import java.nio.file.Path; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; +import java.time.*; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.*; @@ -28,11 +26,13 @@ import static net.smyler.smylib.Objects.optionalBiMapSupplier; import static net.smyler.terramap.http.CacheStatistics.CacheType.ERROR; import static net.smyler.terramap.http.HttpStatusCodes.*; +import static net.smyler.smylib.Strings.isNullOrEmpty; public class TerramapHttpClient implements CachingHttpClient { - private final ForkJoinPool forkJoinPool = new ForkJoinPool(20, HttpWorkerThread::new, this::unhandledException, true); + private final ForkJoinPool forkJoinPool = new ForkJoinPool(1, HttpWorkerThread::new, this::unhandledException, true); + private final ForkJoinPool semaphoreAcquireExecutor = new ForkJoinPool(1, HttpWorkerThread::new, this::unhandledException, true); private final AtomicLong workerCounter = new AtomicLong(0); private static final String USER_AGENT = "Experimental Terramap version https://github.com/SmylerMC/terramap"; @@ -40,6 +40,8 @@ public class TerramapHttpClient implements CachingHttpClient { .ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ENGLISH) .withZone(ZoneId.of("GMT")); + private final ConcurrentHashMap concurrentRequestsCounters = new ConcurrentHashMap<>(); + private final java.net.http.HttpClient client = java.net.http.HttpClient.newBuilder() .followRedirects(java.net.http.HttpClient.Redirect.NORMAL) .executor(this.forkJoinPool) @@ -67,6 +69,7 @@ public TerramapHttpClient(Logger logger, Path cacheDirectory) { public CompletableFuture get(String url) { URI uri = URI.create(url); + String hostname = uri.getHost(); // Lookup cache, use it if fresh CacheEntry cache = this.cache.lookup(uri); @@ -78,24 +81,49 @@ public CompletableFuture get(String url) { }, this.forkJoinPool); } - // Send HTTP request + //final Semaphore semaphore = null; + final Semaphore semaphore = this.concurrentRequestsCounters.get(hostname); + + // Prepare request HttpRequest request = this.createRequest(uri, cache); - this.log(request); - CompletableFuture> response = this.client.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()); - CompletableFuture, byte[]>> content = response.thenApplyAsync(r -> this.getContent(r, cache), this.forkJoinPool); - CompletableFuture bytes = content.thenApply(Pair::right); - content.thenAcceptAsync(p -> this.cacheResponse(p.left(), p.right()), this.forkJoinPool); + final RequestContext context = new RequestContext( + uri, hostname, request, cache, semaphore + ); + + // Request semaphore to respect concurrent request limits + CompletableFuture permitAcquired = CompletableFuture.runAsync(context::acquirePermit, this.semaphoreAcquireExecutor); + + // Send the request + CompletableFuture> response = permitAcquired.thenComposeAsync(context::sendRequest, + this.forkJoinPool + ); + + response.exceptionally(t -> { + this.logger.error(t); + return null; + }); + + // Release concurrent request semaphore + response.whenCompleteAsync(context::releasePermit, this.forkJoinPool); + + // Get resource content, either from the request or the cache + CompletableFuture content = response.thenApplyAsync(context::readContent, this.forkJoinPool); + + // Update cache + content.thenCombineAsync(response, context::cache, this.forkJoinPool); // Make sure a cancellation of the downstream future is propagated to the request - bytes.exceptionally(t -> { + content.exceptionally(t -> { if (t instanceof CancellationException) { - response.cancel(true); + this.logger.trace("Canceled request to {}", uri); + //response.cancel(true); } return null; }); - return bytes; + this.logger.trace("Queued request to {}", uri); + return content; } private HttpRequest createRequest(URI uri, CacheEntry cache) { @@ -113,12 +141,12 @@ private HttpRequest createRequest(URI uri, CacheEntry cache) { return builder.build(); } - private Pair, byte[]> getContent(HttpResponse response, CacheEntry cache) { + private byte @Nullable [] getContent(HttpResponse response, CacheEntry cache) { byte[] content; if (response.statusCode() == HTTP_NOT_MODIFIED) { if (cache == null) { this.logger.warn("Unexpected response: 304 not modified but cache is null"); - return Pair.of(response, new byte[0]); + return null; } content = this.readCache(cache); } else { @@ -129,7 +157,7 @@ private Pair, byte[]> getContent(HttpResponse respo response.statusCode(), content.length, response.uri(), cache == null ? "not cached": cache.isFresh() ? "fresh" : "stale" ); - return Pair.of(response, content); + return content; } private void cacheResponse(HttpResponse response, byte[] content) { @@ -228,19 +256,15 @@ private byte[] readCache(final CacheEntry cache) { @Override public void setMaxConcurrentRequests(String host, int maxConcurrentRequests) { - logger.warn( - "Trying to set max concurrent requests to {}: {}. This is not yet supported", - host, - maxConcurrentRequests - ); - } - - private void log(HttpRequest request) { - this.logger.trace("{} {} {}", - request.method(), - request.uri(), - request.version().map(Objects::toString).orElse("") - ); + // We need to replace the old semaphore, we can't just release to the desired number, + // that could cause a thread condition if a request is pending + host = URI.create(host).getHost(); + if (isNullOrEmpty(host)) { + return; + } + this.logger.debug("Setting max concurrent request to host {} to {}", host, maxConcurrentRequests); + Semaphore newSemaphore = new Semaphore(maxConcurrentRequests); + this.concurrentRequestsCounters.put(host, newSemaphore); } @Override @@ -333,4 +357,76 @@ public static CacheControlResponseDirectives from(String headerValue) { } + private class RequestContext { + + private final @NotNull URI uri; + private final @NotNull String host; + + private final @NotNull HttpRequest request; + private final @Nullable CacheEntry cache; + private final @Nullable Semaphore semaphore; + + private final Logger logger = TerramapHttpClient.this.logger; + + RequestContext(@NotNull URI uri, @NotNull String host, @NotNull HttpRequest request, @Nullable CacheEntry cache, @Nullable Semaphore semaphore) { + this.uri = uri; + this.host = host; + this.request = request; + this.cache = cache; + this.semaphore = semaphore; + } + + void acquirePermit() { + try { + if (this.semaphore != null) { + this.semaphore.acquire(); + this.logger.trace("Acquired semaphore for {}", this.uri); + } + } catch (InterruptedException e) { + this.logger.error("Interrupted when acquiring request semaphore for host {}", this.host); + this.logger.error(e); + } + } + + CompletableFuture> sendRequest(Void v) { + this.logger.trace( + "{} {} {}", + this.request.method(), + this.request.version().map(Object::toString).orElse(""), + this.request.uri() + ); + return TerramapHttpClient.this.client.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()); + } + + void releasePermit(HttpResponse response, Throwable throwable) { + if (this.semaphore != null) { + this.semaphore.release(); + this.logger.trace("Released semaphore for {}", this.host); + } + } + + byte[] readContent(HttpResponse response) { + return TerramapHttpClient.this.getContent(response, this.cache); + } + + Void cache(byte[] content, HttpResponse response) { + // Content might have been read from cache + + if (content == null) { + // HTTP errors should null + return null; + } + + TerramapHttpClient.this.cacheResponse(response, content); + return null; + } + + } + + private Thread createThread(Runnable task) { + return Thread.ofVirtual() + .name("Terramap HTTP " + this.workerCounter.incrementAndGet()) + .unstarted(task); + } + } diff --git a/fabric/src/test/java/net/smyler/terramap/http/TerramapHttpClientTest.java b/fabric/src/test/java/net/smyler/terramap/http/TerramapHttpClientTest.java index 9f8faea5..ea2faa2a 100644 --- a/fabric/src/test/java/net/smyler/terramap/http/TerramapHttpClientTest.java +++ b/fabric/src/test/java/net/smyler/terramap/http/TerramapHttpClientTest.java @@ -3,7 +3,10 @@ import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.Test; +import java.io.IOException; +import java.nio.file.Files; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; import static org.apache.logging.log4j.LogManager.getLogger; @@ -11,9 +14,9 @@ public class TerramapHttpClientTest { @Test - void canRequest() throws ExecutionException, InterruptedException { + void canRequest() throws ExecutionException, InterruptedException, IOException { Logger logger = getLogger("HTTP test"); - HttpClient client = new TerramapHttpClient(logger, new MemoryCache()); + HttpClient client = new TerramapHttpClient(logger, Files.createTempDirectory("terramap-tests")); String content = new String(client.get("https://tile-c.openstreetmap.fr/hot/7/66/38.png").get()); content = new String(client.get("https://tile-c.openstreetmap.fr/hot/7/66/38.png").get()); }