diff --git a/src/main/java/io/cryostat/Producers.java b/src/main/java/io/cryostat/Producers.java index 8b94dcd0b..5375e2dee 100644 --- a/src/main/java/io/cryostat/Producers.java +++ b/src/main/java/io/cryostat/Producers.java @@ -22,7 +22,7 @@ import io.cryostat.core.reports.InterruptibleReportGenerator; import io.cryostat.libcryostat.sys.Clock; import io.cryostat.libcryostat.sys.FileSystem; -import io.cryostat.recordings.ArchiveRequestGenerator; +import io.cryostat.recordings.LongRunningRequestGenerator; import io.quarkus.arc.DefaultBean; import io.vertx.mutiny.core.Vertx; @@ -80,9 +80,9 @@ public static InterruptibleReportGenerator produceInterruptibleReportGenerator() @Produces @RequestScoped @DefaultBean - public static ArchiveRequestGenerator produceArchiveRequestGenerator() { + public static LongRunningRequestGenerator produceArchiveRequestGenerator() { boolean singleThread = Runtime.getRuntime().availableProcessors() < 2; - return new ArchiveRequestGenerator( + return new LongRunningRequestGenerator( singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool()); } diff --git a/src/main/java/io/cryostat/recordings/ActiveRecordings.java b/src/main/java/io/cryostat/recordings/ActiveRecordings.java index 88f394fca..8b3f8692a 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecordings.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecordings.java @@ -29,8 +29,8 @@ import io.cryostat.ConfigProperties; import io.cryostat.libcryostat.templates.Template; import io.cryostat.libcryostat.templates.TemplateType; -import io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest; -import io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest; +import io.cryostat.recordings.LongRunningRequestGenerator.ArchiveRequest; +import io.cryostat.recordings.LongRunningRequestGenerator.GrafanaActiveUploadRequest; import io.cryostat.recordings.RecordingHelper.RecordingOptions; import io.cryostat.recordings.RecordingHelper.RecordingReplace; import io.cryostat.targets.Target; @@ -67,7 +67,7 @@ public class ActiveRecordings { @Inject ObjectMapper mapper; @Inject RecordingHelper recordingHelper; - @Inject ArchiveRequestGenerator generator; + @Inject LongRunningRequestGenerator generator; @Inject EventBus bus; @Inject Logger logger; @@ -130,31 +130,16 @@ public String patch( .atMost(connectionFailedTimeout); return null; case "save": - // FIXME this operation might take a long time to complete, depending on the - // amount of JFR data in the target and the speed of the connection between the - // target and Cryostat. We should not make the client wait until this operation - // completes before sending a response - it should be async. Here we should just - // return an Accepted response, and if a failure occurs that should be indicated - // as a websocket notification. - - /* - * Desired workflow: - * Client sends a PATCH request to Cryostat - * Cryostat receives the PATCH and checks that the specified active recording exists and that the target JVM is reachable (ex. try to open a connection and do something relatively lightweight like compute its JVM ID). If this check succeeds respond to the PATCH with 202, if it fails respond with a 404 (recording not found) or 502 (target not reachable) etc. - * In the background, Cryostat creates the S3 file upload request, opens a target connection, pipes the bytes, etc. - same as steps 2-5 above - * Cryostat emits a WebSocket notification, either indicating task successful completion or task failure. - */ - logger.info("Ceating request"); ArchiveRequest request = new ArchiveRequest(UUID.randomUUID().toString(), activeRecording); - logger.info( + logger.tracev( "Request created: (" + request.getId() + ", " + request.recording().name + ")"); response.endHandler( - (e) -> bus.publish(ArchiveRequestGenerator.ARCHIVE_ADDRESS, request)); + (e) -> bus.publish(LongRunningRequestGenerator.ARCHIVE_ADDRESS, request)); return request.getId(); default: throw new BadRequestException(body); @@ -262,7 +247,7 @@ public String uploadToGrafana( + request.getTargetId() + ")"); response.endHandler( - (e) -> bus.publish(ArchiveRequestGenerator.GRAFANA_ACTIVE_ADDRESS, request)); + (e) -> bus.publish(LongRunningRequestGenerator.GRAFANA_ACTIVE_ADDRESS, request)); return request.getId(); } diff --git a/src/main/java/io/cryostat/recordings/ArchivedRecordings.java b/src/main/java/io/cryostat/recordings/ArchivedRecordings.java index b594811cf..eaa9de3e2 100644 --- a/src/main/java/io/cryostat/recordings/ArchivedRecordings.java +++ b/src/main/java/io/cryostat/recordings/ArchivedRecordings.java @@ -35,7 +35,7 @@ import io.cryostat.libcryostat.sys.Clock; import io.cryostat.recordings.ActiveRecording.Listener.ArchivedRecordingEvent; import io.cryostat.recordings.ActiveRecordings.Metadata; -import io.cryostat.recordings.ArchiveRequestGenerator.GrafanaArchiveUploadRequest; +import io.cryostat.recordings.LongRunningRequestGenerator.GrafanaArchiveUploadRequest; import io.cryostat.targets.Target; import io.cryostat.util.HttpMimeType; import io.cryostat.ws.MessagingServer; @@ -498,7 +498,7 @@ public String uploadArchivedToGrafana(HttpServerResponse response, @RestPath Str logger.info( "Request created: (" + request.getId() + ", " + request.getPair().toString() + ")"); response.endHandler( - (e) -> bus.publish(ArchiveRequestGenerator.GRAFANA_ARCHIVE_ADDRESS, request)); + (e) -> bus.publish(LongRunningRequestGenerator.GRAFANA_ARCHIVE_ADDRESS, request)); return request.getId(); } diff --git a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java b/src/main/java/io/cryostat/recordings/LongRunningRequestGenerator.java similarity index 98% rename from src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java rename to src/main/java/io/cryostat/recordings/LongRunningRequestGenerator.java index 74afa6c47..72a683b48 100644 --- a/src/main/java/io/cryostat/recordings/ArchiveRequestGenerator.java +++ b/src/main/java/io/cryostat/recordings/LongRunningRequestGenerator.java @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; @ApplicationScoped -public class ArchiveRequestGenerator { +public class LongRunningRequestGenerator { public static final String ARCHIVE_ADDRESS = "io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest"; @@ -66,7 +66,7 @@ public class ArchiveRequestGenerator { @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration timeout; - public ArchiveRequestGenerator(ExecutorService executor) { + public LongRunningRequestGenerator(ExecutorService executor) { this.executor = executor; } @@ -84,7 +84,8 @@ public Future performArchive(ArchiveRequest request) { bus.publish( MessagingServer.class.getName(), new Notification( - ARCHIVE_RECORDING_SUCCESS, Map.of("recording", rec))); + ARCHIVE_RECORDING_SUCCESS, + Map.of("jobId", request.getId(), "recording", rec))); return request.getId(); } catch (Exception e) { logger.info("Archiving failed"); diff --git a/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java b/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java index 63b19722d..ffdb43729 100644 --- a/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java +++ b/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java @@ -16,7 +16,6 @@ package io.cryostat.reports; import java.util.Map; -import java.util.Objects; import java.util.function.Predicate; import org.openjdk.jmc.flightrecorder.rules.IRule; @@ -115,15 +114,15 @@ public Uni> reportFor(String jvmId, String filename) public boolean keyExists(ActiveRecording recording) { String key = ReportsService.key(recording); return (quarkusCache || memoryCache) - && !Objects.isNull(activeCache.as(CaffeineCache.class).getIfPresent(key)) - && !delegate.keyExists(recording); + && (activeCache.as(CaffeineCache.class).keySet().contains(key) + || delegate.keyExists(recording)); } @Override public boolean keyExists(String jvmId, String filename) { String key = recordingHelper.archivedRecordingKey(jvmId, filename); return (quarkusCache || memoryCache) - && !Objects.isNull(archivedCache.as(CaffeineCache.class).getIfPresent(key)) - && !delegate.keyExists(jvmId, filename); + && (archivedCache.as(CaffeineCache.class).keySet().contains(key) + || delegate.keyExists(jvmId, filename)); } } diff --git a/src/main/java/io/cryostat/reports/Reports.java b/src/main/java/io/cryostat/reports/Reports.java index a402e0000..873643b9e 100644 --- a/src/main/java/io/cryostat/reports/Reports.java +++ b/src/main/java/io/cryostat/reports/Reports.java @@ -20,9 +20,9 @@ import io.cryostat.ConfigProperties; import io.cryostat.StorageBuckets; -import io.cryostat.recordings.ArchiveRequestGenerator; -import io.cryostat.recordings.ArchiveRequestGenerator.ActiveReportRequest; -import io.cryostat.recordings.ArchiveRequestGenerator.ArchivedReportRequest; +import io.cryostat.recordings.LongRunningRequestGenerator; +import io.cryostat.recordings.LongRunningRequestGenerator.ActiveReportRequest; +import io.cryostat.recordings.LongRunningRequestGenerator.ArchivedReportRequest; import io.cryostat.recordings.RecordingHelper; import io.cryostat.targets.Target; @@ -55,7 +55,7 @@ public class Reports { @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) Duration timeout; - @Inject ArchiveRequestGenerator generator; + @Inject LongRunningRequestGenerator generator; @Inject StorageBuckets storageBuckets; @Inject RecordingHelper helper; @Inject ReportsService reportsService; @@ -99,8 +99,8 @@ public Response get(HttpServerResponse response, @RestPath String encodedKey) { ArchivedReportRequest request = new ArchivedReportRequest(UUID.randomUUID().toString(), pair); response.bodyEndHandler( - (e) -> bus.publish(ArchiveRequestGenerator.ARCHIVE_REPORT_ADDRESS, request)); - return Response.ok(request.getId()) + (e) -> bus.publish(LongRunningRequestGenerator.ARCHIVE_REPORT_ADDRESS, request)); + return Response.ok(request.getId(), MediaType.TEXT_PLAIN) .status(202) .location( UriBuilder.fromUri( @@ -140,7 +140,7 @@ public Response getActive( ActiveReportRequest request = new ActiveReportRequest(UUID.randomUUID().toString(), recording); response.bodyEndHandler( - (e) -> bus.publish(ArchiveRequestGenerator.ACTIVE_REPORT_ADDRESS, request)); + (e) -> bus.publish(LongRunningRequestGenerator.ACTIVE_REPORT_ADDRESS, request)); // TODO implement query parameter for evaluation predicate return Response.ok(request.getId()) .status(202)