Skip to content

Commit

Permalink
cleanup and test geoparquet metadata converter
Browse files Browse the repository at this point in the history
  • Loading branch information
msbarry committed May 20, 2024
1 parent 9f9e6fd commit 716d7f3
Show file tree
Hide file tree
Showing 4 changed files with 625 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import com.onthegomap.planetiler.config.Bounds;
import com.onthegomap.planetiler.geo.GeoUtils;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.Filters;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.locationtech.jts.geom.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,6 +40,11 @@ public record GeoParquetMetadata(
private static final ObjectMapper mapper =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

public ColumnMetadata primaryColumnMetadata() {
return Objects.requireNonNull(columns.get(primaryColumn),
"No geoparquet metadata for primary column " + primaryColumn);
}

@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public record ColumnMetadata(
String encoding,
Expand All @@ -49,6 +63,78 @@ public record ColumnMetadata(
ColumnMetadata(String encoding, List<String> geometryTypes) {
this(encoding, geometryTypes, null, null, null, null, null, null);
}

public Envelope envelope() {
return (bbox == null || bbox.size() != 4) ? GeoUtils.WORLD_LAT_LON_BOUNDS :
new Envelope(bbox.get(0), bbox.get(2), bbox.get(1), bbox.get(3));
}

/**
* Returns a parquet filter that filters records read to only those where the covering bbox overlaps {@code bounds}
* or null if unable to infer that from the metadata.
* <p>
* If covering bbox metadata is missing from geoparquet metadata, it will try to use bbox.xmin, bbox.xmax,
* bbox.ymin, and bbox.ymax if present.
*/
public FilterPredicate bboxFilter(MessageType schema, Bounds bounds) {
if (!bounds.isWorld()) {
var covering = covering();
// if covering metadata missing, use default bbox:{xmin,xmax,ymin,ymax}
if (covering == null) {
if (hasNumericField(schema, "bbox.xmin") &&
hasNumericField(schema, "bbox.xmax") &&
hasNumericField(schema, "bbox.ymin") &&
hasNumericField(schema, "bbox.ymax")) {
covering = new GeoParquetMetadata.Covering(new GeoParquetMetadata.CoveringBbox(
List.of("bbox.xmin"),
List.of("bbox.ymin"),
List.of("bbox.xmax"),
List.of("bbox.ymax")
));
} else if (hasNumericField(schema, "bbox", "xmin") &&
hasNumericField(schema, "bbox", "xmax") &&
hasNumericField(schema, "bbox", "ymin") &&
hasNumericField(schema, "bbox", "ymax")) {
covering = new GeoParquetMetadata.Covering(new GeoParquetMetadata.CoveringBbox(
List.of("bbox", "xmin"),
List.of("bbox", "ymin"),
List.of("bbox", "xmax"),
List.of("bbox", "ymax")
));
}
}
if (covering != null) {
var latLonBounds = bounds.latLon();
// TODO apply projection
var coveringBbox = covering.bbox();
var coordinateType =
schema.getColumnDescription(coveringBbox.xmax().toArray(String[]::new))
.getPrimitiveType()
.getPrimitiveTypeName();
BiFunction<List<String>, Number, FilterPredicate> gtEq = switch (coordinateType) {
case DOUBLE -> (p, v) -> FilterApi.gtEq(Filters.doubleColumn(p), v.doubleValue());
case FLOAT -> (p, v) -> FilterApi.gtEq(Filters.floatColumn(p), v.floatValue());
default -> throw new UnsupportedOperationException();
};
BiFunction<List<String>, Number, FilterPredicate> ltEq = switch (coordinateType) {
case DOUBLE -> (p, v) -> FilterApi.ltEq(Filters.doubleColumn(p), v.doubleValue());
case FLOAT -> (p, v) -> FilterApi.ltEq(Filters.floatColumn(p), v.floatValue());
default -> throw new UnsupportedOperationException();
};
return FilterApi.and(
FilterApi.and(
gtEq.apply(coveringBbox.xmax(), latLonBounds.getMinX()),
ltEq.apply(coveringBbox.xmin(), latLonBounds.getMaxX())
),
FilterApi.and(
gtEq.apply(coveringBbox.ymax(), latLonBounds.getMinY()),
ltEq.apply(coveringBbox.ymin(), latLonBounds.getMaxY())
)
);
}
}
return null;
}
}

public record CoveringBbox(
Expand All @@ -63,6 +149,10 @@ public record Covering(
) {}


/**
* Extracts geoparquet metadata from the {@code "geo"} key value metadata field for the file, or tries to generate a
* default one if missing that uses geometry, wkb_geometry, or wkt_geometry column.
*/
public static GeoParquetMetadata parse(FileMetaData metadata) throws IOException {
String string = metadata.getKeyValueMetaData().get("geo");
if (string != null) {
Expand Down Expand Up @@ -95,4 +185,16 @@ public static GeoParquetMetadata parse(FileMetaData metadata) throws IOException
"No valid geometry columns found: " + metadata.getSchema().asGroupType().getFields().stream().map(
Type::getName).toList());
}

private static boolean hasNumericField(MessageType root, String... path) {
if (root.containsPath(path)) {
var type = root.getType(path);
if (!type.isPrimitive()) {
return false;
}
var typeName = type.asPrimitiveType().getPrimitiveTypeName();
return typeName == PrimitiveType.PrimitiveTypeName.DOUBLE || typeName == PrimitiveType.PrimitiveTypeName.FLOAT;
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.onthegomap.planetiler.reader.parquet;

import com.onthegomap.planetiler.geo.GeoUtils;
import com.onthegomap.planetiler.reader.WithTags;
import com.onthegomap.planetiler.util.FunctionThatThrows;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.locationtech.jts.geom.Geometry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class GeometryReader {
private static final Logger LOGGER = LoggerFactory.getLogger(GeometryReader.class);
private final Map<String, FunctionThatThrows<Object, Geometry>> converters = new HashMap<>();
private final String geometryColumn;

GeometryReader(GeoParquetMetadata geoparquet) {
this.geometryColumn = geoparquet.primaryColumn();
for (var entry : geoparquet.columns().entrySet()) {
String column = entry.getKey();
GeoParquetMetadata.ColumnMetadata columnInfo = entry.getValue();
FunctionThatThrows<Object, Geometry> converter = switch (columnInfo.encoding()) {
case "WKB" -> obj -> obj instanceof byte[] bytes ? GeoUtils.wkbReader().read(bytes) : null;
case "WKT" -> obj -> obj instanceof String string ? GeoUtils.wktReader().read(string) : null;
case "multipolygon", "geoarrow.multipolygon" ->
obj -> obj instanceof List<?> list ? GeoArrow.multipolygon((List<List<List<Object>>>) list) : null;
case "polygon", "geoarrow.polygon" ->
obj -> obj instanceof List<?> list ? GeoArrow.polygon((List<List<Object>>) list) : null;
case "multilinestring", "geoarrow.multilinestring" ->
obj -> obj instanceof List<?> list ? GeoArrow.multilinestring((List<List<Object>>) list) : null;
case "linestring", "geoarrow.linestring" ->
obj -> obj instanceof List<?> list ? GeoArrow.linestring((List<Object>) list) : null;
case "multipoint", "geoarrow.multipoint" ->
obj -> obj instanceof List<?> list ? GeoArrow.multipoint((List<Object>) list) : null;
case "point", "geoarrow.point" -> GeoArrow::point;
default -> throw new IllegalArgumentException("Unhandled type: " + columnInfo.encoding());
};

converters.put(column, converter);
}
}

Geometry readPrimaryGeometry(WithTags tags) {
return readGeometry(tags, geometryColumn);
}

Geometry readGeometry(WithTags tags, String column) {
var value = tags.getTag(column);
var converter = converters.get(column);
if (value == null) {
LOGGER.warn("Missing {} column", column);
return GeoUtils.EMPTY_GEOMETRY;
} else if (converter == null) {
throw new IllegalArgumentException("No geometry converter for " + column);
}
try {
return converter.apply(value);
} catch (Exception e) {
LOGGER.warn("Error reading geometry {}", column, e);
return GeoUtils.EMPTY_GEOMETRY;
}
}
}
Loading

0 comments on commit 716d7f3

Please sign in to comment.