Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

melgenek: minor improvements #655

Merged
merged 2 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions calculate_average_melgenek.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ logicalCpuCount=$([ $(uname) = 'Darwin' ] &&
sysctl -n hw.logicalcpu_max ||
lscpu -p | egrep -v '^#' | wc -l)
# The required heap is proportional to the number of cores.
# There's roughly 3.5MB heap per thread required for the 10k problem.
requiredMemory=$(echo "(l(15 + 3.5 * $logicalCpuCount)/l(2))" | bc -l)
# There's roughly 6MB heap per thread required for the 10k problem.
requiredMemory=$(echo "(l(15 + 6 * $logicalCpuCount)/l(2))" | bc -l)
heapSize=$(echo "scale=0; 2^(($requiredMemory+1)/1)" | bc)

JAVA_OPTS="$JAVA_OPTS -Xms${heapSize}m -Xmx${heapSize}m"
Expand Down
110 changes: 56 additions & 54 deletions src/main/java/dev/morling/onebrc/CalculateAverage_melgenek.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,8 @@
*/
package dev.morling.onebrc;

import jdk.incubator.vector.ByteVector;
import jdk.incubator.vector.Vector;
import jdk.incubator.vector.VectorOperators;
import jdk.incubator.vector.VectorSpecies;
import jdk.incubator.vector.*;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.invoke.MethodHandles;
Expand All @@ -30,7 +26,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.TreeMap;
import java.util.concurrent.Executors;
import java.util.concurrent.*;

/**
* The implementation:
Expand All @@ -47,7 +43,6 @@ public class CalculateAverage_melgenek {
private static final int CORES_COUNT = Runtime.getRuntime().availableProcessors();

private static final String FILE = "./measurements.txt";

/**
* This is a prime number that gives pretty
* <a href="https://vanilla-java.github.io/2018/08/15/Looking-at-randomness-and-performance-for-hash-codes.html">good hash distributions</a>
Expand All @@ -63,41 +58,57 @@ public class CalculateAverage_melgenek {
private static final Vector<Byte> NEWLINE_VECTOR = BYTE_SPECIES.broadcast(NEWLINE);
private static final Vector<Byte> SEMICOLON_VECTOR = BYTE_SPECIES.broadcast(SEMICOLON);
private static final int MAX_LINE_LENGTH = 107; // 100 + len(";-11.1\n") = 100+7
private static final TreeMap<String, ResultRow> RESULT = new TreeMap<>();

public static void main(String[] args) throws Throwable {
long totalSize = Files.size(Path.of(FILE));
try (var executor = Executors.newFixedThreadPool(CORES_COUNT - 1)) {
long chunkSize = Math.max(1, totalSize / CORES_COUNT);
long offset = 0;
long chunkSize = Math.max(MAX_LINE_LENGTH, totalSize / CORES_COUNT);
var result = new TreeMap<String, ResultRow>();
try (var executor = Executors.newFixedThreadPool(CORES_COUNT)) {
var service = new ExecutorCompletionService<CompositeTable>(executor);
int i = 0;
for (; offset < totalSize && i < CORES_COUNT - 1; i++) {
long currentOffset = offset;
for (; i * chunkSize < totalSize; i++) {
long currentOffset = Math.max(0, i * chunkSize - 1);
long maxOffset = Math.min((i + 1) * chunkSize, totalSize);
executor.submit(() -> processRange(currentOffset, maxOffset));
offset = (i + 1) * chunkSize - 1;
service.submit(() -> processRange(currentOffset, maxOffset));
}
if (offset < totalSize) {
processRange(offset, totalSize);
for (; i > 0; i--) {
service.take().get().addRows(result);
}
}
System.out.println(RESULT);
System.out.println(printTree(result));
}

private static void processRange(long startOffset, long maxOffset) {
private static String printTree(TreeMap<String, ResultRow> result) {
var sb = new StringBuilder(50 * result.size());
sb.append("{");
boolean first = true;
for (var entry : result.entrySet()) {
if (first) {
first = false;
}
else {
sb.append(", ");
}
sb.append(entry.getKey());
sb.append('=');
entry.getValue().appendToStringBuffer(sb);
}
sb.append("}");
return sb.toString();
}

private static CompositeTable processRange(long startOffset, long maxOffset) {
final var table = new CompositeTable();
try (var file = new BufferedFile(startOffset, maxOffset)) {
processChunk(file, table);
}
catch (Exception e) {
throw new RuntimeException(e);
}
synchronized (RESULT) {
table.addRows(RESULT);
}
return table;
}

private static void processChunk(BufferedFile file, CompositeTable table) {
private static void processChunk(BufferedFile file, CompositeTable table) throws IOException {
if (file.offset != 0) {
file.refillBuffer();
int newlinePosition = findDelimiter(file, 0, NEWLINE_VECTOR, NEWLINE);
Expand Down Expand Up @@ -223,18 +234,18 @@ private static int calculateHash(byte[] buffer, int startPosition, int endPositi
long hash = 0;

int position = startPosition;
for (; position + Long.BYTES <= endPosition; position += Long.BYTES) {
for (; position + Long.BYTES < endPosition; position += Long.BYTES) {
long value = (long) LONG_VIEW.get(buffer, position);
hash = hash * RANDOM_PRIME + value;
}

if (position + Integer.BYTES <= endPosition) {
if (position + Integer.BYTES < endPosition) {
int value = (int) INT_VIEW.get(buffer, position);
hash = hash * RANDOM_PRIME + value;
position += Integer.BYTES;
}

for (; position <= endPosition; position++) {
for (; position < endPosition; position++) {
hash = hash * RANDOM_PRIME + buffer[position];
}
hash = hash * RANDOM_PRIME;
Expand All @@ -261,8 +272,6 @@ private static final class LongTable {
*/
private final long[] buckets = new long[TABLE_CAPACITY * 3];

int keysCount = 0;

public void add(long str, short value) {
int hash = calculateLongHash(str);
int bucketIdx = hash & TABLE_CAPACITY_MASK;
Expand All @@ -273,7 +282,6 @@ public void add(long str, short value) {
}
else if (bucketStr == 0L) {
createBucket(bucketIdx, str, value);
keysCount++;
}
else {
addWithProbing(str, value, (bucketIdx + 1) & TABLE_CAPACITY_MASK);
Expand All @@ -290,7 +298,6 @@ private void addWithProbing(long str, short value, int bucketIdx) {
}
else if (bucketStr == 0L) {
createBucket(bucketIdx, str, value);
keysCount++;
break;
}
else {
Expand Down Expand Up @@ -367,15 +374,12 @@ private static final class RegularTable {
private static final int TABLE_CAPACITY_MASK = TABLE_CAPACITY - 1;
private final Bucket[] buckets = new Bucket[TABLE_CAPACITY];

int keysCount = 0;

public void add(byte[] data, int start, int stringLength, int hash, short value) {
int bucketIdx = hash & TABLE_CAPACITY_MASK;

var bucket = buckets[bucketIdx];
if (bucket == null) {
buckets[bucketIdx] = new Bucket(data, start, stringLength, hash, value);
keysCount++;
}
else if (hash == bucket.hash && bucket.isEqual(data, start, stringLength)) {
bucket.update(value);
Expand All @@ -391,7 +395,6 @@ private void addWithProbing(byte[] data, int start, int stringLength, int hash,
var bucket = buckets[bucketIdx];
if (bucket == null) {
buckets[bucketIdx] = new Bucket(data, start, stringLength, hash, value);
keysCount++;
break;
}
else if (hash == bucket.hash && bucket.isEqual(data, start, stringLength)) {
Expand Down Expand Up @@ -449,6 +452,14 @@ public boolean isEqual(byte[] data, int start, int length) {
if (str.length != length)
return false;
int i = 0;
int vectorLoopBound = BYTE_SPECIES.loopBound(str.length);
for (; i < vectorLoopBound; i += BYTE_SPECIES_BYTE_SIZE) {
var vector1 = ByteVector.fromArray(BYTE_SPECIES, str, i);
var vector2 = ByteVector.fromArray(BYTE_SPECIES, data, start + i);
var comparisonResult = vector1.compare(VectorOperators.NE, vector2);
if (comparisonResult.anyTrue())
return false;
}
for (; i + Long.BYTES < str.length; i += Long.BYTES) {
long value1 = (long) LONG_VIEW.get(str, i);
long value2 = (long) LONG_VIEW.get(data, start + i);
Expand Down Expand Up @@ -493,10 +504,12 @@ public void add(long anotherSum, int anotherCount, short anotherMin, short anoth
min = anotherMin;
}

public String toString() {
return Math.round((double) min) / 10.0 + "/"
+ Math.round((double) sum / count) / 10.0 + "/"
+ Math.round((double) max) / 10.0;
public void appendToStringBuffer(StringBuilder sb) {
sb.append(Math.round((double) min) / 10.0);
sb.append('/');
sb.append(Math.round((double) sum / count) / 10.0);
sb.append('/');
sb.append(Math.round((double) max) / 10.0);
}
}

Expand All @@ -513,30 +526,19 @@ private static final class BufferedFile implements AutoCloseable {
private final RandomAccessFile file;
private long offset;

private BufferedFile(long startOffset, long maxOffset) throws FileNotFoundException {
private BufferedFile(long startOffset, long maxOffset) throws IOException {
this.offset = startOffset;
this.maxOffset = maxOffset;
this.file = new RandomAccessFile(FILE, "r");
}

private void refillBuffer() {
private void refillBuffer() throws IOException {
int remainingBytes = bufferLimit - bufferPosition;
if (remainingBytes < MAX_LINE_LENGTH) {
bufferPosition = 0;
int bytesRead;
try {
file.seek(offset);
bytesRead = file.read(buffer, 0, BUFFER_SIZE);
}
catch (IOException e) {
throw new RuntimeException(e);
}
if (bytesRead > 0) {
bufferLimit = bytesRead;
}
else {
bufferLimit = 0;
}
file.seek(offset);
int bytesRead = file.read(buffer, 0, BUFFER_SIZE);
bufferLimit = Math.max(bytesRead, 0);
}
}

Expand Down
Loading