Skip to content

Commit

Permalink
geoparquet handling
Browse files Browse the repository at this point in the history
  • Loading branch information
msbarry committed May 19, 2024
1 parent 8f9b750 commit cf9190c
Show file tree
Hide file tree
Showing 38 changed files with 2,848 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.onthegomap.planetiler.benchmarks;

import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.config.Bounds;
import com.onthegomap.planetiler.reader.parquet.ParquetInputFile;
import java.nio.file.Path;

public class BenchmarkParquetRead {

public static void main(String[] args) {
var arguments = Arguments.fromArgs(args);
var path =
arguments.inputFile("parquet", "parquet file to read", Path.of("data", "sources", "locality.zstd.parquet"));
long c = 0;
var file = new ParquetInputFile("parquet", "locality", path, null, Bounds.WORLD, null, tags -> tags.get("id"));
for (int i = 0; i < 20; i++) {
long start = System.currentTimeMillis();
for (var block : file.get()) {
for (var item : block) {
c += item.tags().size();
}
}
System.err.println(System.currentTimeMillis() - start);
}

System.err.println(c);
}
}
22 changes: 17 additions & 5 deletions planetiler-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,24 @@
<artifactId>geopackage</artifactId>
<version>${geopackage.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
</exclusion>
</exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Pin transitive snappy dependency to more recent version without vulnerability -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.10.5</version>
</dependency>
<dependency>
<groupId>blue.strategic.parquet</groupId>
<artifactId>parquet-floor</artifactId>
<version>1.41</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import com.onthegomap.planetiler.reader.GeoPackageReader;
import com.onthegomap.planetiler.reader.NaturalEarthReader;
import com.onthegomap.planetiler.reader.ShapefileReader;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.reader.osm.OsmInputFile;
import com.onthegomap.planetiler.reader.osm.OsmNodeBoundsProvider;
import com.onthegomap.planetiler.reader.osm.OsmReader;
import com.onthegomap.planetiler.reader.parquet.ParquetReader;
import com.onthegomap.planetiler.stats.ProcessInfo;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.stats.Timers;
Expand All @@ -39,6 +41,7 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -469,6 +472,51 @@ public Planetiler addNaturalEarthSource(String name, Path defaultPath, String de
config, profile, stats, keepUnzipped)));
}


/**
* Adds a new <a href="https://github.com/opengeospatial/geoparquet">geoparquet</a> source that will be processed when
* {@link #run()} is called.
*
* @param name string to use in stats and logs to identify this stage
* @param pattern path to the geoparquet file to read, possibly including
* {@linkplain FileSystem#getPathMatcher(String) glob patterns}
* @param hivePartitioning Set to true to parse extra feature tags from the file path, for example
* {@code {them="buildings", type="part"}} from
* {@code base/theme=buildings/type=part/file.parquet}
* @param getId function that extracts a unique vector tile feature ID from each input feature, string or
* binary features will be hashed to a {@code long}.
* @param getLayer function that extracts {@link SourceFeature#getSourceLayer()} from the properties of each
* input feature
* @return this runner instance for chaining
* @see GeoPackageReader
*/
public Planetiler addParquetSource(String name, Path pattern, boolean hivePartitioning,
Function<Map<String, Object>, Object> getId, Function<Map<String, Object>, Object> getLayer) {
// TODO handle auto-downloading
Path path = getPath(name, "parquet", pattern, null, true);
return addStage(name, "Process features in " + path, ifSourceUsed(name, () -> {
var sourcePaths = FileUtils.walkPathWithPattern(path).stream().filter(Files::isRegularFile).toList();
new ParquetReader(name, profile, stats, getId, getLayer, hivePartitioning).process(sourcePaths, featureGroup,
config);
}));
}

/**
* Alias for {@link #addParquetSource(String, Path, boolean, Function, Function)} using the default layer and ID
* extractors.
*/
public Planetiler addParquetSource(String name, Path pattern, boolean hivePartitioning) {
return addParquetSource(name, pattern, hivePartitioning, null, null);
}

/**
* Alias for {@link #addParquetSource(String, Path, boolean, Function, Function)} without hive partitioning and using
* the default layer and ID extractors.
*/
public Planetiler addParquetSource(String name, Path pattern) {
return addParquetSource(name, pattern, false);
}

/**
* Adds a new stage that will be invoked when {@link #run()} is called.
*
Expand Down Expand Up @@ -770,7 +818,7 @@ public void run() throws Exception {
for (var inputPath : inputPaths) {
if (inputPath.freeAfterReading()) {
LOGGER.info("Deleting {} ({}) to make room for output file", inputPath.id, inputPath.path);
FileUtils.delete(inputPath.path());
inputPath.delete();
}
}

Expand Down Expand Up @@ -810,7 +858,7 @@ private void checkDiskSpace() {
// if the user opts to remove an input source after reading to free up additional space for the output...
for (var input : inputPaths) {
if (input.freeAfterReading()) {
writePhase.addDisk(input.path, -FileUtils.size(input.path), "delete " + input.id + " source after reading");
writePhase.addDisk(input.path, -input.size(), "delete " + input.id + " source after reading");
}
}

Expand Down Expand Up @@ -893,18 +941,23 @@ private RunnableThatThrows ifSourceUsed(String name, RunnableThatThrows task) {
}

private Path getPath(String name, String type, Path defaultPath, String defaultUrl) {
return getPath(name, type, defaultPath, defaultUrl);
}

private Path getPath(String name, String type, Path defaultPath, String defaultUrl, boolean wildcard) {
Path path = arguments.file(name + "_path", name + " " + type + " path", defaultPath);
boolean refresh =
arguments.getBoolean("refresh_" + name, "Download new version of " + name + " if changed", refreshSources);
boolean freeAfterReading = arguments.getBoolean("free_" + name + "_after_read",
"delete " + name + " input file after reading to make space for output (reduces peak disk usage)", false);
var inputPath = new InputPath(name, path, freeAfterReading, wildcard);
inputPaths.add(inputPath);
if (downloadSources || refresh) {
String url = arguments.getString(name + "_url", name + " " + type + " url", defaultUrl);
if ((!Files.exists(path) || refresh) && url != null) {
toDownload.add(new ToDownload(name, url, path));
if ((refresh || inputPath.isEmpty()) && url != null) {
toDownload.add(new ToDownload(name, url, path, wildcard));
}
}
inputPaths.add(new InputPath(name, path, freeAfterReading));
return path;
}

Expand All @@ -922,7 +975,7 @@ private void download() {

private void ensureInputFilesExist() {
for (InputPath inputPath : inputPaths) {
if (profile.caresAboutSource(inputPath.id) && !Files.exists(inputPath.path)) {
if (profile.caresAboutSource(inputPath.id) && inputPath.isEmpty()) {
throw new IllegalArgumentException(inputPath.path + " does not exist. Run with --download to fetch it");
}
}
Expand All @@ -935,7 +988,24 @@ private record Stage(String id, List<String> details, RunnableThatThrows task) {
}
}

private record ToDownload(String id, String url, Path path) {}
private record ToDownload(String id, String url, Path path, boolean wildcard) {}

private record InputPath(String id, Path path, boolean freeAfterReading, boolean wildcard) {

public boolean isEmpty() {
return wildcard ? FileUtils.walkPathWithPattern(path).isEmpty() : !Files.exists(path);
}

public long size() {
return wildcard ? FileUtils.size(FileUtils.getPatternBase(path)) : FileUtils.fileSize(path);
}

private record InputPath(String id, Path path, boolean freeAfterReading) {}
public void delete() {
if (wildcard) {
FileUtils.delete(FileUtils.getPatternBase(path));
} else {
FileUtils.delete(path);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import org.locationtech.jts.geom.LineSegment;
import org.locationtech.jts.geom.LineString;
import org.locationtech.jts.geom.LinearRing;
import org.locationtech.jts.geom.MultiLineString;
import org.locationtech.jts.geom.MultiPoint;
import org.locationtech.jts.geom.MultiPolygon;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
Expand All @@ -27,6 +29,7 @@
import org.locationtech.jts.geom.util.GeometryFixer;
import org.locationtech.jts.geom.util.GeometryTransformer;
import org.locationtech.jts.io.WKBReader;
import org.locationtech.jts.io.WKTReader;
import org.locationtech.jts.precision.GeometryPrecisionReducer;

/**
Expand All @@ -40,8 +43,9 @@ public class GeoUtils {
/** Rounding precision for 256x256px tiles encoded using 4096 values. */
public static final PrecisionModel TILE_PRECISION = new PrecisionModel(4096d / 256d);
public static final GeometryFactory JTS_FACTORY = new GeometryFactory(PackedCoordinateSequenceFactory.DOUBLE_FACTORY);
public static final WKBReader WKB_READER = new WKBReader(JTS_FACTORY);

public static final Geometry EMPTY_GEOMETRY = JTS_FACTORY.createGeometryCollection();
public static final CoordinateSequence EMPTY_COORDINATE_SEQUENCE = new PackedCoordinateSequence.Double(0, 2, 0);
public static final Point EMPTY_POINT = JTS_FACTORY.createPoint();
public static final LineString EMPTY_LINE = JTS_FACTORY.createLineString();
public static final Polygon EMPTY_POLYGON = JTS_FACTORY.createPolygon();
Expand Down Expand Up @@ -247,11 +251,11 @@ public static Point point(Coordinate coord) {
return JTS_FACTORY.createPoint(coord);
}

public static Geometry createMultiLineString(List<LineString> lineStrings) {
public static MultiLineString createMultiLineString(List<LineString> lineStrings) {
return JTS_FACTORY.createMultiLineString(lineStrings.toArray(EMPTY_LINE_STRING_ARRAY));
}

public static Geometry createMultiPolygon(List<Polygon> polygon) {
public static MultiPolygon createMultiPolygon(List<Polygon> polygon) {
return JTS_FACTORY.createMultiPolygon(polygon.toArray(EMPTY_POLYGON_ARRAY));
}

Expand Down Expand Up @@ -370,7 +374,7 @@ public static CoordinateSequence coordinateSequence(double... coords) {
return new PackedCoordinateSequence.Double(coords, 2, 0);
}

public static Geometry createMultiPoint(List<Point> points) {
public static MultiPoint createMultiPoint(List<Point> points) {
return JTS_FACTORY.createMultiPoint(points.toArray(EMPTY_POINT_ARRAY));
}

Expand Down Expand Up @@ -548,6 +552,14 @@ public static int minZoomForPixelSize(double worldGeometrySize, double minPixelS
PlanetilerConfig.MAX_MAXZOOM);
}

public static WKBReader wkbReader() {
return new WKBReader(JTS_FACTORY);
}

public static WKTReader wktReader() {
return new WKTReader(JTS_FACTORY);
}

/** Helper class to sort polygons by area of their outer shell. */
private record PolyAndArea(Polygon poly, double area) implements Comparable<PolyAndArea> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,15 @@ public void readFeatures(Consumer<SimpleFeature> next) throws Exception {
}
}
if (geometryColumn >= 0) {
var wkbReader = GeoUtils.wkbReader();
while (rs.next()) {
byte[] geometry = rs.getBytes(geometryColumn + 1);
if (geometry == null) {
continue;
}

// create the feature and pass to next stage
Geometry latLonGeometry = GeoUtils.WKB_READER.read(geometry);
Geometry latLonGeometry = wkbReader.read(geometry);
SimpleFeature readerGeometry = SimpleFeature.create(latLonGeometry, HashMap.newHashMap(column.length - 1),
sourceName, table, ++id);
for (int c = 0; c < column.length; c++) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.onthegomap.planetiler.reader.parquet;

import com.onthegomap.planetiler.geo.GeoUtils;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.locationtech.jts.geom.CoordinateSequence;
import org.locationtech.jts.geom.LineString;
import org.locationtech.jts.geom.LinearRing;
import org.locationtech.jts.geom.MultiLineString;
import org.locationtech.jts.geom.MultiPoint;
import org.locationtech.jts.geom.MultiPolygon;
import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.geom.impl.PackedCoordinateSequence;

/**
* Utilities for converting nested <a href=
* "https://github.com/opengeospatial/geoparquet/blob/main/format-specs/geoparquet.md#native-encodings-based-on-geoarrow">geoarrow</a>
* coordinate lists to JTS geometries.
*/
class GeoArrow {
// TODO create packed coordinate arrays while reading parquet values to avoid creating so many intermediate objects
static MultiPolygon multipolygon(List<List<List<Object>>> list) {
return GeoUtils.createMultiPolygon(map(list, GeoArrow::polygon));
}

static Polygon polygon(List<List<Object>> input) {
return GeoUtils.createPolygon(ring(input.getFirst()), input.stream().skip(1).map(GeoArrow::ring).toList());
}

static MultiPoint multipoint(List<Object> input) {
return GeoUtils.createMultiPoint(map(input, GeoArrow::point));
}

static Point point(Object input) {
int dims = input instanceof List<?> l ? l.size() : input instanceof Map<?, ?> m ? m.size() : 0;
CoordinateSequence result =
new PackedCoordinateSequence.Double(1, dims, dims == 4 ? 1 : 0);
coordinate(input, result, 0);
return GeoUtils.JTS_FACTORY.createPoint(result);
}

static MultiLineString multilinestring(List<List<Object>> input) {
return GeoUtils.createMultiLineString(map(input, GeoArrow::linestring));
}

static LineString linestring(List<Object> input) {
return GeoUtils.JTS_FACTORY.createLineString(coordinateSequence(input));
}


private static CoordinateSequence coordinateSequence(List<Object> input) {
if (input.isEmpty()) {
return GeoUtils.EMPTY_COORDINATE_SEQUENCE;
}
Object first = input.getFirst();
int dims = first instanceof List<?> l ? l.size() : first instanceof Map<?, ?> m ? m.size() : 0;
CoordinateSequence result =
new PackedCoordinateSequence.Double(input.size(), dims, dims == 4 ? 1 : 0);
for (int i = 0; i < input.size(); i++) {
Object item = input.get(i);
coordinate(item, result, i);
}
return result;
}

private static LinearRing ring(List<Object> input) {
return GeoUtils.JTS_FACTORY.createLinearRing(coordinateSequence(input));
}

private static void coordinate(Object input, CoordinateSequence result, int index) {
switch (input) {
case List<?> list -> {
List<Number> l = (List<Number>) list;
for (int i = 0; i < l.size(); i++) {
result.setOrdinate(index, i, l.get(i).doubleValue());
}
}
case Map<?, ?> map -> {
Map<String, Number> m = (Map<String, Number>) map;

for (var entry : m.entrySet()) {
int ordinateIndex = switch (entry.getKey()) {
case "x" -> 0;
case "y" -> 1;
case "z" -> 2;
case "m" -> 3;
case null, default -> throw new IllegalArgumentException("Bad coordinate key: " + entry.getKey());
};
result.setOrdinate(index, ordinateIndex, entry.getValue().doubleValue());
}
}
default -> throw new IllegalArgumentException("Expecting map or list, got: " + input);
}
}

private static <I, O> List<O> map(List<I> in, Function<I, O> remap) {
return in.stream().map(remap).toList();
}

}
Loading

0 comments on commit cf9190c

Please sign in to comment.