Skip to content

Commit

Permalink
Fix feature merge consistent ordering (#789)
Browse files Browse the repository at this point in the history
  • Loading branch information
msbarry authored Jan 14, 2024
1 parent 062528b commit 14b217d
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.onthegomap.planetiler.collection;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class StressTestKWayMerge {

public static void main(String[] args) throws InterruptedException {
for (int i = 1; i < 20; i++) {
test(i, 100_000, 200_000);
}
for (int i = 50; i <= 500; i += 50) {
test(i, 10_000, 20_000);
}
test(5_000, 1000, 2000);
}

private static void test(int n, long items, long maxKey) throws InterruptedException {
System.out.println("test(" + n + ")");
var random = new Random(0);
List<List<SortableFeature>> featureLists = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
for (int i = 0; i < n; i++) {
List<SortableFeature> list = new ArrayList<>();
featureLists.add(list);
for (int j = 0; j < items; j++) {
byte[] bytes = new byte[random.nextInt(1, 10)];
random.nextBytes(bytes);
list.add(new SortableFeature(random.nextLong(maxKey), bytes));
}
executorService.submit(() -> list.sort(Comparator.naturalOrder()));
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);


var iter =
LongMerger.mergeIterators(featureLists.stream().map(List::iterator).toList(), SortableFeature.COMPARE_BYTES);
var last = iter.next();
int i = 1;
while (iter.hasNext()) {
i++;
var item = iter.next();
if (last.compareTo(item) > 0) {
System.err
.println("items out of order lists=" + n + " last=" + last + " item=" + item + " i=" + i);
return;
}
last = item;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public static void writeOutput(FeatureGroup features, WriteableTileArchive outpu
TileArchiveMetadata tileArchiveMetadata, Path layerStatsPath, PlanetilerConfig config, Stats stats) {
var timer = stats.startStage("archive");

int readThreads = config.featureReadThreads();
int chunksToRead = Math.max(1, features.chunksToRead());
int readThreads = Math.min(config.featureReadThreads(), chunksToRead);
int threads = config.threads();
int processThreads = threads < 10 ? threads : threads - readThreads;
int tileWriteThreads = config.tileWriteThreads();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ private void percolateDown(int pos) {
}
}
}
if (comparePosPos(value, minValue, pos, minChild) <= 0) {
if (compareIdPos(value, minValue, id, minChild) <= 0) {
break;
}
posToValue[pos] = minValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ public Iterator<SortableFeature> iterator(int shard, int shards) {
return LongMerger.mergeIterators(iterators, SortableFeature.COMPARE_BYTES);
}

@Override
public int chunksToRead() {
return chunks.size();
}

public int chunks() {
return chunks.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ public void prepare() {
}
}

public int chunksToRead() {
return sorter.chunksToRead();
}

public interface RenderedFeatureEncoder extends Function<RenderedFeature, SortableFeature>, Closeable {}

public record Reader(Worker readWorker, Iterable<TileFeatures> result) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public Iterator<SortableFeature> iterator(int shard, int shards) {
.mapToObj(list::get)
.iterator();
}

@Override
public int chunksToRead() {
return list.size();
}
};
}

Expand Down Expand Up @@ -134,6 +139,8 @@ default ParallelIterator parallelIterator(Stats stats, int threads) {
return new ParallelIterator(reader, LongMerger.mergeSuppliers(queues, SortableFeature.COMPARE_BYTES));
}

int chunksToRead();

record ParallelIterator(Worker reader, @Override Iterator<SortableFeature> iterator)
implements Iterable<SortableFeature> {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ public T next() {
items[id] = next;
heap.updateHead(next.key());
} else {
items[id] = null;
heap.poll();
items[id] = null;
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.function.Supplier;
import java.util.stream.LongStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -199,6 +201,37 @@ void testMerge4(String a, String b, String c, String d, String output) {
}
}

@ParameterizedTest
@ValueSource(ints = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 100})
void stressTest(int n) {
int items = 10;
int maxKey = 20;
var random = new Random(0);
List<List<SortableFeature>> featureLists = new ArrayList<>();
for (int i = 0; i < n; i++) {
List<SortableFeature> list = new ArrayList<>();
featureLists.add(list);
for (int j = 0; j < items; j++) {
byte[] bytes = new byte[]{(byte) random.nextInt(256)};
random.nextBytes(bytes);
list.add(new SortableFeature(random.nextLong(maxKey), bytes));
}
list.sort(Comparator.naturalOrder());
}

var iter =
LongMerger.mergeIterators(featureLists.stream().map(List::iterator).toList(), SortableFeature.COMPARE_BYTES);
var last = iter.next();
int i = 1;
while (iter.hasNext()) {
i++;
var item = iter.next();
assertTrue(last.compareTo(item) <= 0,
"items out of order i=" + i + " lists=" + n + " last=" + last + " item=" + item);
last = item;
}
}

private static long[] parse(String in) {
return in == null ? new long[0] : Stream.of(in.split("\\s+"))
.map(String::strip)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,25 @@ void duplicateElements() {

@ParameterizedTest
@CsvSource({
"0, 1, 2, 3, 4, 5",
"5, 4, 3, 2, 1, 0",
"0, 1, 2, 5, 4, 3",
"0, 1, 5, 2, 4, 3",
"0, 5, 1, 2, 4, 3",
"5, 0, 1, 2, 4, 3",
"0, 1, 2, 3, 4, 5, 6, 7",
"7, 6, 5, 4, 3, 2, 1, 0",
"0, 1, 2, 6, 7, 5, 4, 3",
"0, 1, 5, 2, 4, 3, 6, 7",
"0, 5, 6, 7, 1, 2, 4, 3",
"5, 0, 1, 2, 7, 6, 4, 3",
})
void tieBreaker(int a, int b, int c, int d, int e, int f) {
heap = LongMinHeap.newArrayHeap(6, (id1, id2) -> -Integer.compare(id1, id2));
void tieBreaker(int a, int b, int c, int d, int e, int f, int g, int h) {
heap = LongMinHeap.newArrayHeap(9, (id1, id2) -> -Integer.compare(id1, id2));
heap.push(a, 0L);
heap.push(b, 0L);
heap.push(c, 0L);
heap.push(d, 0L);
heap.push(e, 0L);
heap.push(f, 0L);
heap.push(g, 0L);
heap.push(h, 0L);
assertEquals(7, heap.poll());
assertEquals(6, heap.poll());
assertEquals(5, heap.poll());
assertEquals(4, heap.poll());
assertEquals(3, heap.poll());
Expand Down

0 comments on commit 14b217d

Please sign in to comment.