From f14a05011f5e95fb1f8546728a7df2a727c9690b Mon Sep 17 00:00:00 2001 From: linl33 Date: Wed, 31 Jan 2024 17:44:56 +0800 Subject: [PATCH] Add linl33 v2 --- .github/workflows/maven.yml | 7 +- prepare_linl33.sh | 3 +- .../onebrc/CalculateAverage_linl33.java | 103 +++++++++--------- 3 files changed, 57 insertions(+), 56 deletions(-) diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 2014739f5..859795578 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -53,8 +53,13 @@ jobs: id: sdkman - name: 'Build project' + shell: bash run: | source "$HOME/.sdkman/bin/sdkman-init.sh" + if [ -f ${{ format('src/main/java-22/dev/morling/onebrc/CalculateAverage_{0}.java', github.event.pull_request.user.login || '') }} ]; then + sdk install java 22.ea.32-open || true + sdk use java 22.ea.32-open + fi ./mvnw --version ./mvnw -B clean verify -Pci @@ -63,5 +68,3 @@ jobs: run: | ./test_ci.sh ${{ github.event.pull_request.user.login }} if: github.event_name == 'pull_request' - - diff --git a/prepare_linl33.sh b/prepare_linl33.sh index 5fdf640a6..f943c90ef 100755 --- a/prepare_linl33.sh +++ b/prepare_linl33.sh @@ -17,8 +17,7 @@ source "$HOME/.sdkman/bin/sdkman-init.sh" -# TODO: bump to ea 32 when available -sdk use java 22.ea.31-open 1>&2 +sdk use java 22.ea.32-open 1>&2 CLASS_NAME="CalculateAverage_linl33" diff --git a/src/main/java-22/dev/morling/onebrc/CalculateAverage_linl33.java b/src/main/java-22/dev/morling/onebrc/CalculateAverage_linl33.java index 62d54546e..dc9fd23af 100644 --- a/src/main/java-22/dev/morling/onebrc/CalculateAverage_linl33.java +++ b/src/main/java-22/dev/morling/onebrc/CalculateAverage_linl33.java @@ -71,7 +71,7 @@ public static void main() throws InterruptedException, IOException { final var inputMapped = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size(), Arena.global()); final var chunkBounds = calcChunkBounds(inputMapped.address(), inputMapped.byteSize()); - final var maps = new SparseMap[N_THREADS]; + final var maps = new HashTable[N_THREADS]; try (final var threadPool = Executors.newFixedThreadPool(N_THREADS, THREAD_BUILDER.factory()); final var singleThreadExecutor = Executors.newSingleThreadExecutor(Thread.ofVirtual().factory())) { @@ -104,11 +104,12 @@ private static long[] calcChunkBounds(final long mappedAddr, final long fileSize return chunkBounds; } - private static void printSorted(final SparseMap temperatureMeasurements) { + private static void printSorted(final HashTable temperatureMeasurements) { final var weatherStations = new AggregatedMeasurement[(int) temperatureMeasurements.size]; final var nameBuffer = new byte[WEATHER_STATION_LENGTH_MAX]; - var offset = temperatureMeasurements.denseAddress; - for (int i = 0; i < weatherStations.length; i++, offset += SparseMap.DATA_SCALE * Long.BYTES) { + + for (int i = 0; i < weatherStations.length; i++) { + final var offset = temperatureMeasurements.getOffset(i); final var nameAddr = UNSAFE.getLong(offset); final var nameLength = UNSAFE.getInt(offset + Integer.BYTES * 7); MemorySegment.copy(ALL, ValueLayout.JAVA_BYTE, nameAddr, nameBuffer, 0, nameLength); @@ -129,8 +130,8 @@ private static void printSorted(final SparseMap temperatureMeasurements) { } private static void printAggMeasurement(final AggregatedMeasurement aggMeasurement, - final SparseMap temperatureMeasurements) { - final var offset = temperatureMeasurements.denseAddress + SparseMap.DATA_SCALE * Long.BYTES * aggMeasurement.id(); + final HashTable temperatureMeasurements) { + final var offset = temperatureMeasurements.getOffset(aggMeasurement.id()); // name System.out.print(aggMeasurement.name()); @@ -162,15 +163,15 @@ private static double round(final double d) { private static class CalculateAverageTask implements Runnable { public static final int BATCH_SIZE_BYTES = BYTE_SPECIES.vectorByteSize(); - private final SparseMap[] maps; + private final HashTable[] maps; private final long[] chunkBounds; private final long chunkStart; private final long chunkEnd; private final int t; - private SparseMap map; + private HashTable map; - public CalculateAverageTask(SparseMap[] maps, long[] chunkBounds, int t) { + public CalculateAverageTask(HashTable[] maps, long[] chunkBounds, int t) { this.maps = maps; this.chunkBounds = chunkBounds; this.chunkStart = chunkBounds[t]; @@ -180,7 +181,7 @@ public CalculateAverageTask(SparseMap[] maps, long[] chunkBounds, int t) { @Override public void run() { - this.maps[this.t] = new SparseMap(); + this.maps[this.t] = new HashTable(); this.map = this.maps[this.t]; var lineStart = this.chunkBounds[0]; @@ -192,8 +193,8 @@ public void run() { } } - final var vectorLimit = this.chunkStart + ((this.chunkEnd - this.chunkStart) & -BYTE_SPECIES.vectorByteSize()); - for (long i = this.chunkStart; i < vectorLimit; i += BYTE_SPECIES.vectorByteSize()) { + final var vectorLimit = this.chunkStart + ((this.chunkEnd - this.chunkStart) & -BATCH_SIZE_BYTES); + for (long i = this.chunkStart; i < vectorLimit; i += BATCH_SIZE_BYTES) { var lfMask = ByteVector.fromMemorySegment(BYTE_SPECIES, ALL, i, ByteOrder.nativeOrder()) .eq((byte) '\n') .toLong(); @@ -272,31 +273,34 @@ private void processLine(final long lineStart, final long lfAddress) { /** * Open addressing, linear probing hash map backed by off-heap memory */ - private static class SparseMap { + private static class HashTable { private static final int TRUNCATED_HASH_BITS = 26; // max # of unique keys private static final long DENSE_SIZE = WEATHER_STATION_DISTINCT_MAX; // max hash code (exclusive) private static final long SPARSE_SIZE = 1L << (TRUNCATED_HASH_BITS + 1); - private static final long DATA_SCALE = 4; + public static final long SPARSE_SCALE = 32; + public static final long DENSE_SCALE = 8; public final long sparseAddress; public final long denseAddress; public long size; - public SparseMap() { + public HashTable() { var arena = new MallocArena(Arena.global()); var callocArena = new CallocArena(Arena.global()); - this.size = 0L; - - final var sparse = callocArena.allocate(ValueLayout.JAVA_LONG, SPARSE_SIZE); + final var sparse = callocArena.allocate(ValueLayout.JAVA_BYTE, SPARSE_SIZE * SPARSE_SCALE); this.sparseAddress = (sparse.address() + MallocArena.MAX_ALIGN) & -MallocArena.MAX_ALIGN; - final var dense = arena.allocate(ValueLayout.JAVA_LONG, DENSE_SIZE * DATA_SCALE); + final var dense = arena.allocate(ValueLayout.JAVA_BYTE, DENSE_SIZE * DENSE_SCALE); this.denseAddress = (dense.address() + MallocArena.MAX_ALIGN) & -MallocArena.MAX_ALIGN; } + public long getOffset(final long index) { + return UNSAFE.getLong(this.denseAddress + index * DENSE_SCALE); + } + public void putEntry(final long keyAddress, final int keyLength, final int value) { final var hash = hash(keyAddress, keyLength); this.putEntryInternal(hash, keyAddress, keyLength, value, 1, value, value); @@ -309,43 +313,46 @@ private void putEntryInternal(final long hash, final int count, final int temperatureMin, final int temperatureMax) { - final var sparseOffset = this.sparseAddress + truncateHash(hash) * Long.BYTES; + final var sparseOffset = this.sparseAddress + truncateHash(hash) * SPARSE_SCALE; + + for (long n = 0, sparseLinearOffset = sparseOffset; n < WEATHER_STATION_DISTINCT_MAX; n++, sparseLinearOffset += SPARSE_SCALE) { + final var entryKeyAddress = UNSAFE.getLong(sparseLinearOffset); - for (long n = 0, sparseLinearOffset = sparseOffset; n < WEATHER_STATION_DISTINCT_MAX; n++, sparseLinearOffset += Long.BYTES) { - final var denseOffset = UNSAFE.getLong(sparseLinearOffset); - if (denseOffset == 0L) { + if (entryKeyAddress == 0L) { this.add(sparseLinearOffset, keyAddress, keyLength, temperature, count, temperatureMin, temperatureMax); this.size++; return; } - if (isCollision(keyAddress, keyLength, denseOffset)) { + if (mismatch(keyAddress, entryKeyAddress, keyLength)) { continue; } - final var currTotal = UNSAFE.getLong(denseOffset + Integer.BYTES * 2); - UNSAFE.putLong(denseOffset + Integer.BYTES * 2, currTotal + temperature); // total + final var currMin = UNSAFE.getInt(sparseLinearOffset + Integer.BYTES * 5); + final var currMax = UNSAFE.getInt(sparseLinearOffset + Integer.BYTES * 6); + final var currTotal = UNSAFE.getLong(sparseLinearOffset + Integer.BYTES * 2); + final var currCount = UNSAFE.getInt(sparseLinearOffset + Integer.BYTES * 4); - final var currCount = UNSAFE.getInt(denseOffset + Integer.BYTES * 4); - UNSAFE.putInt(denseOffset + Integer.BYTES * 4, currCount + count); // count + UNSAFE.putLong(sparseLinearOffset + Integer.BYTES * 2, currTotal + temperature); + UNSAFE.putInt(sparseLinearOffset + Integer.BYTES * 4, currCount + count); - final var currMin = UNSAFE.getInt(denseOffset + Integer.BYTES * 5); if (temperatureMin < currMin) { - UNSAFE.putInt(denseOffset + Integer.BYTES * 5, temperatureMin); // min + UNSAFE.putInt(sparseLinearOffset + Integer.BYTES * 5, temperatureMin); } - final var currMax = UNSAFE.getInt(denseOffset + Integer.BYTES * 6); if (temperatureMax > currMax) { - UNSAFE.putInt(denseOffset + Integer.BYTES * 6, temperatureMax); // max + UNSAFE.putInt(sparseLinearOffset + Integer.BYTES * 6, temperatureMax); } return; } } - public void merge(final SparseMap other) { + public void merge(final HashTable other) { final var otherSize = other.size; - for (long i = 0, offset = other.denseAddress; i < otherSize; i++, offset += DATA_SCALE * Long.BYTES) { + for (long i = 0; i < otherSize; i++) { + final var offset = other.getOffset(i); + final var keyAddress = UNSAFE.getLong(offset); final var keyLength = UNSAFE.getInt(offset + Integer.BYTES * 7); final var hash = hash(keyAddress, keyLength); @@ -369,22 +376,15 @@ private void add(final long sparseOffset, final int temperatureMin, final int temperatureMax) { // new entry, initialize sparse and dense - final var denseOffset = this.denseAddress + this.size * DATA_SCALE * Long.BYTES; - UNSAFE.putLong(sparseOffset, denseOffset); - - UNSAFE.putLong(denseOffset, keyAddress); - UNSAFE.putLong(denseOffset + Integer.BYTES * 2, temperature); - UNSAFE.putInt(denseOffset + Integer.BYTES * 4, count); - UNSAFE.putInt(denseOffset + Integer.BYTES * 5, temperatureMin); - UNSAFE.putInt(denseOffset + Integer.BYTES * 6, temperatureMax); - UNSAFE.putInt(denseOffset + Integer.BYTES * 7, keyLength); - } - - private static boolean isCollision(final long keyAddress, final int keyLength, final long denseOffset) { - // key length compare is unnecessary - - final var entryKeyAddress = UNSAFE.getLong(denseOffset); - return mismatch(keyAddress, entryKeyAddress, keyLength); + final var denseOffset = this.denseAddress + this.size * DENSE_SCALE; + UNSAFE.putLong(denseOffset, sparseOffset); + + UNSAFE.putLong(sparseOffset, keyAddress); + UNSAFE.putLong(sparseOffset + Integer.BYTES * 2, temperature); + UNSAFE.putInt(sparseOffset + Integer.BYTES * 4, count); + UNSAFE.putInt(sparseOffset + Integer.BYTES * 5, temperatureMin); + UNSAFE.putInt(sparseOffset + Integer.BYTES * 6, temperatureMax); + UNSAFE.putInt(sparseOffset + Integer.BYTES * 7, keyLength); } private static boolean mismatch(final long leftAddr, final long rightAddr, final int length) { @@ -404,8 +404,7 @@ private static boolean mismatch(final long leftAddr, final long rightAddr, final final var r = ByteVector.fromMemorySegment(BYTE_SPECIES, ALL, rightAddr + loopBound, ByteOrder.nativeOrder()); final var eqMask = l.eq(r).toLong(); - // LE compare to add 1 to length - return Long.numberOfTrailingZeros(~eqMask) <= (length - loopBound); + return Long.numberOfTrailingZeros(~eqMask) < ((length + 1) & (BYTE_SPECIES.vectorByteSize() - 1)); // to support platforms without TZCNT, the check can be replaced with // a comparison to lowestZero = ~eqMask & (eqMask + 1) }