Skip to content

Commit

Permalink
Enforce max concurrent requests limitations in http client
Browse files Browse the repository at this point in the history
  • Loading branch information
SmylerMC committed Sep 14, 2024
1 parent ba73a80 commit 1b20069
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 33 deletions.
158 changes: 127 additions & 31 deletions fabric/src/main/java/net/smyler/terramap/http/TerramapHttpClient.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package net.smyler.terramap.http;

import net.smyler.smylib.Pair;
import net.smyler.smylib.Strings;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -11,10 +12,7 @@
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.*;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.*;
Expand All @@ -28,18 +26,22 @@
import static net.smyler.smylib.Objects.optionalBiMapSupplier;
import static net.smyler.terramap.http.CacheStatistics.CacheType.ERROR;
import static net.smyler.terramap.http.HttpStatusCodes.*;
import static net.smyler.smylib.Strings.isNullOrEmpty;


public class TerramapHttpClient implements CachingHttpClient {

private final ForkJoinPool forkJoinPool = new ForkJoinPool(20, HttpWorkerThread::new, this::unhandledException, true);
private final ForkJoinPool forkJoinPool = new ForkJoinPool(1, HttpWorkerThread::new, this::unhandledException, true);
private final ForkJoinPool semaphoreAcquireExecutor = new ForkJoinPool(1, HttpWorkerThread::new, this::unhandledException, true);
private final AtomicLong workerCounter = new AtomicLong(0);

private static final String USER_AGENT = "Experimental Terramap version https://github.com/SmylerMC/terramap";
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter
.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ENGLISH)
.withZone(ZoneId.of("GMT"));

private final ConcurrentHashMap<String, Semaphore> concurrentRequestsCounters = new ConcurrentHashMap<>();

private final java.net.http.HttpClient client = java.net.http.HttpClient.newBuilder()
.followRedirects(java.net.http.HttpClient.Redirect.NORMAL)
.executor(this.forkJoinPool)
Expand Down Expand Up @@ -67,6 +69,7 @@ public TerramapHttpClient(Logger logger, Path cacheDirectory) {
public CompletableFuture<byte[]> get(String url) {

URI uri = URI.create(url);
String hostname = uri.getHost();

// Lookup cache, use it if fresh
CacheEntry cache = this.cache.lookup(uri);
Expand All @@ -78,24 +81,49 @@ public CompletableFuture<byte[]> get(String url) {
}, this.forkJoinPool);
}

// Send HTTP request
//final Semaphore semaphore = null;
final Semaphore semaphore = this.concurrentRequestsCounters.get(hostname);

// Prepare request
HttpRequest request = this.createRequest(uri, cache);
this.log(request);
CompletableFuture<HttpResponse<byte[]>> response = this.client.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray());
CompletableFuture<Pair<HttpResponse<byte[]>, byte[]>> content = response.thenApplyAsync(r -> this.getContent(r, cache), this.forkJoinPool);
CompletableFuture<byte[]> bytes = content.thenApply(Pair::right);

content.thenAcceptAsync(p -> this.cacheResponse(p.left(), p.right()), this.forkJoinPool);
final RequestContext context = new RequestContext(
uri, hostname, request, cache, semaphore
);

// Request semaphore to respect concurrent request limits
CompletableFuture<Void> permitAcquired = CompletableFuture.runAsync(context::acquirePermit, this.semaphoreAcquireExecutor);

// Send the request
CompletableFuture<HttpResponse<byte[]>> response = permitAcquired.thenComposeAsync(context::sendRequest,
this.forkJoinPool
);

response.exceptionally(t -> {
this.logger.error(t);
return null;
});

// Release concurrent request semaphore
response.whenCompleteAsync(context::releasePermit, this.forkJoinPool);

// Get resource content, either from the request or the cache
CompletableFuture<byte[]> content = response.thenApplyAsync(context::readContent, this.forkJoinPool);

// Update cache
content.thenCombineAsync(response, context::cache, this.forkJoinPool);

// Make sure a cancellation of the downstream future is propagated to the request
bytes.exceptionally(t -> {
content.exceptionally(t -> {
if (t instanceof CancellationException) {
response.cancel(true);
this.logger.trace("Canceled request to {}", uri);
//response.cancel(true);
}
return null;
});

return bytes;
this.logger.trace("Queued request to {}", uri);
return content;
}

private HttpRequest createRequest(URI uri, CacheEntry cache) {
Expand All @@ -113,12 +141,12 @@ private HttpRequest createRequest(URI uri, CacheEntry cache) {
return builder.build();
}

private Pair<HttpResponse<byte[]>, byte[]> getContent(HttpResponse<byte[]> response, CacheEntry cache) {
private byte @Nullable [] getContent(HttpResponse<byte[]> response, CacheEntry cache) {
byte[] content;
if (response.statusCode() == HTTP_NOT_MODIFIED) {
if (cache == null) {
this.logger.warn("Unexpected response: 304 not modified but cache is null");
return Pair.of(response, new byte[0]);
return null;
}
content = this.readCache(cache);
} else {
Expand All @@ -129,7 +157,7 @@ private Pair<HttpResponse<byte[]>, byte[]> getContent(HttpResponse<byte[]> respo
response.statusCode(), content.length, response.uri(),
cache == null ? "not cached": cache.isFresh() ? "fresh" : "stale"
);
return Pair.of(response, content);
return content;
}

private void cacheResponse(HttpResponse<byte[]> response, byte[] content) {
Expand Down Expand Up @@ -228,19 +256,15 @@ private byte[] readCache(final CacheEntry cache) {

@Override
public void setMaxConcurrentRequests(String host, int maxConcurrentRequests) {
logger.warn(
"Trying to set max concurrent requests to {}: {}. This is not yet supported",
host,
maxConcurrentRequests
);
}

private void log(HttpRequest request) {
this.logger.trace("{} {} {}",
request.method(),
request.uri(),
request.version().map(Objects::toString).orElse("")
);
// We need to replace the old semaphore, we can't just release to the desired number,
// that could cause a thread condition if a request is pending
host = URI.create(host).getHost();
if (isNullOrEmpty(host)) {
return;
}
this.logger.debug("Setting max concurrent request to host {} to {}", host, maxConcurrentRequests);
Semaphore newSemaphore = new Semaphore(maxConcurrentRequests);
this.concurrentRequestsCounters.put(host, newSemaphore);
}

@Override
Expand Down Expand Up @@ -333,4 +357,76 @@ public static CacheControlResponseDirectives from(String headerValue) {

}

private class RequestContext {

private final @NotNull URI uri;
private final @NotNull String host;

private final @NotNull HttpRequest request;
private final @Nullable CacheEntry cache;
private final @Nullable Semaphore semaphore;

private final Logger logger = TerramapHttpClient.this.logger;

RequestContext(@NotNull URI uri, @NotNull String host, @NotNull HttpRequest request, @Nullable CacheEntry cache, @Nullable Semaphore semaphore) {
this.uri = uri;
this.host = host;
this.request = request;
this.cache = cache;
this.semaphore = semaphore;
}

void acquirePermit() {
try {
if (this.semaphore != null) {
this.semaphore.acquire();
this.logger.trace("Acquired semaphore for {}", this.uri);
}
} catch (InterruptedException e) {
this.logger.error("Interrupted when acquiring request semaphore for host {}", this.host);
this.logger.error(e);
}
}

CompletableFuture<HttpResponse<byte[]>> sendRequest(Void v) {
this.logger.trace(
"{} {} {}",
this.request.method(),
this.request.version().map(Object::toString).orElse(""),
this.request.uri()
);
return TerramapHttpClient.this.client.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray());
}

void releasePermit(HttpResponse<byte[]> response, Throwable throwable) {
if (this.semaphore != null) {
this.semaphore.release();
this.logger.trace("Released semaphore for {}", this.host);
}
}

byte[] readContent(HttpResponse<byte[]> response) {
return TerramapHttpClient.this.getContent(response, this.cache);
}

Void cache(byte[] content, HttpResponse<byte[]> response) {
// Content might have been read from cache

if (content == null) {
// HTTP errors should null
return null;
}

TerramapHttpClient.this.cacheResponse(response, content);
return null;
}

}

private Thread createThread(Runnable task) {
return Thread.ofVirtual()
.name("Terramap HTTP " + this.workerCounter.incrementAndGet())
.unstarted(task);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.nio.file.Files;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;

import static org.apache.logging.log4j.LogManager.getLogger;


public class TerramapHttpClientTest {

@Test
void canRequest() throws ExecutionException, InterruptedException {
void canRequest() throws ExecutionException, InterruptedException, IOException {
Logger logger = getLogger("HTTP test");
HttpClient client = new TerramapHttpClient(logger, new MemoryCache());
HttpClient client = new TerramapHttpClient(logger, Files.createTempDirectory("terramap-tests"));
String content = new String(client.get("https://tile-c.openstreetmap.fr/hot/7/66/38.png").get());
content = new String(client.get("https://tile-c.openstreetmap.fr/hot/7/66/38.png").get());
}
Expand Down

0 comments on commit 1b20069

Please sign in to comment.