From 14b217d6f6bb1ac205b7905380fd6a7d630c02c1 Mon Sep 17 00:00:00 2001 From: Michael Barry Date: Sun, 14 Jan 2024 12:08:20 -0500 Subject: [PATCH] Fix feature merge consistent ordering (#789) --- .../collection/StressTestKWayMerge.java | 57 +++++++++++++++++++ .../planetiler/archive/TileArchiveWriter.java | 3 +- .../collection/ArrayLongMinHeap.java | 2 +- .../collection/ExternalMergeSort.java | 5 ++ .../planetiler/collection/FeatureGroup.java | 4 ++ .../planetiler/collection/FeatureSort.java | 7 +++ .../planetiler/collection/LongMerger.java | 2 +- .../planetiler/collection/LongMergerTest.java | 33 +++++++++++ .../collection/LongMinHeapTest.java | 20 ++++--- 9 files changed, 122 insertions(+), 11 deletions(-) create mode 100644 planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/StressTestKWayMerge.java diff --git a/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/StressTestKWayMerge.java b/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/StressTestKWayMerge.java new file mode 100644 index 0000000000..af1c2fa5ef --- /dev/null +++ b/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/collection/StressTestKWayMerge.java @@ -0,0 +1,57 @@ +package com.onthegomap.planetiler.collection; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +public class StressTestKWayMerge { + + public static void main(String[] args) throws InterruptedException { + for (int i = 1; i < 20; i++) { + test(i, 100_000, 200_000); + } + for (int i = 50; i <= 500; i += 50) { + test(i, 10_000, 20_000); + } + test(5_000, 1000, 2000); + } + + private static void test(int n, long items, long maxKey) throws InterruptedException { + System.out.println("test(" + n + ")"); + var random = new Random(0); + List> featureLists = new ArrayList<>(); + ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + for (int i = 0; i < n; i++) { + List list = new ArrayList<>(); + featureLists.add(list); + for (int j = 0; j < items; j++) { + byte[] bytes = new byte[random.nextInt(1, 10)]; + random.nextBytes(bytes); + list.add(new SortableFeature(random.nextLong(maxKey), bytes)); + } + executorService.submit(() -> list.sort(Comparator.naturalOrder())); + } + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.DAYS); + + + var iter = + LongMerger.mergeIterators(featureLists.stream().map(List::iterator).toList(), SortableFeature.COMPARE_BYTES); + var last = iter.next(); + int i = 1; + while (iter.hasNext()) { + i++; + var item = iter.next(); + if (last.compareTo(item) > 0) { + System.err + .println("items out of order lists=" + n + " last=" + last + " item=" + item + " i=" + i); + return; + } + last = item; + } + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveWriter.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveWriter.java index 86e0af7f7c..8c3fc2bd48 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveWriter.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/archive/TileArchiveWriter.java @@ -88,7 +88,8 @@ public static void writeOutput(FeatureGroup features, WriteableTileArchive outpu TileArchiveMetadata tileArchiveMetadata, Path layerStatsPath, PlanetilerConfig config, Stats stats) { var timer = stats.startStage("archive"); - int readThreads = config.featureReadThreads(); + int chunksToRead = Math.max(1, features.chunksToRead()); + int readThreads = Math.min(config.featureReadThreads(), chunksToRead); int threads = config.threads(); int processThreads = threads < 10 ? threads : threads - readThreads; int tileWriteThreads = config.tileWriteThreads(); diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ArrayLongMinHeap.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ArrayLongMinHeap.java index f2464ae29c..39389efe34 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ArrayLongMinHeap.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ArrayLongMinHeap.java @@ -213,7 +213,7 @@ private void percolateDown(int pos) { } } } - if (comparePosPos(value, minValue, pos, minChild) <= 0) { + if (compareIdPos(value, minValue, id, minChild) <= 0) { break; } posToValue[pos] = minValue; diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java index 15cf934527..ef4f51eee9 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/ExternalMergeSort.java @@ -256,6 +256,11 @@ public Iterator iterator(int shard, int shards) { return LongMerger.mergeIterators(iterators, SortableFeature.COMPARE_BYTES); } + @Override + public int chunksToRead() { + return chunks.size(); + } + public int chunks() { return chunks.size(); } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java index ba074ef75c..63dfa6fbbd 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureGroup.java @@ -314,6 +314,10 @@ public void prepare() { } } + public int chunksToRead() { + return sorter.chunksToRead(); + } + public interface RenderedFeatureEncoder extends Function, Closeable {} public record Reader(Worker readWorker, Iterable result) {} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java index 4257f31f21..ce2028d1d8 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/FeatureSort.java @@ -74,6 +74,11 @@ public Iterator iterator(int shard, int shards) { .mapToObj(list::get) .iterator(); } + + @Override + public int chunksToRead() { + return list.size(); + } }; } @@ -134,6 +139,8 @@ default ParallelIterator parallelIterator(Stats stats, int threads) { return new ParallelIterator(reader, LongMerger.mergeSuppliers(queues, SortableFeature.COMPARE_BYTES)); } + int chunksToRead(); + record ParallelIterator(Worker reader, @Override Iterator iterator) implements Iterable {} } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/LongMerger.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/LongMerger.java index 1303924c11..2ba220dce6 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/LongMerger.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/collection/LongMerger.java @@ -230,8 +230,8 @@ public T next() { items[id] = next; heap.updateHead(next.key()); } else { - items[id] = null; heap.poll(); + items[id] = null; } return result; } diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMergerTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMergerTest.java index 96e5d7f9e0..e4b4bb0999 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMergerTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMergerTest.java @@ -2,11 +2,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.NoSuchElementException; +import java.util.Random; import java.util.function.Supplier; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -199,6 +201,37 @@ void testMerge4(String a, String b, String c, String d, String output) { } } + @ParameterizedTest + @ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 100}) + void stressTest(int n) { + int items = 10; + int maxKey = 20; + var random = new Random(0); + List> featureLists = new ArrayList<>(); + for (int i = 0; i < n; i++) { + List list = new ArrayList<>(); + featureLists.add(list); + for (int j = 0; j < items; j++) { + byte[] bytes = new byte[]{(byte) random.nextInt(256)}; + random.nextBytes(bytes); + list.add(new SortableFeature(random.nextLong(maxKey), bytes)); + } + list.sort(Comparator.naturalOrder()); + } + + var iter = + LongMerger.mergeIterators(featureLists.stream().map(List::iterator).toList(), SortableFeature.COMPARE_BYTES); + var last = iter.next(); + int i = 1; + while (iter.hasNext()) { + i++; + var item = iter.next(); + assertTrue(last.compareTo(item) <= 0, + "items out of order i=" + i + " lists=" + n + " last=" + last + " item=" + item); + last = item; + } + } + private static long[] parse(String in) { return in == null ? new long[0] : Stream.of(in.split("\\s+")) .map(String::strip) diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMinHeapTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMinHeapTest.java index 65ece71268..7b0a6c4a5a 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMinHeapTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/collection/LongMinHeapTest.java @@ -81,21 +81,25 @@ void duplicateElements() { @ParameterizedTest @CsvSource({ - "0, 1, 2, 3, 4, 5", - "5, 4, 3, 2, 1, 0", - "0, 1, 2, 5, 4, 3", - "0, 1, 5, 2, 4, 3", - "0, 5, 1, 2, 4, 3", - "5, 0, 1, 2, 4, 3", + "0, 1, 2, 3, 4, 5, 6, 7", + "7, 6, 5, 4, 3, 2, 1, 0", + "0, 1, 2, 6, 7, 5, 4, 3", + "0, 1, 5, 2, 4, 3, 6, 7", + "0, 5, 6, 7, 1, 2, 4, 3", + "5, 0, 1, 2, 7, 6, 4, 3", }) - void tieBreaker(int a, int b, int c, int d, int e, int f) { - heap = LongMinHeap.newArrayHeap(6, (id1, id2) -> -Integer.compare(id1, id2)); + void tieBreaker(int a, int b, int c, int d, int e, int f, int g, int h) { + heap = LongMinHeap.newArrayHeap(9, (id1, id2) -> -Integer.compare(id1, id2)); heap.push(a, 0L); heap.push(b, 0L); heap.push(c, 0L); heap.push(d, 0L); heap.push(e, 0L); heap.push(f, 0L); + heap.push(g, 0L); + heap.push(h, 0L); + assertEquals(7, heap.poll()); + assertEquals(6, heap.poll()); assertEquals(5, heap.poll()); assertEquals(4, heap.poll()); assertEquals(3, heap.poll());