Skip to content

Commit

Permalink
Add support for "files"-archive (#761)
Browse files Browse the repository at this point in the history
* Add support for "files"-archive

i.e. write individual pbf-files to disk in the format <base>/z/x/y.pbf

in order to use that format it must be passed as "--ouput=/path/to/tiles?format=files"

Fixes #536

* default to files format

...if no explict format query param given, path ends with a slash, or no extension given

* output metadata.json in files-archive

and refactor TileArchiveMetadata
1. put zoom into center (CoordinateXY->Coordinate) - in sync with mbtiles-format
2. add (De-)Serializer for Coordinate+Envelop => avoid duplication and cleaner
3. change the json and proto output for TileArchiveMetadata to be (more) in sync with mbtiles-format

* add support for custom tile scheme in files-archive

{z}/{x}/{y}.pbf is the default and can be configured as needed - e.g.:
- different order: {x}/{y}/{z}.pbf
- with intermediate dirs: {x}/a/{y}/b/{z}.pbf
- with different extension: {z}/{y}/{y}.pbf.gz

instead of {x} and {y}, {xs} and {xy} can be used which breaks up
x and y into 2 directories each and ensures that each directory has <1000 children

* fix issues with multiple writers

1. call finish archive only once after all writers are finished
   ...and not every time a writer finishes
2. log "zoom-progress" for the first tile write only
   (Finished z11 ... now starting z12)
3. remove file/dir-size progress logger bottleneck for files archive
   => each archive now reports the bytes written, which also fixes
   the issues of stream-archives reporting the size incorrectly
4. introduce printStats-hook on archive-level

* add async file write support to files archive

...allow to use virtual threads ExecturService (bound only!) for tile writing

also add some benchmark for writing tiles to disk: fixed, bound virtual, async, unbound virtual

* Revert "add async file write support to files archive"

This reverts commit b8cfa56.

* few improvements

- extract TileSchemeEncoding
- use Counter.MultithreadCounter rather than LongAdder to count bytes written
- add some JavaDoc

* simplify files archive usage

1. allow to pass tile scheme directly via output: --output=tiles/{x}/{y}/{z}.pbf
2. auto-encode { (%7B) and } (%7D) => no need to encode it the URI on CLI

* few more adjustments according to PR feeback

1. use WriteableTileArchive#bytesWritten in summmary as well
2. call WriteableTileArchive#init in a safer manner

..and a few more adjustments

* more PR feedback
  • Loading branch information
bbilger authored Jan 3, 2024
1 parent 389ccab commit c480b35
Show file tree
Hide file tree
Showing 37 changed files with 2,330 additions and 422 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.onthegomap.planetiler.stats.ProcessInfo;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.stats.Timers;
import com.onthegomap.planetiler.stream.StreamArchiveUtils;
import com.onthegomap.planetiler.util.AnsiColors;
import com.onthegomap.planetiler.util.BuildInfo;
import com.onthegomap.planetiler.util.ByteBufferUtil;
Expand Down Expand Up @@ -683,15 +682,15 @@ public void run() throws Exception {
throw new IllegalArgumentException(output.format() + " doesn't support concurrent writes");
}
IntStream.range(1, config.tileWriteThreads())
.mapToObj(index -> StreamArchiveUtils.constructIndexedPath(output.getLocalPath(), index))
.mapToObj(output::getPathForMultiThreadedWriter)
.forEach(p -> {
if (!config.append() && (overwrite || config.force())) {
FileUtils.delete(p);
}
if (config.append() && !Files.exists(p)) {
throw new IllegalArgumentException("indexed file \"" + p + "\" must exist when appending");
} else if (!config.append() && Files.exists(p)) {
throw new IllegalArgumentException("indexed file \"" + p + "\" must not exist when not appending");
if (config.append() && !output.exists(p)) {
throw new IllegalArgumentException("indexed archive \"" + p + "\" must exist when appending");
} else if (!config.append() && output.exists(p)) {
throw new IllegalArgumentException("indexed archive \"" + p + "\" must not exist when not appending");
}
});
}
Expand Down Expand Up @@ -719,7 +718,7 @@ public void run() throws Exception {
// in case any temp files are left from a previous run...
FileUtils.delete(tmpDir, nodeDbPath, featureDbPath, multipolygonPath);
Files.createDirectories(tmpDir);
FileUtils.createParentDirectories(nodeDbPath, featureDbPath, multipolygonPath, output.getLocalPath());
FileUtils.createParentDirectories(nodeDbPath, featureDbPath, multipolygonPath, output.getLocalBasePath());

if (!toDownload.isEmpty()) {
download();
Expand Down Expand Up @@ -757,7 +756,7 @@ public void run() throws Exception {
stats.monitorFile("nodes", nodeDbPath);
stats.monitorFile("features", featureDbPath);
stats.monitorFile("multipolygons", multipolygonPath);
stats.monitorFile("archive", output.getLocalPath());
stats.monitorFile("archive", output.getLocalPath(), archive::bytesWritten);

for (Stage stage : stages) {
stage.task.run();
Expand All @@ -774,8 +773,8 @@ public void run() throws Exception {

featureGroup.prepare();

TileArchiveWriter.writeOutput(featureGroup, archive, output::size, tileArchiveMetadata, layerStatsPath, config,
stats);
TileArchiveWriter.writeOutput(featureGroup, archive, archive::bytesWritten, tileArchiveMetadata, layerStatsPath,
config, stats);
} catch (IOException e) {
throw new IllegalStateException("Unable to write to " + output, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
import static com.onthegomap.planetiler.util.LanguageUtils.nullIfEmpty;

import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.files.FilesArchiveUtils;
import com.onthegomap.planetiler.stream.StreamArchiveUtils;
import com.onthegomap.planetiler.util.FileUtils;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

/**
* Definition for a tileset, parsed from a URI-like string.
Expand Down Expand Up @@ -39,6 +44,12 @@ public record TileArchiveConfig(
Map<String, String> options
) {

// be more generous and encode some characters for the users
private static final Map<String, String> URI_ENCODINGS = Map.of(
"{", "%7B",
"}", "%7D"
);

private static TileArchiveConfig.Scheme getScheme(URI uri) {
String scheme = uri.getScheme();
if (scheme == null) {
Expand Down Expand Up @@ -77,18 +88,20 @@ private static Map<String, String> parseQuery(URI uri) {

private static TileArchiveConfig.Format getFormat(URI uri) {
String format = parseQuery(uri).get("format");
if (format == null) {
format = getExtension(uri);
for (var value : TileArchiveConfig.Format.values()) {
if (value.isQueryFormatSupported(format)) {
return value;
}
}
if (format == null) {
return TileArchiveConfig.Format.MBTILES;
if (format != null) {
throw new IllegalArgumentException("Unsupported format " + format + " from " + uri);
}
for (var value : TileArchiveConfig.Format.values()) {
if (value.id().equals(format)) {
if (value.isUriSupported(uri)) {
return value;
}
}
throw new IllegalArgumentException("Unsupported format " + format + " from " + uri);
throw new IllegalArgumentException("Unsupported format " + getExtension(uri) + " from " + uri);
}

/**
Expand All @@ -103,6 +116,10 @@ public static TileArchiveConfig from(String string) {
string += "?" + parts[1];
}
}
for (Map.Entry<String, String> uriEncoding : URI_ENCODINGS.entrySet()) {
string = string.replace(uriEncoding.getKey(), uriEncoding.getValue());
}

return from(URI.create(string));
}

Expand All @@ -111,7 +128,11 @@ public static TileArchiveConfig from(String string) {
*/
public static TileArchiveConfig from(URI uri) {
if (uri.getScheme() == null) {
String base = Path.of(uri.getPath()).toAbsolutePath().toUri().normalize().toString();
final String path = uri.getPath();
String base = Path.of(path).toAbsolutePath().toUri().normalize().toString();
if (path.endsWith("/")) {
base = base + "/";
}
if (uri.getRawQuery() != null) {
base += "?" + uri.getRawQuery();
}
Expand All @@ -133,21 +154,55 @@ public Path getLocalPath() {
return scheme == Scheme.FILE ? Path.of(URI.create(uri.toString().replaceAll("\\?.*$", ""))) : null;
}

/**
* Returns the local <b>base</b> path for this archive, for which directories should be pre-created for.
*/
public Path getLocalBasePath() {
Path p = getLocalPath();
if (format() == Format.FILES) {
p = FilesArchiveUtils.cleanBasePath(p);
}
return p;
}


/**
* Deletes the archive if possible.
*/
public void delete() {
if (scheme == Scheme.FILE) {
FileUtils.delete(getLocalPath());
FileUtils.delete(getLocalBasePath());
}
}

/**
* Returns {@code true} if the archive already exists, {@code false} otherwise.
*/
public boolean exists() {
return getLocalPath() != null && Files.exists(getLocalPath());
return exists(getLocalBasePath());
}

/**
* @param p path to the archive
* @return {@code true} if the archive already exists, {@code false} otherwise.
*/
public boolean exists(Path p) {
if (p == null) {
return false;
}
if (format() != Format.FILES) {
return Files.exists(p);
} else {
if (!Files.exists(p)) {
return false;
}
// file-archive exists only if it has any contents
try (Stream<Path> paths = Files.list(p)) {
return paths.findAny().isPresent();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}

/**
Expand All @@ -165,12 +220,30 @@ public Arguments applyFallbacks(Arguments arguments) {
return Arguments.of(options).orElse(arguments.withPrefix(format.id));
}

public Path getPathForMultiThreadedWriter(int index) {
return switch (format) {
case CSV, TSV, JSON, PROTO, PBF -> StreamArchiveUtils.constructIndexedPath(getLocalPath(), index);
case FILES -> getLocalPath();
default -> throw new UnsupportedOperationException("not supported by " + format);
};
}

public enum Format {
MBTILES("mbtiles",
false /* TODO mbtiles could support append in the future by using insert statements with an "on conflict"-clause (i.e. upsert) and by creating tables only if they don't exist, yet */,
false),
PMTILES("pmtiles", false, false),

// should be before PBF in order to avoid collisions
FILES("files", true, true) {
@Override
boolean isUriSupported(URI uri) {
final String path = uri.getPath();
return path != null && (path.endsWith("/") || path.contains("{") /* template string */ ||
!path.contains(".") /* no extension => assume files */);
}
},

CSV("csv", true, true),
/** identical to {@link Format#CSV} - except for the column separator */
TSV("tsv", true, true),
Expand Down Expand Up @@ -202,6 +275,15 @@ public boolean supportsAppend() {
public boolean supportsConcurrentWrites() {
return supportsConcurrentWrites;
}

boolean isUriSupported(URI uri) {
final String path = uri.getPath();
return path != null && path.endsWith("." + id);
}

boolean isQueryFormatSupported(String queryFormat) {
return id.equals(queryFormat);
}
}

public enum Scheme {
Expand Down
Loading

0 comments on commit c480b35

Please sign in to comment.