diff --git a/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/benchmarks/BenchmarkParquetRead.java b/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/benchmarks/BenchmarkParquetRead.java new file mode 100644 index 0000000000..67fd05c571 --- /dev/null +++ b/planetiler-benchmarks/src/main/java/com/onthegomap/planetiler/benchmarks/BenchmarkParquetRead.java @@ -0,0 +1,28 @@ +package com.onthegomap.planetiler.benchmarks; + +import com.onthegomap.planetiler.config.Arguments; +import com.onthegomap.planetiler.config.Bounds; +import com.onthegomap.planetiler.reader.parquet.ParquetInputFile; +import java.nio.file.Path; + +public class BenchmarkParquetRead { + + public static void main(String[] args) { + var arguments = Arguments.fromArgs(args); + var path = + arguments.inputFile("parquet", "parquet file to read", Path.of("data", "sources", "locality.zstd.parquet")); + long c = 0; + var file = new ParquetInputFile("parquet", "locality", path, null, Bounds.WORLD, null, tags -> tags.get("id")); + for (int i = 0; i < 20; i++) { + long start = System.currentTimeMillis(); + for (var block : file.get()) { + for (var item : block) { + c += item.tags().size(); + } + } + System.err.println(System.currentTimeMillis() - start); + } + + System.err.println(c); + } +} diff --git a/planetiler-core/pom.xml b/planetiler-core/pom.xml index cd5514da67..fb69511d09 100644 --- a/planetiler-core/pom.xml +++ b/planetiler-core/pom.xml @@ -154,12 +154,24 @@ geopackage ${geopackage.version} - - org.slf4j - slf4j-nop - - + + org.slf4j + slf4j-nop + + + + + org.xerial.snappy + snappy-java + 1.1.10.5 + + + blue.strategic.parquet + parquet-floor + 1.41 + + diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java index 7ad554095c..daa5490a68 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/Planetiler.java @@ -13,9 +13,11 @@ import com.onthegomap.planetiler.reader.GeoPackageReader; import com.onthegomap.planetiler.reader.NaturalEarthReader; import com.onthegomap.planetiler.reader.ShapefileReader; +import com.onthegomap.planetiler.reader.SourceFeature; import com.onthegomap.planetiler.reader.osm.OsmInputFile; import com.onthegomap.planetiler.reader.osm.OsmNodeBoundsProvider; import com.onthegomap.planetiler.reader.osm.OsmReader; +import com.onthegomap.planetiler.reader.parquet.ParquetReader; import com.onthegomap.planetiler.stats.ProcessInfo; import com.onthegomap.planetiler.stats.Stats; import com.onthegomap.planetiler.stats.Timers; @@ -39,6 +41,7 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.function.Function; import java.util.stream.IntStream; @@ -469,6 +472,51 @@ public Planetiler addNaturalEarthSource(String name, Path defaultPath, String de config, profile, stats, keepUnzipped))); } + + /** + * Adds a new geoparquet source that will be processed when + * {@link #run()} is called. + * + * @param name string to use in stats and logs to identify this stage + * @param pattern path to the geoparquet file to read, possibly including + * {@linkplain FileSystem#getPathMatcher(String) glob patterns} + * @param hivePartitioning Set to true to parse extra feature tags from the file path, for example + * {@code {them="buildings", type="part"}} from + * {@code base/theme=buildings/type=part/file.parquet} + * @param getId function that extracts a unique vector tile feature ID from each input feature, string or + * binary features will be hashed to a {@code long}. + * @param getLayer function that extracts {@link SourceFeature#getSourceLayer()} from the properties of each + * input feature + * @return this runner instance for chaining + * @see GeoPackageReader + */ + public Planetiler addParquetSource(String name, Path pattern, boolean hivePartitioning, + Function, Object> getId, Function, Object> getLayer) { + // TODO handle auto-downloading + Path path = getPath(name, "parquet", pattern, null, true); + return addStage(name, "Process features in " + path, ifSourceUsed(name, () -> { + var sourcePaths = FileUtils.walkPathWithPattern(path).stream().filter(Files::isRegularFile).toList(); + new ParquetReader(name, profile, stats, getId, getLayer, hivePartitioning).process(sourcePaths, featureGroup, + config); + })); + } + + /** + * Alias for {@link #addParquetSource(String, Path, boolean, Function, Function)} using the default layer and ID + * extractors. + */ + public Planetiler addParquetSource(String name, Path pattern, boolean hivePartitioning) { + return addParquetSource(name, pattern, hivePartitioning, null, null); + } + + /** + * Alias for {@link #addParquetSource(String, Path, boolean, Function, Function)} without hive partitioning and using + * the default layer and ID extractors. + */ + public Planetiler addParquetSource(String name, Path pattern) { + return addParquetSource(name, pattern, false); + } + /** * Adds a new stage that will be invoked when {@link #run()} is called. * @@ -770,7 +818,7 @@ public void run() throws Exception { for (var inputPath : inputPaths) { if (inputPath.freeAfterReading()) { LOGGER.info("Deleting {} ({}) to make room for output file", inputPath.id, inputPath.path); - FileUtils.delete(inputPath.path()); + inputPath.delete(); } } @@ -810,7 +858,7 @@ private void checkDiskSpace() { // if the user opts to remove an input source after reading to free up additional space for the output... for (var input : inputPaths) { if (input.freeAfterReading()) { - writePhase.addDisk(input.path, -FileUtils.size(input.path), "delete " + input.id + " source after reading"); + writePhase.addDisk(input.path, -input.size(), "delete " + input.id + " source after reading"); } } @@ -893,18 +941,23 @@ private RunnableThatThrows ifSourceUsed(String name, RunnableThatThrows task) { } private Path getPath(String name, String type, Path defaultPath, String defaultUrl) { + return getPath(name, type, defaultPath, defaultUrl); + } + + private Path getPath(String name, String type, Path defaultPath, String defaultUrl, boolean wildcard) { Path path = arguments.file(name + "_path", name + " " + type + " path", defaultPath); boolean refresh = arguments.getBoolean("refresh_" + name, "Download new version of " + name + " if changed", refreshSources); boolean freeAfterReading = arguments.getBoolean("free_" + name + "_after_read", "delete " + name + " input file after reading to make space for output (reduces peak disk usage)", false); + var inputPath = new InputPath(name, path, freeAfterReading, wildcard); + inputPaths.add(inputPath); if (downloadSources || refresh) { String url = arguments.getString(name + "_url", name + " " + type + " url", defaultUrl); - if ((!Files.exists(path) || refresh) && url != null) { - toDownload.add(new ToDownload(name, url, path)); + if ((refresh || inputPath.isEmpty()) && url != null) { + toDownload.add(new ToDownload(name, url, path, wildcard)); } } - inputPaths.add(new InputPath(name, path, freeAfterReading)); return path; } @@ -922,7 +975,7 @@ private void download() { private void ensureInputFilesExist() { for (InputPath inputPath : inputPaths) { - if (profile.caresAboutSource(inputPath.id) && !Files.exists(inputPath.path)) { + if (profile.caresAboutSource(inputPath.id) && inputPath.isEmpty()) { throw new IllegalArgumentException(inputPath.path + " does not exist. Run with --download to fetch it"); } } @@ -935,7 +988,24 @@ private record Stage(String id, List details, RunnableThatThrows task) { } } - private record ToDownload(String id, String url, Path path) {} + private record ToDownload(String id, String url, Path path, boolean wildcard) {} + + private record InputPath(String id, Path path, boolean freeAfterReading, boolean wildcard) { + + public boolean isEmpty() { + return wildcard ? FileUtils.walkPathWithPattern(path).isEmpty() : !Files.exists(path); + } + + public long size() { + return wildcard ? FileUtils.size(FileUtils.getPatternBase(path)) : FileUtils.fileSize(path); + } - private record InputPath(String id, Path path, boolean freeAfterReading) {} + public void delete() { + if (wildcard) { + FileUtils.delete(FileUtils.getPatternBase(path)); + } else { + FileUtils.delete(path); + } + } + } } diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/geo/GeoUtils.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/geo/GeoUtils.java index f6d0aceb7b..540933b4b8 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/geo/GeoUtils.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/geo/GeoUtils.java @@ -17,6 +17,8 @@ import org.locationtech.jts.geom.LineSegment; import org.locationtech.jts.geom.LineString; import org.locationtech.jts.geom.LinearRing; +import org.locationtech.jts.geom.MultiLineString; +import org.locationtech.jts.geom.MultiPoint; import org.locationtech.jts.geom.MultiPolygon; import org.locationtech.jts.geom.Point; import org.locationtech.jts.geom.Polygon; @@ -27,6 +29,7 @@ import org.locationtech.jts.geom.util.GeometryFixer; import org.locationtech.jts.geom.util.GeometryTransformer; import org.locationtech.jts.io.WKBReader; +import org.locationtech.jts.io.WKTReader; import org.locationtech.jts.precision.GeometryPrecisionReducer; /** @@ -40,8 +43,9 @@ public class GeoUtils { /** Rounding precision for 256x256px tiles encoded using 4096 values. */ public static final PrecisionModel TILE_PRECISION = new PrecisionModel(4096d / 256d); public static final GeometryFactory JTS_FACTORY = new GeometryFactory(PackedCoordinateSequenceFactory.DOUBLE_FACTORY); - public static final WKBReader WKB_READER = new WKBReader(JTS_FACTORY); + public static final Geometry EMPTY_GEOMETRY = JTS_FACTORY.createGeometryCollection(); + public static final CoordinateSequence EMPTY_COORDINATE_SEQUENCE = new PackedCoordinateSequence.Double(0, 2, 0); public static final Point EMPTY_POINT = JTS_FACTORY.createPoint(); public static final LineString EMPTY_LINE = JTS_FACTORY.createLineString(); public static final Polygon EMPTY_POLYGON = JTS_FACTORY.createPolygon(); @@ -247,11 +251,11 @@ public static Point point(Coordinate coord) { return JTS_FACTORY.createPoint(coord); } - public static Geometry createMultiLineString(List lineStrings) { + public static MultiLineString createMultiLineString(List lineStrings) { return JTS_FACTORY.createMultiLineString(lineStrings.toArray(EMPTY_LINE_STRING_ARRAY)); } - public static Geometry createMultiPolygon(List polygon) { + public static MultiPolygon createMultiPolygon(List polygon) { return JTS_FACTORY.createMultiPolygon(polygon.toArray(EMPTY_POLYGON_ARRAY)); } @@ -370,7 +374,7 @@ public static CoordinateSequence coordinateSequence(double... coords) { return new PackedCoordinateSequence.Double(coords, 2, 0); } - public static Geometry createMultiPoint(List points) { + public static MultiPoint createMultiPoint(List points) { return JTS_FACTORY.createMultiPoint(points.toArray(EMPTY_POINT_ARRAY)); } @@ -548,6 +552,14 @@ public static int minZoomForPixelSize(double worldGeometrySize, double minPixelS PlanetilerConfig.MAX_MAXZOOM); } + public static WKBReader wkbReader() { + return new WKBReader(JTS_FACTORY); + } + + public static WKTReader wktReader() { + return new WKTReader(JTS_FACTORY); + } + /** Helper class to sort polygons by area of their outer shell. */ private record PolyAndArea(Polygon poly, double area) implements Comparable { diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/NaturalEarthReader.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/NaturalEarthReader.java index 2711fdb243..d2d4892610 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/NaturalEarthReader.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/NaturalEarthReader.java @@ -172,6 +172,7 @@ public void readFeatures(Consumer next) throws Exception { } } if (geometryColumn >= 0) { + var wkbReader = GeoUtils.wkbReader(); while (rs.next()) { byte[] geometry = rs.getBytes(geometryColumn + 1); if (geometry == null) { @@ -179,7 +180,7 @@ public void readFeatures(Consumer next) throws Exception { } // create the feature and pass to next stage - Geometry latLonGeometry = GeoUtils.WKB_READER.read(geometry); + Geometry latLonGeometry = wkbReader.read(geometry); SimpleFeature readerGeometry = SimpleFeature.create(latLonGeometry, HashMap.newHashMap(column.length - 1), sourceName, table, ++id); for (int c = 0; c < column.length; c++) { diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoArrow.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoArrow.java new file mode 100644 index 0000000000..d935ab689d --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoArrow.java @@ -0,0 +1,102 @@ +package com.onthegomap.planetiler.reader.parquet; + +import com.onthegomap.planetiler.geo.GeoUtils; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.locationtech.jts.geom.CoordinateSequence; +import org.locationtech.jts.geom.LineString; +import org.locationtech.jts.geom.LinearRing; +import org.locationtech.jts.geom.MultiLineString; +import org.locationtech.jts.geom.MultiPoint; +import org.locationtech.jts.geom.MultiPolygon; +import org.locationtech.jts.geom.Point; +import org.locationtech.jts.geom.Polygon; +import org.locationtech.jts.geom.impl.PackedCoordinateSequence; + +/** + * Utilities for converting nested geoarrow + * coordinate lists to JTS geometries. + */ +class GeoArrow { + // TODO create packed coordinate arrays while reading parquet values to avoid creating so many intermediate objects + static MultiPolygon multipolygon(List>> list) { + return GeoUtils.createMultiPolygon(map(list, GeoArrow::polygon)); + } + + static Polygon polygon(List> input) { + return GeoUtils.createPolygon(ring(input.getFirst()), input.stream().skip(1).map(GeoArrow::ring).toList()); + } + + static MultiPoint multipoint(List input) { + return GeoUtils.createMultiPoint(map(input, GeoArrow::point)); + } + + static Point point(Object input) { + int dims = input instanceof List l ? l.size() : input instanceof Map m ? m.size() : 0; + CoordinateSequence result = + new PackedCoordinateSequence.Double(1, dims, dims == 4 ? 1 : 0); + coordinate(input, result, 0); + return GeoUtils.JTS_FACTORY.createPoint(result); + } + + static MultiLineString multilinestring(List> input) { + return GeoUtils.createMultiLineString(map(input, GeoArrow::linestring)); + } + + static LineString linestring(List input) { + return GeoUtils.JTS_FACTORY.createLineString(coordinateSequence(input)); + } + + + private static CoordinateSequence coordinateSequence(List input) { + if (input.isEmpty()) { + return GeoUtils.EMPTY_COORDINATE_SEQUENCE; + } + Object first = input.getFirst(); + int dims = first instanceof List l ? l.size() : first instanceof Map m ? m.size() : 0; + CoordinateSequence result = + new PackedCoordinateSequence.Double(input.size(), dims, dims == 4 ? 1 : 0); + for (int i = 0; i < input.size(); i++) { + Object item = input.get(i); + coordinate(item, result, i); + } + return result; + } + + private static LinearRing ring(List input) { + return GeoUtils.JTS_FACTORY.createLinearRing(coordinateSequence(input)); + } + + private static void coordinate(Object input, CoordinateSequence result, int index) { + switch (input) { + case List list -> { + List l = (List) list; + for (int i = 0; i < l.size(); i++) { + result.setOrdinate(index, i, l.get(i).doubleValue()); + } + } + case Map map -> { + Map m = (Map) map; + + for (var entry : m.entrySet()) { + int ordinateIndex = switch (entry.getKey()) { + case "x" -> 0; + case "y" -> 1; + case "z" -> 2; + case "m" -> 3; + case null, default -> throw new IllegalArgumentException("Bad coordinate key: " + entry.getKey()); + }; + result.setOrdinate(index, ordinateIndex, entry.getValue().doubleValue()); + } + } + default -> throw new IllegalArgumentException("Expecting map or list, got: " + input); + } + } + + private static List map(List in, Function remap) { + return in.stream().map(remap).toList(); + } + +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoParquetMetadata.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoParquetMetadata.java new file mode 100644 index 0000000000..563176a426 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/GeoParquetMetadata.java @@ -0,0 +1,98 @@ +package com.onthegomap.planetiler.reader.parquet; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Struct for deserializing a + * geoparquet + * metadata json string into. + */ +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) +public record GeoParquetMetadata( + String version, + String primaryColumn, + Map columns +) { + + private static final Logger LOGGER = LoggerFactory.getLogger(GeoParquetMetadata.class); + + private static final ObjectMapper mapper = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) + public record ColumnMetadata( + String encoding, + List geometryTypes, + Object crs, + String orientation, + String edges, + List bbox, + Double epoch, + Covering covering + ) { + ColumnMetadata(String encoding) { + this(encoding, List.of()); + } + + ColumnMetadata(String encoding, List geometryTypes) { + this(encoding, geometryTypes, null, null, null, null, null, null); + } + } + + public record CoveringBbox( + List xmin, + List ymin, + List xmax, + List ymax + ) {} + + public record Covering( + CoveringBbox bbox + ) {} + + + public static GeoParquetMetadata parse(FileMetaData metadata) throws IOException { + String string = metadata.getKeyValueMetaData().get("geo"); + if (string != null) { + try { + return mapper.readValue(string, GeoParquetMetadata.class); + } catch (JsonProcessingException e) { + LOGGER.warn("Invalid geoparquet metadata", e); + } + } + // fallback + for (var field : metadata.getSchema().asGroupType().getFields()) { + if (field.isPrimitive() && + field.asPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY) { + switch (field.getName()) { + case "geometry", "wkb_geometry" -> { + return new GeoParquetMetadata("1.0.0", field.getName(), Map.of( + field.getName(), new ColumnMetadata("WKB"))); + } + case "wkt_geometry" -> { + return new GeoParquetMetadata("1.0.0", field.getName(), Map.of( + field.getName(), new ColumnMetadata("WKT"))); + } + default -> { + //ignore + } + } + } + } + throw new IOException( + "No valid geometry columns found: " + metadata.getSchema().asGroupType().getFields().stream().map( + Type::getName).toList()); + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/Interval.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/Interval.java new file mode 100644 index 0000000000..f1cb4d464b --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/Interval.java @@ -0,0 +1,42 @@ +package com.onthegomap.planetiler.reader.parquet; + +import java.time.Duration; +import java.time.Period; +import java.time.temporal.Temporal; +import java.time.temporal.TemporalAmount; +import java.time.temporal.TemporalUnit; +import java.util.List; +import java.util.stream.Stream; + +/** + * Represents a parquet + * interval datatype which has a month, day, and millisecond part. + *

+ * Built-in java {@link TemporalAmount} implementations can only store a period or duration amount, but not both. + */ +public record Interval(Period period, Duration duration) implements TemporalAmount { + + public static Interval of(int months, long days, long millis) { + return new Interval(Period.ofMonths(months).plusDays(days), Duration.ofMillis(millis)); + } + + @Override + public long get(TemporalUnit unit) { + return period.get(unit) + duration.get(unit); + } + + @Override + public List getUnits() { + return Stream.concat(period.getUnits().stream(), duration.getUnits().stream()).toList(); + } + + @Override + public Temporal addTo(Temporal temporal) { + return temporal.plus(period).plus(duration); + } + + @Override + public Temporal subtractFrom(Temporal temporal) { + return temporal.minus(period).minus(duration); + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/MapRecordMaterializer.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/MapRecordMaterializer.java new file mode 100644 index 0000000000..2de7ee0a75 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/MapRecordMaterializer.java @@ -0,0 +1,515 @@ +package com.onthegomap.planetiler.reader.parquet; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.Period; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.IntConsumer; +import java.util.function.LongFunction; +import java.util.stream.IntStream; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.EnumLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntervalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.JsonLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.StringLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; + +/** + * Simple converter for parquet datatypes that maps all structs to {@code Map} and handles deserializing + * list and map nested + * types into java {@link List Lists} and {@link Map Maps}. + */ +public class MapRecordMaterializer extends RecordMaterializer> { + + private final StructConverter root; + private Map map; + + MapRecordMaterializer(MessageType schema) { + root = new StructConverter(null, schema) { + @Override + public void start() { + var group = new MapGroup(schema.getFieldCount()); + this.current = group; + map = group.getMap(); + } + }; + } + + @Override + public Map getCurrentRecord() { + return map; + } + + @Override + public void skipCurrentRecord() { + root.current = null; + } + + @Override + public GroupConverter getRootConverter() { + return root; + } + + + private static class ListConverter extends StructConverter { + + ListConverter(StructConverter parent, String fieldOnParent, GroupType schema) { + super(parent, schema, fieldOnParent, schema.isRepetition(Type.Repetition.REPEATED)); + } + + ListConverter(StructConverter parent, GroupType schema) { + this(parent, schema.getName(), schema); + } + + private boolean onlyField(Type type, String name) { + return !type.isPrimitive() && type.asGroupType().getFieldCount() == 1 && + type.asGroupType().getFieldName(0).equalsIgnoreCase(name); + } + + @Override + protected Converter makeConverter(int fieldIdx, String fieldOnParent) { + if (schema.getFieldCount() == 1) { + Type type = schema.getType(0); + if ((type.getName().equalsIgnoreCase("list") || type.getName().equalsIgnoreCase("array")) && + onlyField(type, "element")) { + return new ListElementConverter(this, this.fieldOnParent, type.asGroupType()); + } + } + return super.makeConverter(fieldIdx, fieldOnParent); + } + + @Override + public void start() { + this.current = new ListGroup(); + parent.current.add(this.fieldOnParent, current.value(), repeated); + } + } + + + private static class ListElementConverter extends StructConverter { + + ListElementConverter(StructConverter parent, String fieldOnParent, GroupType schema) { + super(parent, schema, fieldOnParent, true); + } + + @Override + public void start() { + this.current = new ItemGroup(); + } + + @Override + public void end() { + parent.current.add(this.fieldOnParent, current.value(), parent.repeated); + } + } + + private static class MapConverter extends StructConverter { + + MapConverter(StructConverter parent, String fieldOnParent, GroupType schema) { + super(parent, schema, fieldOnParent, schema.isRepetition(Type.Repetition.REPEATED)); + } + + @Override + protected Converter makeConverter(int fieldIdx, String fieldOnParent) { + if (schema.getFieldCount() == 1) { + Type type = schema.getType(fieldIdx); + String onlyFieldName = type.getName().toLowerCase(Locale.ROOT); + if (!type.isPrimitive() && type.asGroupType().getFieldCount() == 2 && + (onlyFieldName.equals("key_value") || onlyFieldName.equals("map"))) { + return new MapEntryConverter(this, fieldOnParent, type.asGroupType()); + } + } + return super.makeConverter(fieldIdx, fieldOnParent); + } + + @Override + public void start() { + this.current = new MapGroup(); + parent.current.add(this.fieldOnParent, current.value(), repeated); + } + } + + private static class MapEntryConverter extends StructConverter { + MapEntryGroup entry; + + MapEntryConverter(StructConverter parent, String fieldOnParent, GroupType schema) { + super(parent, schema, fieldOnParent, true); + } + + @Override + public void start() { + current = entry = new MapEntryGroup(); + } + + @Override + public void end() { + if (entry.v != null) { + parent.current.add(entry.k, entry.v, false); + } + } + } + + + private static class StructConverter extends GroupConverter { + + final StructConverter parent; + final boolean repeated; + final GroupType schema; + final String fieldOnParent; + Group current; + private final Converter[] converters; + + StructConverter(StructConverter parent, GroupType schema) { + this(parent, schema, schema.getName(), schema.isRepetition(Type.Repetition.REPEATED)); + } + + StructConverter(StructConverter parent, GroupType schema, String fieldOnParent, boolean repeated) { + this.parent = parent; + this.schema = schema; + this.repeated = repeated; + this.fieldOnParent = fieldOnParent; + converters = IntStream.range(0, schema.getFieldCount()).mapToObj(this::makeConverter).toArray(Converter[]::new); + } + + protected Converter makeConverter(int fieldIdx) { + return makeConverter(fieldIdx, schema.getFieldName(fieldIdx)); + } + + protected Converter makeConverter(int fieldIdx, String fieldOnParent) { + return makeConverter(fieldIdx, fieldOnParent, schema.getType(fieldIdx).isRepetition(Type.Repetition.REPEATED)); + } + + protected Converter makeConverter(int fieldIdx, String fieldOnParent, boolean repeated) { + Type type = schema.getType(fieldIdx); + LogicalTypeAnnotation logical = type.getLogicalTypeAnnotation(); + if (!type.isPrimitive()) { + return switch (logical) { + case LogicalTypeAnnotation.ListLogicalTypeAnnotation list -> + // If the repeated field is not a group, then its type is the element type and elements are required. + // If the repeated field is a group with multiple fields, then its type is the element type and elements are required. + // If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required. + // Otherwise, the repeated field's type is the element type with the repeated field's repetition. + new ListConverter(this, type.asGroupType()); + case LogicalTypeAnnotation.MapLogicalTypeAnnotation m -> + // The outer-most level must be a group annotated with MAP that contains a single field named key_value. The repetition of this level must be either optional or required and determines whether the list is nullable. + // The middle level, named key_value, must be a repeated group with a key field for map keys and, optionally, a value field for map values. + // The key field encodes the map's key type. This field must have repetition required and must always be present. + // The value field encodes the map's value type and repetition. This field can be required, optional, or omitted. + new MapConverter(this, fieldOnParent, type.asGroupType()); + case LogicalTypeAnnotation.MapKeyValueTypeAnnotation m -> + new MapConverter(this, fieldOnParent, type.asGroupType()); + case null, default -> new StructConverter(this, type.asGroupType()); + }; + } + var primitiveType = type.asPrimitiveType().getPrimitiveTypeName(); + return switch (primitiveType) { + case BOOLEAN -> new Primitive(this, fieldOnParent, type, repeated) { + @Override + public void addBoolean(boolean value) { + add(value); + } + }; + case INT64, INT32 -> { + LongFunction remapper = switch (type.getLogicalTypeAnnotation()) { + case null -> null; + case IntLogicalTypeAnnotation x -> null; + case DecimalLogicalTypeAnnotation decimal -> { + double multiplier = Math.pow(10, -decimal.getScale()); + yield (value -> value * multiplier); + } + case DateLogicalTypeAnnotation date -> LocalDate::ofEpochDay; + case TimeLogicalTypeAnnotation time -> { + var unit = getUnit(time.getUnit()); + yield value -> LocalTime.ofNanoOfDay(Duration.of(value, unit).toNanos()); + } + case TimestampLogicalTypeAnnotation time -> { + var unit = getUnit(time.getUnit()); + yield value -> Instant.ofEpochMilli(Duration.of(value, unit).toMillis()); + } + default -> + throw new UnsupportedOperationException("Unsupported logical type for " + primitiveType + ": " + logical); + }; + yield new Primitive(this, fieldOnParent, type, repeated) { + @Override + public void addLong(long value) { + add(remapper == null ? value : remapper.apply(value)); + } + + @Override + public void addInt(int value) { + add(remapper == null ? value : remapper.apply(value)); + } + }; + } + case INT96 -> new Primitive(this, fieldOnParent, type, repeated) { + @Override + public void addBinary(Binary value) { + var buf = value.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + LocalTime timeOfDay = LocalTime.ofNanoOfDay(buf.getLong()); + LocalDate day = LocalDate.ofEpochDay(buf.getInt() - 2440588L); + add(LocalDateTime.of(day, timeOfDay).toInstant(ZoneOffset.UTC)); + } + }; + case FLOAT -> new Primitive(this, fieldOnParent, type, repeated) { + @Override + public void addFloat(float value) { + add((double) value); + } + }; + case DOUBLE -> new Primitive(this, fieldOnParent, type, repeated) { + @Override + public void addDouble(double value) { + add(value); + } + }; + case FIXED_LEN_BYTE_ARRAY, BINARY -> { + Function remapper = switch (type.getLogicalTypeAnnotation()) { + case UUIDLogicalTypeAnnotation uuid -> binary -> { + ByteBuffer byteBuffer = binary.toByteBuffer(); + long msb = byteBuffer.getLong(); + long lsb = byteBuffer.getLong(); + return new UUID(msb, lsb); + }; + case IntervalLogicalTypeAnnotation interval -> binary -> { + ByteBuffer byteBuffer = binary.toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + int months = byteBuffer.getInt(); + int days = byteBuffer.getInt(); + int millis = byteBuffer.getInt(); + return new Interval(Period.ofMonths(months).plusDays(days), Duration.ofMillis(millis)); + }; + case DecimalLogicalTypeAnnotation decimal -> { + int scale = -decimal.getScale(); + yield binary -> new BigDecimal(new BigInteger(binary.getBytes()), scale).doubleValue(); + } + case StringLogicalTypeAnnotation string -> Binary::toStringUsingUTF8; + case EnumLogicalTypeAnnotation string -> Binary::toStringUsingUTF8; + case JsonLogicalTypeAnnotation json -> Binary::toStringUsingUTF8; + case null, default -> Binary::getBytes; + }; + yield new Primitive(this, fieldOnParent, type, repeated) { + @Override + public void addBinary(Binary value) { + add(remapper.apply(value)); + } + }; + } + }; + } + + private static ChronoUnit getUnit(LogicalTypeAnnotation.TimeUnit unit) { + return switch (unit) { + case MILLIS -> ChronoUnit.MILLIS; + case MICROS -> ChronoUnit.MICROS; + case NANOS -> ChronoUnit.NANOS; + }; + } + + @Override + public Converter getConverter(int fieldIndex) { + return converters[fieldIndex]; + } + + @Override + public void start() { + current = new MapGroup(schema.getFieldCount()); + parent.current.add(schema.getName(), current.value(), repeated); + } + + @Override + public void end() {} + } + + private abstract static class Primitive extends PrimitiveConverter { + + private final StructConverter parent; + private final boolean repeated; + private final String fieldOnParent; + private Dictionary dictionary; + private final IntConsumer dictionaryHandler; + + public Primitive(StructConverter parent, String fieldOnParent, Type type, boolean repeated) { + this.parent = parent; + this.repeated = repeated; + this.fieldOnParent = fieldOnParent; + this.dictionaryHandler = + switch (type.asPrimitiveType().getPrimitiveTypeName()) { + case INT64 -> idx -> addLong(dictionary.decodeToLong(idx)); + case INT32 -> idx -> addInt(dictionary.decodeToInt(idx)); + case BOOLEAN -> idx -> addBoolean(dictionary.decodeToBoolean(idx)); + case FLOAT -> idx -> addFloat(dictionary.decodeToFloat(idx)); + case DOUBLE -> idx -> addDouble(dictionary.decodeToDouble(idx)); + case BINARY, FIXED_LEN_BYTE_ARRAY, INT96 -> idx -> addBinary(dictionary.decodeToBinary(idx)); + }; + } + + void add(Object value) { + parent.current.add(fieldOnParent, value, repeated); + } + + @Override + public void addValueFromDictionary(int dictionaryId) { + dictionaryHandler.accept(dictionaryId); + } + + @Override + public void setDictionary(Dictionary dictionary) { + this.dictionary = dictionary; + } + + @Override + public boolean hasDictionarySupport() { + return true; + } + } + + private interface Group { + // TODO handle repeated when processing schema, not elements + void add(Object key, Object value, boolean repeated); + + Object value(); + } + + private static class MapGroup implements Group { + + private final Map map; + + MapGroup() { + map = new HashMap<>(); + } + + MapGroup(int size) { + map = HashMap.newHashMap(size); + } + + @Override + public void add(Object key, Object value, boolean repeated) { + if (repeated) { + List items = (List) map.computeIfAbsent(key, n -> new ArrayList<>()); + items.add(value); + } else { + if (map.put(key, value) != null) { + throw new IllegalStateException("Multiple values for " + key); + } + } + } + + @Override + public String toString() { + return "MapGroup" + map; + } + + public Map getMap() { + return (Map) (Map) map; + } + + @Override + public Object value() { + return map; + } + } + + private static class ListGroup implements Group { + + private final List list = new ArrayList<>(); + + @Override + public void add(Object key, Object value, boolean repeated) { + list.add(value); + } + + @Override + public String toString() { + return "ListGroup" + list; + } + + @Override + public Object value() { + return list; + } + } + + private static class ItemGroup implements Group { + + private Object item; + + @Override + public void add(Object key, Object value, boolean repeated) { + if (repeated) { + if (item == null) { + item = new ArrayList<>(); + } + ((List) item).add(value); + } else { + item = value; + } + } + + @Override + public String toString() { + return "ItemGroup{" + item + '}'; + } + + @Override + public Object value() { + return item; + } + } + + private static class MapEntryGroup implements Group { + + private Object k; + private Object v; + + @Override + public void add(Object key, Object value, boolean repeated) { + if ("key".equals(key)) { + k = value; + } else if ("value".equals(key)) { + v = value; + } else if (k == null) { + k = value; + } else { + v = value; + } + } + + @Override + public String toString() { + return "MapEntryGroup{" + k + '=' + v + '}'; + } + + @Override + public Object value() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetFeature.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetFeature.java new file mode 100644 index 0000000000..c64569437b --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetFeature.java @@ -0,0 +1,79 @@ +package com.onthegomap.planetiler.reader.parquet; + +import com.onthegomap.planetiler.geo.GeoUtils; +import com.onthegomap.planetiler.geo.GeometryException; +import com.onthegomap.planetiler.reader.SourceFeature; +import com.onthegomap.planetiler.reader.WithTags; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.Lineal; +import org.locationtech.jts.geom.Polygonal; +import org.locationtech.jts.geom.Puntal; + +/** + * A single record read from a geoparquet file. + */ +public class ParquetFeature extends SourceFeature { + + private final Function geometryParser; + private final Path filename; + private Geometry latLon; + private Geometry world; + + ParquetFeature(String source, String sourceLayer, Path filename, + long id, Function getGeometry, Map tags) { + super(tags, source, sourceLayer, List.of(), id); + this.geometryParser = getGeometry; + this.filename = filename; + } + + public Path getFilename() { + return filename; + } + + @Override + public Geometry latLonGeometry() throws GeometryException { + return latLon == null ? latLon = geometryParser.apply(this) : latLon; + } + + @Override + public Geometry worldGeometry() throws GeometryException { + return world != null ? world : + (world = GeoUtils.sortPolygonsByAreaDescending(GeoUtils.latLonToWorldCoords(latLonGeometry()))); + } + + @Override + public boolean isPoint() { + try { + return latLonGeometry() instanceof Puntal; + } catch (GeometryException e) { + throw new IllegalStateException(e); + } + } + + @Override + public boolean canBePolygon() { + try { + return latLonGeometry() instanceof Polygonal; + } catch (GeometryException e) { + throw new IllegalStateException(e); + } + } + + @Override + public boolean canBeLine() { + try { + return latLonGeometry() instanceof Lineal; + } catch (GeometryException e) { + throw new IllegalStateException(e); + } + } + + @Override + public String toString() { + return tags().toString(); + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFile.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFile.java new file mode 100644 index 0000000000..f6a1a0d7ee --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFile.java @@ -0,0 +1,360 @@ +package com.onthegomap.planetiler.reader.parquet; + +import blue.strategic.parquet.ParquetReader; +import com.google.common.collect.Iterators; +import com.onthegomap.planetiler.config.Bounds; +import com.onthegomap.planetiler.geo.GeoUtils; +import com.onthegomap.planetiler.geo.GeometryException; +import com.onthegomap.planetiler.reader.SourceFeature; +import com.onthegomap.planetiler.reader.WithTags; +import com.onthegomap.planetiler.util.FunctionThatThrows; +import com.onthegomap.planetiler.util.Hashing; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.ToLongFunction; +import java.util.stream.IntStream; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Filters; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.locationtech.jts.geom.Envelope; +import org.locationtech.jts.geom.Geometry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reads {@link SourceFeature SourceFeatures} from a single obj -> obj instanceof byte[] bytes ? GeoUtils.wkbReader().read(bytes) : null; + case "WKT" -> obj -> obj instanceof String string ? GeoUtils.wktReader().read(string) : null; + case "multipolygon", "geoarrow.multipolygon" -> + obj -> obj instanceof List list ? GeoArrow.multipolygon((List>>) list) : null; + case "polygon", "geoarrow.polygon" -> + obj -> obj instanceof List list ? GeoArrow.polygon((List>) list) : null; + case "multilinestring", "geoarrow.multilinestring" -> + obj -> obj instanceof List list ? GeoArrow.multilinestring((List>) list) : null; + case "linestring", "geoarrow.linestring" -> + obj -> obj instanceof List list ? GeoArrow.linestring((List) list) : null; + case "multipoint", "geoarrow.multipoint" -> + obj -> obj instanceof List list ? GeoArrow.multipoint((List) list) : null; + case "point", "geoarrow.point" -> GeoArrow::point; + default -> throw new IllegalArgumentException("Unhandled type: " + columnInfo.encoding()); + }; + + converters.put(column, converter); + + if (columnInfo.crs() != null) { + // TODO handle projjson + LOGGER.warn("Custom CRS not supported in {}", path); + } + + if (column.equals(geometryColumn)) { + if (columnInfo.bbox() != null && columnInfo.bbox().size() == 4) { + var bbox = columnInfo.bbox(); + Envelope env = new Envelope(bbox.get(0), bbox.get(2), bbox.get(1), bbox.get(3)); + // TODO apply projection + if (!bounds.latLon().intersects(env)) { + this.outOfBounds = true; + } + } + if (!this.outOfBounds && !bounds.isWorld()) { + var covering = columnInfo.covering(); + // if covering metadata missing, use default bbox:{xmin,xmax,ymin,ymax} + if (covering == null) { + var root = metadata.getFileMetaData().getSchema(); + if (hasNumericField(root, "bbox.xmin") && + hasNumericField(root, "bbox.xmax") && + hasNumericField(root, "bbox.ymin") && + hasNumericField(root, "bbox.ymax")) { + covering = new GeoParquetMetadata.Covering(new GeoParquetMetadata.CoveringBbox( + List.of("bbox.xmin"), + List.of("bbox.ymin"), + List.of("bbox.xmax"), + List.of("bbox.ymax") + )); + } else if (hasNumericField(root, "bbox", "xmin") && + hasNumericField(root, "bbox", "xmax") && + hasNumericField(root, "bbox", "ymin") && + hasNumericField(root, "bbox", "ymax")) { + covering = new GeoParquetMetadata.Covering(new GeoParquetMetadata.CoveringBbox( + List.of("bbox", "xmin"), + List.of("bbox", "ymin"), + List.of("bbox", "xmax"), + List.of("bbox", "ymax") + )); + } + } + if (covering != null) { + var latLonBounds = bounds.latLon(); + // TODO apply projection + var coveringBbox = covering.bbox(); + var coordinateType = + fileMetadata.getSchema().getColumnDescription(coveringBbox.xmax().toArray(String[]::new)) + .getPrimitiveType() + .getPrimitiveTypeName(); + BiFunction, Number, FilterPredicate> gtEq = switch (coordinateType) { + case DOUBLE -> (p, v) -> FilterApi.gtEq(Filters.doubleColumn(p), v.doubleValue()); + case FLOAT -> (p, v) -> FilterApi.gtEq(Filters.floatColumn(p), v.floatValue()); + default -> throw new UnsupportedOperationException(); + }; + BiFunction, Number, FilterPredicate> ltEq = switch (coordinateType) { + case DOUBLE -> (p, v) -> FilterApi.ltEq(Filters.doubleColumn(p), v.doubleValue()); + case FLOAT -> (p, v) -> FilterApi.ltEq(Filters.floatColumn(p), v.floatValue()); + default -> throw new UnsupportedOperationException(); + }; + var bboxFilter = FilterApi.and( + FilterApi.and( + gtEq.apply(coveringBbox.xmax(), latLonBounds.getMinX()), + ltEq.apply(coveringBbox.xmin(), latLonBounds.getMaxX()) + ), + FilterApi.and( + gtEq.apply(coveringBbox.ymax(), latLonBounds.getMinY()), + ltEq.apply(coveringBbox.ymin(), latLonBounds.getMaxY()) + ) + ); + filter = filter == null ? bboxFilter : FilterApi.and(filter, bboxFilter); + } else { + LOGGER.warn("No covering column specified in geoparquet metadata, fall back to post-filtering"); + postFilterBounds = bounds.latLon(); + } + } + } + } + count = outOfBounds ? 0 : file.getFilteredRecordCount(); + blockCount = outOfBounds ? 0 : metadata.getBlocks().size(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + this.filter = filter == null ? FilterCompat.NOOP : FilterCompat.get(filter); + } + + private static long hashToLong(Object o) { + return switch (o) { + case String s -> Hashing.fnv1a64(s.getBytes(StandardCharsets.UTF_8)); + case byte[] bs -> Hashing.fnv1a64(bs); + case Integer i -> i; + case Long l -> l; + case Float f -> Float.floatToIntBits(f); + case Double d -> Double.doubleToLongBits(d); + case null -> 0; + default -> Hashing.fnv1a64(o.toString().getBytes(StandardCharsets.UTF_8)); + }; + } + + public boolean hasFilter() { + return FilterCompat.isFilteringRequired(filter); + } + + public interface BlockReader extends Iterable, Closeable { + + @Override + default void close() throws IOException {} + } + + public interface Block extends Iterable { + + Path getFileName(); + + String layer(); + } + + public BlockReader get() { + if (outOfBounds) { + return Collections::emptyIterator; + } + long fileHash = Hashing.fnv1a64(path.toString().getBytes(StandardCharsets.UTF_8)); + var schema = metadata.getFileMetaData().getSchema(); + var columnIOFactory = new ColumnIOFactory(metadata.getFileMetaData().getCreatedBy(), false); + return () -> IntStream.range(0, metadata.getBlocks().size()).mapToObj(blockIndex -> { + long blockHash = Hashing.fnv1a64(fileHash, ByteBuffer.allocate(4).putInt(blockIndex).array()); + // happens in reader thread + // TODO read smaller set of rows to reduce memory usage + return (Block) new Block() { + @Override + public Path getFileName() { + return path; + } + + @Override + public String layer() { + return layer; + } + + @Override + public Iterator iterator() { + PageReadStore group; + try (var reader = open()) { + group = reader.readFilteredRowGroup(blockIndex); + if (group == null) { + return Collections.emptyIterator(); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + MessageColumnIO columnIO = columnIOFactory.getColumnIO(schema); + var recordReader = columnIO.getRecordReader(group, new MapRecordMaterializer(schema), filter); + long total = group.getRowCount(); + return Iterators.filter(new Iterator<>() { + long i = 0; + + @Override + public boolean hasNext() { + return i < total; + } + + @Override + public ParquetFeature next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + i++; + + var item = recordReader.read(); + + if (item == null) { + return null; + } + + if (extraFields != null) { + item.putAll(extraFields); + } + + var feature = new ParquetFeature( + source, + layer, + path, + idGenerator != null ? idGenerator.applyAsLong(item) : + Hashing.fnv1a64(blockHash, ByteBuffer.allocate(8).putLong(i).array()), + ParquetInputFile.this::readPrimaryGeometry, + item + ); + + if (postFilterBounds != null) { + try { + if (!feature.latLonGeometry().getEnvelopeInternal().intersects(postFilterBounds)) { + return null; + } + } catch (GeometryException e) { + LOGGER.warn("Error reading geometry to post-filter bounds", e); + return null; + } + } + + return feature; + } + }, Objects::nonNull); + } + }; + }).iterator(); + } + + private ParquetFileReader open() throws IOException { + return ParquetFileReader.open(inputFile, ParquetReadOptions.builder() + .withRecordFilter(filter) + .build()); + } + + private Geometry readPrimaryGeometry(WithTags tags) { + return readGeometry(tags, geometryColumn); + } + + private Geometry readGeometry(WithTags tags, String column) { + var value = tags.getTag(column); + var converter = converters.get(column); + if (value == null) { + LOGGER.warn("Missing {} column", column); + return GeoUtils.EMPTY_GEOMETRY; + } else if (converter == null) { + throw new IllegalArgumentException("No geometry converter for " + column); + } + try { + return converter.apply(value); + } catch (Exception e) { + LOGGER.warn("Error reading geometry {}", column, e); + return GeoUtils.EMPTY_GEOMETRY; + } + } + + public long getCount() { + return count; + } + + public long getBlockCount() { + return blockCount; + } + +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetReader.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetReader.java new file mode 100644 index 0000000000..e574e0e4f1 --- /dev/null +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/reader/parquet/ParquetReader.java @@ -0,0 +1,204 @@ +package com.onthegomap.planetiler.reader.parquet; + +import com.onthegomap.planetiler.FeatureCollector; +import com.onthegomap.planetiler.Profile; +import com.onthegomap.planetiler.collection.FeatureGroup; +import com.onthegomap.planetiler.collection.SortableFeature; +import com.onthegomap.planetiler.config.PlanetilerConfig; +import com.onthegomap.planetiler.reader.SourceFeature; +import com.onthegomap.planetiler.render.FeatureRenderer; +import com.onthegomap.planetiler.stats.Counter; +import com.onthegomap.planetiler.stats.ProgressLoggers; +import com.onthegomap.planetiler.stats.Stats; +import com.onthegomap.planetiler.util.Format; +import com.onthegomap.planetiler.worker.WorkerPipeline; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reads {@link SourceFeature SourceFeatures} from one or more , Object> idGenerator; + private final Function, Object> layerGenerator; + private final Profile profile; + private final Stats stats; + private final boolean hivePartitioning; + + public ParquetReader( + String sourceName, + Profile profile, + Stats stats + ) { + this(sourceName, profile, stats, null, null, false); + } + + public ParquetReader( + String sourceName, + Profile profile, + Stats stats, + Function, Object> getId, + Function, Object> getLayer, + boolean hivePartitioning + ) { + this.sourceName = sourceName; + this.layerGenerator = getLayer; + this.idGenerator = getId; + this.profile = profile; + this.stats = stats; + this.hivePartitioning = hivePartitioning; + } + + public void process(List sourcePath, FeatureGroup writer, PlanetilerConfig config) { + var timer = stats.startStage(sourceName); + var inputFiles = sourcePath.stream() + .filter(d -> !"_SUCCESS".equals(d.getFileName().toString())) + .map(path -> { + var hivePartitionFields = hivePartitioning ? getHivePartitionFields(path) : null; + String layer = getLayerName(path); + return new ParquetInputFile(sourceName, layer, path, null, config.bounds(), hivePartitionFields, idGenerator); + }).toList(); + // don't show % complete on features when a filter is present because to determine total # elements would + // take an expensive initial query, and % complete on blocks gives a good enough proxy + long featureCount = inputFiles.stream().anyMatch(ParquetInputFile::hasFilter) ? 0 : + inputFiles.stream().mapToLong(ParquetInputFile::getCount).sum(); + long blockCount = inputFiles.stream().mapToLong(ParquetInputFile::getBlockCount).sum(); + int processThreads = config.featureProcessThreads(); + int writeThreads = config.featureWriteThreads(); + var blocksRead = Counter.newMultiThreadCounter(); + var featuresRead = Counter.newMultiThreadCounter(); + var featuresWritten = Counter.newMultiThreadCounter(); + Map workingOn = new ConcurrentHashMap<>(); + var inputBlocks = inputFiles.stream().mapMulti((file, next) -> { + try (var blockReader = file.get()) { + for (var block : blockReader) { + next.accept(block); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).toList(); + + var pipeline = WorkerPipeline.start(sourceName, stats) + .readFromTiny("blocks", inputBlocks) + .addWorker("process", processThreads, (prev, next) -> { + var blocks = blocksRead.counterForThread(); + var elements = featuresRead.counterForThread(); + var featureCollectors = new FeatureCollector.Factory(config, stats); + try (FeatureRenderer renderer = newFeatureRenderer(writer, config, next)) { + for (var block : prev) { + String layer = block.layer(); + workingOn.merge(layer, 1, Integer::sum); + for (var sourceFeature : block) { + FeatureCollector features = featureCollectors.get(sourceFeature); + try { + profile.processFeature(sourceFeature, features); + for (FeatureCollector.Feature renderable : features) { + renderer.accept(renderable); + } + } catch (Exception e) { + LOGGER.error("Error processing {}", sourceFeature, e); + } + elements.inc(); + } + blocks.inc(); + workingOn.merge(layer, -1, Integer::sum); + } + } + }) + .addBuffer("write_queue", 50_000, 1_000) + .sinkTo("write", writeThreads, prev -> { + var features = featuresWritten.counterForThread(); + try (var threadLocalWriter = writer.writerForThread()) { + for (var item : prev) { + features.inc(); + threadLocalWriter.accept(item); + } + } + }); + + var loggers = ProgressLoggers.create() + .addRatePercentCounter("read", featureCount, featuresRead, true) + .addRatePercentCounter("blocks", blockCount, blocksRead, false) + .addRateCounter("write", featuresWritten) + .addFileSize(writer) + .newLine() + .add(() -> workingOn.entrySet().stream() + .sorted(Map.Entry.comparingByValue().reversed()) + .filter(d -> d.getValue() > 0) + .map(d -> d.getKey() + ": " + d.getValue()) + .collect(Collectors.joining(", "))) + .newLine() + .addProcessStats() + .newLine() + .addPipelineStats(pipeline); + + pipeline.awaitAndLog(loggers, config.logInterval()); + + // hook for profile to do any post-processing after this source is read + try ( + var threadLocalWriter = writer.writerForThread(); + var featureRenderer = newFeatureRenderer(writer, config, threadLocalWriter) + ) { + profile.finish(sourceName, new FeatureCollector.Factory(config, stats), featureRenderer); + } catch (IOException e) { + LOGGER.warn("Error closing writer", e); + } + + LOGGER.atInfo().setMessage("Processed {} parquet features") + .addArgument(() -> Format.defaultInstance().integer(featuresRead.get())) + .log(); + timer.stop(); + } + + private String getLayerName(Path path) { + String layer = DEFAULT_LAYER; + if (hivePartitioning) { + var fields = getHivePartitionFields(path); + layer = layerGenerator.apply(fields == null ? Map.of() : fields) instanceof Object o ? o.toString() : layer; + } + return layer; + } + + static Map getHivePartitionFields(Path path) { + Map fields = new HashMap<>(); + for (var part : path) { + var string = part.toString(); + if (string.contains("=")) { + var parts = string.split("="); + fields.put(parts[0], parts[1]); + } + } + return fields.isEmpty() ? null : fields; + } + + private FeatureRenderer newFeatureRenderer(FeatureGroup writer, PlanetilerConfig config, + Consumer next) { + @SuppressWarnings("java:S2095") // closed by FeatureRenderer + var encoder = writer.newRenderedFeatureEncoder(); + return new FeatureRenderer( + config, + rendered -> next.accept(encoder.apply(rendered)), + stats, + encoder + ); + } +} diff --git a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java index 1cbbe13875..6874da68d1 100644 --- a/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java +++ b/planetiler-core/src/main/java/com/onthegomap/planetiler/util/FileUtils.java @@ -12,6 +12,7 @@ import java.nio.file.FileStore; import java.nio.file.FileSystem; import java.nio.file.FileSystems; +import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; @@ -22,6 +23,7 @@ import java.util.List; import java.util.Objects; import java.util.function.Function; +import java.util.regex.Pattern; import java.util.stream.Stream; import java.util.stream.StreamSupport; import java.util.zip.ZipEntry; @@ -41,6 +43,7 @@ public class FileUtils { private static final double ZIP_THRESHOLD_RATIO = 1_000; private static final Logger LOGGER = LoggerFactory.getLogger(FileUtils.class); + private static final Pattern GLOB_PATTERN = Pattern.compile("[?*{\\[].*$"); private FileUtils() {} @@ -49,7 +52,7 @@ public static Stream walkFileSystem(FileSystem fileSystem) { return StreamSupport.stream(fileSystem.getRootDirectories().spliterator(), false) .flatMap(rootDirectory -> { try { - return Files.walk(rootDirectory); + return Files.walk(rootDirectory, FileVisitOption.FOLLOW_LINKS); } catch (IOException e) { LOGGER.error("Unable to walk " + rootDirectory + " in " + fileSystem, e); return Stream.empty(); @@ -82,9 +85,9 @@ public static List walkPathWithPattern(Path basePath, String pattern, .toList(); } } else if (Files.isDirectory(basePath)) { - try (var walk = Files.walk(basePath)) { + try (var walk = Files.walk(basePath, FileVisitOption.FOLLOW_LINKS)) { return walk - .filter(path -> matcher.matches(path.getFileName())) + .filter(path -> matcher.matches(path.getFileName()) || matcher.matches(basePath.relativize(path))) .flatMap(path -> { if (FileUtils.hasExtension(path, "zip")) { return walkZipFile.apply(path).stream(); @@ -109,7 +112,40 @@ public static List walkPathWithPattern(Path basePath, String pattern, * @param pattern pattern to match filenames against, as described in {@link FileSystem#getPathMatcher(String)}. */ public static List walkPathWithPattern(Path basePath, String pattern) { - return walkPathWithPattern(basePath, pattern, zipPath -> List.of(zipPath)); + return walkPathWithPattern(basePath, pattern, List::of); + } + + /** + * Returns list of paths matching {@param pathWithPattern} where {@param pathWithPattern} can contain glob patterns. + * + * @param pathWithPattern path that can contain glob patterns + */ + public static List walkPathWithPattern(Path pathWithPattern) { + var parsed = parsePattern(pathWithPattern); + return parsed.pattern == null ? List.of(parsed.base) : walkPathWithPattern(parsed.base, parsed.pattern, List::of); + } + + + /** + * Returns list of base of {@param pathWithPattern} before any glob patterns. + */ + public static Path getPatternBase(Path pathWithPattern) { + return parsePattern(pathWithPattern).base; + } + + static BaseWithPattern parsePattern(Path pattern) { + String string = pattern.toString(); + var matcher = GLOB_PATTERN.matcher(string); + if (!matcher.find()) { + return new BaseWithPattern(pattern, null); + } + matcher.reset(); + String base = matcher.replaceAll(""); + int idx = base.lastIndexOf(pattern.getFileSystem().getSeparator()); + if (idx > 0) { + base = base.substring(0, idx); + } + return new BaseWithPattern(Path.of(base), string.substring(idx + 1)); } /** Returns true if {@code path} ends with ".extension" (case-insensitive). */ @@ -383,4 +419,6 @@ public static void setLength(Path path, long size) { throw new UncheckedIOException(e); } } + + record BaseWithPattern(Path base, String pattern) {} } diff --git a/planetiler-core/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java new file mode 100644 index 0000000000..48d29d1331 --- /dev/null +++ b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java @@ -0,0 +1,16 @@ +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +/** Fix interface from parquet floor so we can extend it with {@link GzipCodec} and {@link Lz4Codec} */ +public interface CompressionCodec { + Decompressor createDecompressor(); + + Compressor createCompressor(); + + CompressionInputStream createInputStream(InputStream is, Decompressor d) throws IOException; + + CompressionOutputStream createOutputStream(OutputStream os, Compressor c) throws IOException; +} diff --git a/planetiler-core/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java new file mode 100644 index 0000000000..287ded26b9 --- /dev/null +++ b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/GzipCodec.java @@ -0,0 +1,10 @@ +package org.apache.hadoop.io.compress; + +import io.airlift.compress.gzip.JdkGzipCodec; + +/** + * Make {@link JdkGzipCodec} available at the location expected by + * {@link org.apache.parquet.hadoop.metadata.CompressionCodecName} to allow deserializing parquet files that use gzip + * compression. + */ +public class GzipCodec extends JdkGzipCodec {} diff --git a/planetiler-core/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java new file mode 100644 index 0000000000..14ea73af6e --- /dev/null +++ b/planetiler-core/src/main/java/org/apache/hadoop/io/compress/Lz4Codec.java @@ -0,0 +1,8 @@ +package org.apache.hadoop.io.compress; + +/** + * Make {@link io.airlift.compress.lz4.Lz4Codec} available at the location expected by + * {@link org.apache.parquet.hadoop.metadata.CompressionCodecName} to allow deserializing parquet files that use lz4 + * compression. + */ +public class Lz4Codec extends io.airlift.compress.lz4.Lz4Codec {} diff --git a/planetiler-core/src/main/java/org/apache/parquet/filter2/predicate/Filters.java b/planetiler-core/src/main/java/org/apache/parquet/filter2/predicate/Filters.java new file mode 100644 index 0000000000..62bc6272b2 --- /dev/null +++ b/planetiler-core/src/main/java/org/apache/parquet/filter2/predicate/Filters.java @@ -0,0 +1,18 @@ +package org.apache.parquet.filter2.predicate; + +import java.util.List; +import org.apache.parquet.hadoop.metadata.ColumnPath; + +/** + * Create {@link Operators.DoubleColumn} and {@link Operators.FloatColumn} instances with dots in the column names since + * their constructors are package-private. + */ +public class Filters { + public static Operators.DoubleColumn doubleColumn(List parts) { + return new Operators.DoubleColumn(ColumnPath.get(parts.toArray(String[]::new))); + } + + public static Operators.FloatColumn floatColumn(List parts) { + return new Operators.FloatColumn(ColumnPath.get(parts.toArray(String[]::new))); + } +} diff --git a/planetiler-core/src/main/resources/log4j2.properties b/planetiler-core/src/main/resources/log4j2.properties index 00e993bb1a..7d23fc3a9a 100644 --- a/planetiler-core/src/main/resources/log4j2.properties +++ b/planetiler-core/src/main/resources/log4j2.properties @@ -7,3 +7,10 @@ packages=com.onthegomap.planetiler.util.log4j rootLogger.level=debug rootLogger.appenderRefs=stdout rootLogger.appenderRef.stdout.ref=STDOUT + +logger.apache.name=org.apache +logger.apache.level=warn + +# suppress warning about unreadable duckdb statistics +logger.apachecorrupt.name=org.apache.parquet.CorruptStatistics +logger.apachecorrupt.level=error diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java index ffe58fffcd..b37d4bb955 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/PlanetilerTests.java @@ -2248,6 +2248,49 @@ public void processFeature(SourceFeature source, FeatureCollector features) { } } + @ParameterizedTest + @ValueSource(strings = { + "", + "--write-threads=2 --process-threads=2 --feature-read-threads=2 --threads=4" + }) + void testPlanetilerRunnerParquet(String args) throws Exception { + Path mbtiles = tempDir.resolve("output.mbtiles"); + + Planetiler.create(Arguments.fromArgs((args + " --tmpdir=" + tempDir.resolve("data")).split("\\s+"))) + .setProfile(new Profile.NullProfile() { + @Override + public void processFeature(SourceFeature source, FeatureCollector features) { + features.polygon("buildings") + .setZoomRange(0, 14) + .setMinPixelSize(0) + .setAttr("id", source.getString("id")); + } + }) + .addParquetSource("parquet", TestUtils.pathToResource("parquet").resolve("boston.parquet")) + .setOutput(mbtiles) + .run(); + + try (Mbtiles db = Mbtiles.newReadOnlyDatabase(mbtiles)) { + Set uniqueIds = new HashSet<>(); + long featureCount = 0; + var tileMap = TestUtils.getTileMap(db); + for (int z = 14; z >= 11; z--) { + var coord = TileCoord.aroundLngLat(-71.07448, 42.35626, z); + assertTrue(tileMap.containsKey(coord), "contain " + coord); + } + for (var tile : tileMap.values()) { + for (var feature : tile) { + feature.geometry().validate(); + featureCount++; + uniqueIds.add((String) feature.attrs().get("id")); + } + } + + assertTrue(featureCount > 0); + assertEquals(3, uniqueIds.size()); + } + } + private void runWithProfile(Path tempDir, Profile profile, boolean force) throws Exception { Planetiler.create(Arguments.of("tmpdir", tempDir, "force", Boolean.toString(force))) .setProfile(profile) diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/GeoArrowTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/GeoArrowTest.java new file mode 100644 index 0000000000..ca7e1e8b85 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/GeoArrowTest.java @@ -0,0 +1,231 @@ +package com.onthegomap.planetiler.reader.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.onthegomap.planetiler.geo.GeoUtils; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.io.ParseException; + +class GeoArrowTest { + @Test + void testPointXY() throws ParseException { + assertSame( + "POINT(1 2)", + GeoArrow.point(Map.of("x", 1, "y", 2)), + GeoArrow.point(List.of(1, 2)) + ); + } + + @Test + void testPointXYZ() throws ParseException { + assertSame( + "POINT Z(1 2 3)", + GeoArrow.point(Map.of("x", 1, "y", 2, "z", 3)), + GeoArrow.point(List.of(1, 2, 3)) + ); + } + + @Test + void testPointXYZM() throws ParseException { + assertSame( + "POINT ZM(1 2 3 4)", + GeoArrow.point(Map.of("x", 1, "y", 2, "z", 3, "m", 4)), + GeoArrow.point(List.of(1, 2, 3, 4)) + ); + } + + @Test + void testLine() throws ParseException { + assertSame( + "LINESTRING(1 2, 3 4)", + GeoArrow.linestring(List.of( + Map.of("x", 1, "y", 2), + Map.of("x", 3, "y", 4) + )), + GeoArrow.linestring(List.of( + List.of(1, 2), + List.of(3, 4) + )) + ); + } + + @Test + void testLineZ() throws ParseException { + assertSame( + "LINESTRING Z(1 2 3, 4 5 6)", + GeoArrow.linestring(List.of( + Map.of("x", 1, "y", 2, "z", 3), + Map.of("x", 4, "y", 5, "z", 6) + )), + GeoArrow.linestring(List.of( + List.of(1, 2, 3), + List.of(4, 5, 6) + )) + ); + } + + @Test + void testLineZM() throws ParseException { + assertSame( + "LINESTRING ZM(1 2 3 4, 5 6 7 8)", + GeoArrow.linestring(List.of( + Map.of("x", 1, "y", 2, "z", 3, "m", 4), + Map.of("x", 5, "y", 6, "z", 7, "m", 8) + )), + GeoArrow.linestring(List.of( + List.of(1, 2, 3, 4), + List.of(5, 6, 7, 8) + )) + ); + } + + @Test + void testPolygon() throws ParseException { + assertSame( + "POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))", + GeoArrow.polygon(List.of( + List.of( + Map.of("x", 0, "y", 0), + Map.of("x", 0, "y", 1), + Map.of("x", 1, "y", 1), + Map.of("x", 1, "y", 0), + Map.of("x", 0, "y", 0) + ))), + GeoArrow.polygon(List.of( + List.of( + List.of(0, 0), + List.of(0, 1), + List.of(1, 1), + List.of(1, 0), + List.of(0, 0) + ) + )) + ); + } + + @Test + void testPolygonWithHole() throws ParseException { + assertSame( + "POLYGON((-2 -2, 2 -2, 0 2, -2 -2), (-1 -1, 1 -1, 0 1, -1 -1))", + GeoArrow.polygon(List.of( + List.of( + Map.of("x", -2, "y", -2), + Map.of("x", 2, "y", -2), + Map.of("x", 0, "y", 2), + Map.of("x", -2, "y", -2) + ), + List.of( + Map.of("x", -1, "y", -1), + Map.of("x", 1, "y", -1), + Map.of("x", 0, "y", 1), + Map.of("x", -1, "y", -1) + ) + )), + GeoArrow.polygon(List.of( + List.of( + List.of(-2, -2), + List.of(2, -2), + List.of(0, 2), + List.of(-2, -2) + ), + List.of( + List.of(-1, -1), + List.of(1, -1), + List.of(0, 1), + List.of(-1, -1) + ) + )) + ); + } + + @Test + void testMultipoint() throws ParseException { + assertSame( + "MULTIPOINT(1 2, 3 4)", + GeoArrow.multipoint(List.of( + Map.of("x", 1, "y", 2), + Map.of("x", 3, "y", 4) + )), + GeoArrow.multipoint(List.of( + List.of(1, 2), + List.of(3, 4) + )) + ); + } + + @Test + void testMultilinestring() throws ParseException { + assertSame( + "MULTILINESTRING((1 2, 3 4), (5 6, 7 8))", + GeoArrow.multilinestring(List.of( + List.of( + Map.of("x", 1, "y", 2), + Map.of("x", 3, "y", 4) + ), + List.of( + Map.of("x", 5, "y", 6), + Map.of("x", 7, "y", 8) + ) + )), + GeoArrow.multilinestring(List.of( + List.of( + List.of(1, 2), + List.of(3, 4) + ), + List.of( + List.of(5, 6), + List.of(7, 8) + ) + )) + ); + } + + @Test + void testMultipolygon() throws ParseException { + assertSame( + "MULTIPOLYGON(((0 0, 1 0, 1 1, 0 1, 0 0)), ((2 0, 3 0, 3 1, 2 1, 2 0)))", + GeoArrow.multipolygon(List.of( + List.of(List.of( + Map.of("x", 0, "y", 0), + Map.of("x", 1, "y", 0), + Map.of("x", 1, "y", 1), + Map.of("x", 0, "y", 1), + Map.of("x", 0, "y", 0) + )), + List.of(List.of( + Map.of("x", 2, "y", 0), + Map.of("x", 3, "y", 0), + Map.of("x", 3, "y", 1), + Map.of("x", 2, "y", 1), + Map.of("x", 2, "y", 0) + )) + )), + GeoArrow.multipolygon(List.of( + List.of(List.of( + List.of(0, 0), + List.of(1, 0), + List.of(1, 1), + List.of(0, 1), + List.of(0, 0) + )), + List.of(List.of( + List.of(2, 0), + List.of(3, 0), + List.of(3, 1), + List.of(2, 1), + List.of(2, 0) + )) + )) + ); + } + + private static void assertSame(String wkt, Geometry... geometry) throws ParseException { + Geometry expected = GeoUtils.wktReader().read(wkt); + for (int i = 0; i < geometry.length; i++) { + assertEquals(expected, geometry[i], "geometry #" + i); + } + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetConverterTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetConverterTest.java new file mode 100644 index 0000000000..60ccf197d8 --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetConverterTest.java @@ -0,0 +1,536 @@ +package com.onthegomap.planetiler.reader.parquet; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.google.common.collect.Lists; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.time.Duration; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.Period; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +class ParquetConverterTest { + @Test + void testIntPrimitive() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + converter -> converter.addInt(1), + 1 + ); + } + + @ParameterizedTest + @CsvSource({ + "32, true, 100, 100", + "32, true, 2147483647, 2147483647", + "32, true, -2147483648, -2147483648", + "32, false, 100, 100", + "16, true, 100, 100", + "8, true, 256, 256", + }) + void testIntPrimitiveWithAnnotation(int bitWidth, boolean isSigned, int input, int expected) { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + LogicalTypeAnnotation.intType(bitWidth, isSigned), + converter -> converter.addInt(input), + expected + ); + } + + @Test + void testLongPrimitive() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + converter -> converter.addLong(1), + 1L + ); + } + + @ParameterizedTest + @CsvSource({ + "64, true, 9223372036854775807, 9223372036854775807", + "64, false, 9223372036854775807, 9223372036854775807", + "64, true, -9223372036854775808, -9223372036854775808", + "64, true, 1, 1", + }) + void testLongPrimitiveWithAnnotation(int bitWidth, boolean isSigned, long input, long expected) { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.intType(bitWidth, isSigned), + converter -> converter.addLong(input), + expected + ); + } + + @ParameterizedTest + @CsvSource({ + "0, 1, 10, 10", + "1, 9, 10, 1", + "2, 9, 10, 0.1", + }) + void testIntDecimal(int scale, int precision, int value, double expected) { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + LogicalTypeAnnotation.decimalType(scale, precision), + converter -> converter.addInt(value), + expected + ); + } + + @ParameterizedTest + @CsvSource({ + "0, 1, 10, 10", + "1, 18, 10, 1", + "2, 18, 10, 0.1", + }) + void testLongDecimal(int scale, int precision, long value, double expected) { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.decimalType(scale, precision), + converter -> converter.addLong(value), + expected + ); + } + + @Test + void testBooleanPrimitive() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.BOOLEAN, + converter -> converter.addBoolean(true), + true + ); + } + + @Test + void testFloatPrimitive() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.FLOAT, + converter -> converter.addFloat(1f), + 1.0 + ); + } + + @Test + void testDoublePrimitive() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.DOUBLE, + converter -> converter.addDouble(1.5), + 1.5 + ); + } + + @Test + void testInt96Timestamp() { + testPrimitive( + PrimitiveType.PrimitiveTypeName.INT96, + converter -> converter.addBinary(Binary.fromConstantByteArray(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})), + Instant.parse("-4713-11-24T00:00:00Z") + ); + } + + @Test + void testDate() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + LogicalTypeAnnotation.dateType(), + converter -> converter.addInt(2), + LocalDate.of(1970, 1, 3) + ); + } + + @Test + void testTime() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT32, + LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS), + converter -> converter.addInt(61_000), + LocalTime.of(0, 1, 1) + ); + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS), + converter -> converter.addLong(61_000_000), + LocalTime.of(0, 1, 1) + ); + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timeType(true, LogicalTypeAnnotation.TimeUnit.NANOS), + converter -> converter.addLong(61_000_000_000L), + LocalTime.of(0, 1, 1) + ); + } + + @ParameterizedTest + @CsvSource({ + "true, MILLIS, 61000, 1970-01-01T00:01:01Z", + "true, MICROS, 61000000, 1970-01-01T00:01:01Z", + "true, NANOS, 61000000000, 1970-01-01T00:01:01Z", + }) + void testTimestamp(boolean utc, LogicalTypeAnnotation.TimeUnit unit, long input, String output) { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timestampType(utc, unit), + converter -> converter.addLong(input), + Instant.parse(output) + ); + } + + @Test + void testString() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.stringType(), + converter -> converter.addBinary(Binary.fromString("abcdef")), + "abcdef" + ); + } + + @Test + void testEnum() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.enumType(), + converter -> converter.addBinary(Binary.fromString("value")), + "value" + ); + } + + @Test + void testJson() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.BINARY, + LogicalTypeAnnotation.jsonType(), + converter -> converter.addBinary(Binary.fromString("[1,2,3]")), + "[1,2,3]" + ); + } + + @Test + void testUUID() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + LogicalTypeAnnotation.uuidType(), + 16, + converter -> converter + .addBinary(Binary.fromConstantByteArray( + new byte[]{0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, (byte) 0x88, (byte) 0x99, (byte) 0xaa, (byte) 0xbb, + (byte) 0xcc, (byte) 0xdd, (byte) 0xee, (byte) 0xff})), + UUID.fromString("00112233-4455-6677-8899-aabbccddeeff") + ); + } + + @Test + void testInterval() { + testAnnotatedPrimitive( + PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, + LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(), + 12, + converter -> converter.addBinary(Binary.fromConstantByteBuffer(ByteBuffer.allocate(12) + .order(ByteOrder.LITTLE_ENDIAN) + .putInt(1) + .putInt(2) + .putInt(3) + .flip())), + new Interval( + Period.ofMonths(1).plusDays(2), + Duration.ofMillis(3) + ) + ); + } + + @Test + void testOptionalMissing() { + var materializer = new MapRecordMaterializer(Types.buildMessage() + .optional(PrimitiveType.PrimitiveTypeName.INT32).named("value") + .named("message")); + var rootConverter = materializer.getRootConverter(); + rootConverter.start(); + rootConverter.end(); + assertEquals(Map.of(), materializer.getCurrentRecord()); + } + + @Test + void testListFromSimpleRepeatedElement() { + var materializer = new MapRecordMaterializer(Types.buildMessage() + .repeated(PrimitiveType.PrimitiveTypeName.INT32).named("value") + .named("message")); + + + var rootConverter = materializer.getRootConverter(); + rootConverter.start(); + rootConverter.end(); + assertEquals(Map.of(), materializer.getCurrentRecord()); + + rootConverter.start(); + rootConverter.getConverter(0).asPrimitiveConverter().addInt(1); + rootConverter.end(); + assertEquals(Map.of("value", List.of(1)), materializer.getCurrentRecord()); + + rootConverter.start(); + rootConverter.getConverter(0).asPrimitiveConverter().addInt(1); + rootConverter.getConverter(0).asPrimitiveConverter().addInt(2); + rootConverter.end(); + assertEquals(Map.of("value", List.of(1, 2)), materializer.getCurrentRecord()); + } + + @Test + void testListFromListElementStructs() { + var materializer = new MapRecordMaterializer(Types.buildMessage() + .requiredList().optionalElement(PrimitiveType.PrimitiveTypeName.INT32).named("value") + .named("message")); + + var root = materializer.getRootConverter(); + var value = root.getConverter(0).asGroupConverter(); + var list = value.getConverter(0).asGroupConverter(); + var element = list.getConverter(0).asPrimitiveConverter(); + root.start(); + value.start(); + value.end(); + root.end(); + assertEquals(Map.of("value", List.of()), materializer.getCurrentRecord()); + + root.start(); + value.start(); + list.start(); + element.addInt(1); + list.end(); + list.start(); + list.end(); + list.start(); + element.addInt(3); + list.end(); + value.end(); + root.end(); + assertEquals(Map.of("value", Lists.newArrayList(1, null, 3)), materializer.getCurrentRecord()); + } + + @Test + void testListRepeatedAtTopAndBottomLevel() { + var materializer = new MapRecordMaterializer(Types.buildMessage() + .list(Type.Repetition.REPEATED).element(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REPEATED) + .named("value") + .named("message")); + + var root = materializer.getRootConverter(); + var value = root.getConverter(0).asGroupConverter(); + var list = value.getConverter(0).asGroupConverter(); + var element = list.getConverter(0).asPrimitiveConverter(); + root.start(); + value.start(); + value.end(); + value.start(); + list.start(); + element.addInt(1); + element.addInt(2); + list.end(); + list.start(); + element.addInt(3); + list.end(); + value.end(); + root.end(); + assertEquals(Map.of("value", List.of(List.of(), List.of(List.of(1, 2), List.of(3)))), + materializer.getCurrentRecord()); + } + + @Test + void testNestedList() { + var materializer = new MapRecordMaterializer(Types.buildMessage() + .optionalList() + .optionalListElement() + .optionalElement(PrimitiveType.PrimitiveTypeName.INT32) + .named("value") + .named("root")); + + //message root { + // optional group value (LIST) { + // repeated group list { + // optional group element (LIST) { + // repeated group list { + // optional int32 element; + // } + // } + // } + // } + //} + + var root = materializer.getRootConverter(); + var value = root.getConverter(0).asGroupConverter(); + var outerList = value.getConverter(0).asGroupConverter(); + var outerElement = outerList.getConverter(0).asGroupConverter(); + var innerList = outerElement.getConverter(0).asGroupConverter(); + var innerElement = innerList.getConverter(0).asPrimitiveConverter(); + root.start(); + root.end(); + assertEquals(Map.of(), materializer.getCurrentRecord()); + + root.start(); + value.start(); + value.end(); + root.end(); + assertEquals(Map.of("value", List.of()), materializer.getCurrentRecord()); + + root.start(); + value.start(); + outerList.start(); + outerList.end(); + + outerList.start(); + outerElement.start(); + + innerList.start(); + innerElement.addInt(1); + innerList.end(); + + innerList.start(); + innerList.end(); + + innerList.start(); + innerElement.addInt(2); + innerList.end(); + + outerElement.end(); + outerList.end(); + value.end(); + root.end(); + + assertEquals(Map.of( + "value", Lists.newArrayList(null, Lists.newArrayList(1, null, 2)) + ), materializer.getCurrentRecord()); + } + + @Test + void testMapConverter() { + var materializer = new MapRecordMaterializer(Types.buildMessage() + .optionalMap() + .key(PrimitiveType.PrimitiveTypeName.INT32) + .optionalValue(PrimitiveType.PrimitiveTypeName.INT64) + .named("value") + .named("root")); + + //message root { + // optional group value (MAP) { + // repeated group key_value { + // required int32 key; + // optional int64 value; + // } + // } + //} + + var root = materializer.getRootConverter(); + var map = root.getConverter(0).asGroupConverter(); + var keyValue = map.getConverter(0).asGroupConverter(); + var key = keyValue.getConverter(0).asPrimitiveConverter(); + var value = keyValue.getConverter(1).asPrimitiveConverter(); + + root.start(); + root.end(); + assertEquals(Map.of(), materializer.getCurrentRecord()); + + root.start(); + map.start(); + map.end(); + root.end(); + assertEquals(Map.of("value", Map.of()), materializer.getCurrentRecord()); + + root.start(); + map.start(); + keyValue.start(); + key.addInt(1); + keyValue.end(); + map.end(); + root.end(); + assertEquals(Map.of("value", Map.of()), materializer.getCurrentRecord()); + + root.start(); + map.start(); + keyValue.start(); + key.addInt(1); + value.addLong(2); + keyValue.end(); + map.end(); + root.end(); + assertEquals(Map.of("value", Map.of(1, 2L)), materializer.getCurrentRecord()); + + root.start(); + map.start(); + keyValue.start(); + key.addInt(1); + value.addLong(2); + keyValue.end(); + keyValue.start(); + key.addInt(3); + value.addLong(4); + keyValue.end(); + map.end(); + root.end(); + assertEquals(Map.of("value", Map.of(1, 2L, 3, 4L)), materializer.getCurrentRecord()); + } + + @Test + void testRepeatedMap() { + var materializer = new MapRecordMaterializer(Types.buildMessage() + .map(Type.Repetition.REPEATED) + .key(PrimitiveType.PrimitiveTypeName.INT32) + .optionalValue(PrimitiveType.PrimitiveTypeName.INT64) + .named("value") + .named("root")); + + var root = materializer.getRootConverter(); + var map = root.getConverter(0).asGroupConverter(); + var keyValue = map.getConverter(0).asGroupConverter(); + var key = keyValue.getConverter(0).asPrimitiveConverter(); + + root.start(); + map.start(); + keyValue.start(); + key.addInt(1); + keyValue.end(); + map.end(); + root.end(); + assertEquals(Map.of("value", List.of(Map.of())), materializer.getCurrentRecord()); + } + + private void testPrimitive(PrimitiveType.PrimitiveTypeName type, Consumer consumer, + Object expected) { + var materializer = new MapRecordMaterializer(Types.buildMessage() + .required(type).named("value") + .named("message")); + var rootConverter = materializer.getRootConverter(); + rootConverter.start(); + consumer.accept(rootConverter.getConverter(0).asPrimitiveConverter()); + rootConverter.end(); + assertEquals(Map.of("value", expected), materializer.getCurrentRecord()); + } + + private void testAnnotatedPrimitive(PrimitiveType.PrimitiveTypeName type, LogicalTypeAnnotation annotation, + Consumer consumer, Object expected) { + testAnnotatedPrimitive(type, annotation, 0, consumer, expected); + } + + private void testAnnotatedPrimitive(PrimitiveType.PrimitiveTypeName type, LogicalTypeAnnotation annotation, + int length, Consumer consumer, Object expected) { + var materializer = new MapRecordMaterializer(Types.buildMessage() + .required(type).as(annotation).length(length).named("value") + .named("message")); + var rootConverter = materializer.getRootConverter(); + rootConverter.start(); + consumer.accept(rootConverter.getConverter(0).asPrimitiveConverter()); + rootConverter.end(); + assertEquals(Map.of("value", expected), materializer.getCurrentRecord()); + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFileTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFileTest.java new file mode 100644 index 0000000000..46cdb470cf --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetInputFileTest.java @@ -0,0 +1,187 @@ +package com.onthegomap.planetiler.reader.parquet; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.DynamicTest.dynamicTest; + +import com.onthegomap.planetiler.TestUtils; +import com.onthegomap.planetiler.config.Bounds; +import com.onthegomap.planetiler.util.FileUtils; +import java.nio.file.Path; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import org.apache.parquet.filter2.predicate.FilterApi; +import org.junit.jupiter.api.DynamicTest; +import org.junit.jupiter.api.TestFactory; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.locationtech.jts.geom.Envelope; + +class ParquetInputFileTest { + + static List bostons() { + return FileUtils.walkPathWithPattern(TestUtils.pathToResource("parquet").resolve("boston*.parquet")); + } + + @ParameterizedTest + @MethodSource("bostons") + void testReadBoston(Path path) { + for (int i = 0; i < 3; i++) { + Set ids = new HashSet<>(); + for (var block : new ParquetInputFile("parquet", "layer", path) + .get()) { + for (var item : block) { + ids.add(item.getString("id")); + } + } + assertEquals(3, ids.size(), "iter " + i); + } + } + + @ParameterizedTest + @MethodSource("bostons") + void testReadBostonWithBboxFilterCovering(Path path) { + Set ids = new HashSet<>(); + for (var block : new ParquetInputFile("parquet", "layer", path, null, + new Bounds(new Envelope(-71.0747653629, -71.0741656634, 42.3560968301, 42.3564346282)), null, null) + .get()) { + for (var item : block) { + ids.add(item.getString("id")); + } + } + assertEquals(3, ids.size()); + } + + @ParameterizedTest + @MethodSource("bostons") + void testReadBostonWithBboxFilterCoveringAndOtherFilter(Path path) { + Set ids = new HashSet<>(); + for (var block : new ParquetInputFile("parquet", "layer", path, FilterApi.gt(FilterApi.doubleColumn("height"), 3d), + new Bounds(new Envelope(-71.0747653629, -71.0741656634, 42.3560968301, 42.3564346282)), null, null) + .get()) { + for (var item : block) { + ids.add(item.getString("id")); + } + } + assertEquals(1, ids.size()); + } + + @ParameterizedTest + @MethodSource("bostons") + void testReadBostonWithBboxFilterNotCovering(Path path) { + Set ids = new HashSet<>(); + for (var block : new ParquetInputFile("parquet", "layer", path, null, + new Bounds(new Envelope(-72.0747653629, -72.0741656634, 42.3560968301, 42.3564346282)), null, null) + .get()) { + for (var item : block) { + ids.add(item.getString("id")); + } + } + assertEquals(0, ids.size()); + } + + @TestFactory + List testReadAllDataTypes() { + + /* + ┌──────────────────────┬────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐ + │ column_name │ column_type │ null │ key │ default │ extra │ + │ varchar │ varchar │ varchar │ varchar │ varchar │ varchar │ + ├──────────────────────┼────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤ + │ geometry │ BLOB │ YES │ │ │ │ ST_AsWKB(ST_Point(1, 2)) + │ bigint │ BIGINT │ YES │ │ │ │ 9223372036854775807 + │ blob │ BLOB │ YES │ │ │ │ '1011' + │ boolean │ BOOLEAN │ YES │ │ │ │ true + │ date │ DATE │ YES │ │ │ │ '2000-01-01' + │ decimal │ DECIMAL(18,3) │ YES │ │ │ │ 123456.789 + │ double │ DOUBLE │ YES │ │ │ │ 123456.789 + │ hugeint │ HUGEINT │ YES │ │ │ │ 92233720368547758079223372036854775807 + │ integer │ INTEGER │ YES │ │ │ │ 123 + │ interval │ INTERVAL │ YES │ │ │ │ INTERVAL 1 MONTH + INTERVAL 1 DAY + INTERVAL 1 SECOND + │ real │ FLOAT │ YES │ │ │ │ 123.456 + │ smallint │ SMALLINT │ YES │ │ │ │ 1234 + │ time │ TIME │ YES │ │ │ │ 2000-01-01 05:30:10.123 + │ timestamp_with_tim… │ TIMESTAMP WITH TIME ZONE │ YES │ │ │ │ 2000-01-01 05:30:10.123 EST + │ timestamp │ TIMESTAMP │ YES │ │ │ │ 2000-01-01 05:30:10.123456 EST + │ tinyint │ TINYINT │ YES │ │ │ │ 123 + │ ubigint │ UBIGINT │ YES │ │ │ │ 9223372036854775807 + │ uhugeint │ UHUGEINT │ YES │ │ │ │ 92233720368547758079223372036854775807 + │ uinteger │ UINTEGER │ YES │ │ │ │ 123 + │ usmallint │ USMALLINT │ YES │ │ │ │ 123 + │ utinyint │ UTINYINT │ YES │ │ │ │ 123 + │ uuid │ UUID │ YES │ │ │ │ 606362d9-012a-4949-b91a-1ab439951671 + │ varchar │ VARCHAR │ YES │ │ │ │ "string" + │ list │ INTEGER[] │ YES │ │ │ │ [1,2,3,4] + │ map │ MAP(INTEGER, VARCHAR) │ YES │ │ │ │ map([1,2,3],['one','two','three']) + │ array │ INTEGER[3] │ YES │ │ │ │ [1,2,3] + │ struct │ STRUCT(i INTEGER, j VARCHAR) │ YES │ │ │ │ {'i': 42, 'j': 'a'} + │ complex │ MAP(VARCHAR, STRUCT(i INTEGE… │ YES │ │ │ │ [MAP(['a', 'b'], [[], [{'i': 43, 'j': 'a'}, {'i': 43, 'j': 'b'}]])]; + ├──────────────────────┴────────────────────────────────┴─────────┴─────────┴─────────┴─────────┤ + │ 29 rows 6 columns │ + └───────────────────────────────────────────────────────────────────────────────────────────────┘ + + */ + Map map = null; + int i = 0; + for (var block : new ParquetInputFile("parquet", "layer", + TestUtils.pathToResource("parquet").resolve("all_data_types.parquet")) + .get()) { + for (var item : block) { + map = item.tags(); + assertEquals(0, i++); + } + } + assertNotNull(map); + return List.of( + testEquals(map, "bigint", 9223372036854775807L), + test(map, "blob", v -> assertArrayEquals("1011".getBytes(), (byte[]) v)), + testEquals(map, "boolean", true), + testEquals(map, "date", LocalDate.of(2000, 1, 1)), + testEquals(map, "decimal", 123456.789), + testEquals(map, "double", 123456.789), + testEquals(map, "hugeint", 92233720368547758079223372036854775807.0), + testEquals(map, "integer", 123), + testEquals(map, "interval", Interval.of(1, 2, 3_000)), + test(map, "real", v -> assertEquals(123.456, (double) v, 1e-3)), + testEquals(map, "smallint", 1234), + testEquals(map, "time", LocalTime.parse("05:30:10.123")), + testEquals(map, "timestamp_with_timezone", Instant.parse("2000-01-01T10:30:10.123Z")), + testEquals(map, "timestamp", Instant.parse("2000-01-01T10:30:10.123Z")), + testEquals(map, "tinyint", 123), + testEquals(map, "ubigint", 9223372036854775807L), + testEquals(map, "uhugeint", 92233720368547758079223372036854775807.0), + testEquals(map, "uinteger", 123), + testEquals(map, "usmallint", 123), + testEquals(map, "utinyint", 123), + testEquals(map, "uuid", UUID.fromString("606362d9-012a-4949-b91a-1ab439951671")), + testEquals(map, "varchar", "string"), + testEquals(map, "list_of_items", List.of(1, 2, 3, 4)), + testEquals(map, "map", Map.of(1, "one", 2, "two", 3, "three")), + testEquals(map, "array", List.of(1, 2, 3)), + testEquals(map, "struct", Map.of("i", 42, "j", "a")), + testEquals(map, "complex", List.of(Map.of( + "a", List.of(), + "b", List.of( + Map.of("i", 43, "j", "a"), + Map.of("i", 43, "j", "b") + ) + ))) + ); + } + + private static DynamicTest testEquals(Map map, String key, Object expected) { + return test(map, key, v -> assertEquals(expected, map.get(key))); + } + + private static DynamicTest test(Map map, String key, Consumer test) { + return dynamicTest(key, () -> test.accept(map.get(key))); + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetReaderTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetReaderTest.java new file mode 100644 index 0000000000..cbd9f04d3c --- /dev/null +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/reader/parquet/ParquetReaderTest.java @@ -0,0 +1,84 @@ +package com.onthegomap.planetiler.reader.parquet; + +import static com.onthegomap.planetiler.TestUtils.newPoint; +import static com.onthegomap.planetiler.TestUtils.round; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import com.onthegomap.planetiler.FeatureCollector; +import com.onthegomap.planetiler.Profile; +import com.onthegomap.planetiler.TestUtils; +import com.onthegomap.planetiler.collection.FeatureGroup; +import com.onthegomap.planetiler.config.PlanetilerConfig; +import com.onthegomap.planetiler.geo.GeoUtils; +import com.onthegomap.planetiler.geo.GeometryException; +import com.onthegomap.planetiler.geo.TileOrder; +import com.onthegomap.planetiler.reader.SourceFeature; +import com.onthegomap.planetiler.stats.Stats; +import com.onthegomap.planetiler.util.FileUtils; +import com.onthegomap.planetiler.util.Parse; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.locationtech.jts.geom.Geometry; + +class ParquetReaderTest { + private final PlanetilerConfig config = PlanetilerConfig.defaults(); + private final Stats stats = Stats.inMemory(); + + static List bostons() { + return FileUtils.walkPathWithPattern(TestUtils.pathToResource("parquet").resolve("boston*.parquet")); + } + + @ParameterizedTest + @MethodSource("bostons") + @Timeout(30) + void testReadOvertureParquet(Path path) { + List ids = new CopyOnWriteArrayList<>(); + List geoms = new CopyOnWriteArrayList<>(); + + var profile = new Profile.NullProfile() { + volatile double height = 0; + + @Override + public synchronized void processFeature(SourceFeature sourceFeature, FeatureCollector features) { + try { + ids.add(sourceFeature.getString("id")); + height += Parse.parseDoubleOrNull(sourceFeature.getTag("height")) instanceof Double d ? d : 0; + geoms.add(sourceFeature.latLonGeometry()); + } catch (GeometryException e) { + throw new RuntimeException(e); + } + } + }; + var reader = new ParquetReader("source", profile, stats); + reader.process(List.of(path), + FeatureGroup.newInMemoryFeatureGroup(TileOrder.TMS, profile, config, stats), PlanetilerConfig.defaults()); + assertEquals(List.of( + "08b2a306638a0fff02001c5b97636c80", + "08b2a306638a0fff0200a75c80c3d54b", + "08b2a306638a0fff0200d1814977faca" + ), ids.stream().sorted().toList()); + var center = GeoUtils.combine(geoms.toArray(Geometry[]::new)).getCentroid(); + assertEquals(newPoint(-71.07448, 42.35626), round(center)); + assertEquals(4.7, profile.height); + } + + + @Test + void testHivePartitionFields() { + assertNull(ParquetReader.getHivePartitionFields(Path.of(""))); + assertNull(ParquetReader.getHivePartitionFields(Path.of("a"))); + assertNull(ParquetReader.getHivePartitionFields(Path.of("a", "b"))); + assertEquals(Map.of("c", "d"), ParquetReader.getHivePartitionFields(Path.of("a", "b", "c=d"))); + assertEquals(Map.of("c", "d", "e", "f"), + ParquetReader.getHivePartitionFields(Path.of("a", "b", "c=d", "e=f"))); + assertEquals(Map.of("a", "b", "c", "d", "e", "f"), + ParquetReader.getHivePartitionFields(Path.of("a=b", "b", "c=d", "e=f"))); + } +} diff --git a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java index 2f480cbdec..42391b048b 100644 --- a/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java +++ b/planetiler-core/src/test/java/com/onthegomap/planetiler/util/FileUtilsTest.java @@ -8,6 +8,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Path; import java.util.List; @@ -17,6 +18,8 @@ import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; class FileUtilsTest { @@ -119,6 +122,13 @@ void testWalkPathWithPatternDirectory() throws IOException { txtFiles.stream().sorted().toList(), matchingPaths.stream().sorted().toList() ); + + matchingPaths = FileUtils.walkPathWithPattern(parent.resolve("*.txt")); + + assertEquals( + txtFiles.stream().sorted().toList(), + matchingPaths.stream().sorted().toList() + ); } @Test @@ -140,6 +150,9 @@ void testWalkPathWithPatternDirectoryZip() throws IOException { // Otherwise, the files inside the zip should be returned. assertEquals(List.of(zipFile.resolve("inner.txt")), FileUtils.walkPathWithPattern(parent, "*.zip", mockWalkZipFile)); + + + assertEquals(List.of(zipFile), FileUtils.walkPathWithPattern(parent.resolve("*.zip"))); } @Test @@ -148,6 +161,12 @@ void testWalkPathWithPatternSingleZip() { var matchingPaths = FileUtils.walkPathWithPattern(zipPath, "stations.sh[px]"); + assertEquals( + List.of("/shapefile/stations.shp", "/shapefile/stations.shx"), + matchingPaths.stream().map(Path::toString).sorted().toList()); + + matchingPaths = FileUtils.walkPathWithPattern(zipPath.resolve("stations.sh[px]")); + assertEquals( List.of("/shapefile/stations.shp", "/shapefile/stations.shx"), matchingPaths.stream().map(Path::toString).sorted().toList()); @@ -159,4 +178,38 @@ void testExpandFile() throws IOException { FileUtils.setLength(path, 1000); assertEquals(1000, Files.size(path)); } + + @ParameterizedTest + @CsvSource(value = { + "a/b/c; a/b/c;", + "a/b/*; a/b; *", + "a/*/b; a; */b", + "*/b/*; ; */b/*", + "/*/test; /; */test", + "a/b={c,d}/other; a; b={c,d}/other", + "./a/b=?/other; ./a; b=?/other", + }, delimiter = ';') + void testParsePathWithPattern(String input, String base, String pattern) { + var separator = FileSystems.getDefault().getSeparator(); + input = input.replace("/", separator); + base = base == null ? "" : base.replace("/", separator); + pattern = pattern == null ? null : pattern.replace("/", separator); + assertEquals( + new FileUtils.BaseWithPattern( + Path.of(base), + pattern + ), + FileUtils.parsePattern(Path.of(input)) + ); + } + + @Test + void testWalkPathWithPattern() throws IOException { + var path = tmpDir.resolve("a").resolve("b").resolve("c.txt"); + FileUtils.createParentDirectories(path); + Files.writeString(path, "test"); + assertEquals(List.of(path), FileUtils.walkPathWithPattern(tmpDir.resolve(Path.of("a", "*", "c.txt")))); + assertEquals(List.of(path), FileUtils.walkPathWithPattern(tmpDir.resolve(Path.of("*", "*", "c.txt")))); + assertEquals(List.of(path), FileUtils.walkPathWithPattern(tmpDir.resolve(Path.of("a", "b", "c.txt")))); + } } diff --git a/planetiler-core/src/test/resources/log4j2-test.properties b/planetiler-core/src/test/resources/log4j2-test.properties index 3140f703c0..7e7dcb0b5b 100644 --- a/planetiler-core/src/test/resources/log4j2-test.properties +++ b/planetiler-core/src/test/resources/log4j2-test.properties @@ -7,3 +7,7 @@ packages=com.onthegomap.planetiler.util.log4j rootLogger.level=warn rootLogger.appenderRefs=stdout rootLogger.appenderRef.stdout.ref=STDOUT + +# suppress warning about unreadable duckdb statistics +logger.apachecorrupt.name=org.apache.parquet.CorruptStatistics +logger.apachecorrupt.level=error diff --git a/planetiler-core/src/test/resources/parquet/all_data_types.parquet b/planetiler-core/src/test/resources/parquet/all_data_types.parquet new file mode 100644 index 0000000000..ac6776b607 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/all_data_types.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.customgeometryname.parquet b/planetiler-core/src/test/resources/parquet/boston.customgeometryname.parquet new file mode 100644 index 0000000000..f6dbabb4a1 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.customgeometryname.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_new.parquet b/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_new.parquet new file mode 100644 index 0000000000..e3ed0ef6e8 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_new.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_old.parquet b/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_old.parquet new file mode 100644 index 0000000000..3dd2e76045 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.geoarrow_from_gdal_old.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.gzip.parquet b/planetiler-core/src/test/resources/parquet/boston.gzip.parquet new file mode 100644 index 0000000000..9ca613914c Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.gzip.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.lz4hadoop.parquet b/planetiler-core/src/test/resources/parquet/boston.lz4hadoop.parquet new file mode 100644 index 0000000000..31a81492c3 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.lz4hadoop.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.lz4raw.parquet b/planetiler-core/src/test/resources/parquet/boston.lz4raw.parquet new file mode 100644 index 0000000000..b3ba777df8 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.lz4raw.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.none.parquet b/planetiler-core/src/test/resources/parquet/boston.none.parquet new file mode 100644 index 0000000000..17fc8cb7af Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.none.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.parquet b/planetiler-core/src/test/resources/parquet/boston.parquet new file mode 100644 index 0000000000..297da5e9dc Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.snappy.parquet b/planetiler-core/src/test/resources/parquet/boston.snappy.parquet new file mode 100644 index 0000000000..5c230ab5f0 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.snappy.parquet differ diff --git a/planetiler-core/src/test/resources/parquet/boston.zstd.parquet b/planetiler-core/src/test/resources/parquet/boston.zstd.parquet new file mode 100644 index 0000000000..c90b5924c3 Binary files /dev/null and b/planetiler-core/src/test/resources/parquet/boston.zstd.parquet differ diff --git a/planetiler-dist/src/main/java/com/onthegomap/planetiler/Main.java b/planetiler-dist/src/main/java/com/onthegomap/planetiler/Main.java index becc425ba0..5b8eee3241 100644 --- a/planetiler-dist/src/main/java/com/onthegomap/planetiler/Main.java +++ b/planetiler-dist/src/main/java/com/onthegomap/planetiler/Main.java @@ -10,6 +10,7 @@ import com.onthegomap.planetiler.examples.OsmQaTiles; import com.onthegomap.planetiler.examples.ToiletsOverlay; import com.onthegomap.planetiler.examples.ToiletsOverlayLowLevelApi; +import com.onthegomap.planetiler.examples.overture.OvertureBasemap; import com.onthegomap.planetiler.mbtiles.Verify; import com.onthegomap.planetiler.util.CompareArchives; import com.onthegomap.planetiler.util.TileSizeStats; @@ -54,6 +55,8 @@ public class Main { entry("example-bikeroutes", BikeRouteOverlay::main), entry("example-toilets", ToiletsOverlay::main), entry("example-toilets-lowlevel", ToiletsOverlayLowLevelApi::main), + entry("example-overture", OvertureBasemap::main), + entry("overture", OvertureBasemap::main), entry("example-qa", OsmQaTiles::main), entry("osm-qa", OsmQaTiles::main), diff --git a/planetiler-examples/src/main/java/com/onthegomap/planetiler/examples/overture/OvertureBasemap.java b/planetiler-examples/src/main/java/com/onthegomap/planetiler/examples/overture/OvertureBasemap.java new file mode 100644 index 0000000000..7a6315b933 --- /dev/null +++ b/planetiler-examples/src/main/java/com/onthegomap/planetiler/examples/overture/OvertureBasemap.java @@ -0,0 +1,65 @@ +package com.onthegomap.planetiler.examples.overture; + +import com.onthegomap.planetiler.FeatureCollector; +import com.onthegomap.planetiler.Planetiler; +import com.onthegomap.planetiler.Profile; +import com.onthegomap.planetiler.config.Arguments; +import com.onthegomap.planetiler.reader.SourceFeature; +import java.nio.file.Path; + +/** + * Example basemap using Overture Maps data. + */ +public class OvertureBasemap implements Profile { + + @Override + public void processFeature(SourceFeature source, FeatureCollector features) { + String layer = source.getSourceLayer(); + switch (layer) { + case "building" -> features.polygon("building") + .setMinZoom(13) + .inheritAttrFromSource("height") + .inheritAttrFromSource("roof_color"); + case null, default -> { + // ignore for now + } + } + } + + @Override + public String name() { + return "Overture"; + } + + @Override + public String description() { + return "A basemap generated from Overture data"; + } + + @Override + public String attribution() { + return """ + © OpenStreetMap + © Overture Foundation + """ + .replace("\n", " ") + .trim(); + } + + public static void main(String[] args) throws Exception { + run(Arguments.fromArgsOrConfigFile(args)); + } + + static void run(Arguments args) throws Exception { + Path input = args.inputFile("base", "overture base directory", Path.of("data", "overture")); + Planetiler.create(args) + .setProfile(new OvertureBasemap()) + .addParquetSource("overture-buildings", + input.resolve(Path.of("*", "type=building", "*.parquet")), + true, // hive-partitioning + fields -> fields.get("id"), // hash the ID field to generate unique long IDs + fields -> fields.get("type")) // extract "type={}" from the filename to get layer + .overwriteOutput(Path.of("data", "overture.pmtiles")) + .run(); + } +}