Skip to content

Commit

Permalink
Move async handling from cache to http client
Browse files Browse the repository at this point in the history
  • Loading branch information
SmylerMC committed Sep 14, 2024
1 parent e29cefa commit ba73a80
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ public enum CacheType {
*/
HYBRID,

/**
* Cache error.
*/
ERROR,

}

}
69 changes: 28 additions & 41 deletions fabric/src/main/java/net/smyler/terramap/http/DiskCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
Expand All @@ -32,12 +29,10 @@ public class DiskCache implements HttpCache {

private static final int FILE_FORMAT_VERSION = 1;

private final ExecutorService executor;
private final Logger logger;
private final Path directory;

public DiskCache(Path directory, Logger logger, ExecutorService executor) {
this.executor = executor;
public DiskCache(Path directory, Logger logger) {
this.logger = logger;
this.directory = directory;
}
Expand Down Expand Up @@ -86,44 +81,36 @@ public void put(@NotNull URI uri, long lastModified, long maxAge, @Nullable Stri
}

@Override
public CompletableFuture<CacheStatistics> statistics() {
return CompletableFuture.supplyAsync(() -> {
try (Stream<Path> paths = Files.list(this.directory)) {
AtomicLong counter = new AtomicLong(0L);
long size = paths.parallel()
.map(Path::toFile)
.filter(File::isFile)
.peek(f -> counter.incrementAndGet())
.map(File::length)
.reduce(Long::sum)
.orElse(0L);
return new CacheStatistics(counter.get(), size, DISK);
} catch (IOException e) {
throw new CompletionException(e);
}
}, this.executor);
public CacheStatistics statistics() throws IOException {
try (Stream<Path> paths = Files.list(this.directory)) {
AtomicLong counter = new AtomicLong(0L);
long size = paths.parallel()
.map(Path::toFile)
.filter(File::isFile)
.peek(f -> counter.incrementAndGet())
.map(File::length)
.reduce(Long::sum)
.orElse(0L);
return new CacheStatistics(counter.get(), size, DISK);
}
}

@Override
public CompletableFuture<CacheStatistics> cleanup(Predicate<CacheEntry> predicate) {
return CompletableFuture.supplyAsync(() -> {
try (Stream<Path> paths = Files.list(this.directory)) {
AtomicLong entries = new AtomicLong(0L);
AtomicLong size = new AtomicLong(0L);
paths.parallel()
.map(Path::toFile)
.map(f -> new EntryCleanup(f, this.readEntry(f), f.length()))
.filter(p -> p.entry() == null || predicate.test(p.entry()))
.filter(EntryCleanup::delete)
.forEach(e -> {
entries.incrementAndGet();
size.addAndGet(e.size());
});
return new CacheStatistics(entries.get(), size.get(), DISK);
} catch (IOException e) {
throw new CompletionException(e);
}
}, this.executor);
public CacheStatistics cleanup(Predicate<CacheEntry> predicate) throws IOException {
try (Stream<Path> paths = Files.list(this.directory)) {
AtomicLong entries = new AtomicLong(0L);
AtomicLong size = new AtomicLong(0L);
paths.parallel()
.map(Path::toFile)
.map(f -> new EntryCleanup(f, this.readEntry(f), f.length()))
.filter(p -> p.entry() == null || predicate.test(p.entry()))
.filter(EntryCleanup::delete)
.forEach(e -> {
entries.incrementAndGet();
size.addAndGet(e.size());
});
return new CacheStatistics(entries.get(), size.get(), DISK);
}
}

private @Nullable CacheEntry readEntry(@NotNull File file) {
Expand Down
6 changes: 3 additions & 3 deletions fabric/src/main/java/net/smyler/terramap/http/HttpCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;

public interface HttpCache {
Expand All @@ -13,8 +13,8 @@ public interface HttpCache {

@Nullable CacheEntry lookup(URI uri);

CompletableFuture<CacheStatistics> statistics();
CacheStatistics statistics() throws IOException;

CompletableFuture<CacheStatistics> cleanup(Predicate<CacheEntry> predicate);
CacheStatistics cleanup(Predicate<CacheEntry> predicate) throws IOException;

}
48 changes: 19 additions & 29 deletions fabric/src/main/java/net/smyler/terramap/http/MemoryCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,16 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;

import static net.smyler.terramap.http.CacheStatistics.CacheType.MEMORY;

public class MemoryCache implements HttpCache {

private final ExecutorService executorService;
private final ConcurrentHashMap<URI, MemoryCacheEntry> cache = new ConcurrentHashMap<>();

public MemoryCache(ExecutorService executorService) {
this.executorService = executorService;
}

@Override
public void put(@NotNull URI uri, long lastModified, long maxAge, @Nullable String etag, boolean immutable, boolean mustRevalidate, byte @NotNull [] body) {
Expand Down Expand Up @@ -52,33 +46,29 @@ public void put(@NotNull URI uri, long lastModified, long maxAge, @Nullable Stri
}

@Override
public CompletableFuture<CacheStatistics> statistics() {
return CompletableFuture.supplyAsync(() -> {
AtomicLong counter = new AtomicLong();
long size = this.cache.values().stream()
.peek(e -> counter.getAndIncrement())
.map(MemoryCacheEntry::size)
.reduce(0L, Long::sum);
return new CacheStatistics(counter.get(), size, MEMORY);
}, this.executorService);
public CacheStatistics statistics() {
AtomicLong counter = new AtomicLong();
long size = this.cache.values().stream()
.peek(e -> counter.getAndIncrement())
.map(MemoryCacheEntry::size)
.reduce(0L, Long::sum);
return new CacheStatistics(counter.get(), size, MEMORY);
}

@Override
public CompletableFuture<CacheStatistics> cleanup(Predicate<CacheEntry> predicate) {
return CompletableFuture.supplyAsync(() -> {
Collection<MemoryCacheEntry> values = this.cache.values();
long removedCount = 0;
long removedSize = 0;
for (Iterator<MemoryCacheEntry> iterator = values.iterator(); iterator.hasNext(); ) {
MemoryCacheEntry entry = iterator.next();
if (predicate.test(entry.entry)) {
iterator.remove();
removedSize += entry.size();
removedCount++;
}
public CacheStatistics cleanup(Predicate<CacheEntry> predicate) {
Collection<MemoryCacheEntry> values = this.cache.values();
long removedCount = 0;
long removedSize = 0;
for (Iterator<MemoryCacheEntry> iterator = values.iterator(); iterator.hasNext(); ) {
MemoryCacheEntry entry = iterator.next();
if (predicate.test(entry.entry)) {
iterator.remove();
removedSize += entry.size();
removedCount++;
}
return new CacheStatistics(removedCount, removedSize, MEMORY);
}, this.executorService);
}
return new CacheStatistics(removedCount, removedSize, MEMORY);
}

private record MemoryCacheEntry(CacheEntry entry, long size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static java.util.Arrays.stream;
import static java.util.Objects.requireNonNull;
import static net.smyler.smylib.Objects.optionalBiMapSupplier;
import static net.smyler.terramap.http.CacheStatistics.CacheType.ERROR;
import static net.smyler.terramap.http.HttpStatusCodes.*;


Expand Down Expand Up @@ -53,11 +54,11 @@ public TerramapHttpClient(Logger logger, Path cacheDirectory) {
try {
requireNonNull(cacheDirectory);
Files.createDirectories(cacheDirectory);
cache = new DiskCache(cacheDirectory, this.logger, this.forkJoinPool);
cache = new DiskCache(cacheDirectory, this.logger);
} catch (Exception e) {
this.logger.warn("Failed to create cache directory, falling back to memory cache");
this.logger.catching(e);
cache = new MemoryCache(this.forkJoinPool);
cache = new MemoryCache();
}
this.cache = cache;
}
Expand Down Expand Up @@ -244,17 +245,41 @@ private void log(HttpRequest request) {

@Override
public CompletableFuture<CacheStatistics> cacheStatistics() {
return this.cache.statistics();
return CompletableFuture.supplyAsync(() -> {
try {
return this.cache.statistics();
} catch (IOException e) {
this.logger.warn("Computing cache statistics failed");
this.logger.catching(e);
return new CacheStatistics(0, 0, ERROR);
}
}, this.forkJoinPool);
}

@Override
public CompletableFuture<CacheStatistics> cacheCleanup() {
return this.cache.cleanup(e -> !e.isFresh() && e.age() > e.maxAge() * 10);
return CompletableFuture.supplyAsync(() -> {
try {
return this.cache.cleanup(e -> !e.isFresh() && e.age() > e.maxAge() * 10);
} catch (IOException e) {
this.logger.warn("Cache cleanup failed");
this.logger.catching(e);
return new CacheStatistics(0, 0, ERROR);
}
}, this.forkJoinPool);
}

@Override
public CompletableFuture<CacheStatistics> cacheClear() {
return this.cache.cleanup(e -> true);
return CompletableFuture.supplyAsync(() -> {
try {
return this.cache.cleanup(e -> true);
} catch (IOException e) {
this.logger.warn("Cache clear failed");
this.logger.catching(e);
return new CacheStatistics(0, 0, ERROR);
}
}, this.forkJoinPool);
}

private class HttpWorkerThread extends ForkJoinWorkerThread {
Expand Down

0 comments on commit ba73a80

Please sign in to comment.