diff --git a/NOTICE.md b/NOTICE.md index 25f81916d1..9a0546ed2e 100644 --- a/NOTICE.md +++ b/NOTICE.md @@ -18,6 +18,7 @@ The `planetiler-core` module includes the following software: , [EPSG](https://github.com/geotools/geotools/blob/main/licenses/EPSG.md)) - org.msgpack:msgpack-core (Apache license) - org.xerial:sqlite-jdbc (Apache license) + - org.xerial.snappy:snappy-java (Apache license) - com.ibm.icu:icu4j ([ICU license](https://github.com/unicode-org/icu/blob/main/icu4c/LICENSE)) - com.google.guava:guava (Apache license) - com.google.protobuf:protobuf-java (BSD 3-Clause License) @@ -29,6 +30,7 @@ The `planetiler-core` module includes the following software: - org.snakeyaml:snakeyaml-engine (Apache license) - org.commonmark:commonmark (BSD 2-clause license) - org.tukaani:xz (public domain) + - blue.strategic.parquet:parquet-floor (Apache license) - Adapted code: - `DouglasPeuckerSimplifier` from [JTS](https://github.com/locationtech/jts) (EDL) - `OsmMultipolygon` from [imposm3](https://github.com/omniscale/imposm3) (Apache license) @@ -65,4 +67,5 @@ The `planetiler-core` module includes the following software: | OSM Lakelines | [MIT](https://github.com/lukasmartinelli/osm-lakelines), data from OSM [ODBL](https://www.openstreetmap.org/copyright) | yes | no | | OSM Water Polygons | [acknowledgement](https://osmdata.openstreetmap.de/info/license.html), data from OSM [ODBL](https://www.openstreetmap.org/copyright) | yes | yes | | Wikidata name translations | [CCO](https://www.wikidata.org/wiki/Wikidata:Licensing) | no | no | +| Overture Maps | [Various](https://docs.overturemaps.org/attribution) | no | yes | diff --git a/README.md b/README.md index 0eb3e7f5c7..993284ecf8 100644 --- a/README.md +++ b/README.md @@ -334,6 +334,8 @@ Planetiler is made possible by these awesome open source projects: Google's [Common Expression Language](https://github.com/google/cel-spec) that powers dynamic expressions embedded in schema config files. - [PMTiles](https://github.com/protomaps/PMTiles) optimized tile storage format +- [Apache Parquet](https://github.com/apache/parquet-mr) to support reading geoparquet files in java (with dependencies + minimized by [parquet-floor](https://github.com/strategicblue/parquet-floor)) See [NOTICE.md](NOTICE.md) for a full list and license details. diff --git a/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/benchmarks/BenchmarkParquetRead.java b/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/benchmarks/BenchmarkParquetRead.java new file mode 100644 index 0000000000..67fd05c571 --- /dev/null +++ b/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/benchmarks/BenchmarkParquetRead.java @@ -0,0 +1,28 @@ +package com.onthegomap.planetiler.benchmarks; + +import com.onthegomap.planetiler.config.Arguments; +import com.onthegomap.planetiler.config.Bounds; +import com.onthegomap.planetiler.reader.parquet.ParquetInputFile; +import java.nio.file.Path; + +public class BenchmarkParquetRead { + + public static void main(String[] args) { + var arguments = Arguments.fromArgs(args); + var path = + arguments.inputFile("parquet", "parquet file to read", Path.of("data", "sources", "locality.zstd.parquet")); + long c = 0; + var file = new ParquetInputFile("parquet", "locality", path, null, Bounds.WORLD, null, tags -> tags.get("id")); + for (int i = 0; i < 20; i++) { + long start = System.currentTimeMillis(); + for (var block : file.get()) { + for (var item : block) { + c += item.tags().size(); + } + } + System.err.println(System.currentTimeMillis() - start); + } + + System.err.println(c); + } +} diff --git a/planetiler-core/pom.xml b/planetiler-core/pom.xml index cd5514da67..fb69511d09 100644 --- a/planetiler-core/pom.xml +++ b/planetiler-core/pom.xml @@ -154,12 +154,24 @@ geopackage ${geopackage.version} - - org.slf4j - slf4j-nop - - + + org.slf4j + slf4j-nop + + + + + org.xerial.snappy + snappy-java + 1.1.10.5 + + + blue.strategic.parquet + parquet-floor + 1.41 + + diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java index 7ad554095c..59371024a5 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java @@ -13,9 +13,11 @@ import com.onthegomap.planetiler.reader.GeoPackageReader; import com.onthegomap.planetiler.reader.NaturalEarthReader; import com.onthegomap.planetiler.reader.ShapefileReader; +import com.onthegomap.planetiler.reader.SourceFeature; import com.onthegomap.planetiler.reader.osm.OsmInputFile; import com.onthegomap.planetiler.reader.osm.OsmNodeBoundsProvider; import com.onthegomap.planetiler.reader.osm.OsmReader; +import com.onthegomap.planetiler.reader.parquet.ParquetReader; import com.onthegomap.planetiler.stats.ProcessInfo; import com.onthegomap.planetiler.stats.Stats; import com.onthegomap.planetiler.stats.Timers; @@ -39,9 +41,12 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.Function; +import java.util.regex.Pattern; import java.util.stream.IntStream; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -469,6 +474,53 @@ public Planetiler addNaturalEarthSource(String name, Path defaultPath, String de config, profile, stats, keepUnzipped))); } + + /** + * Adds a new geoparquet source that will be processed when + * {@link #run()} is called. + * + * @param name string to use in stats and logs to identify this stage + * @param paths paths to the geoparquet files to read. + * @param hivePartitioning Set to true to parse extra feature tags from the file path, for example + * {@code {them="buildings", type="part"}} from + * {@code base/theme=buildings/type=part/file.parquet} + * @param getId function that extracts a unique vector tile feature ID from each input feature, string or + * binary features will be hashed to a {@code long}. + * @param getLayer function that extracts {@link SourceFeature#getSourceLayer()} from the properties of each + * input feature + * @return this runner instance for chaining + * @see GeoPackageReader + */ + public Planetiler addParquetSource(String name, List paths, boolean hivePartitioning, + Function, Object> getId, Function, Object> getLayer) { + // TODO handle auto-downloading + for (var path : paths) { + inputPaths.add(new InputPath(name, path, false)); + } + var separator = Pattern.quote(paths.isEmpty() ? "/" : paths.getFirst().getFileSystem().getSeparator()); + String prefix = StringUtils.getCommonPrefix(paths.stream().map(Path::toString).toArray(String[]::new)) + .replaceAll(separator + "[^" + separator + "]*$", ""); + return addStage(name, "Process features in " + (prefix.isEmpty() ? (paths.size() + " files") : prefix), + ifSourceUsed(name, () -> new ParquetReader(name, profile, stats, getId, getLayer, hivePartitioning) + .process(paths, featureGroup, config))); + } + + /** + * Alias for {@link #addParquetSource(String, List, boolean, Function, Function)} using the default layer and ID + * extractors. + */ + public Planetiler addParquetSource(String name, List paths, boolean hivePartitioning) { + return addParquetSource(name, paths, hivePartitioning, null, null); + } + + /** + * Alias for {@link #addParquetSource(String, List, boolean, Function, Function)} without hive partitioning and using + * the default layer and ID extractors. + */ + public Planetiler addParquetSource(String name, List paths) { + return addParquetSource(name, paths, false); + } + /** * Adds a new stage that will be invoked when {@link #run()} is called. * diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/geo/GeoUtils.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/geo/GeoUtils.java index f6d0aceb7b..540933b4b8 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/geo/GeoUtils.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/geo/GeoUtils.java @@ -17,6 +17,8 @@ import org.locationtech.jts.geom.LineSegment; import org.locationtech.jts.geom.LineString; import org.locationtech.jts.geom.LinearRing; +import org.locationtech.jts.geom.MultiLineString; +import org.locationtech.jts.geom.MultiPoint; import org.locationtech.jts.geom.MultiPolygon; import org.locationtech.jts.geom.Point; import org.locationtech.jts.geom.Polygon; @@ -27,6 +29,7 @@ import org.locationtech.jts.geom.util.GeometryFixer; import org.locationtech.jts.geom.util.GeometryTransformer; import org.locationtech.jts.io.WKBReader; +import org.locationtech.jts.io.WKTReader; import org.locationtech.jts.precision.GeometryPrecisionReducer; /** @@ -40,8 +43,9 @@ public class GeoUtils { /** Rounding precision for 256x256px tiles encoded using 4096 values. */ public static final PrecisionModel TILE_PRECISION = new PrecisionModel(4096d / 256d); public static final GeometryFactory JTS_FACTORY = new GeometryFactory(PackedCoordinateSequenceFactory.DOUBLE_FACTORY); - public static final WKBReader WKB_READER = new WKBReader(JTS_FACTORY); + public static final Geometry EMPTY_GEOMETRY = JTS_FACTORY.createGeometryCollection(); + public static final CoordinateSequence EMPTY_COORDINATE_SEQUENCE = new PackedCoordinateSequence.Double(0, 2, 0); public static final Point EMPTY_POINT = JTS_FACTORY.createPoint(); public static final LineString EMPTY_LINE = JTS_FACTORY.createLineString(); public static final Polygon EMPTY_POLYGON = JTS_FACTORY.createPolygon(); @@ -247,11 +251,11 @@ public static Point point(Coordinate coord) { return JTS_FACTORY.createPoint(coord); } - public static Geometry createMultiLineString(List lineStrings) { + public static MultiLineString createMultiLineString(List lineStrings) { return JTS_FACTORY.createMultiLineString(lineStrings.toArray(EMPTY_LINE_STRING_ARRAY)); } - public static Geometry createMultiPolygon(List polygon) { + public static MultiPolygon createMultiPolygon(List polygon) { return JTS_FACTORY.createMultiPolygon(polygon.toArray(EMPTY_POLYGON_ARRAY)); } @@ -370,7 +374,7 @@ public static CoordinateSequence coordinateSequence(double... coords) { return new PackedCoordinateSequence.Double(coords, 2, 0); } - public static Geometry createMultiPoint(List points) { + public static MultiPoint createMultiPoint(List points) { return JTS_FACTORY.createMultiPoint(points.toArray(EMPTY_POINT_ARRAY)); } @@ -548,6 +552,14 @@ public static int minZoomForPixelSize(double worldGeometrySize, double minPixelS PlanetilerConfig.MAX_MAXZOOM); } + public static WKBReader wkbReader() { + return new WKBReader(JTS_FACTORY); + } + + public static WKTReader wktReader() { + return new WKTReader(JTS_FACTORY); + } + /** Helper class to sort polygons by area of their outer shell. */ private record PolyAndArea(Polygon poly, double area) implements Comparable { diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/NaturalEarthReader.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/NaturalEarthReader.java index 2711fdb243..d2d4892610 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/NaturalEarthReader.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/NaturalEarthReader.java @@ -172,6 +172,7 @@ public void readFeatures(Consumer next) throws Exception { } } if (geometryColumn >= 0) { + var wkbReader = GeoUtils.wkbReader(); while (rs.next()) { byte[] geometry = rs.getBytes(geometryColumn + 1); if (geometry == null) { @@ -179,7 +180,7 @@ public void readFeatures(Consumer next) throws Exception { } // create the feature and pass to next stage - Geometry latLonGeometry = GeoUtils.WKB_READER.read(geometry); + Geometry latLonGeometry = wkbReader.read(geometry); SimpleFeature readerGeometry = SimpleFeature.create(latLonGeometry, HashMap.newHashMap(column.length - 1), sourceName, table, ++id); for (int c = 0; c < column.length; c++) { diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoArrow.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoArrow.java new file mode 100644 index 0000000000..32b404ca74 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoArrow.java @@ -0,0 +1,104 @@ +package com.onthegomap.planetiler.reader.parquet; + +import com.onthegomap.planetiler.geo.GeoUtils; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.locationtech.jts.geom.CoordinateSequence; +import org.locationtech.jts.geom.LineString; +import org.locationtech.jts.geom.LinearRing; +import org.locationtech.jts.geom.MultiLineString; +import org.locationtech.jts.geom.MultiPoint; +import org.locationtech.jts.geom.MultiPolygon; +import org.locationtech.jts.geom.Point; +import org.locationtech.jts.geom.Polygon; +import org.locationtech.jts.geom.impl.PackedCoordinateSequence; + +/** + * Utilities for converting nested geoarrow + * coordinate lists to JTS geometries. + */ +class GeoArrow { + private GeoArrow() {} + + // TODO create packed coordinate arrays while reading parquet values to avoid creating so many intermediate objects + static MultiPolygon multipolygon(List>> list) { + return GeoUtils.createMultiPolygon(map(list, GeoArrow::polygon)); + } + + static Polygon polygon(List> input) { + return GeoUtils.createPolygon(ring(input.getFirst()), input.stream().skip(1).map(GeoArrow::ring).toList()); + } + + static MultiPoint multipoint(List input) { + return GeoUtils.createMultiPoint(map(input, GeoArrow::point)); + } + + static Point point(Object input) { + int dims = input instanceof List l ? l.size() : input instanceof Map m ? m.size() : 0; + CoordinateSequence result = + new PackedCoordinateSequence.Double(1, dims, dims == 4 ? 1 : 0); + coordinate(input, result, 0); + return GeoUtils.JTS_FACTORY.createPoint(result); + } + + static MultiLineString multilinestring(List> input) { + return GeoUtils.createMultiLineString(map(input, GeoArrow::linestring)); + } + + static LineString linestring(List input) { + return GeoUtils.JTS_FACTORY.createLineString(coordinateSequence(input)); + } + + + private static CoordinateSequence coordinateSequence(List input) { + if (input.isEmpty()) { + return GeoUtils.EMPTY_COORDINATE_SEQUENCE; + } + Object first = input.getFirst(); + int dims = first instanceof List l ? l.size() : first instanceof Map m ? m.size() : 0; + CoordinateSequence result = + new PackedCoordinateSequence.Double(input.size(), dims, dims == 4 ? 1 : 0); + for (int i = 0; i < input.size(); i++) { + Object item = input.get(i); + coordinate(item, result, i); + } + return result; + } + + private static LinearRing ring(List input) { + return GeoUtils.JTS_FACTORY.createLinearRing(coordinateSequence(input)); + } + + private static void coordinate(Object input, CoordinateSequence result, int index) { + switch (input) { + case List list -> { + List l = (List) list; + for (int i = 0; i < l.size(); i++) { + result.setOrdinate(index, i, l.get(i).doubleValue()); + } + } + case Map map -> { + Map m = (Map) map; + + for (var entry : m.entrySet()) { + int ordinateIndex = switch (entry.getKey()) { + case "x" -> 0; + case "y" -> 1; + case "z" -> 2; + case "m" -> 3; + case null, default -> throw new IllegalArgumentException("Bad coordinate key: " + entry.getKey()); + }; + result.setOrdinate(index, ordinateIndex, entry.getValue().doubleValue()); + } + } + default -> throw new IllegalArgumentException("Expecting map or list, got: " + input); + } + } + + private static List map(List in, Function remap) { + return in.stream().map(remap).toList(); + } + +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoParquetMetadata.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoParquetMetadata.java new file mode 100644 index 0000000000..bbf4dab036 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoParquetMetadata.java @@ -0,0 +1,200 @@ +package com.onthegomap.planetiler.reader.parquet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import com.onthegomap.planetiler.config.Bounds; +import com.onthegomap.planetiler.geo.GeoUtils; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.BiFunction; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Filters; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.locationtech.jts.geom.Envelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Struct for deserializing a + * geoparquet + * metadata json string into. + */ +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +public record GeoParquetMetadata( + String version, + String primaryColumn, + Map columns +) { + + private static final Logger LOGGER = LoggerFactory.getLogger(GeoParquetMetadata.class); + + private static final ObjectMapper mapper = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + public record CoveringBbox( + List xmin, + List ymin, + List xmax, + List ymax + ) {} + + public record Covering( + CoveringBbox bbox + ) {} + + @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) + public record ColumnMetadata( + String encoding, + List geometryTypes, + Object crs, + String orientation, + String edges, + List bbox, + Double epoch, + Covering covering + ) { + ColumnMetadata(String encoding) { + this(encoding, List.of()); + } + + ColumnMetadata(String encoding, List geometryTypes) { + this(encoding, geometryTypes, null, null, null, null, null, null); + } + + public Envelope envelope() { + return (bbox == null || bbox.size() != 4) ? GeoUtils.WORLD_LAT_LON_BOUNDS : + new Envelope(bbox.get(0), bbox.get(2), bbox.get(1), bbox.get(3)); + } + + /** + * Returns a parquet filter that filters records read to only those where the covering bbox overlaps {@code bounds} + * or null if unable to infer that from the metadata. + *

+ * If covering bbox metadata is missing from geoparquet metadata, it will try to use bbox.xmin, bbox.xmax, + * bbox.ymin, and bbox.ymax if present. + */ + public FilterPredicate bboxFilter(MessageType schema, Bounds bounds) { + if (!bounds.isWorld()) { + var covering = covering(); + // if covering metadata missing, use default bbox:{xmin,xmax,ymin,ymax} + if (covering == null) { + if (hasNumericField(schema, "bbox.xmin") && + hasNumericField(schema, "bbox.xmax") && + hasNumericField(schema, "bbox.ymin") && + hasNumericField(schema, "bbox.ymax")) { + covering = new GeoParquetMetadata.Covering(new GeoParquetMetadata.CoveringBbox( + List.of("bbox.xmin"), + List.of("bbox.ymin"), + List.of("bbox.xmax"), + List.of("bbox.ymax") + )); + } else if (hasNumericField(schema, "bbox", "xmin") && + hasNumericField(schema, "bbox", "xmax") && + hasNumericField(schema, "bbox", "ymin") && + hasNumericField(schema, "bbox", "ymax")) { + covering = new GeoParquetMetadata.Covering(new GeoParquetMetadata.CoveringBbox( + List.of("bbox", "xmin"), + List.of("bbox", "ymin"), + List.of("bbox", "xmax"), + List.of("bbox", "ymax") + )); + } + } + if (covering != null) { + var latLonBounds = bounds.latLon(); + // TODO apply projection + var coveringBbox = covering.bbox(); + var coordinateType = + schema.getColumnDescription(coveringBbox.xmax().toArray(String[]::new)) + .getPrimitiveType() + .getPrimitiveTypeName(); + BiFunction, Number, FilterPredicate> gtEq = switch (coordinateType) { + case DOUBLE -> (p, v) -> FilterApi.gtEq(Filters.doubleColumn(p), v.doubleValue()); + case FLOAT -> (p, v) -> FilterApi.gtEq(Filters.floatColumn(p), v.floatValue()); + default -> throw new UnsupportedOperationException(); + }; + BiFunction, Number, FilterPredicate> ltEq = switch (coordinateType) { + case DOUBLE -> (p, v) -> FilterApi.ltEq(Filters.doubleColumn(p), v.doubleValue()); + case FLOAT -> (p, v) -> FilterApi.ltEq(Filters.floatColumn(p), v.floatValue()); + default -> throw new UnsupportedOperationException(); + }; + return FilterApi.and( + FilterApi.and( + gtEq.apply(coveringBbox.xmax(), latLonBounds.getMinX()), + ltEq.apply(coveringBbox.xmin(), latLonBounds.getMaxX()) + ), + FilterApi.and( + gtEq.apply(coveringBbox.ymax(), latLonBounds.getMinY()), + ltEq.apply(coveringBbox.ymin(), latLonBounds.getMaxY()) + ) + ); + } + } + return null; + } + } + + public ColumnMetadata primaryColumnMetadata() { + return Objects.requireNonNull(columns.get(primaryColumn), + "No geoparquet metadata for primary column " + primaryColumn); + } + + + /** + * Extracts geoparquet metadata from the {@code "geo"} key value metadata field for the file, or tries to generate a + * default one if missing that uses geometry, wkb_geometry, or wkt_geometry column. + */ + public static GeoParquetMetadata parse(FileMetaData metadata) throws IOException { + String string = metadata.getKeyValueMetaData().get("geo"); + if (string != null) { + try { + return mapper.readValue(string, GeoParquetMetadata.class); + } catch (JsonProcessingException e) { + LOGGER.warn("Invalid geoparquet metadata", e); + } + } + // fallback + for (var field : metadata.getSchema().asGroupType().getFields()) { + if (field.isPrimitive() && + field.asPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) { + switch (field.getName()) { + case "geometry", "wkb_geometry" -> { + return new GeoParquetMetadata("1.0.0", field.getName(), Map.of( + field.getName(), new ColumnMetadata("WKB"))); + } + case "wkt_geometry" -> { + return new GeoParquetMetadata("1.0.0", field.getName(), Map.of( + field.getName(), new ColumnMetadata("WKT"))); + } + default -> { + //ignore + } + } + } + } + throw new IOException( + "No valid geometry columns found: " + metadata.getSchema().asGroupType().getFields().stream().map( + Type::getName).toList()); + } + + private static boolean hasNumericField(MessageType root, String... path) { + if (root.containsPath(path)) { + var type = root.getType(path); + if (!type.isPrimitive()) { + return false; + } + var typeName = type.asPrimitiveType().getPrimitiveTypeName(); + return typeName == PrimitiveType.PrimitiveTypeName.DOUBLE || typeName == PrimitiveType.PrimitiveTypeName.FLOAT; + } + return false; + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeometryReader.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeometryReader.java new file mode 100644 index 0000000000..67deed9ced --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeometryReader.java @@ -0,0 +1,63 @@ +package com.onthegomap.planetiler.reader.parquet; + +import com.onthegomap.planetiler.geo.GeoUtils; +import com.onthegomap.planetiler.geo.GeometryException; +import com.onthegomap.planetiler.reader.WithTags; +import com.onthegomap.planetiler.util.FunctionThatThrows; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.locationtech.jts.geom.Geometry; + +/** + * Decodes geometries from a parquet record based on the {@link GeoParquetMetadata} provided. + */ +class GeometryReader { + private final Map> converters = new HashMap<>(); + private final String geometryColumn; + + GeometryReader(GeoParquetMetadata geoparquet) { + this.geometryColumn = geoparquet.primaryColumn(); + for (var entry : geoparquet.columns().entrySet()) { + String column = entry.getKey(); + GeoParquetMetadata.ColumnMetadata columnInfo = entry.getValue(); + FunctionThatThrows converter = switch (columnInfo.encoding()) { + case "WKB" -> obj -> obj instanceof byte[] bytes ? GeoUtils.wkbReader().read(bytes) : null; + case "WKT" -> obj -> obj instanceof String string ? GeoUtils.wktReader().read(string) : null; + case "multipolygon", "geoarrow.multipolygon" -> + obj -> obj instanceof List list ? GeoArrow.multipolygon((List>>) list) : null; + case "polygon", "geoarrow.polygon" -> + obj -> obj instanceof List list ? GeoArrow.polygon((List>) list) : null; + case "multilinestring", "geoarrow.multilinestring" -> + obj -> obj instanceof List list ? GeoArrow.multilinestring((List>) list) : null; + case "linestring", "geoarrow.linestring" -> + obj -> obj instanceof List list ? GeoArrow.linestring((List) list) : null; + case "multipoint", "geoarrow.multipoint" -> + obj -> obj instanceof List list ? GeoArrow.multipoint((List) list) : null; + case "point", "geoarrow.point" -> GeoArrow::point; + default -> throw new IllegalArgumentException("Unhandled type: " + columnInfo.encoding()); + }; + + converters.put(column, converter); + } + } + + Geometry readPrimaryGeometry(WithTags tags) throws GeometryException { + return readGeometry(tags, geometryColumn); + } + + Geometry readGeometry(WithTags tags, String column) throws GeometryException { + var value = tags.getTag(column); + var converter = converters.get(column); + if (value == null) { + throw new GeometryException("no_parquet_column", "Missing geometry column column " + column); + } else if (converter == null) { + throw new GeometryException("no_converter", "No geometry converter for " + column); + } + try { + return converter.apply(value); + } catch (Exception e) { + throw new GeometryException("error_reading", "Error reading " + column, e); + } + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/Interval.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/Interval.java new file mode 100644 index 0000000000..f1cb4d464b --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/Interval.java @@ -0,0 +1,42 @@ +package com.onthegomap.planetiler.reader.parquet; + +import java.time.Duration; +import java.time.Period; +import java.time.temporal.Temporal; +import java.time.temporal.TemporalAmount; +import java.time.temporal.TemporalUnit; +import java.util.List; +import java.util.stream.Stream; + +/** + * Represents a parquet + * interval datatype which has a month, day, and millisecond part. + *

+ * Built-in java {@link TemporalAmount} implementations can only store a period or duration amount, but not both. + */ +public record Interval(Period period, Duration duration) implements TemporalAmount { + + public static Interval of(int months, long days, long millis) { + return new Interval(Period.ofMonths(months).plusDays(days), Duration.ofMillis(millis)); + } + + @Override + public long get(TemporalUnit unit) { + return period.get(unit) + duration.get(unit); + } + + @Override + public List getUnits() { + return Stream.concat(period.getUnits().stream(), duration.getUnits().stream()).toList(); + } + + @Override + public Temporal addTo(Temporal temporal) { + return temporal.plus(period).plus(duration); + } + + @Override + public Temporal subtractFrom(Temporal temporal) { + return temporal.minus(period).minus(duration); + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetFeature.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetFeature.java new file mode 100644 index 0000000000..6cbc826244 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetFeature.java @@ -0,0 +1,77 @@ +package com.onthegomap.planetiler.reader.parquet; + +import com.onthegomap.planetiler.geo.GeoUtils; +import com.onthegomap.planetiler.geo.GeometryException; +import com.onthegomap.planetiler.reader.SourceFeature; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.Lineal; +import org.locationtech.jts.geom.Polygonal; +import org.locationtech.jts.geom.Puntal; + +/** + * A single record read from a geoparquet file. + */ +public class ParquetFeature extends SourceFeature { + + private final GeometryReader geometryParser; + private final Path filename; + private Geometry latLon; + private Geometry world; + + ParquetFeature(String source, String sourceLayer, Path filename, long id, GeometryReader geometryParser, + Map tags) { + super(tags, source, sourceLayer, List.of(), id); + this.geometryParser = geometryParser; + this.filename = filename; + } + + public Path getFilename() { + return filename; + } + + @Override + public Geometry latLonGeometry() throws GeometryException { + return latLon == null ? latLon = geometryParser.readPrimaryGeometry(this) : latLon; + } + + @Override + public Geometry worldGeometry() throws GeometryException { + return world != null ? world : + (world = GeoUtils.sortPolygonsByAreaDescending(GeoUtils.latLonToWorldCoords(latLonGeometry()))); + } + + @Override + public boolean isPoint() { + try { + return latLonGeometry() instanceof Puntal; + } catch (GeometryException e) { + throw new IllegalStateException(e); + } + } + + @Override + public boolean canBePolygon() { + try { + return latLonGeometry() instanceof Polygonal; + } catch (GeometryException e) { + throw new IllegalStateException(e); + } + } + + @Override + public boolean canBeLine() { + try { + return latLonGeometry() instanceof Lineal; + } catch (GeometryException e) { + throw new IllegalStateException(e); + } + } + + @Override + public String toString() { + return tags().toString(); + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFile.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFile.java new file mode 100644 index 0000000000..d6b16b42e2 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFile.java @@ -0,0 +1,233 @@ +package com.onthegomap.planetiler.reader.parquet; + +import blue.strategic.parquet.ParquetReader; +import com.google.common.collect.Iterators; +import com.onthegomap.planetiler.config.Bounds; +import com.onthegomap.planetiler.geo.GeometryException; +import com.onthegomap.planetiler.reader.SourceFeature; +import com.onthegomap.planetiler.util.Hashing; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.Function; +import java.util.function.ToLongFunction; +import java.util.stream.IntStream; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.MessageColumnIO; +import org.locationtech.jts.geom.Envelope; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reads {@link SourceFeature SourceFeatures} from a single + * geoparquet file. + */ +public class ParquetInputFile { + + private static final Logger LOGGER = LoggerFactory.getLogger(ParquetInputFile.class); + private final ParquetMetadata metadata; + private final InputFile inputFile; + private final Path path; + private final FilterCompat.Filter filter; + private final String source; + private final ToLongFunction> idGenerator; + private final String layer; + private final long count; + private final int blockCount; + private final GeometryReader geometryReader; + private final Map extraFields; + private Envelope postFilterBounds = null; + private boolean outOfBounds = false; + + public ParquetInputFile(String source, String layer, Path path) { + this(source, layer, path, null, Bounds.WORLD, null, null); + } + + public ParquetInputFile(String source, String layer, Path path, FilterPredicate filter, Bounds bounds, + Map extraFields, Function, Object> idGenerator) { + this.idGenerator = idGenerator == null ? null : map -> hashToLong(idGenerator.apply(map)); + this.layer = layer; + this.source = source; + this.path = path; + inputFile = ParquetReader.makeInputFile(path.toFile()); + this.extraFields = extraFields; + try (var file = open()) { + metadata = file.getFooter(); + var fileMetadata = metadata.getFileMetaData(); + var geoparquet = GeoParquetMetadata.parse(fileMetadata); + this.geometryReader = new GeometryReader(geoparquet); + if (!bounds.isWorld()) { + if (!geoparquet.primaryColumnMetadata().envelope().intersects(bounds.latLon())) { + outOfBounds = true; + } else { + var bboxFilter = geoparquet.primaryColumnMetadata().bboxFilter(fileMetadata.getSchema(), bounds); + if (bboxFilter != null) { + filter = filter == null ? bboxFilter : FilterApi.and(filter, bboxFilter); + } else { + LOGGER.warn("No covering column specified in geoparquet metadata, fall back to post-filtering"); + postFilterBounds = bounds.latLon(); + } + } + } + count = outOfBounds ? 0 : file.getFilteredRecordCount(); + blockCount = outOfBounds ? 0 : metadata.getBlocks().size(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + this.filter = filter == null ? FilterCompat.NOOP : FilterCompat.get(filter); + } + + private static long hashToLong(Object o) { + return switch (o) { + case String s -> Hashing.fnv1a64(s.getBytes(StandardCharsets.UTF_8)); + case byte[] bs -> Hashing.fnv1a64(bs); + case Integer i -> i; + case Long l -> l; + case Float f -> Float.floatToIntBits(f); + case Double d -> Double.doubleToLongBits(d); + case null -> 0; + default -> Hashing.fnv1a64(o.toString().getBytes(StandardCharsets.UTF_8)); + }; + } + + public boolean hasFilter() { + return FilterCompat.isFilteringRequired(filter); + } + + public boolean isOutOfBounds() { + return outOfBounds; + } + + public BlockReader get() { + if (outOfBounds) { + return Collections::emptyIterator; + } + long fileHash = Hashing.fnv1a64(path.toString().getBytes(StandardCharsets.UTF_8)); + var schema = metadata.getFileMetaData().getSchema(); + var columnIOFactory = new ColumnIOFactory(metadata.getFileMetaData().getCreatedBy(), false); + return () -> IntStream.range(0, metadata.getBlocks().size()).mapToObj(blockIndex -> { + long blockHash = Hashing.fnv1a64(fileHash, ByteBuffer.allocate(4).putInt(blockIndex).array()); + // happens in reader thread + return (Block) new Block() { + @Override + public Path getFileName() { + return path; + } + + @Override + public String layer() { + return layer; + } + + @Override + public Iterator iterator() { + PageReadStore group; + try (var reader = open()) { + group = reader.readFilteredRowGroup(blockIndex); + if (group == null) { + return Collections.emptyIterator(); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + MessageColumnIO columnIO = columnIOFactory.getColumnIO(schema); + var recordReader = columnIO.getRecordReader(group, new ParquetRecordConverter(schema), filter); + long total = group.getRowCount(); + return Iterators.filter(new Iterator<>() { + long i = 0; + + @Override + public boolean hasNext() { + return i < total; + } + + @Override + public ParquetFeature next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + i++; + + var item = recordReader.read(); + + if (item == null) { + return null; + } + + if (extraFields != null) { + item.putAll(extraFields); + } + + var feature = new ParquetFeature( + source, + layer, + path, + idGenerator != null ? idGenerator.applyAsLong(item) : + Hashing.fnv1a64(blockHash, ByteBuffer.allocate(8).putLong(i).array()), + geometryReader, + item + ); + + if (postFilterBounds != null) { + try { + if (!feature.latLonGeometry().getEnvelopeInternal().intersects(postFilterBounds)) { + return null; + } + } catch (GeometryException e) { + LOGGER.warn("Error reading geometry to post-filter bounds", e); + return null; + } + } + + return feature; + } + }, Objects::nonNull); + } + }; + }).iterator(); + } + + private ParquetFileReader open() throws IOException { + return ParquetFileReader.open(inputFile, ParquetReadOptions.builder() + .withRecordFilter(filter) + .build()); + } + + public long getCount() { + return count; + } + + public long getBlockCount() { + return blockCount; + } + + public interface BlockReader extends Iterable, Closeable { + + @Override + default void close() throws IOException {} + } + + public interface Block extends Iterable { + + Path getFileName(); + + String layer(); + } + +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetPrimitiveConverter.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetPrimitiveConverter.java new file mode 100644 index 0000000000..e021bf9d84 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetPrimitiveConverter.java @@ -0,0 +1,214 @@ +package com.onthegomap.planetiler.reader.parquet; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.Period; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.LongFunction; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; + +/** + * Converts typed primitive values from parquet records to java objects: + * + *

    + *
  • {@link PrimitiveType.PrimitiveTypeName#FLOAT} -> {@link Float} + *
  • {@link PrimitiveType.PrimitiveTypeName#DOUBLE} -> {@link Double} + *
  • {@link PrimitiveType.PrimitiveTypeName#INT32} -> {@link Integer} + *
  • {@link PrimitiveType.PrimitiveTypeName#INT64} -> {@link Long} + *
  • {@link PrimitiveType.PrimitiveTypeName#BOOLEAN} -> {@link Boolean} + *
  • {@link PrimitiveType.PrimitiveTypeName#INT96} -> {@link Instant} + *
  • {@link LogicalTypeAnnotation.DateLogicalTypeAnnotation} -> {@link LocalDate} + *
  • {@link LogicalTypeAnnotation.TimeLogicalTypeAnnotation} -> {@link LocalTime} + *
  • {@link LogicalTypeAnnotation.TimestampLogicalTypeAnnotation} -> {@link Instant} + *
  • {@link LogicalTypeAnnotation.UUIDLogicalTypeAnnotation} -> {@link UUID} + *
  • {@link LogicalTypeAnnotation.DecimalLogicalTypeAnnotation} -> {@link Double} + *
  • {@link LogicalTypeAnnotation.StringLogicalTypeAnnotation} -> {@link String} + *
  • {@link LogicalTypeAnnotation.JsonLogicalTypeAnnotation} -> {@link String} + *
  • {@link LogicalTypeAnnotation.EnumLogicalTypeAnnotation} -> {@link String} + *
  • {@link PrimitiveType.PrimitiveTypeName#BINARY} -> {@code byte[]} + *
+ */ +class ParquetPrimitiveConverter extends PrimitiveConverter { + private final PrimitiveType.PrimitiveTypeName primitiveType; + private final ParquetRecordConverter.Context context; + private Dictionary dictionary; + + ParquetPrimitiveConverter(ParquetRecordConverter.Context context) { + this.context = context; + this.primitiveType = context.type.asPrimitiveType().getPrimitiveTypeName(); + } + + static ParquetPrimitiveConverter of(ParquetRecordConverter.Context context) { + var primitiveType = context.type().asPrimitiveType().getPrimitiveTypeName(); + return switch (primitiveType) { + case FLOAT, DOUBLE, BOOLEAN -> new ParquetPrimitiveConverter(context); + case INT64, INT32 -> switch (context.type().getLogicalTypeAnnotation()) { + case null -> new ParquetPrimitiveConverter(context); + case LogicalTypeAnnotation.IntLogicalTypeAnnotation ignored -> + new ParquetPrimitiveConverter(context); + case LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal -> { + var multiplier = Math.pow(10, -decimal.getScale()); + yield new IntegerConverter(context, value -> multiplier * value); + } + case LogicalTypeAnnotation.DateLogicalTypeAnnotation ignored -> + new IntegerConverter(context, LocalDate::ofEpochDay); + case LogicalTypeAnnotation.TimeLogicalTypeAnnotation time -> { + var unit = getUnit(time.getUnit()); + yield new IntegerConverter(context, value -> LocalTime.ofNanoOfDay(Duration.of(value, unit).toNanos())); + } + case LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp -> { + var unit = getUnit(timestamp.getUnit()); + yield new IntegerConverter(context, value -> Instant.ofEpochMilli(Duration.of(value, unit).toMillis())); + } + default -> throw new UnsupportedOperationException( + "Unsupported logical type for " + primitiveType + ": " + context.type().getLogicalTypeAnnotation()); + }; + case INT96 -> new BinaryConverer(context, value -> { + var buf = value.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + LocalTime timeOfDay = LocalTime.ofNanoOfDay(buf.getLong()); + LocalDate day = LocalDate.ofEpochDay(buf.getInt() - 2440588L); + return LocalDateTime.of(day, timeOfDay).toInstant(ZoneOffset.UTC); + }); + case FIXED_LEN_BYTE_ARRAY, BINARY -> switch (context.type().getLogicalTypeAnnotation()) { + case LogicalTypeAnnotation.UUIDLogicalTypeAnnotation ignored -> new BinaryConverer(context, binary -> { + ByteBuffer byteBuffer = binary.toByteBuffer(); + long msb = byteBuffer.getLong(); + long lsb = byteBuffer.getLong(); + return new UUID(msb, lsb); + }); + case LogicalTypeAnnotation.IntervalLogicalTypeAnnotation ignored -> new BinaryConverer(context, binary -> { + ByteBuffer byteBuffer = binary.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + int months = byteBuffer.getInt(); + int days = byteBuffer.getInt(); + int millis = byteBuffer.getInt(); + return new Interval(Period.ofMonths(months).plusDays(days), Duration.ofMillis(millis)); + }); + case LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal -> { + int scale = -decimal.getScale(); + yield new BinaryConverer(context, + binary -> new BigDecimal(new BigInteger(binary.getBytes()), scale).doubleValue()); + } + case LogicalTypeAnnotation.StringLogicalTypeAnnotation ignored -> + new BinaryConverer(context, Binary::toStringUsingUTF8); + case LogicalTypeAnnotation.EnumLogicalTypeAnnotation ignored -> + new BinaryConverer(context, Binary::toStringUsingUTF8); + case LogicalTypeAnnotation.JsonLogicalTypeAnnotation ignores -> + new BinaryConverer(context, Binary::toStringUsingUTF8); + case null, default -> new ParquetPrimitiveConverter(context); + }; + }; + } + + private static ChronoUnit getUnit(LogicalTypeAnnotation.TimeUnit unit) { + return switch (unit) { + case MILLIS -> ChronoUnit.MILLIS; + case MICROS -> ChronoUnit.MICROS; + case NANOS -> ChronoUnit.NANOS; + }; + } + + void add(Object value) { + context.accept(value); + } + + @Override + public void addFloat(float value) { + add((double) value); + } + + @Override + public void addDouble(double value) { + add(value); + } + + @Override + public void addInt(int value) { + add(value); + } + + @Override + public void addLong(long value) { + add(value); + } + + @Override + public void addBoolean(boolean value) { + add(value); + } + + @Override + public void addBinary(Binary value) { + add(value.getBytes()); + } + + @Override + public void addValueFromDictionary(int idx) { + switch (primitiveType) { + case INT64 -> addLong(dictionary.decodeToLong(idx)); + case INT32 -> addInt(dictionary.decodeToInt(idx)); + case BOOLEAN -> addBoolean(dictionary.decodeToBoolean(idx)); + case FLOAT -> addFloat(dictionary.decodeToFloat(idx)); + case DOUBLE -> addDouble(dictionary.decodeToDouble(idx)); + case BINARY, FIXED_LEN_BYTE_ARRAY, INT96 -> addBinary(dictionary.decodeToBinary(idx)); + } + } + + @Override + public void setDictionary(Dictionary dictionary) { + this.dictionary = dictionary; + } + + @Override + public boolean hasDictionarySupport() { + return true; + } + + private static class BinaryConverer extends ParquetPrimitiveConverter { + + private final Function remapper; + + BinaryConverer(ParquetRecordConverter.Context context, Function remapper) { + super(context); + this.remapper = remapper; + } + + @Override + public void addBinary(Binary value) { + add(remapper.apply(value)); + } + } + + + private static class IntegerConverter extends ParquetPrimitiveConverter { + private final LongFunction remapper; + + IntegerConverter(ParquetRecordConverter.Context context, LongFunction remapper) { + super(context); + this.remapper = remapper; + } + + @Override + public void addLong(long value) { + add(remapper.apply(value)); + } + + @Override + public void addInt(int value) { + addLong(value); + } + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetReader.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetReader.java new file mode 100644 index 0000000000..ff24ce025a --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetReader.java @@ -0,0 +1,217 @@ +package com.onthegomap.planetiler.reader.parquet; + +import static io.prometheus.client.Collector.NANOSECONDS_PER_SECOND; + +import com.onthegomap.planetiler.FeatureCollector; +import com.onthegomap.planetiler.Profile; +import com.onthegomap.planetiler.collection.FeatureGroup; +import com.onthegomap.planetiler.collection.SortableFeature; +import com.onthegomap.planetiler.config.PlanetilerConfig; +import com.onthegomap.planetiler.reader.SourceFeature; +import com.onthegomap.planetiler.render.FeatureRenderer; +import com.onthegomap.planetiler.stats.Counter; +import com.onthegomap.planetiler.stats.ProgressLoggers; +import com.onthegomap.planetiler.stats.Stats; +import com.onthegomap.planetiler.util.Format; +import com.onthegomap.planetiler.worker.WorkerPipeline; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reads {@link SourceFeature SourceFeatures} from one or more + * geoparquet files. + *

+ * If files don't contain geoparquet metadata then try to get geometry from "geometry" "wkb_geometry" or "wkt_geometry" + * fields. + */ +public class ParquetReader { + public static final String DEFAULT_LAYER = "features"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ParquetReader.class); + private final String sourceName; + private final Function, Object> idGenerator; + private final Function, Object> layerGenerator; + private final Profile profile; + private final Stats stats; + private final boolean hivePartitioning; + + public ParquetReader( + String sourceName, + Profile profile, + Stats stats + ) { + this(sourceName, profile, stats, null, null, false); + } + + public ParquetReader( + String sourceName, + Profile profile, + Stats stats, + Function, Object> getId, + Function, Object> getLayer, + boolean hivePartitioning + ) { + this.sourceName = sourceName; + this.layerGenerator = getLayer; + this.idGenerator = getId; + this.profile = profile; + this.stats = stats; + this.hivePartitioning = hivePartitioning; + } + + static Map getHivePartitionFields(Path path) { + Map fields = new HashMap<>(); + for (var part : path) { + var string = part.toString(); + if (string.contains("=")) { + var parts = string.split("="); + fields.put(parts[0], parts[1]); + } + } + return fields.isEmpty() ? null : fields; + } + + public void process(List sourcePath, FeatureGroup writer, PlanetilerConfig config) { + var timer = stats.startStage(sourceName); + var inputFiles = sourcePath.stream() + .filter(d -> !"_SUCCESS".equals(d.getFileName().toString())) + .map(path -> { + var hivePartitionFields = hivePartitioning ? getHivePartitionFields(path) : null; + String layer = getLayerName(path); + return new ParquetInputFile(sourceName, layer, path, null, config.bounds(), hivePartitionFields, idGenerator); + }) + .filter(file -> !file.isOutOfBounds()) + .toList(); + // don't show % complete on features when a filter is present because to determine total # elements would + // take an expensive initial query, and % complete on blocks gives a good enough proxy + long featureCount = inputFiles.stream().anyMatch(ParquetInputFile::hasFilter) ? 0 : + inputFiles.stream().mapToLong(ParquetInputFile::getCount).sum(); + long blockCount = inputFiles.stream().mapToLong(ParquetInputFile::getBlockCount).sum(); + int processThreads = config.featureProcessThreads(); + int writeThreads = config.featureWriteThreads(); + var blocksRead = Counter.newMultiThreadCounter(); + var featuresRead = Counter.newMultiThreadCounter(); + var featuresWritten = Counter.newMultiThreadCounter(); + Map workingOn = new ConcurrentHashMap<>(); + var inputBlocks = inputFiles.stream().mapMulti((file, next) -> { + try (var blockReader = file.get()) { + for (var block : blockReader) { + next.accept(block); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).toList(); + + var pipeline = WorkerPipeline.start(sourceName, stats) + .readFromTiny("blocks", inputBlocks) + .addWorker("process", processThreads, (prev, next) -> { + var blocks = blocksRead.counterForThread(); + var elements = featuresRead.counterForThread(); + var featureCollectors = new FeatureCollector.Factory(config, stats); + try (FeatureRenderer renderer = newFeatureRenderer(writer, config, next)) { + for (var block : prev) { + String layer = block.layer(); + workingOn.merge(layer, 1, Integer::sum); + for (var sourceFeature : block) { + FeatureCollector features = featureCollectors.get(sourceFeature); + try { + profile.processFeature(sourceFeature, features); + for (FeatureCollector.Feature renderable : features) { + renderer.accept(renderable); + } + } catch (Exception e) { + LOGGER.error("Error processing {}", sourceFeature, e); + } + elements.inc(); + } + blocks.inc(); + workingOn.merge(layer, -1, Integer::sum); + } + } + }) + .addBuffer("write_queue", 50_000, 1_000) + .sinkTo("write", writeThreads, prev -> { + var features = featuresWritten.counterForThread(); + try (var threadLocalWriter = writer.writerForThread()) { + for (var item : prev) { + features.inc(); + threadLocalWriter.accept(item); + } + } + }); + + var loggers = ProgressLoggers.create() + .addRatePercentCounter("read", featureCount, featuresRead, true) + .addRatePercentCounter("blocks", blockCount, blocksRead, false) + .addRateCounter("write", featuresWritten) + .addFileSize(writer) + .newLine() + .add(() -> workingOn.entrySet().stream() + .sorted(Map.Entry.comparingByValue().reversed()) + .filter(d -> d.getValue() > 0) + .map(d -> d.getKey() + ": " + d.getValue()) + .collect(Collectors.joining(", "))) + .newLine() + .addProcessStats() + .newLine() + .addPipelineStats(pipeline); + + pipeline.awaitAndLog(loggers, config.logInterval()); + + if (LOGGER.isInfoEnabled()) { + var format = Format.defaultInstance(); + long count = featuresRead.get(); + var elapsed = timer.elapsed(); + LOGGER.info("Processed {} parquet features ({}/s, {} blocks, {} files) in {}", + format.integer(count), + format.numeric(count * NANOSECONDS_PER_SECOND / elapsed.wall().toNanos()), + format.integer(blocksRead.get()), + format.integer(inputFiles.size()), + elapsed + ); + } + timer.stop(); + + // hook for profile to do any post-processing after this source is read + try ( + var threadLocalWriter = writer.writerForThread(); + var featureRenderer = newFeatureRenderer(writer, config, threadLocalWriter) + ) { + profile.finish(sourceName, new FeatureCollector.Factory(config, stats), featureRenderer); + } catch (IOException e) { + LOGGER.warn("Error closing writer", e); + } + } + + private String getLayerName(Path path) { + String layer = DEFAULT_LAYER; + if (hivePartitioning) { + var fields = getHivePartitionFields(path); + layer = layerGenerator.apply(fields == null ? Map.of() : fields) instanceof Object o ? o.toString() : layer; + } + return layer; + } + + private FeatureRenderer newFeatureRenderer(FeatureGroup writer, PlanetilerConfig config, + Consumer next) { + @SuppressWarnings("java:S2095") // closed by FeatureRenderer + var encoder = writer.newRenderedFeatureEncoder(); + return new FeatureRenderer( + config, + rendered -> next.accept(encoder.apply(rendered)), + stats, + encoder + ); + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetRecordConverter.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetRecordConverter.java new file mode 100644 index 0000000000..2cdae2b7ff --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetRecordConverter.java @@ -0,0 +1,391 @@ +package com.onthegomap.planetiler.reader.parquet; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; + +/** + * Simple converter for parquet datatypes that maps all structs to {@code Map} and handles deserializing + * list and map nested + * types into java {@link List Lists} and {@link Map Maps}. + */ +public class ParquetRecordConverter extends RecordMaterializer> { + + private final StructConverter root; + private Map map; + + ParquetRecordConverter(MessageType schema) { + root = new StructConverter(new Context(schema)) { + @Override + public void start() { + var group = new MapGroup(schema.getFieldCount()); + context.current = group; + map = group.getMap(); + } + }; + } + + @Override + public Map getCurrentRecord() { + return map; + } + + @Override + public void skipCurrentRecord() { + root.context.current = null; + } + + @Override + public GroupConverter getRootConverter() { + return root; + } + + + interface Group { + // TODO handle repeated when processing schema, not elements + void add(Object key, Object value, boolean repeated); + + Object value(); + } + + private static class ListConverter extends StructConverter { + + ListConverter(Context context) { + super(context); + } + + @Override + protected Converter makeConverter(Context child) { + if ((child.named("list") || child.named("array")) && child.onlyField("element")) { + return new ListElementConverter(child.hoist()); + } + return super.makeConverter(child); + } + + @Override + public void start() { + context.current = new ListGroup(); + context.acceptCurrentValue(); + } + } + + private static class ListElementConverter extends StructConverter { + + ListElementConverter(Context context) { + super(context); + } + + @Override + public void start() { + context.current = new ItemGroup(); + } + + @Override + public void end() { + context.acceptCurrentValue(); + } + } + + private static class MapConverter extends StructConverter { + + MapConverter(Context context) { + super(context); + } + + @Override + protected Converter makeConverter(Context child) { + if (context.getFieldCount() == 1) { + Type type = child.type; + String onlyFieldName = type.getName().toLowerCase(Locale.ROOT); + if (!type.isPrimitive() && type.asGroupType().getFieldCount() == 2 && + (onlyFieldName.equals("key_value") || onlyFieldName.equals("map"))) { + return new MapEntryConverter(child.repeated(false)); + } + } + return super.makeConverter(child); + } + + @Override + public void start() { + context.current = new MapGroup(); + context.acceptCurrentValue(); + } + } + + private static class MapEntryConverter extends StructConverter { + MapEntryGroup entry; + + MapEntryConverter(Context context) { + super(context); + } + + @Override + public void start() { + context.current = entry = new MapEntryGroup(); + } + + @Override + public void end() { + if (entry.v != null && entry.k != null) { + context.accept(entry.k, entry.v); + } + } + } + + static class StructConverter extends GroupConverter { + + final Context context; + private final Converter[] converters; + + StructConverter(Context context) { + this.context = context; + int count = context.type.asGroupType().getFieldCount(); + converters = new Converter[count]; + for (int i = 0; i < count; i++) { + converters[i] = makeConverter(context.field(i)); + } + } + + protected Converter makeConverter(Context child) { + Type type = child.type; + LogicalTypeAnnotation logical = type.getLogicalTypeAnnotation(); + if (!type.isPrimitive()) { + return switch (logical) { + case LogicalTypeAnnotation.ListLogicalTypeAnnotation ignored -> + // If the repeated field is not a group, then its type is the element type and elements are required. + // If the repeated field is a group with multiple fields, then its type is the element type and elements are required. + // If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required. + // Otherwise, the repeated field's type is the element type with the repeated field's repetition. + new ListConverter(child); + case LogicalTypeAnnotation.MapLogicalTypeAnnotation ignored -> + // The outer-most level must be a group annotated with MAP that contains a single field named key_value. The repetition of this level must be either optional or required and determines whether the list is nullable. + // The middle level, named key_value, must be a repeated group with a key field for map keys and, optionally, a value field for map values. + // The key field encodes the map's key type. This field must have repetition required and must always be present. + // The value field encodes the map's value type and repetition. This field can be required, optional, or omitted. + new MapConverter(child); + case LogicalTypeAnnotation.MapKeyValueTypeAnnotation ignored -> + new MapConverter(child); + case null, default -> new StructConverter(child); + }; + } + return ParquetPrimitiveConverter.of(child); + } + + @Override + public Converter getConverter(int fieldIndex) { + return converters[fieldIndex]; + } + + @Override + public void start() { + context.current = new MapGroup(context.getFieldCount()); + context.acceptCurrentValue(); + } + + @Override + public void end() { + // by default, don't need to do anything + } + } + + private static class MapGroup implements Group { + + private final Map map; + + MapGroup() { + this(10); + } + + MapGroup(int size) { + map = HashMap.newHashMap(size * 2); + } + + @Override + public void add(Object key, Object value, boolean repeated) { + if (repeated) { + List items = (List) map.computeIfAbsent(key, n -> new ArrayList<>()); + items.add(value); + } else { + if (map.put(key, value) != null) { + throw new IllegalStateException("Multiple values for " + key); + } + } + } + + @Override + public String toString() { + return "MapGroup" + map; + } + + public Map getMap() { + return (Map) (Map) map; + } + + @Override + public Object value() { + return map; + } + } + + private static class ListGroup implements Group { + + private final List list = new ArrayList<>(); + + @Override + public void add(Object key, Object value, boolean repeated) { + list.add(value); + } + + @Override + public String toString() { + return "ListGroup" + list; + } + + @Override + public Object value() { + return list; + } + } + + private static class ItemGroup implements Group { + + private Object item; + + @Override + public void add(Object key, Object value, boolean repeated) { + if (repeated) { + if (item == null) { + item = new ArrayList<>(); + } + ((List) item).add(value); + } else { + item = value; + } + } + + @Override + public String toString() { + return "ItemGroup{" + item + '}'; + } + + @Override + public Object value() { + return item; + } + } + + private static class MapEntryGroup implements Group { + + private Object k; + private Object v; + + @Override + public void add(Object key, Object value, boolean repeated) { + if ("key".equals(key)) { + k = value; + } else if ("value".equals(key)) { + v = value; + } else if (k == null) { + k = value; + } else { + v = value; + } + } + + @Override + public String toString() { + return "MapEntryGroup{" + k + '=' + v + '}'; + } + + @Override + public Object value() { + throw new UnsupportedOperationException(); + } + } + + /** Constructs java objects from parquet records at read-time. */ + static final class Context { + + final Context parent; + final String fieldOnParent; + final Type type; + final boolean repeated; + private final int fieldCount; + Group current; + + Context(Context parent, String fieldOnParent, Type type, boolean repeated) { + this.parent = parent; + this.fieldOnParent = fieldOnParent; + this.type = type; + this.repeated = repeated; + this.fieldCount = type.isPrimitive() ? 0 : type.asGroupType().getFieldCount(); + } + + public Context(Context newParent, Type type) { + this(newParent, type.getName(), type, type.isRepetition(Type.Repetition.REPEATED)); + } + + public Context(MessageType schema) { + this(null, schema); + } + + public Context field(int i) { + return new Context(this, type.asGroupType().getType(i)); + } + + /** Returns a new context that flattens-out this level of the hierarchy and writes values into the parent field. */ + public Context hoist() { + return new Context(parent, parent.fieldOnParent, type, repeated); + } + + public void acceptCurrentValue() { + accept(current.value()); + } + + public void accept(Object value) { + parent.current.add(fieldOnParent, value, repeated); + } + + public int getFieldCount() { + return fieldCount; + } + + public void accept(Object k, Object v) { + parent.current.add(k, v, repeated); + } + + public Context repeated(boolean newRepeated) { + return new Context(parent, fieldOnParent, type, newRepeated); + } + + public boolean named(String name) { + return type.getName().equalsIgnoreCase(name); + } + + boolean onlyField(String name) { + return !type.isPrimitive() && fieldCount == 1 && + type.asGroupType().getFieldName(0).equalsIgnoreCase(name); + } + + public Type type() { + return type; + } + + @Override + public String toString() { + return "Context[" + + "parent=" + parent + ", " + + "fieldOnParent=" + fieldOnParent + ", " + + "type=" + type + ", " + + "repeated=" + repeated + ']'; + } + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/stats/Stats.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/stats/Stats.java index c18ac1bbc2..4570c0a832 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/stats/Stats.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/stats/Stats.java @@ -94,10 +94,15 @@ default Timers.Finishable startStage(String name, boolean log) { LogUtil.setStage(name); } var timer = timers().startTimer(name, log); - return () -> { - timer.stop(); - if (log) { - LogUtil.clearStage(); + return new Timers.Finishable() { + @Override + public void stop() { + timer.stop(); + } + + @Override + public ProcessTime elapsed() { + return timer.elapsed(); } }; } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/stats/Timers.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/stats/Timers.java index 3ce21270b7..8e22e727c8 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/stats/Timers.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/stats/Timers.java @@ -103,12 +103,20 @@ public Finishable startTimer(String name, boolean logStart) { LOGGER.info(""); LOGGER.info("Starting..."); } - return () -> { - LOGGER.info("Finished in {}", timers.get(name).timer.stop()); - for (var details : getStageDetails(name, true)) { - LOGGER.info(" {}", details); + return new Finishable() { + @Override + public void stop() { + LOGGER.info("Finished in {}", timers.get(name).timer.stop()); + for (var details : getStageDetails(name, true)) { + LOGGER.info(" {}", details); + } + currentStage.set(last); + } + + @Override + public ProcessTime elapsed() { + return timer.elapsed(); } - currentStage.set(last); }; } @@ -129,6 +137,8 @@ public Map all() { /** A handle that callers can use to indicate a task has finished. */ public interface Finishable { void stop(); + + ProcessTime elapsed(); } record ThreadInfo(ProcessInfo.ThreadState state, String prefix, Duration elapsed) {} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java index 1cbbe13875..431cc3ddea 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java @@ -12,6 +12,7 @@ import java.nio.file.FileStore; import java.nio.file.FileSystem; import java.nio.file.FileSystems; +import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -49,7 +50,7 @@ public static Stream walkFileSystem(FileSystem fileSystem) { return StreamSupport.stream(fileSystem.getRootDirectories().spliterator(), false) .flatMap(rootDirectory -> { try { - return Files.walk(rootDirectory); + return Files.walk(rootDirectory, FileVisitOption.FOLLOW_LINKS); } catch (IOException e) { LOGGER.error("Unable to walk " + rootDirectory + " in " + fileSystem, e); return Stream.empty(); @@ -82,9 +83,9 @@ public static List walkPathWithPattern(Path basePath, String pattern, .toList(); } } else if (Files.isDirectory(basePath)) { - try (var walk = Files.walk(basePath)) { + try (var walk = Files.walk(basePath, FileVisitOption.FOLLOW_LINKS)) { return walk - .filter(path -> matcher.matches(path.getFileName())) + .filter(path -> matcher.matches(path.getFileName()) || matcher.matches(basePath.relativize(path))) .flatMap(path -> { if (FileUtils.hasExtension(path, "zip")) { return walkZipFile.apply(path).stream(); @@ -109,9 +110,10 @@ public static List walkPathWithPattern(Path basePath, String pattern, * @param pattern pattern to match filenames against, as described in {@link FileSystem#getPathMatcher(String)}. */ public static List walkPathWithPattern(Path basePath, String pattern) { - return walkPathWithPattern(basePath, pattern, zipPath -> List.of(zipPath)); + return walkPathWithPattern(basePath, pattern, List::of); } + /** Returns true if {@code path} ends with ".extension" (case-insensitive). */ public static boolean hasExtension(Path path, String extension) { return path.toString().toLowerCase().endsWith("." + extension.toLowerCase()); diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Glob.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Glob.java new file mode 100644 index 0000000000..f17d65eca0 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/Glob.java @@ -0,0 +1,55 @@ +package com.onthegomap.planetiler.util; + +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; + + +/** + * Utility for constructing base+glob paths for matching many files + */ +public record Glob(Path base, String pattern) { + + private static final Pattern GLOB_PATTERN = Pattern.compile("[?*{\\[].*$"); + + /** Wrap a base path with no globs in it yet. */ + public static Glob of(Path path) { + return new Glob(path, null); + } + + /** Resolves a subdirectory using parts separated by the platform file separator. */ + public Glob resolve(String... subPath) { + String separator = "/"; + if (pattern != null) { + return new Glob(base, pattern + separator + String.join(separator, subPath)); + } else if (subPath == null || subPath.length == 0) { + return this; + } else if (GLOB_PATTERN.matcher(subPath[0]).find()) { + return new Glob(base, String.join(separator, subPath)); + } else { + return of(base.resolve(subPath[0])).resolve(Arrays.copyOfRange(subPath, 1, subPath.length)); + } + } + + /** Parse a string containing platform-specific file separators into a base+glob pattern. */ + public static Glob parse(String path) { + var matcher = GLOB_PATTERN.matcher(path); + if (!matcher.find()) { + return of(Path.of(path)); + } + matcher.reset(); + String base = matcher.replaceAll(""); + String separator = Path.of(base).getFileSystem().getSeparator(); + int idx = base.lastIndexOf(separator); + if (idx > 0) { + base = base.substring(0, idx); + } + return of(Path.of(base)).resolve(path.substring(idx + 1).split(Pattern.quote(separator))); + } + + /** Search the filesystem for all files beneath {@link #base()} matching {@link #pattern()}. */ + public List find() { + return pattern == null ? List.of(base) : FileUtils.walkPathWithPattern(base, pattern); + } +} diff --git a/planetiler-core/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java new file mode 100644 index 0000000000..48d29d1331 --- /dev/null +++ b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** Fix interface from parquet floor so we can extend it with {@link GzipCodec} and {@link Lz4Codec} */ +public interface CompressionCodec { + Decompressor createDecompressor(); + + Compressor createCompressor(); + + CompressionInputStream createInputStream(InputStream is, Decompressor d) throws IOException; + + CompressionOutputStream createOutputStream(OutputStream os, Compressor c) throws IOException; +} diff --git a/planetiler-core/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java new file mode 100644 index 0000000000..287ded26b9 --- /dev/null +++ b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java @@ -0,0 +1,10 @@ +package org.apache.hadoop.io.compress; + +import io.airlift.compress.gzip.JdkGzipCodec; + +/** + * Make {@link JdkGzipCodec} available at the location expected by + * {@link org.apache.parquet.hadoop.metadata.CompressionCodecName} to allow deserializing parquet files that use gzip + * compression. + */ +public class GzipCodec extends JdkGzipCodec {} diff --git a/planetiler-core/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java new file mode 100644 index 0000000000..cab23a1515 --- /dev/null +++ b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java @@ -0,0 +1,9 @@ +package org.apache.hadoop.io.compress; + +/** + * Make {@link io.airlift.compress.lz4.Lz4Codec} available at the location expected by + * {@link org.apache.parquet.hadoop.metadata.CompressionCodecName} to allow deserializing parquet files that use lz4 + * compression. + */ +@SuppressWarnings("java:S2176") +public class Lz4Codec extends io.airlift.compress.lz4.Lz4Codec {} diff --git a/planetiler-core/src/main/java/org/apache/parquet/filter2/predicate/Filters.java b/planetiler-core/src/main/java/org/apache/parquet/filter2/predicate/Filters.java new file mode 100644 index 0000000000..b268257327 --- /dev/null +++ b/planetiler-core/src/main/java/org/apache/parquet/filter2/predicate/Filters.java @@ -0,0 +1,20 @@ +package org.apache.parquet.filter2.predicate; + +import java.util.List; +import org.apache.parquet.hadoop.metadata.ColumnPath; + +/** + * Create {@link Operators.DoubleColumn} and {@link Operators.FloatColumn} instances with dots in the column names since + * their constructors are package-private. + */ +public class Filters { + private Filters() {} + + public static Operators.DoubleColumn doubleColumn(List parts) { + return new Operators.DoubleColumn(ColumnPath.get(parts.toArray(String[]::new))); + } + + public static Operators.FloatColumn floatColumn(List parts) { + return new Operators.FloatColumn(ColumnPath.get(parts.toArray(String[]::new))); + } +} diff --git a/planetiler-core/src/main/resources/log4j2.properties b/planetiler-core/src/main/resources/log4j2.properties index 00e993bb1a..7d23fc3a9a 100644 --- a/planetiler-core/src/main/resources/log4j2.properties +++ b/planetiler-core/src/main/resources/log4j2.properties @@ -7,3 +7,10 @@ packages=com.onthegomap.planetiler.util.log4j rootLogger.level=debug rootLogger.appenderRefs=stdout rootLogger.appenderRef.stdout.ref=STDOUT + +logger.apache.name=org.apache +logger.apache.level=warn + +# suppress warning about unreadable duckdb statistics +logger.apachecorrupt.name=org.apache.parquet.CorruptStatistics +logger.apachecorrupt.level=error diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java index ffe58fffcd..d8d71292f3 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java @@ -2248,6 +2248,49 @@ public void processFeature(SourceFeature source, FeatureCollector features) { } } + @ParameterizedTest + @ValueSource(strings = { + "", + "--write-threads=2 --process-threads=2 --feature-read-threads=2 --threads=4" + }) + void testPlanetilerRunnerParquet(String args) throws Exception { + Path mbtiles = tempDir.resolve("output.mbtiles"); + + Planetiler.create(Arguments.fromArgs((args + " --tmpdir=" + tempDir.resolve("data")).split("\\s+"))) + .setProfile(new Profile.NullProfile() { + @Override + public void processFeature(SourceFeature source, FeatureCollector features) { + features.polygon("buildings") + .setZoomRange(0, 14) + .setMinPixelSize(0) + .setAttr("id", source.getString("id")); + } + }) + .addParquetSource("parquet", List.of(TestUtils.pathToResource("parquet").resolve("boston.parquet"))) + .setOutput(mbtiles) + .run(); + + try (Mbtiles db = Mbtiles.newReadOnlyDatabase(mbtiles)) { + Set uniqueIds = new HashSet<>(); + long featureCount = 0; + var tileMap = TestUtils.getTileMap(db); + for (int z = 14; z >= 11; z--) { + var coord = TileCoord.aroundLngLat(-71.07448, 42.35626, z); + assertTrue(tileMap.containsKey(coord), "contain " + coord); + } + for (var tile : tileMap.values()) { + for (var feature : tile) { + feature.geometry().validate(); + featureCount++; + uniqueIds.add((String) feature.attrs().get("id")); + } + } + + assertTrue(featureCount > 0); + assertEquals(3, uniqueIds.size()); + } + } + private void runWithProfile(Path tempDir, Profile profile, boolean force) throws Exception { Planetiler.create(Arguments.of("tmpdir", tempDir, "force", Boolean.toString(force))) .setProfile(profile) diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/GeoArrowTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/GeoArrowTest.java new file mode 100644 index 0000000000..ca7e1e8b85 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/GeoArrowTest.java @@ -0,0 +1,231 @@ +package com.onthegomap.planetiler.reader.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.onthegomap.planetiler.geo.GeoUtils; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; + +class GeoArrowTest { + @Test + void testPointXY() throws ParseException { + assertSame( + "POINT(1 2)", + GeoArrow.point(Map.of("x", 1, "y", 2)), + GeoArrow.point(List.of(1, 2)) + ); + } + + @Test + void testPointXYZ() throws ParseException { + assertSame( + "POINT Z(1 2 3)", + GeoArrow.point(Map.of("x", 1, "y", 2, "z", 3)), + GeoArrow.point(List.of(1, 2, 3)) + ); + } + + @Test + void testPointXYZM() throws ParseException { + assertSame( + "POINT ZM(1 2 3 4)", + GeoArrow.point(Map.of("x", 1, "y", 2, "z", 3, "m", 4)), + GeoArrow.point(List.of(1, 2, 3, 4)) + ); + } + + @Test + void testLine() throws ParseException { + assertSame( + "LINESTRING(1 2, 3 4)", + GeoArrow.linestring(List.of( + Map.of("x", 1, "y", 2), + Map.of("x", 3, "y", 4) + )), + GeoArrow.linestring(List.of( + List.of(1, 2), + List.of(3, 4) + )) + ); + } + + @Test + void testLineZ() throws ParseException { + assertSame( + "LINESTRING Z(1 2 3, 4 5 6)", + GeoArrow.linestring(List.of( + Map.of("x", 1, "y", 2, "z", 3), + Map.of("x", 4, "y", 5, "z", 6) + )), + GeoArrow.linestring(List.of( + List.of(1, 2, 3), + List.of(4, 5, 6) + )) + ); + } + + @Test + void testLineZM() throws ParseException { + assertSame( + "LINESTRING ZM(1 2 3 4, 5 6 7 8)", + GeoArrow.linestring(List.of( + Map.of("x", 1, "y", 2, "z", 3, "m", 4), + Map.of("x", 5, "y", 6, "z", 7, "m", 8) + )), + GeoArrow.linestring(List.of( + List.of(1, 2, 3, 4), + List.of(5, 6, 7, 8) + )) + ); + } + + @Test + void testPolygon() throws ParseException { + assertSame( + "POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))", + GeoArrow.polygon(List.of( + List.of( + Map.of("x", 0, "y", 0), + Map.of("x", 0, "y", 1), + Map.of("x", 1, "y", 1), + Map.of("x", 1, "y", 0), + Map.of("x", 0, "y", 0) + ))), + GeoArrow.polygon(List.of( + List.of( + List.of(0, 0), + List.of(0, 1), + List.of(1, 1), + List.of(1, 0), + List.of(0, 0) + ) + )) + ); + } + + @Test + void testPolygonWithHole() throws ParseException { + assertSame( + "POLYGON((-2 -2, 2 -2, 0 2, -2 -2), (-1 -1, 1 -1, 0 1, -1 -1))", + GeoArrow.polygon(List.of( + List.of( + Map.of("x", -2, "y", -2), + Map.of("x", 2, "y", -2), + Map.of("x", 0, "y", 2), + Map.of("x", -2, "y", -2) + ), + List.of( + Map.of("x", -1, "y", -1), + Map.of("x", 1, "y", -1), + Map.of("x", 0, "y", 1), + Map.of("x", -1, "y", -1) + ) + )), + GeoArrow.polygon(List.of( + List.of( + List.of(-2, -2), + List.of(2, -2), + List.of(0, 2), + List.of(-2, -2) + ), + List.of( + List.of(-1, -1), + List.of(1, -1), + List.of(0, 1), + List.of(-1, -1) + ) + )) + ); + } + + @Test + void testMultipoint() throws ParseException { + assertSame( + "MULTIPOINT(1 2, 3 4)", + GeoArrow.multipoint(List.of( + Map.of("x", 1, "y", 2), + Map.of("x", 3, "y", 4) + )), + GeoArrow.multipoint(List.of( + List.of(1, 2), + List.of(3, 4) + )) + ); + } + + @Test + void testMultilinestring() throws ParseException { + assertSame( + "MULTILINESTRING((1 2, 3 4), (5 6, 7 8))", + GeoArrow.multilinestring(List.of( + List.of( + Map.of("x", 1, "y", 2), + Map.of("x", 3, "y", 4) + ), + List.of( + Map.of("x", 5, "y", 6), + Map.of("x", 7, "y", 8) + ) + )), + GeoArrow.multilinestring(List.of( + List.of( + List.of(1, 2), + List.of(3, 4) + ), + List.of( + List.of(5, 6), + List.of(7, 8) + ) + )) + ); + } + + @Test + void testMultipolygon() throws ParseException { + assertSame( + "MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), ((2 0, 3 0, 3 1, 2 1, 2 0)))", + GeoArrow.multipolygon(List.of( + List.of(List.of( + Map.of("x", 0, "y", 0), + Map.of("x", 1, "y", 0), + Map.of("x", 1, "y", 1), + Map.of("x", 0, "y", 1), + Map.of("x", 0, "y", 0) + )), + List.of(List.of( + Map.of("x", 2, "y", 0), + Map.of("x", 3, "y", 0), + Map.of("x", 3, "y", 1), + Map.of("x", 2, "y", 1), + Map.of("x", 2, "y", 0) + )) + )), + GeoArrow.multipolygon(List.of( + List.of(List.of( + List.of(0, 0), + List.of(1, 0), + List.of(1, 1), + List.of(0, 1), + List.of(0, 0) + )), + List.of(List.of( + List.of(2, 0), + List.of(3, 0), + List.of(3, 1), + List.of(2, 1), + List.of(2, 0) + )) + )) + ); + } + + private static void assertSame(String wkt, Geometry... geometry) throws ParseException { + Geometry expected = GeoUtils.wktReader().read(wkt); + for (int i = 0; i < geometry.length; i++) { + assertEquals(expected, geometry[i], "geometry #" + i); + } + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/GeoParquetMetadataTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/GeoParquetMetadataTest.java new file mode 100644 index 0000000000..ddcf0daa69 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/GeoParquetMetadataTest.java @@ -0,0 +1,447 @@ +package com.onthegomap.planetiler.reader.parquet; + +import static com.onthegomap.planetiler.geo.GeoUtils.createMultiPoint; +import static com.onthegomap.planetiler.geo.GeoUtils.point; +import static org.apache.parquet.filter2.predicate.FilterApi.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.onthegomap.planetiler.config.Bounds; +import com.onthegomap.planetiler.geo.GeometryException; +import com.onthegomap.planetiler.reader.WithTags; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.parquet.filter2.predicate.Filters; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestFactory; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.locationtech.jts.geom.Envelope; +import org.locationtech.jts.io.WKBWriter; +import org.locationtech.jts.io.WKTWriter; + +class GeoParquetMetadataTest { + // https://github.com/opengeospatial/geoparquet/blob/main/examples/example_metadata.json + private static final String EXAMPLE_METADATA = """ + { + "columns": { + "geometry": { + "bbox": [ + -180.0, + -90.0, + 180.0, + 83.6451 + ], + "covering": { + "bbox": { + "xmax": [ + "bbox", + "xmax" + ], + "xmin": [ + "bbox", + "xmin" + ], + "ymax": [ + "bbox", + "ymax" + ], + "ymin": [ + "bbox", + "ymin" + ] + } + }, + "crs": { + "$schema": "https://proj.org/schemas/v0.6/projjson.schema.json", + "area": "World.", + "bbox": { + "east_longitude": 180, + "north_latitude": 90, + "south_latitude": -90, + "west_longitude": -180 + }, + "coordinate_system": { + "axis": [ + { + "abbreviation": "Lon", + "direction": "east", + "name": "Geodetic longitude", + "unit": "degree" + }, + { + "abbreviation": "Lat", + "direction": "north", + "name": "Geodetic latitude", + "unit": "degree" + } + ], + "subtype": "ellipsoidal" + }, + "datum_ensemble": { + "accuracy": "2.0", + "ellipsoid": { + "inverse_flattening": 298.257223563, + "name": "WGS 84", + "semi_major_axis": 6378137 + }, + "id": { + "authority": "EPSG", + "code": 6326 + }, + "members": [ + { + "id": { + "authority": "EPSG", + "code": 1166 + }, + "name": "World Geodetic System 1984 (Transit)" + }, + { + "id": { + "authority": "EPSG", + "code": 1152 + }, + "name": "World Geodetic System 1984 (G730)" + }, + { + "id": { + "authority": "EPSG", + "code": 1153 + }, + "name": "World Geodetic System 1984 (G873)" + }, + { + "id": { + "authority": "EPSG", + "code": 1154 + }, + "name": "World Geodetic System 1984 (G1150)" + }, + { + "id": { + "authority": "EPSG", + "code": 1155 + }, + "name": "World Geodetic System 1984 (G1674)" + }, + { + "id": { + "authority": "EPSG", + "code": 1156 + }, + "name": "World Geodetic System 1984 (G1762)" + }, + { + "id": { + "authority": "EPSG", + "code": 1309 + }, + "name": "World Geodetic System 1984 (G2139)" + } + ], + "name": "World Geodetic System 1984 ensemble" + }, + "id": { + "authority": "OGC", + "code": "CRS84" + }, + "name": "WGS 84 (CRS84)", + "scope": "Not known.", + "type": "GeographicCRS" + }, + "edges": "planar", + "encoding": "WKB", + "geometry_types": [ + "Polygon", + "MultiPolygon" + ] + } + }, + "primary_column": "geometry", + "version": "1.1.0-dev" + } + """; + + @Test + void testParseBasicMetadata() throws IOException { + var parsed = GeoParquetMetadata.parse(new FileMetaData( + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("geometry") + .named("root"), + Map.of("geo", EXAMPLE_METADATA), + "")); + assertEquals("geometry", parsed.primaryColumn()); + assertEquals("1.1.0-dev", parsed.version()); + assertEquals("planar", parsed.primaryColumnMetadata().edges()); + assertEquals("WKB", parsed.primaryColumnMetadata().encoding()); + assertEquals(new Envelope(-180.0, 180.0, -90.0, 83.6451), parsed.primaryColumnMetadata().envelope()); + assertEquals(new GeoParquetMetadata.CoveringBbox( + List.of("bbox", "xmin"), + List.of("bbox", "ymin"), + List.of("bbox", "xmax"), + List.of("bbox", "ymax") + ), parsed.primaryColumnMetadata().covering().bbox()); + assertEquals(List.of("Polygon", "MultiPolygon"), parsed.primaryColumnMetadata().geometryTypes()); + assertTrue(parsed.primaryColumnMetadata().crs() instanceof Map); + } + + @Test + void testFailsWhenNoGeometry() { + var fileMetadata = new FileMetaData( + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("not_geometry") + .named("root"), + Map.of(), + ""); + assertThrows(IOException.class, () -> GeoParquetMetadata.parse(fileMetadata)); + } + + @Test + void testFailsWhenBadGeometryType() { + var fileMetadata = new FileMetaData( + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32) + .named("geometry") + .named("root"), + Map.of(), + ""); + assertThrows(IOException.class, () -> GeoParquetMetadata.parse(fileMetadata)); + } + + @Test + void testInfersDefaultGeometry() throws IOException { + var parsed = GeoParquetMetadata.parse(new FileMetaData( + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("geometry") + .named("root"), + Map.of(), + "")); + assertEquals("geometry", parsed.primaryColumn()); + assertEquals("WKB", parsed.primaryColumnMetadata().encoding()); + assertEquals(Bounds.WORLD.latLon(), parsed.primaryColumnMetadata().envelope()); + assertNull(parsed.primaryColumnMetadata().covering()); + } + + @Test + void testGeometryReaderFromMetadata() throws IOException, GeometryException { + var parsed = GeoParquetMetadata.parse(new FileMetaData( + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("geometry") + .named("root"), + Map.of("geo", EXAMPLE_METADATA), + "")); + assertEquals(point(1, 2), new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of( + "geometry", new WKBWriter().write(point(1, 2)) + )))); + } + + @Test + void testGeometryReaderFromMetadataDifferentName() throws IOException, GeometryException { + var parsed = GeoParquetMetadata.parse(new FileMetaData( + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("other") + .named("root"), + Map.of("geo", """ + { + "primary_column": "other", + "columns": { + "other": { + "encoding": "WKB" + } + } + } + """), + "")); + assertEquals(point(1, 2), new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of( + "other", new WKBWriter().write(point(1, 2)) + )))); + } + + @ParameterizedTest + @ValueSource(strings = {"wkb_geometry", "geometry"}) + void testReadWKBGeometryNoMetadata(String name) throws IOException, GeometryException { + var parsed = GeoParquetMetadata.parse(new FileMetaData( + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named(name) + .named("root"), + Map.of(), + "")); + assertEquals(point(1, 2), new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of( + name, new WKBWriter().write(point(1, 2)) + )))); + } + + @Test + void testReadWKTGeometryNoMetadata() throws IOException, GeometryException { + var parsed = GeoParquetMetadata.parse(new FileMetaData( + Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("wkt_geometry") + .named("root"), + Map.of(), + "")); + assertEquals(point(1, 2), new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of( + "wkt_geometry", new WKTWriter().write(point(1, 2)) + )))); + } + + @TestFactory + void testReadGeoArrowPoint() throws IOException, GeometryException { + var parsed = GeoParquetMetadata.parse(new FileMetaData( + Types.buildMessage().named("root"), + Map.of("geo", """ + { + "primary_column": "geoarrow", + "columns": { + "geoarrow": { + "encoding": "point" + } + } + } + """), + "")); + assertEquals(point(1, 2), new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of( + "geoarrow", Map.of("x", 1, "y", 2) + )))); + } + + @TestFactory + void testReadGeoArrowMultiPoint() throws IOException, GeometryException { + var parsed = GeoParquetMetadata.parse(new FileMetaData( + Types.buildMessage().named("root"), + Map.of("geo", """ + { + "primary_column": "geoarrow", + "columns": { + "geoarrow": { + "encoding": "multipolygon" + } + } + } + """), + "")); + assertEquals(createMultiPoint(List.of(point(1, 2))), + new GeometryReader(parsed).readPrimaryGeometry(WithTags.from(Map.of( + "geoarrow", List.of(Map.of("x", 1, "y", 2)) + )))); + } + + @ParameterizedTest + @CsvSource({ + "bbox, true, DOUBLE", + "bbox, true, FLOAT", + "custom_bbox, true, DOUBLE", + "custom_bbox, true, FLOAT", + "bbox, false, DOUBLE", + "bbox, false, FLOAT", + }) + void testBboxFilterFromMetadata(String bbox, boolean hasMetadata, PrimitiveType.PrimitiveTypeName type) + throws IOException { + var schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("geometry") + .requiredGroup() + .required(type).named("xmin") + .required(type).named("xmax") + .required(type).named("ymin") + .required(type).named("ymax") + .named(bbox) + .named("root"); + var parsed = GeoParquetMetadata.parse(new FileMetaData( + schema, + hasMetadata ? Map.of("geo", EXAMPLE_METADATA.replaceAll("\"bbox\",", "\"" + bbox + "\",")) : Map.of(), + "")); + var expected = type == PrimitiveType.PrimitiveTypeName.FLOAT ? + and( + and(gtEq(floatColumn(bbox + ".xmax"), 1f), ltEq(floatColumn(bbox + ".xmin"), 2f)), + and(gtEq(floatColumn(bbox + ".ymax"), 3f), ltEq(floatColumn(bbox + ".ymin"), 4f)) + ) : + and( + and(gtEq(doubleColumn(bbox + ".xmax"), 1.0), ltEq(doubleColumn(bbox + ".xmin"), 2.0)), + and(gtEq(doubleColumn(bbox + ".ymax"), 3.0), ltEq(doubleColumn(bbox + ".ymin"), 4.0)) + ); + assertEquals(expected, parsed.primaryColumnMetadata().bboxFilter(schema, new Bounds(new Envelope(1, 2, 3, 4)))); + } + + @ParameterizedTest + @CsvSource({ + "bbox, true, DOUBLE", + "bbox, true, FLOAT", + "custom_bbox, true, DOUBLE", + "custom_bbox, true, FLOAT", + "bbox, false, DOUBLE", + "bbox, false, FLOAT", + }) + void testBboxFilterFromMetadataOldGdalStyle(String bbox, boolean hasMetadata, PrimitiveType.PrimitiveTypeName type) + throws IOException { + var schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("geometry") + .required(type).named(bbox + ".xmin") + .required(type).named(bbox + ".xmax") + .required(type).named(bbox + ".ymin") + .required(type).named(bbox + ".ymax") + .named("root"); + var parsed = GeoParquetMetadata.parse(new FileMetaData( + schema, + hasMetadata ? Map.of("geo", """ + { + "primary_column": "geometry", + "columns": { + "geometry": { + "covering": { + "bbox": { + "xmin": ["bbox.xmin"], + "xmax": ["bbox.xmax"], + "ymin": ["bbox.ymin"], + "ymax": ["bbox.ymax"] + } + } + } + } + } + """.replace("bbox.", bbox + ".")) : Map.of(), + "")); + var expected = type == PrimitiveType.PrimitiveTypeName.FLOAT ? + and( + and(gtEq(Filters.floatColumn(List.of(bbox + ".xmax")), 1f), + ltEq(Filters.floatColumn(List.of(bbox + ".xmin")), 2f)), + and(gtEq(Filters.floatColumn(List.of(bbox + ".ymax")), 3f), + ltEq(Filters.floatColumn(List.of(bbox + ".ymin")), 4f)) + ) : + and( + and(gtEq(Filters.doubleColumn(List.of(bbox + ".xmax")), 1.0), + ltEq(Filters.doubleColumn(List.of(bbox + ".xmin")), 2.0)), + and(gtEq(Filters.doubleColumn(List.of(bbox + ".ymax")), 3.0), + ltEq(Filters.doubleColumn(List.of(bbox + ".ymin")), 4.0)) + ); + assertEquals(expected, parsed.primaryColumnMetadata().bboxFilter(schema, new Bounds(new Envelope(1, 2, 3, 4)))); + } + + @Test + void testNoBboxFilterFromDefault() throws IOException { + var schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named("geometry") + .named("root"); + var parsed = GeoParquetMetadata.parse(new FileMetaData( + schema, + Map.of(), + "")); + assertNull(parsed.primaryColumnMetadata().bboxFilter(schema, new Bounds(new Envelope(1, 2, 3, 4)))); + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetConverterTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetConverterTest.java new file mode 100644 index 0000000000..25a6840031 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetConverterTest.java @@ -0,0 +1,536 @@ +package com.onthegomap.planetiler.reader.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.google.common.collect.Lists; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.Period; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class ParquetConverterTest { + @Test + void testIntPrimitive() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + converter -> converter.addInt(1), + 1 + ); + } + + @ParameterizedTest + @CsvSource({ + "32, true, 100, 100", + "32, true, 2147483647, 2147483647", + "32, true, -2147483648, -2147483648", + "32, false, 100, 100", + "16, true, 100, 100", + "8, true, 256, 256", + }) + void testIntPrimitiveWithAnnotation(int bitWidth, boolean isSigned, int input, int expected) { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + LogicalTypeAnnotation.intType(bitWidth, isSigned), + converter -> converter.addInt(input), + expected + ); + } + + @Test + void testLongPrimitive() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + converter -> converter.addLong(1), + 1L + ); + } + + @ParameterizedTest + @CsvSource({ + "64, true, 9223372036854775807, 9223372036854775807", + "64, false, 9223372036854775807, 9223372036854775807", + "64, true, -9223372036854775808, -9223372036854775808", + "64, true, 1, 1", + }) + void testLongPrimitiveWithAnnotation(int bitWidth, boolean isSigned, long input, long expected) { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.intType(bitWidth, isSigned), + converter -> converter.addLong(input), + expected + ); + } + + @ParameterizedTest + @CsvSource({ + "0, 1, 10, 10", + "1, 9, 10, 1", + "2, 9, 10, 0.1", + }) + void testIntDecimal(int scale, int precision, int value, double expected) { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + LogicalTypeAnnotation.decimalType(scale, precision), + converter -> converter.addInt(value), + expected + ); + } + + @ParameterizedTest + @CsvSource({ + "0, 1, 10, 10", + "1, 18, 10, 1", + "2, 18, 10, 0.1", + }) + void testLongDecimal(int scale, int precision, long value, double expected) { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.decimalType(scale, precision), + converter -> converter.addLong(value), + expected + ); + } + + @Test + void testBooleanPrimitive() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.BOOLEAN, + converter -> converter.addBoolean(true), + true + ); + } + + @Test + void testFloatPrimitive() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.FLOAT, + converter -> converter.addFloat(1f), + 1.0 + ); + } + + @Test + void testDoublePrimitive() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.DOUBLE, + converter -> converter.addDouble(1.5), + 1.5 + ); + } + + @Test + void testInt96Timestamp() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.INT96, + converter -> converter.addBinary(Binary.fromConstantByteArray(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})), + Instant.parse("-4713-11-24T00:00:00Z") + ); + } + + @Test + void testDate() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + LogicalTypeAnnotation.dateType(), + converter -> converter.addInt(2), + LocalDate.of(1970, 1, 3) + ); + } + + @Test + void testTime() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS), + converter -> converter.addInt(61_000), + LocalTime.of(0, 1, 1) + ); + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS), + converter -> converter.addLong(61_000_000), + LocalTime.of(0, 1, 1) + ); + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.NANOS), + converter -> converter.addLong(61_000_000_000L), + LocalTime.of(0, 1, 1) + ); + } + + @ParameterizedTest + @CsvSource({ + "true, MILLIS, 61000, 1970-01-01T00:01:01Z", + "true, MICROS, 61000000, 1970-01-01T00:01:01Z", + "true, NANOS, 61000000000, 1970-01-01T00:01:01Z", + }) + void testTimestamp(boolean utc, LogicalTypeAnnotation.TimeUnit unit, long input, String output) { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timestampType(utc, unit), + converter -> converter.addLong(input), + Instant.parse(output) + ); + } + + @Test + void testString() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.stringType(), + converter -> converter.addBinary(Binary.fromString("abcdef")), + "abcdef" + ); + } + + @Test + void testEnum() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.enumType(), + converter -> converter.addBinary(Binary.fromString("value")), + "value" + ); + } + + @Test + void testJson() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.jsonType(), + converter -> converter.addBinary(Binary.fromString("[1,2,3]")), + "[1,2,3]" + ); + } + + @Test + void testUUID() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + LogicalTypeAnnotation.uuidType(), + 16, + converter -> converter + .addBinary(Binary.fromConstantByteArray( + new byte[]{0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, (byte) 0x88, (byte) 0x99, (byte) 0xaa, (byte) 0xbb, + (byte) 0xcc, (byte) 0xdd, (byte) 0xee, (byte) 0xff})), + UUID.fromString("00112233-4455-6677-8899-aabbccddeeff") + ); + } + + @Test + void testInterval() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(), + 12, + converter -> converter.addBinary(Binary.fromConstantByteBuffer(ByteBuffer.allocate(12) + .order(ByteOrder.LITTLE_ENDIAN) + .putInt(1) + .putInt(2) + .putInt(3) + .flip())), + new Interval( + Period.ofMonths(1).plusDays(2), + Duration.ofMillis(3) + ) + ); + } + + @Test + void testOptionalMissing() { + var materializer = new ParquetRecordConverter(Types.buildMessage() + .optional(PrimitiveType.PrimitiveTypeName.INT32).named("value") + .named("message")); + var rootConverter = materializer.getRootConverter(); + rootConverter.start(); + rootConverter.end(); + assertEquals(Map.of(), materializer.getCurrentRecord()); + } + + @Test + void testListFromSimpleRepeatedElement() { + var materializer = new ParquetRecordConverter(Types.buildMessage() + .repeated(PrimitiveType.PrimitiveTypeName.INT32).named("value") + .named("message")); + + + var rootConverter = materializer.getRootConverter(); + rootConverter.start(); + rootConverter.end(); + assertEquals(Map.of(), materializer.getCurrentRecord()); + + rootConverter.start(); + rootConverter.getConverter(0).asPrimitiveConverter().addInt(1); + rootConverter.end(); + assertEquals(Map.of("value", List.of(1)), materializer.getCurrentRecord()); + + rootConverter.start(); + rootConverter.getConverter(0).asPrimitiveConverter().addInt(1); + rootConverter.getConverter(0).asPrimitiveConverter().addInt(2); + rootConverter.end(); + assertEquals(Map.of("value", List.of(1, 2)), materializer.getCurrentRecord()); + } + + @Test + void testListFromListElementStructs() { + var materializer = new ParquetRecordConverter(Types.buildMessage() + .requiredList().optionalElement(PrimitiveType.PrimitiveTypeName.INT32).named("value") + .named("message")); + + var root = materializer.getRootConverter(); + var value = root.getConverter(0).asGroupConverter(); + var list = value.getConverter(0).asGroupConverter(); + var element = list.getConverter(0).asPrimitiveConverter(); + root.start(); + value.start(); + value.end(); + root.end(); + assertEquals(Map.of("value", List.of()), materializer.getCurrentRecord()); + + root.start(); + value.start(); + list.start(); + element.addInt(1); + list.end(); + list.start(); + list.end(); + list.start(); + element.addInt(3); + list.end(); + value.end(); + root.end(); + assertEquals(Map.of("value", Lists.newArrayList(1, null, 3)), materializer.getCurrentRecord()); + } + + @Test + void testListRepeatedAtTopAndBottomLevel() { + var materializer = new ParquetRecordConverter(Types.buildMessage() + .list(Type.Repetition.REPEATED).element(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REPEATED) + .named("value") + .named("message")); + + var root = materializer.getRootConverter(); + var value = root.getConverter(0).asGroupConverter(); + var list = value.getConverter(0).asGroupConverter(); + var element = list.getConverter(0).asPrimitiveConverter(); + root.start(); + value.start(); + value.end(); + value.start(); + list.start(); + element.addInt(1); + element.addInt(2); + list.end(); + list.start(); + element.addInt(3); + list.end(); + value.end(); + root.end(); + assertEquals(Map.of("value", List.of(List.of(), List.of(List.of(1, 2), List.of(3)))), + materializer.getCurrentRecord()); + } + + @Test + void testNestedList() { + var materializer = new ParquetRecordConverter(Types.buildMessage() + .optionalList() + .optionalListElement() + .optionalElement(PrimitiveType.PrimitiveTypeName.INT32) + .named("value") + .named("root")); + + //message root { + // optional group value (LIST) { + // repeated group list { + // optional group element (LIST) { + // repeated group list { + // optional int32 element; + // } + // } + // } + // } + //} + + var root = materializer.getRootConverter(); + var value = root.getConverter(0).asGroupConverter(); + var outerList = value.getConverter(0).asGroupConverter(); + var outerElement = outerList.getConverter(0).asGroupConverter(); + var innerList = outerElement.getConverter(0).asGroupConverter(); + var innerElement = innerList.getConverter(0).asPrimitiveConverter(); + root.start(); + root.end(); + assertEquals(Map.of(), materializer.getCurrentRecord()); + + root.start(); + value.start(); + value.end(); + root.end(); + assertEquals(Map.of("value", List.of()), materializer.getCurrentRecord()); + + root.start(); + value.start(); + outerList.start(); + outerList.end(); + + outerList.start(); + outerElement.start(); + + innerList.start(); + innerElement.addInt(1); + innerList.end(); + + innerList.start(); + innerList.end(); + + innerList.start(); + innerElement.addInt(2); + innerList.end(); + + outerElement.end(); + outerList.end(); + value.end(); + root.end(); + + assertEquals(Map.of( + "value", Lists.newArrayList(null, Lists.newArrayList(1, null, 2)) + ), materializer.getCurrentRecord()); + } + + @Test + void testMapConverter() { + var materializer = new ParquetRecordConverter(Types.buildMessage() + .optionalMap() + .key(PrimitiveType.PrimitiveTypeName.INT32) + .optionalValue(PrimitiveType.PrimitiveTypeName.INT64) + .named("value") + .named("root")); + + //message root { + // optional group value (MAP) { + // repeated group key_value { + // required int32 key; + // optional int64 value; + // } + // } + //} + + var root = materializer.getRootConverter(); + var map = root.getConverter(0).asGroupConverter(); + var keyValue = map.getConverter(0).asGroupConverter(); + var key = keyValue.getConverter(0).asPrimitiveConverter(); + var value = keyValue.getConverter(1).asPrimitiveConverter(); + + root.start(); + root.end(); + assertEquals(Map.of(), materializer.getCurrentRecord()); + + root.start(); + map.start(); + map.end(); + root.end(); + assertEquals(Map.of("value", Map.of()), materializer.getCurrentRecord()); + + root.start(); + map.start(); + keyValue.start(); + key.addInt(1); + keyValue.end(); + map.end(); + root.end(); + assertEquals(Map.of("value", Map.of()), materializer.getCurrentRecord()); + + root.start(); + map.start(); + keyValue.start(); + key.addInt(1); + value.addLong(2); + keyValue.end(); + map.end(); + root.end(); + assertEquals(Map.of("value", Map.of(1, 2L)), materializer.getCurrentRecord()); + + root.start(); + map.start(); + keyValue.start(); + key.addInt(1); + value.addLong(2); + keyValue.end(); + keyValue.start(); + key.addInt(3); + value.addLong(4); + keyValue.end(); + map.end(); + root.end(); + assertEquals(Map.of("value", Map.of(1, 2L, 3, 4L)), materializer.getCurrentRecord()); + } + + @Test + void testRepeatedMap() { + var materializer = new ParquetRecordConverter(Types.buildMessage() + .map(Type.Repetition.REPEATED) + .key(PrimitiveType.PrimitiveTypeName.INT32) + .optionalValue(PrimitiveType.PrimitiveTypeName.INT64) + .named("value") + .named("root")); + + var root = materializer.getRootConverter(); + var map = root.getConverter(0).asGroupConverter(); + var keyValue = map.getConverter(0).asGroupConverter(); + var key = keyValue.getConverter(0).asPrimitiveConverter(); + + root.start(); + map.start(); + keyValue.start(); + key.addInt(1); + keyValue.end(); + map.end(); + root.end(); + assertEquals(Map.of("value", List.of(Map.of())), materializer.getCurrentRecord()); + } + + private void testPrimitive(PrimitiveType.PrimitiveTypeName type, Consumer consumer, + Object expected) { + var materializer = new ParquetRecordConverter(Types.buildMessage() + .required(type).named("value") + .named("message")); + var rootConverter = materializer.getRootConverter(); + rootConverter.start(); + consumer.accept(rootConverter.getConverter(0).asPrimitiveConverter()); + rootConverter.end(); + assertEquals(Map.of("value", expected), materializer.getCurrentRecord()); + } + + private void testAnnotatedPrimitive(PrimitiveType.PrimitiveTypeName type, LogicalTypeAnnotation annotation, + Consumer consumer, Object expected) { + testAnnotatedPrimitive(type, annotation, 0, consumer, expected); + } + + private void testAnnotatedPrimitive(PrimitiveType.PrimitiveTypeName type, LogicalTypeAnnotation annotation, + int length, Consumer consumer, Object expected) { + var materializer = new ParquetRecordConverter(Types.buildMessage() + .required(type).as(annotation).length(length).named("value") + .named("message")); + var rootConverter = materializer.getRootConverter(); + rootConverter.start(); + consumer.accept(rootConverter.getConverter(0).asPrimitiveConverter()); + rootConverter.end(); + assertEquals(Map.of("value", expected), materializer.getCurrentRecord()); + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFileTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFileTest.java new file mode 100644 index 0000000000..994b1dc42b --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFileTest.java @@ -0,0 +1,188 @@ +package com.onthegomap.planetiler.reader.parquet; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.DynamicTest.dynamicTest; + +import com.onthegomap.planetiler.TestUtils; +import com.onthegomap.planetiler.config.Bounds; +import com.onthegomap.planetiler.util.Glob; +import java.nio.file.Path; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.TestFactory; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.locationtech.jts.geom.Envelope; + +class ParquetInputFileTest { + + static List bostons() { + return Glob.of(TestUtils.pathToResource("parquet")).resolve("boston*.parquet").find(); + } + + @ParameterizedTest + @MethodSource("bostons") + void testReadBoston(Path path) { + for (int i = 0; i < 3; i++) { + Set ids = new HashSet<>(); + for (var block : new ParquetInputFile("parquet", "layer", path) + .get()) { + for (var item : block) { + ids.add(item.getString("id")); + } + } + assertEquals(3, ids.size(), "iter " + i); + } + } + + @ParameterizedTest + @MethodSource("bostons") + void testReadBostonWithBboxFilterCovering(Path path) { + Set ids = new HashSet<>(); + for (var block : new ParquetInputFile("parquet", "layer", path, null, + new Bounds(new Envelope(-71.0747653629, -71.0741656634, 42.3560968301, 42.3564346282)), null, null) + .get()) { + for (var item : block) { + ids.add(item.getString("id")); + } + } + assertEquals(3, ids.size()); + } + + @ParameterizedTest + @MethodSource("bostons") + void testReadBostonWithBboxFilterCoveringAndOtherFilter(Path path) { + Set ids = new HashSet<>(); + for (var block : new ParquetInputFile("parquet", "layer", path, FilterApi.gt(FilterApi.doubleColumn("height"), 3d), + new Bounds(new Envelope(-71.0747653629, -71.0741656634, 42.3560968301, 42.3564346282)), null, null) + .get()) { + for (var item : block) { + ids.add(item.getString("id")); + } + } + assertEquals(1, ids.size()); + } + + @ParameterizedTest + @MethodSource("bostons") + void testReadBostonWithBboxFilterNotCovering(Path path) { + Set ids = new HashSet<>(); + for (var block : new ParquetInputFile("parquet", "layer", path, null, + new Bounds(new Envelope(-72.0747653629, -72.0741656634, 42.3560968301, 42.3564346282)), null, null) + .get()) { + for (var item : block) { + ids.add(item.getString("id")); + } + } + assertEquals(0, ids.size()); + } + + @TestFactory + @SuppressWarnings("java:S5961") + List testReadAllDataTypes() { + + /* + ┌──────────────────────┬────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐ + │ column_name │ column_type │ null │ key │ default │ extra │ + │ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │ + ├──────────────────────┼────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤ + │ geometry │ BLOB │ YES │ │ │ │ ST_AsWKB(ST_Point(1, 2)) + │ bigint │ BIGINT │ YES │ │ │ │ 9223372036854775807 + │ blob │ BLOB │ YES │ │ │ │ '1011' + │ boolean │ BOOLEAN │ YES │ │ │ │ true + │ date │ DATE │ YES │ │ │ │ '2000-01-01' + │ decimal │ DECIMAL(18,3) │ YES │ │ │ │ 123456.789 + │ double │ DOUBLE │ YES │ │ │ │ 123456.789 + │ hugeint │ HUGEINT │ YES │ │ │ │ 92233720368547758079223372036854775807 + │ integer │ INTEGER │ YES │ │ │ │ 123 + │ interval │ INTERVAL │ YES │ │ │ │ INTERVAL 1 MONTH + INTERVAL 1 DAY + INTERVAL 1 SECOND + │ real │ FLOAT │ YES │ │ │ │ 123.456 + │ smallint │ SMALLINT │ YES │ │ │ │ 1234 + │ time │ TIME │ YES │ │ │ │ 2000-01-01 05:30:10.123 + │ timestamp_with_tim… │ TIMESTAMP WITH TIME ZONE │ YES │ │ │ │ 2000-01-01 05:30:10.123 EST + │ timestamp │ TIMESTAMP │ YES │ │ │ │ 2000-01-01 05:30:10.123456 EST + │ tinyint │ TINYINT │ YES │ │ │ │ 123 + │ ubigint │ UBIGINT │ YES │ │ │ │ 9223372036854775807 + │ uhugeint │ UHUGEINT │ YES │ │ │ │ 92233720368547758079223372036854775807 + │ uinteger │ UINTEGER │ YES │ │ │ │ 123 + │ usmallint │ USMALLINT │ YES │ │ │ │ 123 + │ utinyint │ UTINYINT │ YES │ │ │ │ 123 + │ uuid │ UUID │ YES │ │ │ │ 606362d9-012a-4949-b91a-1ab439951671 + │ varchar │ VARCHAR │ YES │ │ │ │ "string" + │ list │ INTEGER[] │ YES │ │ │ │ [1,2,3,4] + │ map │ MAP(INTEGER, VARCHAR) │ YES │ │ │ │ map([1,2,3],['one','two','three']) + │ array │ INTEGER[3] │ YES │ │ │ │ [1,2,3] + │ struct │ STRUCT(i INTEGER, j VARCHAR) │ YES │ │ │ │ {'i': 42, 'j': 'a'} + │ complex │ MAP(VARCHAR, STRUCT(i INTEGE… │ YES │ │ │ │ [MAP(['a', 'b'], [[], [{'i': 43, 'j': 'a'}, {'i': 43, 'j': 'b'}]])]; + ├──────────────────────┴────────────────────────────────┴─────────┴─────────┴─────────┴─────────┤ + │ 29 rows 6 columns │ + └───────────────────────────────────────────────────────────────────────────────────────────────┘ + + */ + Map map = null; + int i = 0; + for (var block : new ParquetInputFile("parquet", "layer", + TestUtils.pathToResource("parquet").resolve("all_data_types.parquet")) + .get()) { + for (var item : block) { + map = item.tags(); + assertEquals(0, i++); + } + } + assertNotNull(map); + return List.of( + testEquals(map, "bigint", 9223372036854775807L), + test(map, "blob", v -> assertArrayEquals("1011".getBytes(), (byte[]) v)), + testEquals(map, "boolean", true), + testEquals(map, "date", LocalDate.of(2000, 1, 1)), + testEquals(map, "decimal", 123456.789), + testEquals(map, "double", 123456.789), + testEquals(map, "hugeint", 92233720368547758079223372036854775807.0), + testEquals(map, "integer", 123), + testEquals(map, "interval", Interval.of(1, 2, 3_000)), + test(map, "real", v -> assertEquals(123.456, (double) v, 1e-3)), + testEquals(map, "smallint", 1234), + testEquals(map, "time", LocalTime.parse("05:30:10.123")), + testEquals(map, "timestamp_with_timezone", Instant.parse("2000-01-01T10:30:10.123Z")), + testEquals(map, "timestamp", Instant.parse("2000-01-01T10:30:10.123Z")), + testEquals(map, "tinyint", 123), + testEquals(map, "ubigint", 9223372036854775807L), + testEquals(map, "uhugeint", 92233720368547758079223372036854775807.0), + testEquals(map, "uinteger", 123), + testEquals(map, "usmallint", 123), + testEquals(map, "utinyint", 123), + testEquals(map, "uuid", UUID.fromString("606362d9-012a-4949-b91a-1ab439951671")), + testEquals(map, "varchar", "string"), + testEquals(map, "list_of_items", List.of(1, 2, 3, 4)), + testEquals(map, "map", Map.of(1, "one", 2, "two", 3, "three")), + testEquals(map, "array", List.of(1, 2, 3)), + testEquals(map, "struct", Map.of("i", 42, "j", "a")), + testEquals(map, "complex", List.of(Map.of( + "a", List.of(), + "b", List.of( + Map.of("i", 43, "j", "a"), + Map.of("i", 43, "j", "b") + ) + ))) + ); + } + + private static DynamicTest testEquals(Map map, String key, Object expected) { + return test(map, key, v -> assertEquals(expected, map.get(key))); + } + + private static DynamicTest test(Map map, String key, Consumer test) { + return dynamicTest(key, () -> test.accept(map.get(key))); + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetReaderTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetReaderTest.java new file mode 100644 index 0000000000..b01238c2da --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetReaderTest.java @@ -0,0 +1,84 @@ +package com.onthegomap.planetiler.reader.parquet; + +import static com.onthegomap.planetiler.TestUtils.newPoint; +import static com.onthegomap.planetiler.TestUtils.round; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import com.onthegomap.planetiler.FeatureCollector; +import com.onthegomap.planetiler.Profile; +import com.onthegomap.planetiler.TestUtils; +import com.onthegomap.planetiler.collection.FeatureGroup; +import com.onthegomap.planetiler.config.PlanetilerConfig; +import com.onthegomap.planetiler.geo.GeoUtils; +import com.onthegomap.planetiler.geo.GeometryException; +import com.onthegomap.planetiler.geo.TileOrder; +import com.onthegomap.planetiler.reader.SourceFeature; +import com.onthegomap.planetiler.stats.Stats; +import com.onthegomap.planetiler.util.Glob; +import com.onthegomap.planetiler.util.Parse; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.locationtech.jts.geom.Geometry; + +class ParquetReaderTest { + private final PlanetilerConfig config = PlanetilerConfig.defaults(); + private final Stats stats = Stats.inMemory(); + + static List bostons() { + return Glob.of(TestUtils.pathToResource("parquet")).resolve("boston*.parquet").find(); + } + + @ParameterizedTest + @MethodSource("bostons") + @Timeout(30) + void testReadOvertureParquet(Path path) { + List ids = new CopyOnWriteArrayList<>(); + List geoms = new CopyOnWriteArrayList<>(); + + var profile = new Profile.NullProfile() { + volatile double height = 0; + + @Override + public synchronized void processFeature(SourceFeature sourceFeature, FeatureCollector features) { + try { + ids.add(sourceFeature.getString("id")); + height += Parse.parseDoubleOrNull(sourceFeature.getTag("height")) instanceof Double d ? d : 0; + geoms.add(sourceFeature.latLonGeometry()); + } catch (GeometryException e) { + throw new RuntimeException(e); + } + } + }; + var reader = new ParquetReader("source", profile, stats); + reader.process(List.of(path), + FeatureGroup.newInMemoryFeatureGroup(TileOrder.TMS, profile, config, stats), PlanetilerConfig.defaults()); + assertEquals(List.of( + "08b2a306638a0fff02001c5b97636c80", + "08b2a306638a0fff0200a75c80c3d54b", + "08b2a306638a0fff0200d1814977faca" + ), ids.stream().sorted().toList()); + var center = GeoUtils.combine(geoms.toArray(Geometry[]::new)).getCentroid(); + assertEquals(newPoint(-71.07448, 42.35626), round(center)); + assertEquals(4.7, profile.height); + } + + + @Test + void testHivePartitionFields() { + assertNull(ParquetReader.getHivePartitionFields(Path.of(""))); + assertNull(ParquetReader.getHivePartitionFields(Path.of("a"))); + assertNull(ParquetReader.getHivePartitionFields(Path.of("a", "b"))); + assertEquals(Map.of("c", "d"), ParquetReader.getHivePartitionFields(Path.of("a", "b", "c=d"))); + assertEquals(Map.of("c", "d", "e", "f"), + ParquetReader.getHivePartitionFields(Path.of("a", "b", "c=d", "e=f"))); + assertEquals(Map.of("a", "b", "c", "d", "e", "f"), + ParquetReader.getHivePartitionFields(Path.of("a=b", "b", "c=d", "e=f"))); + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java index 2f480cbdec..9d08d2ba36 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java @@ -119,6 +119,13 @@ void testWalkPathWithPatternDirectory() throws IOException { txtFiles.stream().sorted().toList(), matchingPaths.stream().sorted().toList() ); + + matchingPaths = Glob.of(parent).resolve("*.txt").find(); + + assertEquals( + txtFiles.stream().sorted().toList(), + matchingPaths.stream().sorted().toList() + ); } @Test @@ -140,6 +147,9 @@ void testWalkPathWithPatternDirectoryZip() throws IOException { // Otherwise, the files inside the zip should be returned. assertEquals(List.of(zipFile.resolve("inner.txt")), FileUtils.walkPathWithPattern(parent, "*.zip", mockWalkZipFile)); + + + assertEquals(List.of(zipFile), Glob.of(parent).resolve("*.zip").find()); } @Test @@ -148,6 +158,12 @@ void testWalkPathWithPatternSingleZip() { var matchingPaths = FileUtils.walkPathWithPattern(zipPath, "stations.sh[px]"); + assertEquals( + List.of("/shapefile/stations.shp", "/shapefile/stations.shx"), + matchingPaths.stream().map(Path::toString).sorted().toList()); + + matchingPaths = Glob.of(zipPath).resolve("stations.sh[px]").find(); + assertEquals( List.of("/shapefile/stations.shp", "/shapefile/stations.shx"), matchingPaths.stream().map(Path::toString).sorted().toList()); diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/GlobTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/GlobTest.java new file mode 100644 index 0000000000..c7f7034d4e --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/GlobTest.java @@ -0,0 +1,66 @@ +package com.onthegomap.planetiler.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.nio.file.FileSystems; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class GlobTest { + @TempDir + Path tmpDir; + + @ParameterizedTest + @CsvSource(value = { + "a/b/c; a/b/c;", + "a/b/*; a/b; *", + "a/*/b; a; */b", + "*/b/*; ; */b/*", + "/*/test; /; */test", + "a/b={c,d}/other; a; b={c,d}/other", + "./a/b=?/other; ./a; b=?/other", + }, delimiter = ';') + void testParsePathWithPattern(String input, String base, String pattern) { + var separator = FileSystems.getDefault().getSeparator(); + input = input.replace("/", separator); + base = base == null ? "" : base.replace("/", separator); + assertEquals( + new Glob(Path.of(base), pattern), + Glob.parse(input) + ); + } + + @Test + void testWalkPathWithPattern() throws IOException { + var path = tmpDir.resolve("a").resolve("b").resolve("c.txt"); + FileUtils.createParentDirectories(path); + Files.writeString(path, "test"); + assertEquals(List.of(path), Glob.of(tmpDir).resolve("a", "*", "c.txt").find()); + assertEquals(List.of(path), Glob.of(tmpDir).resolve("*", "*", "c.txt").find()); + assertEquals(List.of(path), Glob.of(tmpDir).resolve("a", "b", "c.txt").find()); + } + + @Test + void testResolve() { + var base = Glob.of(Path.of("a", "b")); + assertEquals(new Glob(Path.of("a", "b", "c"), null), base.resolve("c")); + assertEquals(new Glob(Path.of("a", "b", "c", "d"), null), base.resolve("c", "d")); + assertEquals(new Glob(Path.of("a", "b"), "*/d"), base.resolve("*", "d")); + assertEquals(new Glob(tmpDir, "*/*/c.txt"), + Glob.of(tmpDir).resolve("*", "*", "c.txt")); + } + + @Test + void testParseAbsoluteString() { + var base = Glob.of(Path.of("a", "b")).resolve("*", "d"); + var separator = base.base().getFileSystem().getSeparator(); + assertEquals(new Glob(base.base().toAbsolutePath(), base.pattern()), + Glob.parse(base.base().toAbsolutePath() + separator + base.pattern())); + } +} diff --git a/planetiler-core/src/test/resources/log4j2-test.properties b/planetiler-core/src/test/resources/log4j2-test.properties index 3140f703c0..7e7dcb0b5b 100644 --- a/planetiler-core/src/test/resources/log4j2-test.properties +++ b/planetiler-core/src/test/resources/log4j2-test.properties @@ -7,3 +7,7 @@ packages=com.onthegomap.planetiler.util.log4j rootLogger.level=warn rootLogger.appenderRefs=stdout rootLogger.appenderRef.stdout.ref=STDOUT + +# suppress warning about unreadable duckdb statistics +logger.apachecorrupt.name=org.apache.parquet.CorruptStatistics +logger.apachecorrupt.level=error diff --git a/planetiler-core/src/test/resources/parquet/all_data_types.parquet b/planetiler-core/src/test/resources/parquet/all_data_types.parquet new file mode 100644 index 0000000000..ac6776b607 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/all_data_types.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.customgeometryname.parquet b/planetiler-core/src/test/resources/parquet/boston.customgeometryname.parquet new file mode 100644 index 0000000000..f6dbabb4a1 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.customgeometryname.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_new.parquet b/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_new.parquet new file mode 100644 index 0000000000..e3ed0ef6e8 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_new.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_old.parquet b/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_old.parquet new file mode 100644 index 0000000000..3dd2e76045 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_old.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.gzip.parquet b/planetiler-core/src/test/resources/parquet/boston.gzip.parquet new file mode 100644 index 0000000000..9ca613914c Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.gzip.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.lz4hadoop.parquet b/planetiler-core/src/test/resources/parquet/boston.lz4hadoop.parquet new file mode 100644 index 0000000000..31a81492c3 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.lz4hadoop.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.lz4raw.parquet b/planetiler-core/src/test/resources/parquet/boston.lz4raw.parquet new file mode 100644 index 0000000000..b3ba777df8 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.lz4raw.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.none.parquet b/planetiler-core/src/test/resources/parquet/boston.none.parquet new file mode 100644 index 0000000000..17fc8cb7af Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.none.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.parquet b/planetiler-core/src/test/resources/parquet/boston.parquet new file mode 100644 index 0000000000..297da5e9dc Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.snappy.parquet b/planetiler-core/src/test/resources/parquet/boston.snappy.parquet new file mode 100644 index 0000000000..5c230ab5f0 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.snappy.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.zstd.parquet b/planetiler-core/src/test/resources/parquet/boston.zstd.parquet new file mode 100644 index 0000000000..c90b5924c3 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.zstd.parquet differ diff --git a/planetiler-dist/src/main/java/com/onthegomap/planetiler/Main.java b/planetiler-dist/src/main/java/com/onthegomap/planetiler/Main.java index becc425ba0..5b8eee3241 100644 --- a/planetiler-dist/src/main/java/com/onthegomap/planetiler/Main.java +++ b/planetiler-dist/src/main/java/com/onthegomap/planetiler/Main.java @@ -10,6 +10,7 @@ import com.onthegomap.planetiler.examples.OsmQaTiles; import com.onthegomap.planetiler.examples.ToiletsOverlay; import com.onthegomap.planetiler.examples.ToiletsOverlayLowLevelApi; +import com.onthegomap.planetiler.examples.overture.OvertureBasemap; import com.onthegomap.planetiler.mbtiles.Verify; import com.onthegomap.planetiler.util.CompareArchives; import com.onthegomap.planetiler.util.TileSizeStats; @@ -54,6 +55,8 @@ public class Main { entry("example-bikeroutes", BikeRouteOverlay::main), entry("example-toilets", ToiletsOverlay::main), entry("example-toilets-lowlevel", ToiletsOverlayLowLevelApi::main), + entry("example-overture", OvertureBasemap::main), + entry("overture", OvertureBasemap::main), entry("example-qa", OsmQaTiles::main), entry("osm-qa", OsmQaTiles::main), diff --git a/planetiler-examples/src/main/java/com/onthegomap/planetiler/examples/overture/OvertureBasemap.java b/planetiler-examples/src/main/java/com/onthegomap/planetiler/examples/overture/OvertureBasemap.java new file mode 100644 index 0000000000..3723c5a1be --- /dev/null +++ b/planetiler-examples/src/main/java/com/onthegomap/planetiler/examples/overture/OvertureBasemap.java @@ -0,0 +1,66 @@ +package com.onthegomap.planetiler.examples.overture; + +import com.onthegomap.planetiler.FeatureCollector; +import com.onthegomap.planetiler.Planetiler; +import com.onthegomap.planetiler.Profile; +import com.onthegomap.planetiler.config.Arguments; +import com.onthegomap.planetiler.reader.SourceFeature; +import com.onthegomap.planetiler.util.Glob; +import java.nio.file.Path; + +/** + * Example basemap using Overture Maps data. + */ +public class OvertureBasemap implements Profile { + + @Override + public void processFeature(SourceFeature source, FeatureCollector features) { + String layer = source.getSourceLayer(); + switch (layer) { + case "building" -> features.polygon("building") + .setMinZoom(13) + .inheritAttrFromSource("height") + .inheritAttrFromSource("roof_color"); + case null, default -> { + // ignore for now + } + } + } + + @Override + public String name() { + return "Overture"; + } + + @Override + public String description() { + return "A basemap generated from Overture data"; + } + + @Override + public String attribution() { + return """ + © OpenStreetMap + © Overture Maps Foundation + """ + .replace("\n", " ") + .trim(); + } + + public static void main(String[] args) throws Exception { + run(Arguments.fromArgsOrConfigFile(args)); + } + + static void run(Arguments args) throws Exception { + Path base = args.inputFile("base", "overture base directory", Path.of("data", "overture")); + Planetiler.create(args) + .setProfile(new OvertureBasemap()) + .addParquetSource("overture-buildings", + Glob.of(base).resolve("*", "type=building", "*.parquet").find(), + true, // hive-partitioning + fields -> fields.get("id"), // hash the ID field to generate unique long IDs + fields -> fields.get("type")) // extract "type={}" from the filename to get layer + .overwriteOutput(Path.of("data", "overture.pmtiles")) + .run(); + } +}