Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
msbarry committed May 22, 2024
1 parent fbb9139 commit c482e8a
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -495,7 +497,10 @@ public Planetiler addParquetSource(String name, List<Path> paths, boolean hivePa
for (var path : paths) {
inputPaths.add(new InputPath(name, path, false));
}
return addStage(name, "Process features in " + paths,
var separator = Pattern.quote(paths.isEmpty() ? "/" : paths.getFirst().getFileSystem().getSeparator());
String prefix = StringUtils.getCommonPrefix(paths.stream().map(Path::toString).toArray(String[]::new))
.replaceAll(separator + "[^" + separator + "]*$", "");
return addStage(name, "Process features in " + (prefix.isEmpty() ? (paths.size() + " files") : prefix),
ifSourceUsed(name, () -> new ParquetReader(name, profile, stats, getId, getLayer, hivePartitioning)
.process(paths, featureGroup, config)));
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public boolean hasFilter() {
return FilterCompat.isFilteringRequired(filter);
}

public boolean isOutOfBounds() {
return outOfBounds;
}

public BlockReader get() {
if (outOfBounds) {
return Collections::emptyIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
*/
class ParquetPrimitiveConverter extends PrimitiveConverter {
private final PrimitiveType.PrimitiveTypeName primitiveType;
private final ParquetConverterContext context;
private final ParquetRecordConverter.Context context;
private Dictionary dictionary;

ParquetPrimitiveConverter(ParquetConverterContext context) {
ParquetPrimitiveConverter(ParquetRecordConverter.Context context) {
this.context = context;
this.primitiveType = context.type.asPrimitiveType().getPrimitiveTypeName();
}

static ParquetPrimitiveConverter of(ParquetConverterContext context) {
static ParquetPrimitiveConverter of(ParquetRecordConverter.Context context) {
var primitiveType = context.type().asPrimitiveType().getPrimitiveTypeName();
return switch (primitiveType) {
case FLOAT, DOUBLE, BOOLEAN -> new ParquetPrimitiveConverter(context);
Expand Down Expand Up @@ -181,7 +181,7 @@ private static class BinaryConverer extends ParquetPrimitiveConverter {

private final Function<Binary, ?> remapper;

BinaryConverer(ParquetConverterContext context, Function<Binary, ?> remapper) {
BinaryConverer(ParquetRecordConverter.Context context, Function<Binary, ?> remapper) {
super(context);
this.remapper = remapper;
}
Expand All @@ -196,7 +196,7 @@ public void addBinary(Binary value) {
private static class IntegerConverter extends ParquetPrimitiveConverter {
private final LongFunction<?> remapper;

IntegerConverter(ParquetConverterContext context, LongFunction<?> remapper) {
IntegerConverter(ParquetRecordConverter.Context context, LongFunction<?> remapper) {
super(context);
this.remapper = remapper;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.onthegomap.planetiler.reader.parquet;

import static io.prometheus.client.Collector.NANOSECONDS_PER_SECOND;

import com.onthegomap.planetiler.FeatureCollector;
import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.collection.FeatureGroup;
Expand Down Expand Up @@ -87,7 +89,9 @@ public void process(List<Path> sourcePath, FeatureGroup writer, PlanetilerConfig
var hivePartitionFields = hivePartitioning ? getHivePartitionFields(path) : null;
String layer = getLayerName(path);
return new ParquetInputFile(sourceName, layer, path, null, config.bounds(), hivePartitionFields, idGenerator);
}).toList();
})
.filter(file -> !file.isOutOfBounds())
.toList();
// don't show % complete on features when a filter is present because to determine total # elements would
// take an expensive initial query, and % complete on blocks gives a good enough proxy
long featureCount = inputFiles.stream().anyMatch(ParquetInputFile::hasFilter) ? 0 :
Expand Down Expand Up @@ -165,6 +169,20 @@ public void process(List<Path> sourcePath, FeatureGroup writer, PlanetilerConfig

pipeline.awaitAndLog(loggers, config.logInterval());

if (LOGGER.isInfoEnabled()) {
var format = Format.defaultInstance();
long count = featuresRead.get();
var elapsed = timer.elapsed();
LOGGER.info("Processed {} parquet features ({}/s, {} blocks, {} files) in {}",
format.integer(count),
format.numeric(count * NANOSECONDS_PER_SECOND / elapsed.wall().toNanos()),
format.integer(blocksRead.get()),
format.integer(inputFiles.size()),
elapsed
);
}
timer.stop();

// hook for profile to do any post-processing after this source is read
try (
var threadLocalWriter = writer.writerForThread();
Expand All @@ -174,11 +192,6 @@ public void process(List<Path> sourcePath, FeatureGroup writer, PlanetilerConfig
} catch (IOException e) {
LOGGER.warn("Error closing writer", e);
}

LOGGER.atInfo().setMessage("Processed {} parquet features")
.addArgument(() -> Format.defaultInstance().integer(featuresRead.get()))
.log();
timer.stop();
}

private String getLayerName(Path path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class ParquetRecordConverter extends RecordMaterializer<Map<String, Objec
private Map<String, Object> map;

ParquetRecordConverter(MessageType schema) {
root = new StructConverter(new ParquetConverterContext(schema)) {
root = new StructConverter(new Context(schema)) {
@Override
public void start() {
var group = new MapGroup(schema.getFieldCount());
Expand Down Expand Up @@ -58,12 +58,12 @@ interface Group {

private static class ListConverter extends StructConverter {

ListConverter(ParquetConverterContext context) {
ListConverter(Context context) {
super(context);
}

@Override
protected Converter makeConverter(ParquetConverterContext child) {
protected Converter makeConverter(Context child) {
if ((child.named("list") || child.named("array")) && child.onlyField("element")) {
return new ListElementConverter(child.hoist());
}
Expand All @@ -79,7 +79,7 @@ public void start() {

private static class ListElementConverter extends StructConverter {

ListElementConverter(ParquetConverterContext context) {
ListElementConverter(Context context) {
super(context);
}

Expand All @@ -96,12 +96,12 @@ public void end() {

private static class MapConverter extends StructConverter {

MapConverter(ParquetConverterContext context) {
MapConverter(Context context) {
super(context);
}

@Override
protected Converter makeConverter(ParquetConverterContext child) {
protected Converter makeConverter(Context child) {
if (context.getFieldCount() == 1) {
Type type = child.type;
String onlyFieldName = type.getName().toLowerCase(Locale.ROOT);
Expand All @@ -123,7 +123,7 @@ public void start() {
private static class MapEntryConverter extends StructConverter {
MapEntryGroup entry;

MapEntryConverter(ParquetConverterContext context) {
MapEntryConverter(Context context) {
super(context);
}

Expand All @@ -142,10 +142,10 @@ public void end() {

static class StructConverter extends GroupConverter {

final ParquetConverterContext context;
final Context context;
private final Converter[] converters;

StructConverter(ParquetConverterContext context) {
StructConverter(Context context) {
this.context = context;
int count = context.type.asGroupType().getFieldCount();
converters = new Converter[count];
Expand All @@ -154,7 +154,7 @@ static class StructConverter extends GroupConverter {
}
}

protected Converter makeConverter(ParquetConverterContext child) {
protected Converter makeConverter(Context child) {
Type type = child.type;
LogicalTypeAnnotation logical = type.getLogicalTypeAnnotation();
if (!type.isPrimitive()) {
Expand Down Expand Up @@ -311,4 +311,81 @@ public Object value() {
}
}

/** Constructs java objects from parquet records at read-time. */
static final class Context {

final Context parent;
final String fieldOnParent;
final Type type;
final boolean repeated;
private final int fieldCount;
Group current;

Context(Context parent, String fieldOnParent, Type type, boolean repeated) {
this.parent = parent;
this.fieldOnParent = fieldOnParent;
this.type = type;
this.repeated = repeated;
this.fieldCount = type.isPrimitive() ? 0 : type.asGroupType().getFieldCount();
}

public Context(Context newParent, Type type) {
this(newParent, type.getName(), type, type.isRepetition(Type.Repetition.REPEATED));
}

public Context(MessageType schema) {
this(null, schema);
}

public Context field(int i) {
return new Context(this, type.asGroupType().getType(i));
}

/** Returns a new context that flattens-out this level of the hierarchy and writes values into the parent field. */
public Context hoist() {
return new Context(parent, parent.fieldOnParent, type, repeated);
}

public void acceptCurrentValue() {
accept(current.value());
}

public void accept(Object value) {
parent.current.add(fieldOnParent, value, repeated);
}

public int getFieldCount() {
return fieldCount;
}

public void accept(Object k, Object v) {
parent.current.add(k, v, repeated);
}

public Context repeated(boolean newRepeated) {
return new Context(parent, fieldOnParent, type, newRepeated);
}

public boolean named(String name) {
return type.getName().equalsIgnoreCase(name);
}

boolean onlyField(String name) {
return !type.isPrimitive() && fieldCount == 1 &&
type.asGroupType().getFieldName(0).equalsIgnoreCase(name);
}

public Type type() {
return type;
}

@Override
public String toString() {
return "Context[" +
"parent=" + parent + ", " +
"fieldOnParent=" + fieldOnParent + ", " +
"type=" + type + ", " +
"repeated=" + repeated + ']';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,15 @@ default Timers.Finishable startStage(String name, boolean log) {
LogUtil.setStage(name);
}
var timer = timers().startTimer(name, log);
return () -> {
timer.stop();
if (log) {
LogUtil.clearStage();
return new Timers.Finishable() {
@Override
public void stop() {
timer.stop();
}

@Override
public ProcessTime elapsed() {
return timer.elapsed();
}
};
}
Expand Down
Loading

0 comments on commit c482e8a

Please sign in to comment.