Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix tile size stats memory leak #861

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ private void tileEncoderSink(Iterable<TileBatch> prev) throws IOException {
boolean lastIsFill = false;
List<TileSizeStats.LayerStats> lastLayerStats = null;
boolean skipFilled = config.skipFilledTiles();
var layerStatsSerializer = TileSizeStats.newThreadLocalSerializer();

var tileStatsUpdater = tileStats.threadLocalUpdater();
var layerAttrStatsUpdater = layerAttrStats.handlerForThread();
Expand Down Expand Up @@ -320,7 +321,7 @@ private void tileEncoderSink(Iterable<TileBatch> prev) throws IOException {
if ((!skipFilled || !lastIsFill) && bytes != null) {
tileStatsUpdater.recordTile(tileFeatures.tileCoord(), bytes.length, layerStats);
List<String> layerStatsRows = config.outputLayerStats() ?
TileSizeStats.formatOutputRows(tileFeatures.tileCoord(), bytes.length, layerStats) :
layerStatsSerializer.formatOutputRows(tileFeatures.tileCoord(), bytes.length, layerStats) :
List.of();
result.add(
new TileEncodingResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* Utilities for extracting tile and layer size summaries from encoded vector tiles.
* <p>
* {@link #computeTileStats(VectorTileProto.Tile)} extracts statistics about each layer in a tile and
* {@link #formatOutputRows(TileCoord, int, List)} formats them as row of a TSV file to write.
* {@link TsvSerializer} formats them as row of a TSV file to write.
* <p>
* To generate a tsv.gz file with stats for each tile, you can add {@code --output-layerstats} option when generating an
* archive, or run the following an existing archive:
Expand All @@ -52,13 +52,11 @@
public class TileSizeStats {

private static final int BATCH_SIZE = 1_000;
private static final CsvMapper MAPPER = new CsvMapper();
private static final CsvSchema SCHEMA = MAPPER
private static final CsvSchema SCHEMA = new CsvMapper()
.schemaFor(OutputRow.class)
.withoutHeader()
.withColumnSeparator('\t')
.withLineSeparator("\n");
private static final ObjectWriter WRITER = MAPPER.writer(SCHEMA);

/** Returns the default path that a layerstats file should go relative to an existing archive. */
public static Path getDefaultLayerstatsPath(Path archive) {
Expand Down Expand Up @@ -120,6 +118,7 @@ record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {}
List<LayerStats> layerStats = null;

var updater = tileStats.threadLocalUpdater();
var layerStatsSerializer = TileSizeStats.newThreadLocalSerializer();
for (var batch : prev) {
List<String> lines = new ArrayList<>(batch.tiles.size());
for (var tile : batch.tiles) {
Expand All @@ -130,7 +129,7 @@ record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {}
layerStats = computeTileStats(decoded);
}
updater.recordTile(tile.coord(), zipped.length, layerStats);
lines.addAll(TileSizeStats.formatOutputRows(tile.coord(), zipped.length, layerStats));
lines.addAll(layerStatsSerializer.formatOutputRows(tile.coord(), zipped.length, layerStats));
}
batch.stats.complete(lines);
}
Expand Down Expand Up @@ -161,28 +160,32 @@ record Batch(List<Tile> tiles, CompletableFuture<List<String>> stats) {}
stats.printSummary();
}

/** Returns the TSV rows to output for all the layers in a tile. */
public static List<String> formatOutputRows(TileCoord tileCoord, int archivedBytes, List<LayerStats> layerStats)
throws IOException {
int hilbert = tileCoord.hilbertEncoded();
List<String> result = new ArrayList<>(layerStats.size());
for (var layer : layerStats) {
result.add(lineToString(new OutputRow(
tileCoord.z(),
tileCoord.x(),
tileCoord.y(),
hilbert,
archivedBytes,
layer.layer,
layer.layerBytes,
layer.layerFeatures,
layer.layerGeometries,
layer.layerAttrBytes,
layer.layerAttrKeys,
layer.layerAttrValues
)));
}
return result;
/** Returns a {@link TsvSerializer} that can be used by a single thread to convert to CSV rows. */
public static TsvSerializer newThreadLocalSerializer() {
// CsvMapper is not entirely thread safe, and can end up with a BufferRecycler memory leak when writeValueAsString
// is called billions of times from multiple threads, so we generate a new instance per serializing thread
ObjectWriter writer = new CsvMapper().writer(SCHEMA);
return (tileCoord, archivedBytes, layerStats) -> {
int hilbert = tileCoord.hilbertEncoded();
List<String> result = new ArrayList<>(layerStats.size());
for (var layer : layerStats) {
result.add(writer.writeValueAsString(new OutputRow(
tileCoord.z(),
tileCoord.x(),
tileCoord.y(),
hilbert,
archivedBytes,
layer.layer,
layer.layerBytes,
layer.layerFeatures,
layer.layerGeometries,
layer.layerAttrBytes,
layer.layerAttrKeys,
layer.layerAttrValues
)));
}
return result;
};
}

/**
Expand All @@ -195,11 +198,6 @@ public static Writer newWriter(Path path) throws IOException {
StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE))));
}

/** Returns {@code output} encoded as a TSV row string. */
public static String lineToString(OutputRow output) throws IOException {
return WRITER.writeValueAsString(output);
}

/** Returns the header row for the output TSV file. */
public static String headerRow() {
return String.join(
Expand Down Expand Up @@ -240,6 +238,14 @@ public static List<LayerStats> computeTileStats(VectorTileProto.Tile proto) {
return result;
}

@FunctionalInterface
public interface TsvSerializer {

/** Returns the TSV rows to output for all the layers in a tile. */
List<String> formatOutputRows(TileCoord tileCoord, int archivedBytes, List<LayerStats> layerStats)
throws IOException;
}

/** Model for the data contained in each row in the TSV. */
@JsonPropertyOrder({
"z",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void computeStatsOneFeature() throws IOException {
assertEquals(2, entry1.layerAttrKeys());
assertEquals(2, entry1.layerAttrValues());

var formatted = TileSizeStats.formatOutputRows(TileCoord.ofXYZ(1, 2, 3), 999, stats);
var formatted = TileSizeStats.newThreadLocalSerializer().formatOutputRows(TileCoord.ofXYZ(1, 2, 3), 999, stats);
assertEquals(
"""
z x y hilbert archived_tile_bytes layer layer_bytes layer_features layer_geometries layer_attr_bytes layer_attr_keys layer_attr_values
Expand Down Expand Up @@ -86,7 +86,7 @@ void computeStats2Features() throws IOException {
assertEquals("b", entry2.layer());
assertEquals(1, entry2.layerFeatures());

var formatted = TileSizeStats.formatOutputRows(TileCoord.ofXYZ(1, 2, 3), 999, stats);
var formatted = TileSizeStats.newThreadLocalSerializer().formatOutputRows(TileCoord.ofXYZ(1, 2, 3), 999, stats);
assertEquals(
"""
z x y hilbert archived_tile_bytes layer layer_bytes layer_features layer_geometries layer_attr_bytes layer_attr_keys layer_attr_values
Expand Down
Loading