Skip to content

Commit

Permalink
Paweł Adamski - 1brc submission (#629)
Browse files Browse the repository at this point in the history
* Paweł Adamski - 1BRC challenge

* Paweł Adamski - 1BRC challenge

* Make files executabe
  • Loading branch information
PawelAdamski authored Jan 29, 2024
1 parent 7d52a37 commit d9d2f3f
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 0 deletions.
19 changes: 19 additions & 0 deletions calculate_average_PawelAdamski.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/sh
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

JAVA_OPTS="-Xnoclassgc"
java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_PawelAdamski
20 changes: 20 additions & 0 deletions prepare_PawelAdamski.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/bin/bash
#
# Copyright 2023 The original authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Uncomment below to use sdk
# source "$HOME/.sdkman/bin/sdkman-init.sh"
# sdk use java 21.0.1-graal 1>&2
209 changes: 209 additions & 0 deletions src/main/java/dev/morling/onebrc/CalculateAverage_PawelAdamski.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Copyright 2023 The original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.morling.onebrc;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.stream.Collectors;

import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.groupingByConcurrent;

public class CalculateAverage_PawelAdamski {

private static final long READ_SIZE = 100_000_000;
private static final String FILE = "./measurements.txt";

private static record ResultRow(double min, double mean, double max) {

public ResultRow(MeasurementAggregator ma) {
this(ma.min / 10.0, ((Math.round(ma.sum * 100.0) / 100.0) / (double) ma.count) / 10.0, ma.max / 10.0);
}

public String toString() {
return round(min) + "/" + round(mean) + "/" + round(max);
}

private double round(double value) {
return Math.round(value * 10.0) / 10.0;
}
}

private static class Station {
byte[] bytes;
int hash;

public Station(byte[] station) {
this.bytes = station;
this.hash = Arrays.hashCode(bytes);
}

@Override
public int hashCode() {
return hash;
}

@Override
public boolean equals(Object o) {
return Arrays.equals(bytes, ((Station) o).bytes);
}

}

private static class MeasurementAggregator {
private long min;
private long max;
private long sum;
private long count;

public MeasurementAggregator(long temp) {
min = temp;
max = temp;
sum = temp;
count = 1;
}

public MeasurementAggregator() {
min = Long.MAX_VALUE;
max = Long.MIN_VALUE;
sum = 0;
count = 0;
}

public MeasurementAggregator merge(MeasurementAggregator measurement) {
MeasurementAggregator ma = new MeasurementAggregator();
ma.min = Math.min(min, measurement.min);
ma.max = Math.max(max, measurement.max);
ma.sum = sum + measurement.sum;
ma.count = count + measurement.count;
return ma;
}
}

public static void main(String[] args) throws IOException {
try (RandomAccessFile raf = new RandomAccessFile(FILE, "r")) {
List<FilePart> parts = splitFileIntoParts(raf);
Map<Station, MeasurementAggregator> rr = calculateTemperatureStats(parts, raf);
Map<String, ResultRow> results = prepareResults(rr);
System.out.println(results);
}
}

private static Map<String, ResultRow> prepareResults(Map<Station, MeasurementAggregator> rr) {
Map<String, ResultRow> measurements = new TreeMap<>();
rr.forEach((k, v) -> measurements.put(new String(k.bytes, UTF_8), new ResultRow(v)));
return measurements;
}

private static Map<Station, MeasurementAggregator> calculateTemperatureStats(List<FilePart> parts, RandomAccessFile raf) {
return parts.parallelStream()
.map(filePart -> parse(filePart, raf))
.flatMap(m -> m.entrySet().stream())
.collect(groupingByConcurrent(
Map.Entry::getKey,
Collectors.reducing(
new MeasurementAggregator(),
Map.Entry::getValue,
MeasurementAggregator::merge)));
}

private static ArrayList<FilePart> splitFileIntoParts(RandomAccessFile raf) throws IOException {
ArrayList<FilePart> parts = new ArrayList<>((int) (raf.length() / READ_SIZE));
long pointer = 0;
long nextPointer = 0;
long fileLength = raf.length();
while (pointer < fileLength) {
if (pointer + READ_SIZE > fileLength) {
nextPointer = fileLength;
}
else {
nextPointer = findNextLine(raf, pointer + READ_SIZE);
}
parts.add(new FilePart(pointer, nextPointer - pointer));
pointer = nextPointer;
}
return parts;
}

private static Map<Station, MeasurementAggregator> parse(FilePart filePart, RandomAccessFile raf) {
try {
byte[] bytes = readBytesFromFile(filePart, raf);
return parseBytesIntoStationsMap(bytes);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

private static HashMap<Station, MeasurementAggregator> parseBytesIntoStationsMap(byte[] bytes) {
HashMap<Station, MeasurementAggregator> measurementAggregator = new HashMap<>(500);
int semicolonIndex = 0;
int newLineIndex = -1;
for (int i = 0; i < bytes.length; i++) {
if (bytes[i] == ';') {
semicolonIndex = i;
}
else if (bytes[i] == '\n') {
byte[] station = Arrays.copyOfRange(bytes, newLineIndex + 1, semicolonIndex);
long temp = parseDouble(bytes, semicolonIndex + 1, i);
MeasurementAggregator measurement = new MeasurementAggregator(temp);
measurementAggregator.compute(new Station(station), (k, prevV) -> prevV == null ? measurement : prevV.merge(measurement));
newLineIndex = i;
}
}
return measurementAggregator;
}

private static byte[] readBytesFromFile(FilePart filePart, RandomAccessFile raf) throws IOException {
var bb = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, filePart.start(), filePart.len());
byte[] bytes = new byte[bb.remaining()];
bb.get(bytes);
return bytes;
}

private static long parseDouble(byte[] text, int start, int end) {
boolean negative = false;
int result = 0;
for (int i = start; i < end; i++) {
byte c = text[i];
if (c == '-') {
negative = true;
}
else if (c != '.') {
result *= 10;
result += c - '0';
}
}
if (negative) {
return -result;
}
else {
return result;
}
}

private static long findNextLine(RandomAccessFile raf, long currentPosition) throws IOException {
raf.seek(currentPosition);
while (raf.readByte() != '\n')
;
return raf.getFilePointer();
}

record FilePart(long start, long len) {
}
}

0 comments on commit d9d2f3f

Please sign in to comment.