Skip to content

Commit

Permalink
few more adjustments according to PR feeback
Browse files Browse the repository at this point in the history
1. use WriteableTileArchive#bytesWritten in summmary as well
2. call WriteableTileArchive#init in a safer manner

..and rebase
  • Loading branch information
bbilger committed Dec 30, 2023
1 parent 7d0bc26 commit e395518
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ public void run() throws Exception {
stats.monitorFile("nodes", nodeDbPath);
stats.monitorFile("features", featureDbPath);
stats.monitorFile("multipolygons", multipolygonPath);
stats.monitorFile("archive", output.getLocalPath());
stats.monitorFile("archive", output.getLocalPath(), archive::bytesWritten);

for (Stage stage : stages) {
stage.task.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ public static void writeOutput(FeatureGroup features, WriteableTileArchive outpu
.addBuffer("reader_queue", queueSize)
.sinkTo("encode", processThreads, writer::tileEncoderSink);

// ensure to initialize the archive BEFORE starting to write any tiles
output.initialize();

// the tile writer will wait on the result of each batch to ensure tiles are written in order
WorkerPipeline<TileBatch> writeBranch = pipeline.readFromQueue(writerQueue)
.sinkTo("write", tileWriteThreads, writer::tileWriter);
Expand Down Expand Up @@ -339,9 +342,6 @@ private void tileEncoderSink(Iterable<TileBatch> prev) throws IOException {
private void tileWriter(Iterable<TileBatch> tileBatches) throws ExecutionException, InterruptedException {

final boolean firstTileWriter = firstTileWriterTracker.compareAndExchange(true, false);
if (firstTileWriter) {
archive.initialize();
}

var f = NumberFormat.getNumberInstance(Locale.getDefault());
f.setMaximumFractionDigits(5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class PrometheusStats implements Stats {
private ScheduledExecutorService executor;
private final String job;
private final Map<String, Path> filesToMonitor = new ConcurrentSkipListMap<>();
private final Map<String, LongSupplier> sizesOfFilesToMonitor = new ConcurrentSkipListMap<>();
private final Map<String, Long> dataErrorCounters = new ConcurrentHashMap<>();
private final Map<String, MemoryEstimator.HasEstimate> heapObjectsToMonitor = new ConcurrentSkipListMap<>();

Expand Down Expand Up @@ -175,6 +176,11 @@ public Map<String, Path> monitoredFiles() {
return filesToMonitor;
}

@Override
public Map<String, LongSupplier> monitoredFileSizes() {
return sizesOfFilesToMonitor;
}

@Override
public void monitorInMemoryObject(String name, MemoryEstimator.HasEstimate object) {
heapObjectsToMonitor.put(name, object);
Expand Down Expand Up @@ -251,8 +257,10 @@ public List<MetricFamilySamples> collect() {
for (var file : filesToMonitor.entrySet()) {
String name = sanitizeMetricName(file.getKey());
Path path = file.getValue();
var sizeSupplier = monitoredFileSizes().getOrDefault(file.getKey(), () -> FileUtils.size(path));
long size = sizeSupplier.getAsLong();
results.add(new GaugeMetricFamily(BASE + "file_" + name + "_size_bytes", "Size of " + name + " in bytes",
FileUtils.size(path)));
size));
if (Files.exists(path)) {
try {
FileStore fileStore = Files.getFileStore(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ default void printSummary() {
timers().printSummary();
logger.info("-".repeat(40));
for (var entry : monitoredFiles().entrySet()) {
long size = FileUtils.size(entry.getValue());
var sizeSupplier = monitoredFileSizes().getOrDefault(entry.getKey(), () -> FileUtils.size(entry.getValue()));
long size = sizeSupplier.getAsLong();
if (size > 0) {
logger.info("\t{}\t{}B", entry.getKey(), format.storage(size, false));
}
Expand Down Expand Up @@ -120,13 +121,23 @@ default Timers.Finishable startStage(String name, boolean log) {
/** Returns all the files being monitored. */
Map<String, Path> monitoredFiles();

Map<String, LongSupplier> monitoredFileSizes();

/** Adds a stat that will track the size of a file or directory located at {@code path}. */
default void monitorFile(String name, Path path) {
monitorFile(name, path, null);
}

default void monitorFile(String name, Path path, LongSupplier sizeProvider) {
if (path != null) {
monitoredFiles().put(name, path);
}
if (sizeProvider != null) {
monitoredFileSizes().put(name, sizeProvider);
}
}


/** Adds a stat that will track the estimated in-memory size of {@code object}. */
void monitorInMemoryObject(String name, MemoryEstimator.HasEstimate object);

Expand Down Expand Up @@ -189,6 +200,7 @@ private InMemory() {}

private final Timers timers = new Timers();
private final Map<String, Path> monitoredFiles = new ConcurrentSkipListMap<>();
private final Map<String, LongSupplier> monitoredFileSizes = new ConcurrentSkipListMap<>();
private final Map<String, Long> dataErrors = new ConcurrentHashMap<>();

@Override
Expand All @@ -204,6 +216,11 @@ public Map<String, Path> monitoredFiles() {
return monitoredFiles;
}

@Override
public Map<String, LongSupplier> monitoredFileSizes() {
return monitoredFileSizes;
}

@Override
public void monitorInMemoryObject(String name, MemoryEstimator.HasEstimate object) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ void testRoundtripMetadata() throws IOException {
"baselayer",
TileArchiveMetadata.MVT_FORMAT,
new Envelope(1.1, 2.2, 3.3, 4.4),
new CoordinateXY(5.5, 6.6),
7d,
new Coordinate(5.5, 6.6, 7d),
8,
9,
TileArchiveMetadata.TileArchiveMetadataJson.create(
Expand Down

0 comments on commit e395518

Please sign in to comment.