Skip to content

Commit

Permalink
Async cache/protocollib (#51)
Browse files Browse the repository at this point in the history
* feat: WIP - async obfuscation

* feat: WIP - async ChunkSerializer + ChunkCache

* feat: async ChunkCache

* feat: improve AsyncChunkSerializer (task grouping)

* feat: improve AsyncChunkSerializer efficiency

* fix: memory-leak on reload in ProximityHider

* feat: ofc thread group + thread names

* feat: added new config options for new async cache
  • Loading branch information
Ingrim4 authored Oct 4, 2020
1 parent d90a52f commit b803d33
Show file tree
Hide file tree
Showing 28 changed files with 528 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,14 @@ public interface CacheConfig {
* @throws IllegalArgumentException When the expire value is lower than one
*/
void expireAfterAccess(long expire);

int maximumTaskQueueSize();
/**
* @param size
* @throws IllegalArgumentException When the expire value is lower than one
*/
void maximumTaskQueueSize(int size);

int protocolLibThreads();
void protocolLibThreads(int threads);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@

import java.util.Objects;

import org.bukkit.World;

public class ChunkPosition {

private final String world;
private final World world;
private final int x;
private final int z;

public ChunkPosition(String world, int x, int z) {
public ChunkPosition(World world, int x, int z) {
this.world = Objects.requireNonNull(world);
this.x = x;
this.z = z;
}

public String getWorld() {
public World getWorld() {
return this.world;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,9 @@ protected final T get(Path path) throws IOException {
}

public final void close(Path path) throws IOException {
T t = null;

this.lock.writeLock().lock();
try {
t = this.regionFiles.remove(path);
T t = this.regionFiles.remove(path);
if (t != null) {
this.closeRegionFile(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ cache:
baseDirectory: 'orebfuscator_cache/'
maximumOpenRegionFiles: 256
deleteRegionFilesAfterAccess: 172800000‬
maximumSize: 4096
maximumSize: 8192
expireAfterAccess: 30000
maximumTaskQueueSize: 32768
protocolLibThreads: -1
world:
- worlds:
- world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ cache:
baseDirectory: 'orebfuscator_cache/'
maximumOpenRegionFiles: 256
deleteRegionFilesAfterAccess: 172800000‬
maximumSize: 4096
maximumSize: 8192
expireAfterAccess: 30000
maximumTaskQueueSize: 32768
protocolLibThreads: -1
world:
- worlds:
- world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ cache:
baseDirectory: 'orebfuscator_cache/'
maximumOpenRegionFiles: 256
deleteRegionFilesAfterAccess: 172800000‬
maximumSize: 4096
maximumSize: 8192
expireAfterAccess: 30000
maximumTaskQueueSize: 32768
protocolLibThreads: -1
world:
- worlds:
- world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ cache:
baseDirectory: 'orebfuscator_cache/'
maximumOpenRegionFiles: 256
deleteRegionFilesAfterAccess: 172800000‬
maximumSize: 4096
maximumSize: 8192
expireAfterAccess: 30000
maximumTaskQueueSize: 32768
protocolLibThreads: -1
world:
- worlds:
- world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ cache:
baseDirectory: 'orebfuscator_cache/'
maximumOpenRegionFiles: 256
deleteRegionFilesAfterAccess: 172800000‬
maximumSize: 4096
maximumSize: 8192
expireAfterAccess: 30000
maximumTaskQueueSize: 32768
protocolLibThreads: -1
world:
- worlds:
- world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ cache:
baseDirectory: 'orebfuscator_cache/'
maximumOpenRegionFiles: 256
deleteRegionFilesAfterAccess: 172800000‬
maximumSize: 4096
maximumSize: 8192
expireAfterAccess: 30000
maximumTaskQueueSize: 32768
protocolLibThreads: -1
world:
- worlds:
- world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ cache:
baseDirectory: 'orebfuscator_cache/'
maximumOpenRegionFiles: 256
deleteRegionFilesAfterAccess: 172800000‬
maximumSize: 4096
maximumSize: 8192
expireAfterAccess: 30000
maximumTaskQueueSize: 32768
protocolLibThreads: -1
world:
- worlds:
- world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ cache:
baseDirectory: 'orebfuscator_cache/'
maximumOpenRegionFiles: 256
deleteRegionFilesAfterAccess: 172800000‬
maximumSize: 4096
maximumSize: 8192
expireAfterAccess: 30000
maximumTaskQueueSize: 32768
protocolLibThreads: -1
world:
- worlds:
- world
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

public class Orebfuscator extends JavaPlugin implements Listener {

public static final ThreadGroup THREAD_GROUP = new ThreadGroup("ofc");

private final Thread mainThread = Thread.currentThread();

private OrebfuscatorConfig config;
private ChunkCache chunkCache;
private Obfuscator obfuscator;
Expand Down Expand Up @@ -71,8 +75,8 @@ public void onEnable() {

@Override
public void onDisable() {
this.chunkCache.invalidateAll(true);
NmsInstance.close();
this.chunkCache.close();
this.config.store();

this.packetListener.unregister();

Expand All @@ -83,6 +87,8 @@ public void onDisable() {

this.getServer().getScheduler().cancelTasks(this);


NmsInstance.close();
this.config = null;
}

Expand All @@ -95,6 +101,10 @@ public void onEnableFailed(Listener listener, Event event) {
}
}

public boolean isMainThread() {
return Thread.currentThread() == this.mainThread;
}

public OrebfuscatorConfig getOrebfuscatorConfig() {
return this.config;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package net.imprex.orebfuscator.cache;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import net.imprex.orebfuscator.Orebfuscator;
import net.imprex.orebfuscator.obfuscation.ObfuscatedChunk;
import net.imprex.orebfuscator.util.ChunkPosition;

public class AsyncChunkSerializer implements Runnable {

private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

private final Map<ChunkPosition, Runnable> tasks = new HashMap<>();
private final Queue<ChunkPosition> positions = new LinkedList<>();
private final int maxTaskQueueSize;

private final Thread thread;
private volatile boolean running = true;

public AsyncChunkSerializer(Orebfuscator orebfuscator) {
this.maxTaskQueueSize = orebfuscator.getOrebfuscatorConfig().cache().maximumTaskQueueSize();

this.thread = new Thread(Orebfuscator.THREAD_GROUP, this, "ofc-chunk-serializer");
this.thread.setDaemon(true);
this.thread.start();
}

public CompletableFuture<ObfuscatedChunk> read(ChunkPosition position) {
this.lock.lock();
try {
Runnable task = this.tasks.get(position);
if (task instanceof WriteTask) {
return CompletableFuture.completedFuture(((WriteTask) task).chunk);
} else if (task instanceof ReadTask) {
return ((ReadTask) task).future;
} else {
CompletableFuture<ObfuscatedChunk> future = new CompletableFuture<>();
this.queueTask(position, new ReadTask(position, future));
return future;
}
} finally {
this.lock.unlock();
}
}

public void write(ChunkPosition position, ObfuscatedChunk chunk) {
this.lock.lock();
try {
Runnable prevTask = this.queueTask(position, new WriteTask(position, chunk));
if (prevTask instanceof ReadTask) {
((ReadTask) prevTask).future.complete(chunk);
}
} finally {
this.lock.unlock();
}
}

private Runnable queueTask(ChunkPosition position, Runnable nextTask) {
while (this.positions.size() >= this.maxTaskQueueSize) {
this.notFull.awaitUninterruptibly();
}

if (!this.running) {
throw new IllegalStateException("AsyncChunkSerializer already closed");
}

Runnable prevTask = this.tasks.put(position, nextTask);
if (prevTask == null) {
this.positions.offer(position);
}

this.notEmpty.signal();
return prevTask;
}

@Override
public void run() {
while (this.running) {
this.lock.lock();
try {
if (this.positions.isEmpty()) {
this.notEmpty.await();
}

this.tasks.remove(this.positions.poll()).run();

this.notFull.signal();
} catch (InterruptedException e) {
break;
} finally {
this.lock.unlock();
}
}
}

public void close() {
this.lock.lock();
try {
this.running = false;
this.thread.interrupt();

while (!this.positions.isEmpty()) {
Runnable task = this.tasks.remove(this.positions.poll());
if (task instanceof WriteTask) {
task.run();
}
}
} finally {
this.lock.unlock();
}
}

private class WriteTask implements Runnable {
private final ChunkPosition position;
private final ObfuscatedChunk chunk;

public WriteTask(ChunkPosition position, ObfuscatedChunk chunk) {
this.position = position;
this.chunk = chunk;
}

@Override
public void run() {
try {
ChunkSerializer.write(position, chunk);
} catch (IOException e) {
e.printStackTrace();
}
}
}

private class ReadTask implements Runnable {
private final ChunkPosition position;
private final CompletableFuture<ObfuscatedChunk> future;

public ReadTask(ChunkPosition position, CompletableFuture<ObfuscatedChunk> future) {
this.position = position;
this.future = future;
}

@Override
public void run() {
try {
future.complete(ChunkSerializer.read(position));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Loading

0 comments on commit b803d33

Please sign in to comment.