diff --git a/calculate_average_breejesh.sh b/calculate_average_breejesh.sh new file mode 100755 index 000000000..0f0738b2b --- /dev/null +++ b/calculate_average_breejesh.sh @@ -0,0 +1,19 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_breejesh diff --git a/prepare_breejesh.sh b/prepare_breejesh.sh new file mode 100755 index 000000000..4cda7b411 --- /dev/null +++ b/prepare_breejesh.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Uncomment below to use sdk +# source "$HOME/.sdkman/bin/sdkman-init.sh" +# sdk use java 21.0.1-graal 1>&2 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_breejesh.java b/src/main/java/dev/morling/onebrc/CalculateAverage_breejesh.java new file mode 100644 index 000000000..3ee87c943 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_breejesh.java @@ -0,0 +1,180 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.File; +import java.io.IOException; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class CalculateAverage_breejesh { + private static final String FILE = "./measurements.txt"; + private static final int TWO_BYTE_TO_INT = 480 + 48; // 48 is the ASCII code for '0' + private static final int THREE_BYTE_TO_INT = 4800 + 480 + 48; + + private static final class Measurement { + + private int min; + private int max; + private int total; + private int count; + + public Measurement(int value) { + this.min = value; + this.max = value; + this.total = value; + this.count = 1; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append(min / 10.0); + result.append("/"); + result.append(Math.round(((double) total) / count) / 10.0); + result.append("/"); + result.append(max / 10.0); + return result.toString(); + } + + private void append(int min, int max, int total, int count) { + if (min < this.min) + this.min = min; + if (max > this.max) + this.max = max; + this.total += total; + this.count += count; + } + + public void append(int value) { + append(value, value, value, 1); + } + + public void merge(Measurement other) { + append(other.min, other.max, other.total, other.count); + } + } + + public static void main(String[] args) throws Exception { + // long start = System.currentTimeMillis(); + // Find system details to determine cores and + var file = new File(args.length > 0 ? args[0] : FILE); + long fileSize = file.length(); + var numberOfCores = fileSize > 1_000_000 ? Runtime.getRuntime().availableProcessors() : 1; + var splitSectionSize = (int) Math.min(Integer.MAX_VALUE, fileSize / numberOfCores); // bytebuffer position is an int, so can be max Integer.MAX_VALUE + var segmentCount = (int) (fileSize / splitSectionSize); + + // Divide file into segments + ExecutorService executor = Executors.newFixedThreadPool(segmentCount); + List>> futures = new ArrayList<>(); + for (int i = 0; i < segmentCount; i++) { + long sectionStart = i * (long) splitSectionSize; + long sectionEnd = Math.min(fileSize, sectionStart + splitSectionSize + 100); + var fileChannel = (FileChannel) Files.newByteChannel(file.toPath(), StandardOpenOption.READ); + CompletableFuture> future = CompletableFuture.supplyAsync(() -> { + MappedByteBuffer currentBuffer = null; + try { + currentBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, sectionStart, sectionEnd - sectionStart); + } + catch (IOException e) { + throw new RuntimeException(e); + } + // Skip till new line for unequal segments, not to be done for first section + if (sectionStart > 0) { + while (currentBuffer.get() != '\n') + ; + } + Map map = new HashMap<>(); + while (currentBuffer.position() < splitSectionSize) { + // Read station + String str = getStationFromBuffer(currentBuffer); + // Read number + int value = getValueFromBuffer(currentBuffer); + if (map.containsKey(str)) { + map.get(str).append(value); + } + else { + map.put(str, new Measurement(value)); + } + } + return map; + }, executor); + futures.add(future); + } + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + Map finalMap = new TreeMap<>(); + for (CompletableFuture> future : futures) { + Map map = future.get(); + map.keySet().stream().forEach( + key -> { + if (finalMap.containsKey(key)) { + finalMap.get(key).merge(map.get(key)); + } + else { + finalMap.put(key, map.get(key)); + } + }); + } + + System.out.println(finalMap); + // System.out.printf("Time %s", System.currentTimeMillis() - start); + System.exit(0); + } + + private static String getStationFromBuffer(MappedByteBuffer currentBuffer) { + byte currentByte; + var byteCounter = 0; + var buffer = new byte[100]; + while ((currentByte = currentBuffer.get()) != ';') { + buffer[byteCounter++] = currentByte; + } + return new String(buffer, 0, byteCounter, StandardCharsets.UTF_8); + } + + private static int getValueFromBuffer(MappedByteBuffer currentBuffer) { + int value; + byte[] nums = new byte[4]; + currentBuffer.get(nums); + if (nums[1] == '.') { + // case of n.n + value = (nums[0] * 10 + nums[2] - TWO_BYTE_TO_INT); + } + else { + if (nums[3] == '.') { + // case of -nn.n + value = -(nums[1] * 100 + nums[2] * 10 + currentBuffer.get() - THREE_BYTE_TO_INT); + } + else if (nums[0] == '-') { + // case of -n.n + value = -(nums[1] * 10 + nums[3] - TWO_BYTE_TO_INT); + } + else { + // case of nn.n + value = (nums[0] * 100 + nums[1] * 10 + nums[3] - THREE_BYTE_TO_INT); + } + currentBuffer.get(); // new line + } + return value; + } +}