Skip to content

Commit

Permalink
Fix hundredwatt's entry on 10k dataset (#558)
Browse files Browse the repository at this point in the history
* Improve hash function

* remove limit on number of cores

* fix calculation of boundaries between chunks

* fix IOOBE

---------

Co-authored-by: Jason Nochlin <[email protected]>
  • Loading branch information
hundredwatt and hundredwatt authored Jan 26, 2024
1 parent 09b0d75 commit 457a36b
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions src/main/java/dev/morling/onebrc/CalculateAverage_hundredwatt.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class CalculateAverage_hundredwatt {
private static final String FILE = "./measurements.txt";
private static final int MAX_ROW_SIZE = 100 + 1 + 5 + 1; // 100 for city name, 1 for ;, 5 for temperature, 1 for \n
private static final int THREAD_COUNT = Math.min(8, Runtime.getRuntime().availableProcessors());
private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
private static final long BUFFER_SIZE = 128 * 1024 * 1024; // 128MB
private static final long CHUNK_SIZE = BUFFER_SIZE / THREAD_COUNT;
private static final long FILE_CHUNK_SIZE = CHUNK_SIZE - MAX_ROW_SIZE;
Expand Down Expand Up @@ -209,10 +209,10 @@ private static int processChunk(ByteBuffer bb, HashTable hashTable, long start,
short temperature_value;
int hashInt;

int i = 0;
int rc = 0;
int end = (int) (size - MAX_ROW_SIZE);
while (position < end) {
i++;
while (position <= end) {
// rc++;
offset = -1;

// Parse city name
Expand Down Expand Up @@ -257,11 +257,11 @@ private static int processChunk(ByteBuffer bb, HashTable hashTable, long start,

position = position + newlinePos / 8 + 2; // +1 for \n

hashInt = (int) (hash ^ (hash >> 32));
hashInt = (int) (hash ^ (hash >> 32) ^ (hash >> 17));

hashTable.putOrMerge(hashInt, offset + 1, key, temperature_value);
}
return i;
return rc;
}

public static void main(String[] args) throws IOException {
Expand All @@ -282,7 +282,7 @@ public static void main(String[] args) throws IOException {
byte[] trailing = new byte[MAX_ROW_SIZE * 2];
fileChannel.read(ByteBuffer.wrap(trailing), Math.max(0, fileSize - MAX_ROW_SIZE));
var rc = processChunk(ByteBuffer.wrap(trailing), hashTable, Math.max(0, fileSize - MAX_ROW_SIZE),
MAX_ROW_SIZE + Math.min(fileSize, MAX_ROW_SIZE));
MAX_ROW_SIZE + Math.min(fileSize, MAX_ROW_SIZE) - 1);
// rowCount.addAndGet(rc);
return hashTable;

Expand All @@ -292,11 +292,16 @@ public static void main(String[] args) throws IOException {
}
}

// if file is smaller than max row size, we're done b/c the trailing bytes handler processed the whole file
if (fileSize <= MAX_ROW_SIZE) {
return hashTable;
}

while (start < fileSize) {
long end = Math.min(start + CHUNK_SIZE, fileSize);
MappedByteBuffer bb = null;
try {
bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, start, end - start);
bb = fileChannel.map(FileChannel.MapMode.READ_ONLY, start, Math.min(end - start + 8, fileSize - start));
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down

0 comments on commit 457a36b

Please sign in to comment.