Skip to content

Commit

Permalink
Deterministic merging (#785)
Browse files Browse the repository at this point in the history
  • Loading branch information
msbarry authored Jan 11, 2024
1 parent 96eae61 commit 0dc2ee8
Show file tree
Hide file tree
Showing 12 changed files with 285 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class BenchmarkKWayMerge {
public static void main(String[] args) {
for (int i = 0; i < 4; i++) {
System.err.println();
testMinHeap("quaternary", LongMinHeap::newArrayHeap);
testMinHeap("quaternary", n -> LongMinHeap.newArrayHeap(n, Integer::compare));
System.err.println(String.join("\t",
"priorityqueue",
Long.toString(testPriorityQueue(10).toMillis()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -80,7 +81,8 @@ public class VectorTile {
// TODO make these configurable
private static final int EXTENT = 4096;
private static final double SIZE = 256d;
private final Map<String, Layer> layers = new LinkedHashMap<>();
// use a treemap to ensure that layers are encoded in a consistent order
private final Map<String, Layer> layers = new TreeMap<>();
private LayerAttrStats.Updater.ForZoom layerStatsTracker = LayerAttrStats.Updater.ForZoom.NOOP;

private static int[] getCommands(Geometry input, int scale) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.onthegomap.planetiler.collection;

import java.util.Arrays;
import java.util.function.IntBinaryOperator;

/**
* A min-heap stored in an array where each element has 4 children.
Expand All @@ -38,24 +39,26 @@
*/
class ArrayLongMinHeap implements LongMinHeap {
protected static final int NOT_PRESENT = -1;
protected final int[] tree;
protected final int[] positions;
protected final long[] vals;
protected final int[] posToId;
protected final int[] idToPos;
protected final long[] posToValue;
protected final int max;
protected int size;
private final IntBinaryOperator tieBreaker;

/**
* @param elements the number of elements that can be stored in this heap. Currently the heap cannot be resized or
* shrunk/trimmed after initial creation. elements-1 is the maximum id that can be stored in this heap
*/
ArrayLongMinHeap(int elements) {
ArrayLongMinHeap(int elements, IntBinaryOperator tieBreaker) {
// we use an offset of one to make the arithmetic a bit simpler/more efficient, the 0th elements are not used!
tree = new int[elements + 1];
positions = new int[elements + 1];
Arrays.fill(positions, NOT_PRESENT);
vals = new long[elements + 1];
vals[0] = Long.MIN_VALUE;
posToId = new int[elements + 1];
idToPos = new int[elements + 1];
Arrays.fill(idToPos, NOT_PRESENT);
posToValue = new long[elements + 1];
posToValue[0] = Long.MIN_VALUE;
this.max = elements;
this.tieBreaker = tieBreaker;
}

private static int firstChild(int index) {
Expand Down Expand Up @@ -87,58 +90,59 @@ public void push(int id, long value) {
" was pushed already, you need to use the update method if you want to change its value");
}
size++;
tree[size] = id;
positions[id] = size;
vals[size] = value;
posToId[size] = id;
idToPos[id] = size;
posToValue[size] = value;
percolateUp(size);
}

@Override
public boolean contains(int id) {
checkIdInRange(id);
return positions[id] != NOT_PRESENT;
return idToPos[id] != NOT_PRESENT;
}

@Override
public void update(int id, long value) {
checkIdInRange(id);
int index = positions[id];
if (index < 0) {
int pos = idToPos[id];
if (pos < 0) {
throw new IllegalStateException(
"The heap does not contain: " + id + ". Use the contains method to check this before calling update");
}
long prev = vals[index];
vals[index] = value;
if (value > prev) {
percolateDown(index);
} else if (value < prev) {
percolateUp(index);
long prev = posToValue[pos];
posToValue[pos] = value;
int cmp = compareIdPos(value, prev, id, pos);
if (cmp > 0) {
percolateDown(pos);
} else if (cmp < 0) {
percolateUp(pos);
}
}

@Override
public void updateHead(long value) {
vals[1] = value;
posToValue[1] = value;
percolateDown(1);
}

@Override
public int peekId() {
return tree[1];
return posToId[1];
}

@Override
public long peekValue() {
return vals[1];
return posToValue[1];
}

@Override
public int poll() {
int id = peekId();
tree[1] = tree[size];
vals[1] = vals[size];
positions[tree[1]] = 1;
positions[id] = NOT_PRESENT;
posToId[1] = posToId[size];
posToValue[1] = posToValue[size];
idToPos[posToId[1]] = 1;
idToPos[id] = NOT_PRESENT;
size--;
percolateDown(1);
return id;
Expand All @@ -147,29 +151,29 @@ public int poll() {
@Override
public void clear() {
for (int i = 1; i <= size; i++) {
positions[tree[i]] = NOT_PRESENT;
idToPos[posToId[i]] = NOT_PRESENT;
}
size = 0;
}

private void percolateUp(int index) {
assert index != 0;
if (index == 1) {
private void percolateUp(int pos) {
assert pos != 0;
if (pos == 1) {
return;
}
final int el = tree[index];
final long val = vals[index];
final int id = posToId[pos];
final long val = posToValue[pos];
// the finish condition (index==0) is covered here automatically because we set vals[0]=-inf
int parent;
long parentValue;
while (val < (parentValue = vals[parent = parent(index)])) {
vals[index] = parentValue;
positions[tree[index] = tree[parent]] = index;
index = parent;
while (compareIdPos(val, parentValue = posToValue[parent = parent(pos)], id, parent) < 0) {
posToValue[pos] = parentValue;
idToPos[posToId[pos] = posToId[parent]] = pos;
pos = parent;
}
tree[index] = el;
vals[index] = val;
positions[tree[index]] = index;
posToId[pos] = id;
posToValue[pos] = val;
idToPos[posToId[pos]] = pos;
}

private void checkIdInRange(int id) {
Expand All @@ -178,45 +182,65 @@ private void checkIdInRange(int id) {
}
}

private void percolateDown(int index) {
private void percolateDown(int pos) {
if (size == 0) {
return;
}
assert index > 0;
assert index <= size;
final int el = tree[index];
final long val = vals[index];
assert pos > 0;
assert pos <= size;
final int id = posToId[pos];
final long value = posToValue[pos];
int child;
while ((child = firstChild(index)) <= size) {
while ((child = firstChild(pos)) <= size) {
// optimization: this is a very hot code path for performance of k-way merging,
// so manually-unroll the loop over the 4 child elements to find the minimum value
int minChild = child;
long minValue = vals[child], value;
long minValue = posToValue[child], childValue;
if (++child <= size) {
if ((value = vals[child]) < minValue) {
if (comparePosPos(childValue = posToValue[child], minValue, child, minChild) < 0) {
minChild = child;
minValue = value;
minValue = childValue;
}
if (++child <= size) {
if ((value = vals[child]) < minValue) {
if (comparePosPos(childValue = posToValue[child], minValue, child, minChild) < 0) {
minChild = child;
minValue = value;
minValue = childValue;
}
if (++child <= size && (value = vals[child]) < minValue) {
if (++child <= size &&
comparePosPos(childValue = posToValue[child], minValue, child, minChild) < 0) {
minChild = child;
minValue = value;
minValue = childValue;
}
}
}
if (minValue >= val) {
if (comparePosPos(value, minValue, pos, minChild) <= 0) {
break;
}
vals[index] = minValue;
positions[tree[index] = tree[minChild]] = index;
index = minChild;
posToValue[pos] = minValue;
idToPos[posToId[pos] = posToId[minChild]] = pos;
pos = minChild;
}
tree[index] = el;
vals[index] = val;
positions[el] = index;
posToId[pos] = id;
posToValue[pos] = value;
idToPos[id] = pos;
}

private int comparePosPos(long val1, long val2, int pos1, int pos2) {
if (val1 < val2) {
return -1;
} else if (val1 == val2 && val1 != Long.MIN_VALUE) {
return tieBreaker.applyAsInt(posToId[pos1], posToId[pos2]);
}
return 1;
}

private int compareIdPos(long val1, long val2, int id1, int pos2) {
if (val1 < val2) {
return -1;
} else if (val1 == val2 && val1 != Long.MIN_VALUE) {
return tieBreaker.applyAsInt(id1, posToId[pos2]);
}
return 1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public Iterator<SortableFeature> iterator(int shard, int shards) {
}
}

return LongMerger.mergeIterators(iterators);
return LongMerger.mergeIterators(iterators, SortableFeature.COMPARE_BYTES);
}

public int chunks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ default ParallelIterator parallelIterator(Stats stats, int threads) {
}
}
});
return new ParallelIterator(reader, LongMerger.mergeSuppliers(queues));
return new ParallelIterator(reader, LongMerger.mergeSuppliers(queues, SortableFeature.COMPARE_BYTES));
}

record ParallelIterator(Worker reader, @Override Iterator<SortableFeature> iterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

/**
* An item with a {@code long key} that can be used for sorting/grouping.
*
* <p>
* These items can be sorted or grouped by {@link FeatureSort}/{@link FeatureGroup} implementations. Sorted lists can
* also be merged using {@link LongMerger}.
*/
Expand Down
Loading

0 comments on commit 0dc2ee8

Please sign in to comment.