Skip to content

Commit

Permalink
New Fresh Solution to Optimize Execution time (#641)
Browse files Browse the repository at this point in the history
* CalculateAverage_pdrakatos

* Rename to be valid with rules

* CalculateAverage_pdrakatos

* Rename to be valid with rules

* Changes on scripts execution

* Fixing bugs causing scripts not to be executed

* Changes on prepare make it compatible

* Fixing passing all tests

* Increase direct memory allocation buffer

* Fixing memory problem causes heap space exception

* Fresh solution to optimize performance of the execution
  • Loading branch information
PanagiotisDrakatos authored Jan 29, 2024
1 parent 1281e77 commit 31a6740
Showing 1 changed file with 56 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package dev.morling.onebrc;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
Expand All @@ -26,18 +28,27 @@
import java.util.stream.StreamSupport;

public class CalculateAverage_PanagiotisDrakatos {

private static final String FILE = "./measurements.txt";
private static final long SEGMENT_SIZE = 4 * 1024 * 1024;
private static final long COMMA_PATTERN = 0x3B3B3B3B3B3B3B3BL;
private static final long DOT_BITS = 0x10101000;
private static final long MAGIC_MULTIPLIER = (100 * 0x1000000 + 10 * 0x10000 + 1);

private static TreeMap<String, MeasurementObject> sortedCities;

public static void main(String[] args) throws IOException {
SeekableByteRead(FILE);
System.out.println(sortedCities);
boolean DEBUG = true;
}

private static void SeekableByteRead(String path) throws IOException {
FileInputStream fileInputStream = new FileInputStream(FILE);
FileInputStream fileInputStream = new FileInputStream(new File(FILE));
FileChannel fileChannel = fileInputStream.getChannel();
Optional<Map<String, MeasurementObject>> optimistic = SplitSeekableByteChannel(fileChannel)
Optional<Map<String, MeasurementObject>> optimistic = getFileSegments(new File(FILE), fileChannel)
.stream()
.map(CalculateAverage_PanagiotisDrakatos::SplitSeekableByteChannel)
.parallel()
.map(CalculateAverage_PanagiotisDrakatos::MappingByteBufferToData)
.reduce(CalculateAverage_PanagiotisDrakatos::combineMaps);
Expand All @@ -46,37 +57,53 @@ private static void SeekableByteRead(String path) throws IOException {

}

private static Stream<ByteBuffer> SplitSeekableByteChannel(FileChannel channel) throws IOException {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new Iterator<ByteBuffer>() {
private static final long MAP_SIZE = 1024 * 1024 * 10L;

private long position = 0;
private long length = channel.size();
record FileSegment(long start, long end, FileChannel fileChannel) {
}

@Override
public boolean hasNext() {
while (position < length) {
return true;
}
return false;
private static List<FileSegment> getFileSegments(final File file, final FileChannel fileChannel) throws IOException {
final int numberOfSegments = Runtime.getRuntime().availableProcessors();
final long fileSize = file.length();
final long segmentSize = fileSize / numberOfSegments;
final List<FileSegment> segments = new ArrayList<>();
if (segmentSize < 1000) {
segments.add(new FileSegment(0, fileSize, fileChannel));
return segments;
}
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) {
long segStart = 0;
long segEnd = segmentSize;
while (segStart < fileSize) {
segEnd = findSegment(randomAccessFile, segEnd, fileSize);
segments.add(new FileSegment(segStart, segEnd, fileChannel));
segStart = segEnd; // Just re-use the end and go from there.
segEnd = Math.min(fileSize, segEnd + segmentSize);
}
}
return segments;
}

@Override
public ByteBuffer next() {
try {
MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, Math.min(MAP_SIZE, length - position));
int end = buffer.limit() - 1;
while (buffer.get(end) != '\n') {
end--;
}
position += end + 1;
return buffer.slice(0, end);
}
catch (IOException e) {
throw new RuntimeException(e);
}
private static long findSegment(RandomAccessFile raf, long location, final long fileSize) throws IOException {
raf.seek(location);
while (location < fileSize) {
location++;
if (raf.read() == '\n')
return location;
}
return location;
}

private static ByteBuffer SplitSeekableByteChannel(FileSegment segment) {
try {
MappedByteBuffer buffer = segment.fileChannel.map(FileChannel.MapMode.READ_ONLY, segment.start(), segment.end - segment.start());
int end = buffer.limit() - 1;
while (buffer.get(end) != '\n') {
end--;
}
}, Spliterator.IMMUTABLE), false);
return buffer.slice(0, end);
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}

public static ByteBuffer concat(ByteBuffer[] buffers) {
Expand Down

0 comments on commit 31a6740

Please sign in to comment.