From c5b7b19e57624d2c510acc9efe7d5b2884dec9e8 Mon Sep 17 00:00:00 2001 From: Yevhenii Melnyk Date: Wed, 31 Jan 2024 18:05:09 +0100 Subject: [PATCH] melgenek: minor improvements (#655) * melgenek: minor improvements * More memory --- calculate_average_melgenek.sh | 4 +- .../onebrc/CalculateAverage_melgenek.java | 110 +++++++++--------- 2 files changed, 58 insertions(+), 56 deletions(-) diff --git a/calculate_average_melgenek.sh b/calculate_average_melgenek.sh index e0a88a352..ad709c31b 100755 --- a/calculate_average_melgenek.sh +++ b/calculate_average_melgenek.sh @@ -29,8 +29,8 @@ logicalCpuCount=$([ $(uname) = 'Darwin' ] && sysctl -n hw.logicalcpu_max || lscpu -p | egrep -v '^#' | wc -l) # The required heap is proportional to the number of cores. -# There's roughly 3.5MB heap per thread required for the 10k problem. -requiredMemory=$(echo "(l(15 + 3.5 * $logicalCpuCount)/l(2))" | bc -l) +# There's roughly 6MB heap per thread required for the 10k problem. +requiredMemory=$(echo "(l(15 + 6 * $logicalCpuCount)/l(2))" | bc -l) heapSize=$(echo "scale=0; 2^(($requiredMemory+1)/1)" | bc) JAVA_OPTS="$JAVA_OPTS -Xms${heapSize}m -Xmx${heapSize}m" diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_melgenek.java b/src/main/java/dev/morling/onebrc/CalculateAverage_melgenek.java index 133573186..924cf15d8 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_melgenek.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_melgenek.java @@ -15,12 +15,8 @@ */ package dev.morling.onebrc; -import jdk.incubator.vector.ByteVector; -import jdk.incubator.vector.Vector; -import jdk.incubator.vector.VectorOperators; -import jdk.incubator.vector.VectorSpecies; +import jdk.incubator.vector.*; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.lang.invoke.MethodHandles; @@ -30,7 +26,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.TreeMap; -import java.util.concurrent.Executors; +import java.util.concurrent.*; /** * The implementation: @@ -47,7 +43,6 @@ public class CalculateAverage_melgenek { private static final int CORES_COUNT = Runtime.getRuntime().availableProcessors(); private static final String FILE = "./measurements.txt"; - /** * This is a prime number that gives pretty * good hash distributions @@ -63,28 +58,46 @@ public class CalculateAverage_melgenek { private static final Vector NEWLINE_VECTOR = BYTE_SPECIES.broadcast(NEWLINE); private static final Vector SEMICOLON_VECTOR = BYTE_SPECIES.broadcast(SEMICOLON); private static final int MAX_LINE_LENGTH = 107; // 100 + len(";-11.1\n") = 100+7 - private static final TreeMap RESULT = new TreeMap<>(); public static void main(String[] args) throws Throwable { long totalSize = Files.size(Path.of(FILE)); - try (var executor = Executors.newFixedThreadPool(CORES_COUNT - 1)) { - long chunkSize = Math.max(1, totalSize / CORES_COUNT); - long offset = 0; + long chunkSize = Math.max(MAX_LINE_LENGTH, totalSize / CORES_COUNT); + var result = new TreeMap(); + try (var executor = Executors.newFixedThreadPool(CORES_COUNT)) { + var service = new ExecutorCompletionService(executor); int i = 0; - for (; offset < totalSize && i < CORES_COUNT - 1; i++) { - long currentOffset = offset; + for (; i * chunkSize < totalSize; i++) { + long currentOffset = Math.max(0, i * chunkSize - 1); long maxOffset = Math.min((i + 1) * chunkSize, totalSize); - executor.submit(() -> processRange(currentOffset, maxOffset)); - offset = (i + 1) * chunkSize - 1; + service.submit(() -> processRange(currentOffset, maxOffset)); } - if (offset < totalSize) { - processRange(offset, totalSize); + for (; i > 0; i--) { + service.take().get().addRows(result); } } - System.out.println(RESULT); + System.out.println(printTree(result)); } - private static void processRange(long startOffset, long maxOffset) { + private static String printTree(TreeMap result) { + var sb = new StringBuilder(50 * result.size()); + sb.append("{"); + boolean first = true; + for (var entry : result.entrySet()) { + if (first) { + first = false; + } + else { + sb.append(", "); + } + sb.append(entry.getKey()); + sb.append('='); + entry.getValue().appendToStringBuffer(sb); + } + sb.append("}"); + return sb.toString(); + } + + private static CompositeTable processRange(long startOffset, long maxOffset) { final var table = new CompositeTable(); try (var file = new BufferedFile(startOffset, maxOffset)) { processChunk(file, table); @@ -92,12 +105,10 @@ private static void processRange(long startOffset, long maxOffset) { catch (Exception e) { throw new RuntimeException(e); } - synchronized (RESULT) { - table.addRows(RESULT); - } + return table; } - private static void processChunk(BufferedFile file, CompositeTable table) { + private static void processChunk(BufferedFile file, CompositeTable table) throws IOException { if (file.offset != 0) { file.refillBuffer(); int newlinePosition = findDelimiter(file, 0, NEWLINE_VECTOR, NEWLINE); @@ -223,18 +234,18 @@ private static int calculateHash(byte[] buffer, int startPosition, int endPositi long hash = 0; int position = startPosition; - for (; position + Long.BYTES <= endPosition; position += Long.BYTES) { + for (; position + Long.BYTES < endPosition; position += Long.BYTES) { long value = (long) LONG_VIEW.get(buffer, position); hash = hash * RANDOM_PRIME + value; } - if (position + Integer.BYTES <= endPosition) { + if (position + Integer.BYTES < endPosition) { int value = (int) INT_VIEW.get(buffer, position); hash = hash * RANDOM_PRIME + value; position += Integer.BYTES; } - for (; position <= endPosition; position++) { + for (; position < endPosition; position++) { hash = hash * RANDOM_PRIME + buffer[position]; } hash = hash * RANDOM_PRIME; @@ -261,8 +272,6 @@ private static final class LongTable { */ private final long[] buckets = new long[TABLE_CAPACITY * 3]; - int keysCount = 0; - public void add(long str, short value) { int hash = calculateLongHash(str); int bucketIdx = hash & TABLE_CAPACITY_MASK; @@ -273,7 +282,6 @@ public void add(long str, short value) { } else if (bucketStr == 0L) { createBucket(bucketIdx, str, value); - keysCount++; } else { addWithProbing(str, value, (bucketIdx + 1) & TABLE_CAPACITY_MASK); @@ -290,7 +298,6 @@ private void addWithProbing(long str, short value, int bucketIdx) { } else if (bucketStr == 0L) { createBucket(bucketIdx, str, value); - keysCount++; break; } else { @@ -367,15 +374,12 @@ private static final class RegularTable { private static final int TABLE_CAPACITY_MASK = TABLE_CAPACITY - 1; private final Bucket[] buckets = new Bucket[TABLE_CAPACITY]; - int keysCount = 0; - public void add(byte[] data, int start, int stringLength, int hash, short value) { int bucketIdx = hash & TABLE_CAPACITY_MASK; var bucket = buckets[bucketIdx]; if (bucket == null) { buckets[bucketIdx] = new Bucket(data, start, stringLength, hash, value); - keysCount++; } else if (hash == bucket.hash && bucket.isEqual(data, start, stringLength)) { bucket.update(value); @@ -391,7 +395,6 @@ private void addWithProbing(byte[] data, int start, int stringLength, int hash, var bucket = buckets[bucketIdx]; if (bucket == null) { buckets[bucketIdx] = new Bucket(data, start, stringLength, hash, value); - keysCount++; break; } else if (hash == bucket.hash && bucket.isEqual(data, start, stringLength)) { @@ -449,6 +452,14 @@ public boolean isEqual(byte[] data, int start, int length) { if (str.length != length) return false; int i = 0; + int vectorLoopBound = BYTE_SPECIES.loopBound(str.length); + for (; i < vectorLoopBound; i += BYTE_SPECIES_BYTE_SIZE) { + var vector1 = ByteVector.fromArray(BYTE_SPECIES, str, i); + var vector2 = ByteVector.fromArray(BYTE_SPECIES, data, start + i); + var comparisonResult = vector1.compare(VectorOperators.NE, vector2); + if (comparisonResult.anyTrue()) + return false; + } for (; i + Long.BYTES < str.length; i += Long.BYTES) { long value1 = (long) LONG_VIEW.get(str, i); long value2 = (long) LONG_VIEW.get(data, start + i); @@ -493,10 +504,12 @@ public void add(long anotherSum, int anotherCount, short anotherMin, short anoth min = anotherMin; } - public String toString() { - return Math.round((double) min) / 10.0 + "/" - + Math.round((double) sum / count) / 10.0 + "/" - + Math.round((double) max) / 10.0; + public void appendToStringBuffer(StringBuilder sb) { + sb.append(Math.round((double) min) / 10.0); + sb.append('/'); + sb.append(Math.round((double) sum / count) / 10.0); + sb.append('/'); + sb.append(Math.round((double) max) / 10.0); } } @@ -513,30 +526,19 @@ private static final class BufferedFile implements AutoCloseable { private final RandomAccessFile file; private long offset; - private BufferedFile(long startOffset, long maxOffset) throws FileNotFoundException { + private BufferedFile(long startOffset, long maxOffset) throws IOException { this.offset = startOffset; this.maxOffset = maxOffset; this.file = new RandomAccessFile(FILE, "r"); } - private void refillBuffer() { + private void refillBuffer() throws IOException { int remainingBytes = bufferLimit - bufferPosition; if (remainingBytes < MAX_LINE_LENGTH) { bufferPosition = 0; - int bytesRead; - try { - file.seek(offset); - bytesRead = file.read(buffer, 0, BUFFER_SIZE); - } - catch (IOException e) { - throw new RuntimeException(e); - } - if (bytesRead > 0) { - bufferLimit = bytesRead; - } - else { - bufferLimit = 0; - } + file.seek(offset); + int bytesRead = file.read(buffer, 0, BUFFER_SIZE); + bufferLimit = Math.max(bytesRead, 0); } }