From d9d2f3f97f4229ce327cd46ad129ffe10e3b9ba6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Adamski?= Date: Mon, 29 Jan 2024 21:35:51 +0100 Subject: [PATCH] =?UTF-8?q?Pawe=C5=82=20Adamski=20=20-=201brc=20submission?= =?UTF-8?q?=20=20(#629)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Paweł Adamski - 1BRC challenge * Paweł Adamski - 1BRC challenge * Make files executabe --- calculate_average_PawelAdamski.sh | 19 ++ prepare_PawelAdamski.sh | 20 ++ .../onebrc/CalculateAverage_PawelAdamski.java | 209 ++++++++++++++++++ 3 files changed, 248 insertions(+) create mode 100755 calculate_average_PawelAdamski.sh create mode 100755 prepare_PawelAdamski.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_PawelAdamski.java diff --git a/calculate_average_PawelAdamski.sh b/calculate_average_PawelAdamski.sh new file mode 100755 index 000000000..e8d4bd4ce --- /dev/null +++ b/calculate_average_PawelAdamski.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="-Xnoclassgc" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_PawelAdamski diff --git a/prepare_PawelAdamski.sh b/prepare_PawelAdamski.sh new file mode 100755 index 000000000..4cda7b411 --- /dev/null +++ b/prepare_PawelAdamski.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Uncomment below to use sdk +# source "$HOME/.sdkman/bin/sdkman-init.sh" +# sdk use java 21.0.1-graal 1>&2 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_PawelAdamski.java b/src/main/java/dev/morling/onebrc/CalculateAverage_PawelAdamski.java new file mode 100644 index 000000000..45470558f --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_PawelAdamski.java @@ -0,0 +1,209 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.*; +import java.util.stream.Collectors; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.groupingByConcurrent; + +public class CalculateAverage_PawelAdamski { + + private static final long READ_SIZE = 100_000_000; + private static final String FILE = "./measurements.txt"; + + private static record ResultRow(double min, double mean, double max) { + + public ResultRow(MeasurementAggregator ma) { + this(ma.min / 10.0, ((Math.round(ma.sum * 100.0) / 100.0) / (double) ma.count) / 10.0, ma.max / 10.0); + } + + public String toString() { + return round(min) + "/" + round(mean) + "/" + round(max); + } + + private double round(double value) { + return Math.round(value * 10.0) / 10.0; + } + } + + private static class Station { + byte[] bytes; + int hash; + + public Station(byte[] station) { + this.bytes = station; + this.hash = Arrays.hashCode(bytes); + } + + @Override + public int hashCode() { + return hash; + } + + @Override + public boolean equals(Object o) { + return Arrays.equals(bytes, ((Station) o).bytes); + } + + } + + private static class MeasurementAggregator { + private long min; + private long max; + private long sum; + private long count; + + public MeasurementAggregator(long temp) { + min = temp; + max = temp; + sum = temp; + count = 1; + } + + public MeasurementAggregator() { + min = Long.MAX_VALUE; + max = Long.MIN_VALUE; + sum = 0; + count = 0; + } + + public MeasurementAggregator merge(MeasurementAggregator measurement) { + MeasurementAggregator ma = new MeasurementAggregator(); + ma.min = Math.min(min, measurement.min); + ma.max = Math.max(max, measurement.max); + ma.sum = sum + measurement.sum; + ma.count = count + measurement.count; + return ma; + } + } + + public static void main(String[] args) throws IOException { + try (RandomAccessFile raf = new RandomAccessFile(FILE, "r")) { + List parts = splitFileIntoParts(raf); + Map rr = calculateTemperatureStats(parts, raf); + Map results = prepareResults(rr); + System.out.println(results); + } + } + + private static Map prepareResults(Map rr) { + Map measurements = new TreeMap<>(); + rr.forEach((k, v) -> measurements.put(new String(k.bytes, UTF_8), new ResultRow(v))); + return measurements; + } + + private static Map calculateTemperatureStats(List parts, RandomAccessFile raf) { + return parts.parallelStream() + .map(filePart -> parse(filePart, raf)) + .flatMap(m -> m.entrySet().stream()) + .collect(groupingByConcurrent( + Map.Entry::getKey, + Collectors.reducing( + new MeasurementAggregator(), + Map.Entry::getValue, + MeasurementAggregator::merge))); + } + + private static ArrayList splitFileIntoParts(RandomAccessFile raf) throws IOException { + ArrayList parts = new ArrayList<>((int) (raf.length() / READ_SIZE)); + long pointer = 0; + long nextPointer = 0; + long fileLength = raf.length(); + while (pointer < fileLength) { + if (pointer + READ_SIZE > fileLength) { + nextPointer = fileLength; + } + else { + nextPointer = findNextLine(raf, pointer + READ_SIZE); + } + parts.add(new FilePart(pointer, nextPointer - pointer)); + pointer = nextPointer; + } + return parts; + } + + private static Map parse(FilePart filePart, RandomAccessFile raf) { + try { + byte[] bytes = readBytesFromFile(filePart, raf); + return parseBytesIntoStationsMap(bytes); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static HashMap parseBytesIntoStationsMap(byte[] bytes) { + HashMap measurementAggregator = new HashMap<>(500); + int semicolonIndex = 0; + int newLineIndex = -1; + for (int i = 0; i < bytes.length; i++) { + if (bytes[i] == ';') { + semicolonIndex = i; + } + else if (bytes[i] == '\n') { + byte[] station = Arrays.copyOfRange(bytes, newLineIndex + 1, semicolonIndex); + long temp = parseDouble(bytes, semicolonIndex + 1, i); + MeasurementAggregator measurement = new MeasurementAggregator(temp); + measurementAggregator.compute(new Station(station), (k, prevV) -> prevV == null ? measurement : prevV.merge(measurement)); + newLineIndex = i; + } + } + return measurementAggregator; + } + + private static byte[] readBytesFromFile(FilePart filePart, RandomAccessFile raf) throws IOException { + var bb = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, filePart.start(), filePart.len()); + byte[] bytes = new byte[bb.remaining()]; + bb.get(bytes); + return bytes; + } + + private static long parseDouble(byte[] text, int start, int end) { + boolean negative = false; + int result = 0; + for (int i = start; i < end; i++) { + byte c = text[i]; + if (c == '-') { + negative = true; + } + else if (c != '.') { + result *= 10; + result += c - '0'; + } + } + if (negative) { + return -result; + } + else { + return result; + } + } + + private static long findNextLine(RandomAccessFile raf, long currentPosition) throws IOException { + raf.seek(currentPosition); + while (raf.readByte() != '\n') + ; + return raf.getFilePointer(); + } + + record FilePart(long start, long len) { + } +}