From bbd11496f3031e91636bcde4a26355a8dbed02e1 Mon Sep 17 00:00:00 2001 From: Joshua Matsuoka Date: Wed, 11 Dec 2024 15:22:47 -0500 Subject: [PATCH] feat(async): long-running async job API (#698) --- schema/openapi.yaml | 80 +++--- src/main/java/io/cryostat/Producers.java | 8 + .../cryostat/recordings/ActiveRecordings.java | 58 +++-- .../recordings/ArchivedRecordings.java | 18 +- .../LongRunningRequestGenerator.java | 245 ++++++++++++++++++ .../cryostat/recordings/RecordingHelper.java | 23 -- .../reports/MemoryCachingReportsService.java | 17 ++ .../java/io/cryostat/reports/Reports.java | 88 ++++++- .../io/cryostat/reports/ReportsService.java | 4 + .../cryostat/reports/ReportsServiceImpl.java | 10 + .../reports/StorageCachingReportsService.java | 15 ++ .../java/io/cryostat/reports/ReportsTest.java | 16 +- .../java/itest/RecordingWorkflowTest.java | 77 ++++-- src/test/java/itest/ReportGenerationTest.java | 56 ++-- .../java/itest/TargetRecordingPatchTest.java | 27 +- src/test/java/itest/UploadRecordingTest.java | 10 +- 16 files changed, 593 insertions(+), 159 deletions(-) create mode 100644 src/main/java/io/cryostat/recordings/LongRunningRequestGenerator.java diff --git a/schema/openapi.yaml b/schema/openapi.yaml index 7ccd335f9..8c8c378a7 100644 --- a/schema/openapi.yaml +++ b/schema/openapi.yaml @@ -1,18 +1,6 @@ --- components: schemas: - AnalysisResult: - properties: - evaluation: - $ref: '#/components/schemas/Evaluation' - name: - type: string - score: - format: double - type: number - topic: - type: string - type: object Annotations: properties: cryostat: @@ -239,19 +227,6 @@ components: - id - realm type: object - Evaluation: - properties: - explanation: - type: string - solution: - type: string - suggestions: - items: - $ref: '#/components/schemas/Suggestion' - type: array - summary: - type: string - type: object Event: properties: clazz: @@ -309,6 +284,16 @@ components: hash: type: string type: object + HttpServerResponse: + properties: + chunked: + type: boolean + statusCode: + format: int32 + type: integer + statusMessage: + type: string + type: object Instant: example: 2022-03-10T16:15:50Z format: date-time @@ -554,15 +539,6 @@ components: name: type: string type: object - Suggestion: - properties: - name: - type: string - setting: - type: string - value: - type: string - type: object Target: properties: agent: @@ -1225,6 +1201,11 @@ paths: required: true schema: type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: "200": content: @@ -1456,14 +1437,13 @@ paths: required: true schema: type: string + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: "200": - content: - application/json: - schema: - additionalProperties: - $ref: '#/components/schemas/AnalysisResult' - type: object description: OK "401": description: Not Authorized @@ -2127,9 +2107,9 @@ paths: type: integer requestBody: content: - text/plain: + application/json: schema: - type: string + $ref: '#/components/schemas/HttpServerResponse' responses: "200": content: @@ -2160,6 +2140,11 @@ paths: schema: format: int64 type: integer + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: "200": content: @@ -2190,14 +2175,13 @@ paths: schema: format: int64 type: integer + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/HttpServerResponse' responses: "200": - content: - application/json: - schema: - additionalProperties: - $ref: '#/components/schemas/AnalysisResult' - type: object description: OK "401": description: Not Authorized diff --git a/src/main/java/io/cryostat/Producers.java b/src/main/java/io/cryostat/Producers.java index 035836753..4bef25407 100644 --- a/src/main/java/io/cryostat/Producers.java +++ b/src/main/java/io/cryostat/Producers.java @@ -22,6 +22,7 @@ import io.cryostat.core.reports.InterruptibleReportGenerator; import io.cryostat.libcryostat.sys.Clock; import io.cryostat.libcryostat.sys.FileSystem; +import io.cryostat.recordings.LongRunningRequestGenerator; import io.quarkus.arc.DefaultBean; import io.vertx.mutiny.core.Vertx; @@ -76,6 +77,13 @@ public static InterruptibleReportGenerator produceInterruptibleReportGenerator() singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool()); } + @Produces + @RequestScoped + @DefaultBean + public static LongRunningRequestGenerator produceArchiveRequestGenerator() { + return new LongRunningRequestGenerator(); + } + @Produces @DefaultBean public WebClient produceWebClient(Vertx vertx) { diff --git a/src/main/java/io/cryostat/recordings/ActiveRecordings.java b/src/main/java/io/cryostat/recordings/ActiveRecordings.java index 2d7ea05ca..de745182d 100644 --- a/src/main/java/io/cryostat/recordings/ActiveRecordings.java +++ b/src/main/java/io/cryostat/recordings/ActiveRecordings.java @@ -15,7 +15,6 @@ */ package io.cryostat.recordings; -import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.time.Duration; @@ -25,10 +24,13 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.libcryostat.templates.Template; import io.cryostat.libcryostat.templates.TemplateType; +import io.cryostat.recordings.LongRunningRequestGenerator.ArchiveRequest; +import io.cryostat.recordings.LongRunningRequestGenerator.GrafanaActiveUploadRequest; import io.cryostat.recordings.RecordingHelper.RecordingOptions; import io.cryostat.recordings.RecordingHelper.RecordingReplace; import io.cryostat.targets.Target; @@ -36,7 +38,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.smallrye.common.annotation.Blocking; -import io.smallrye.mutiny.Uni; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.security.RolesAllowed; import jakarta.inject.Inject; import jakarta.transaction.Transactional; @@ -64,6 +67,8 @@ public class ActiveRecordings { @Inject ObjectMapper mapper; @Inject RecordingHelper recordingHelper; + @Inject LongRunningRequestGenerator generator; + @Inject EventBus bus; @Inject Logger logger; @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) @@ -102,7 +107,11 @@ public RestResponse download(@RestPath long targetId, @RestPath lon @Blocking @Path("/{remoteId}") @RolesAllowed("write") - public String patch(@RestPath long targetId, @RestPath long remoteId, String body) + public String patch( + HttpServerResponse response, + @RestPath long targetId, + @RestPath long remoteId, + String body) throws Exception { Target target = Target.find("id", targetId).singleResult(); Optional recording = @@ -121,18 +130,17 @@ public String patch(@RestPath long targetId, @RestPath long remoteId, String bod .atMost(connectionFailedTimeout); return null; case "save": - try { - // FIXME this operation might take a long time to complete, depending on the - // amount of JFR data in the target and the speed of the connection between the - // target and Cryostat. We should not make the client wait until this operation - // completes before sending a response - it should be async. Here we should just - // return an Accepted response, and if a failure occurs that should be indicated - // as a websocket notification. - return recordingHelper.archiveRecording(activeRecording, null, null).name(); - } catch (IOException ioe) { - logger.warn(ioe); - return null; - } + ArchiveRequest request = + new ArchiveRequest(UUID.randomUUID().toString(), activeRecording); + logger.tracev( + "Request created: (" + + request.getId() + + ", " + + request.recording().name + + ")"); + response.endHandler( + (e) -> bus.publish(LongRunningRequestGenerator.ARCHIVE_ADDRESS, request)); + return request.getId(); default: throw new BadRequestException(body); } @@ -222,9 +230,25 @@ public void delete(@RestPath long targetId, @RestPath long remoteId) throws Exce @Blocking @Path("/{remoteId}/upload") @RolesAllowed("write") - public Uni uploadToGrafana(@RestPath long targetId, @RestPath long remoteId) + public String uploadToGrafana( + HttpServerResponse response, @RestPath long targetId, @RestPath long remoteId) throws Exception { - return recordingHelper.uploadToJFRDatasource(targetId, remoteId); + // Send an intermediate response back to the client while another thread handles the upload + // request + logger.trace("Creating grafana upload request"); + GrafanaActiveUploadRequest request = + new GrafanaActiveUploadRequest(UUID.randomUUID().toString(), remoteId, targetId); + logger.trace( + "Request created: (" + + request.getId() + + ", " + + request.getRemoteId() + + ", " + + request.getTargetId() + + ")"); + response.endHandler( + (e) -> bus.publish(LongRunningRequestGenerator.GRAFANA_ACTIVE_ADDRESS, request)); + return request.getId(); } public record LinkedRecordingDescriptor( diff --git a/src/main/java/io/cryostat/recordings/ArchivedRecordings.java b/src/main/java/io/cryostat/recordings/ArchivedRecordings.java index 8d7302828..c7f73957e 100644 --- a/src/main/java/io/cryostat/recordings/ArchivedRecordings.java +++ b/src/main/java/io/cryostat/recordings/ArchivedRecordings.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.Producers; @@ -34,6 +35,7 @@ import io.cryostat.libcryostat.sys.Clock; import io.cryostat.recordings.ActiveRecording.Listener.ArchivedRecordingEvent; import io.cryostat.recordings.ActiveRecordings.Metadata; +import io.cryostat.recordings.LongRunningRequestGenerator.GrafanaArchiveUploadRequest; import io.cryostat.targets.Target; import io.cryostat.util.HttpMimeType; import io.cryostat.ws.MessagingServer; @@ -42,7 +44,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.quarkus.runtime.StartupEvent; import io.smallrye.common.annotation.Blocking; -import io.smallrye.mutiny.Uni; +import io.vertx.core.http.HttpServerResponse; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.handler.HttpException; import io.vertx.mutiny.core.eventbus.EventBus; @@ -473,7 +475,8 @@ public void deleteArchivedRecording(@RestPath String jvmId, @RestPath String fil @Blocking @Path("/api/v4/grafana/{encodedKey}") @RolesAllowed("write") - public Uni uploadArchivedToGrafana(@RestPath String encodedKey) throws Exception { + public String uploadArchivedToGrafana(HttpServerResponse response, @RestPath String encodedKey) + throws Exception { var pair = recordingHelper.decodedKey(encodedKey); var key = recordingHelper.archivedRecordingKey(pair); storage.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build()) @@ -487,7 +490,16 @@ public Uni uploadArchivedToGrafana(@RestPath String encodedKey) throws E if (!found) { throw new NotFoundException(); } - return recordingHelper.uploadToJFRDatasource(pair); + // Send an intermediate response back to the client while another thread handles the upload + // request + logger.trace("Creating grafana upload request"); + GrafanaArchiveUploadRequest request = + new GrafanaArchiveUploadRequest(UUID.randomUUID().toString(), pair); + logger.trace( + "Request created: (" + request.getId() + ", " + request.getPair().toString() + ")"); + response.endHandler( + (e) -> bus.publish(LongRunningRequestGenerator.GRAFANA_ARCHIVE_ADDRESS, request)); + return request.getId(); } @GET diff --git a/src/main/java/io/cryostat/recordings/LongRunningRequestGenerator.java b/src/main/java/io/cryostat/recordings/LongRunningRequestGenerator.java new file mode 100644 index 000000000..fae8525a8 --- /dev/null +++ b/src/main/java/io/cryostat/recordings/LongRunningRequestGenerator.java @@ -0,0 +1,245 @@ +/* + * Copyright The Cryostat Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cryostat.recordings; + +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletionException; + +import io.cryostat.ConfigProperties; +import io.cryostat.reports.ReportsService; +import io.cryostat.ws.MessagingServer; +import io.cryostat.ws.Notification; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.quarkus.vertx.ConsumeEvent; +import io.vertx.mutiny.core.eventbus.EventBus; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.jboss.logging.Logger; + +@ApplicationScoped +public class LongRunningRequestGenerator { + + public static final String ARCHIVE_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.ArchiveRequest"; + public static final String GRAFANA_ARCHIVE_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.GrafanaArchiveUploadRequest"; + public static final String GRAFANA_ACTIVE_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.GrafanaActiveUploadRequest"; + public static final String ARCHIVE_REPORT_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.ArchiveReportRequest"; + public static final String ACTIVE_REPORT_ADDRESS = + "io.cryostat.recordings.ArchiveRequestGenerator.ActiveReportRequest"; + private static final String ARCHIVE_RECORDING_SUCCESS = "ArchiveRecordingSuccess"; + private static final String ARCHIVE_RECORDING_FAIL = "ArchiveRecordingFailed"; + private static final String GRAFANA_UPLOAD_SUCCESS = "GrafanaUploadSuccess"; + private static final String GRAFANA_UPLOAD_FAIL = "GrafanaUploadFailed"; + private static final String REPORT_SUCCESS = "ReportSuccess"; + private static final String REPORT_FAILURE = "ReportFailure"; + + @Inject Logger logger; + @Inject private EventBus bus; + @Inject private RecordingHelper recordingHelper; + @Inject private ReportsService reportsService; + + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration timeout; + + public LongRunningRequestGenerator() {} + + @ConsumeEvent(value = ARCHIVE_ADDRESS, blocking = true) + public void onMessage(ArchiveRequest request) { + logger.trace("Job ID: " + request.getId() + " submitted."); + try { + String rec = recordingHelper.archiveRecording(request.recording, null, null).name(); + logger.trace("Recording archived, firing notification"); + bus.publish( + MessagingServer.class.getName(), + new Notification( + ARCHIVE_RECORDING_SUCCESS, + Map.of("jobId", request.getId(), "recording", rec))); + } catch (Exception e) { + logger.warn("Archiving failed"); + bus.publish( + MessagingServer.class.getName(), + new Notification(ARCHIVE_RECORDING_FAIL, Map.of("jobId", request.getId()))); + throw new CompletionException(e); + } + } + + @ConsumeEvent(value = GRAFANA_ARCHIVE_ADDRESS, blocking = true) + public void onMessage(GrafanaArchiveUploadRequest request) { + try { + logger.trace("Job ID: " + request.getId() + " submitted."); + recordingHelper.uploadToJFRDatasource(request.getPair()).await().atMost(timeout); + logger.trace("Grafana upload complete, firing notification"); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_SUCCESS, Map.of("jobId", request.getId()))); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_FAIL, Map.of("jobId", request.getId()))); + } + } + + @ConsumeEvent(value = GRAFANA_ACTIVE_ADDRESS, blocking = true) + public void onMessage(GrafanaActiveUploadRequest request) { + try { + logger.trace("Job ID: " + request.getId() + " submitted."); + recordingHelper + .uploadToJFRDatasource(request.getTargetId(), request.getRemoteId()) + .await() + .atMost(timeout); + logger.trace("Grafana upload complete, firing notification"); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_SUCCESS, Map.of("jobId", request.getId()))); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(GRAFANA_UPLOAD_FAIL, Map.of("jobId", request.getId()))); + } + } + + @ConsumeEvent(value = ACTIVE_REPORT_ADDRESS, blocking = true) + public void onMessage(ActiveReportRequest request) { + try { + logger.trace("Job ID: " + request.getId() + " submitted."); + reportsService.reportFor(request.recording).await().atMost(timeout); + logger.trace("Report generation complete, firing notification"); + bus.publish( + MessagingServer.class.getName(), + new Notification(REPORT_SUCCESS, Map.of("jobId", request.getId()))); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(REPORT_FAILURE, Map.of("jobId", request.getId()))); + } + } + + @ConsumeEvent(value = ARCHIVE_REPORT_ADDRESS, blocking = true) + public void onMessage(ArchivedReportRequest request) { + try { + logger.trace("Job ID: " + request.getId() + " submitted."); + reportsService + .reportFor(request.getPair().getKey(), request.getPair().getValue()) + .await() + .atMost(timeout); + logger.trace("Report generation complete, firing notification"); + bus.publish( + MessagingServer.class.getName(), + new Notification(REPORT_SUCCESS, Map.of("jobId", request.getId()))); + } catch (Exception e) { + logger.warn("Exception thrown while servicing request: ", e); + bus.publish( + MessagingServer.class.getName(), + new Notification(REPORT_FAILURE, Map.of("jobId", request.getId()))); + } + } + + // Spotbugs doesn't like us storing an ActiveRecording here as part + // of the record. It shouldn't be a problem and we do similar things + // elswhere with other records. + @SuppressFBWarnings(value = {"EI_EXPOSE_REP", "EI_EXPOSE_REP2"}) + public record ArchiveRequest(String id, ActiveRecording recording) { + + public ArchiveRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(recording); + } + + public String getId() { + return id; + } + } + + public record GrafanaArchiveUploadRequest(String id, Pair pair) { + + public GrafanaArchiveUploadRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(pair); + } + + public String getId() { + return id; + } + + public Pair getPair() { + return pair; + } + } + + public record GrafanaActiveUploadRequest(String id, long remoteId, long targetId) { + + public GrafanaActiveUploadRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(remoteId); + Objects.requireNonNull(targetId); + } + + public String getId() { + return id; + } + + public long getRemoteId() { + return remoteId; + } + + public long getTargetId() { + return targetId; + } + } + + public record ArchivedReportRequest(String id, Pair pair) { + + public ArchivedReportRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(pair); + } + + public String getId() { + return id; + } + + public Pair getPair() { + return pair; + } + } + + // Spotbugs doesn't like us storing an ActiveRecording here as part + // of the record. It shouldn't be a problem and we do similar things + // elswhere with other records. + @SuppressFBWarnings(value = {"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) + public record ActiveReportRequest(String id, ActiveRecording recording) { + + public ActiveReportRequest { + Objects.requireNonNull(id); + Objects.requireNonNull(recording); + } + + public String getId() { + return id; + } + } +} diff --git a/src/main/java/io/cryostat/recordings/RecordingHelper.java b/src/main/java/io/cryostat/recordings/RecordingHelper.java index dd9912491..8a7677099 100644 --- a/src/main/java/io/cryostat/recordings/RecordingHelper.java +++ b/src/main/java/io/cryostat/recordings/RecordingHelper.java @@ -18,7 +18,6 @@ import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; -import java.net.URI; import java.net.URISyntaxException; import java.net.URL; import java.net.URLDecoder; @@ -892,28 +891,6 @@ public ArchivedRecording archiveRecording( // couldn't parse the response from Amazon S3. throw e; } - if (expiry == null) { - ArchivedRecording archivedRecording = - new ArchivedRecording( - recording.target.jvmId, - filename, - downloadUrl(recording.target.jvmId, filename), - reportUrl(recording.target.jvmId, filename), - recording.metadata, - accum, - now.getEpochSecond()); - - URI connectUrl = recording.target.connectUrl; - - var event = - new ArchivedRecordingEvent( - ActiveRecordings.RecordingEventCategory.ARCHIVED_CREATED, - ArchivedRecordingEvent.Payload.of(connectUrl, archivedRecording)); - bus.publish(event.category().category(), event.payload().recording()); - bus.publish( - MessagingServer.class.getName(), - new Notification(event.category().category(), event.payload())); - } return new ArchivedRecording( recording.target.jvmId, filename, diff --git a/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java b/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java index 433538ff2..0b2e1a8a2 100644 --- a/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java +++ b/src/main/java/io/cryostat/reports/MemoryCachingReportsService.java @@ -27,6 +27,7 @@ import io.quarkus.cache.Cache; import io.quarkus.cache.CacheName; +import io.quarkus.cache.CaffeineCache; import io.smallrye.mutiny.Uni; import jakarta.annotation.Priority; import jakarta.decorator.Decorator; @@ -108,4 +109,20 @@ public Uni> reportFor(ActiveRecording recording) { public Uni> reportFor(String jvmId, String filename) { return reportFor(jvmId, filename, r -> true); } + + @Override + public boolean keyExists(ActiveRecording recording) { + String key = ReportsService.key(recording); + return (quarkusCache && memoryCache) + && (activeCache.as(CaffeineCache.class).keySet().contains(key) + || delegate.keyExists(recording)); + } + + @Override + public boolean keyExists(String jvmId, String filename) { + String key = recordingHelper.archivedRecordingKey(jvmId, filename); + return (quarkusCache && memoryCache) + && (archivedCache.as(CaffeineCache.class).keySet().contains(key) + || delegate.keyExists(jvmId, filename)); + } } diff --git a/src/main/java/io/cryostat/reports/Reports.java b/src/main/java/io/cryostat/reports/Reports.java index 5fd7e7e0f..83270eca3 100644 --- a/src/main/java/io/cryostat/reports/Reports.java +++ b/src/main/java/io/cryostat/reports/Reports.java @@ -15,25 +15,30 @@ */ package io.cryostat.reports; -import java.util.Map; +import java.time.Duration; +import java.util.UUID; import io.cryostat.ConfigProperties; import io.cryostat.StorageBuckets; -import io.cryostat.core.reports.InterruptibleReportGenerator.AnalysisResult; +import io.cryostat.recordings.LongRunningRequestGenerator; +import io.cryostat.recordings.LongRunningRequestGenerator.ActiveReportRequest; +import io.cryostat.recordings.LongRunningRequestGenerator.ArchivedReportRequest; import io.cryostat.recordings.RecordingHelper; import io.cryostat.targets.Target; import io.quarkus.runtime.StartupEvent; import io.smallrye.common.annotation.Blocking; -import io.smallrye.mutiny.Uni; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.mutiny.core.eventbus.EventBus; import jakarta.annotation.security.RolesAllowed; import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; import jakarta.ws.rs.GET; import jakarta.ws.rs.NotFoundException; import jakarta.ws.rs.Path; -import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.jboss.logging.Logger; import org.jboss.resteasy.reactive.RestPath; @@ -47,9 +52,14 @@ public class Reports { @ConfigProperty(name = ConfigProperties.ARCHIVED_REPORTS_STORAGE_CACHE_NAME) String bucket; + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration timeout; + + @Inject LongRunningRequestGenerator generator; @Inject StorageBuckets storageBuckets; @Inject RecordingHelper helper; @Inject ReportsService reportsService; + @Inject EventBus bus; @Inject Logger logger; // FIXME this observer cannot be declared on the StorageCachingReportsService decorator. @@ -63,27 +73,83 @@ void onStart(@Observes StartupEvent evt) { @GET @Blocking @Path("/api/v4/reports/{encodedKey}") - @Produces(MediaType.APPLICATION_JSON) @RolesAllowed("read") - public Uni> get(@RestPath String encodedKey) { + // Response isn't strongly typed which allows us to return either the Analysis result + // or a job ID String along with setting different Status codes. + // TODO: Is there a cleaner way to accomplish this? + public Response get(HttpServerResponse response, @RestPath String encodedKey) { // TODO implement query parameter for evaluation predicate var pair = helper.decodedKey(encodedKey); - return reportsService.reportFor(pair.getKey(), pair.getValue()); + + // Check if we have a cached result already for this report + if (reportsService.keyExists(pair.getKey(), pair.getValue())) { + return Response.ok( + reportsService + .reportFor(pair.getKey(), pair.getValue()) + .await() + .atMost(timeout), + MediaType.APPLICATION_JSON) + .status(200) + .build(); + } + + // If we don't have a cached result, delegate to the ArchiveRequestGenerator + // and return the job ID with a location header. + logger.trace("Cache miss. Creating archived reports request"); + ArchivedReportRequest request = + new ArchivedReportRequest(UUID.randomUUID().toString(), pair); + response.bodyEndHandler( + (e) -> bus.publish(LongRunningRequestGenerator.ARCHIVE_REPORT_ADDRESS, request)); + return Response.ok(request.getId(), MediaType.TEXT_PLAIN) + .status(202) + .location( + UriBuilder.fromUri( + String.format( + "/api/v4/targets/%d/reports/%d", + pair.getLeft(), pair.getRight())) + .build()) + .build(); } @GET @Blocking @Path("/api/v4/targets/{targetId}/reports/{recordingId}") - @Produces({MediaType.APPLICATION_JSON}) @RolesAllowed("read") - public Uni> getActive( - @RestPath long targetId, @RestPath long recordingId) throws Exception { + // Response isn't strongly typed which allows us to return either the Analysis result + // or a job ID String along with setting different Status codes. + // TODO: Is there a cleaner way to accomplish this? + public Response getActive( + HttpServerResponse response, @RestPath long targetId, @RestPath long recordingId) + throws Exception { var target = Target.getTargetById(targetId); var recording = target.getRecordingById(recordingId); if (recording == null) { throw new NotFoundException(); } + + // Check if we've already cached a result for this report, return it if so + if (reportsService.keyExists(recording)) { + return Response.ok(reportsService.reportFor(recording).await().atMost(timeout)) + .status(200) + .build(); + } + + // If there isn't a cached result available, delegate to the ArchiveRequestGenerator + // and return the job ID with a location header. + logger.trace("Cache miss. Creating active reports request"); + ActiveReportRequest request = + new ActiveReportRequest(UUID.randomUUID().toString(), recording); + response.bodyEndHandler( + (e) -> bus.publish(LongRunningRequestGenerator.ACTIVE_REPORT_ADDRESS, request)); // TODO implement query parameter for evaluation predicate - return reportsService.reportFor(recording); + return Response.ok(request.getId(), MediaType.TEXT_PLAIN) + .status(202) + .location( + UriBuilder.fromUri( + String.format( + "/api/v4/targets/%d/reports/%d", + target.id, recordingId)) + .build()) + .build(); } } diff --git a/src/main/java/io/cryostat/reports/ReportsService.java b/src/main/java/io/cryostat/reports/ReportsService.java index 14a22764d..4d982d45b 100644 --- a/src/main/java/io/cryostat/reports/ReportsService.java +++ b/src/main/java/io/cryostat/reports/ReportsService.java @@ -39,4 +39,8 @@ Uni> reportFor( static String key(ActiveRecording recording) { return String.format("%s/%d", recording.target.jvmId, recording.id); } + + public boolean keyExists(ActiveRecording recording); + + public boolean keyExists(String jvmId, String filename); } diff --git a/src/main/java/io/cryostat/reports/ReportsServiceImpl.java b/src/main/java/io/cryostat/reports/ReportsServiceImpl.java index a11f0ec10..fe449c2ec 100644 --- a/src/main/java/io/cryostat/reports/ReportsServiceImpl.java +++ b/src/main/java/io/cryostat/reports/ReportsServiceImpl.java @@ -117,4 +117,14 @@ public ReportGenerationException(Throwable cause) { super(cause); } } + + @Override + public boolean keyExists(ActiveRecording recording) { + return false; + } + + @Override + public boolean keyExists(String jvmId, String filename) { + return false; + } } diff --git a/src/main/java/io/cryostat/reports/StorageCachingReportsService.java b/src/main/java/io/cryostat/reports/StorageCachingReportsService.java index 1a490960f..d68d82853 100644 --- a/src/main/java/io/cryostat/reports/StorageCachingReportsService.java +++ b/src/main/java/io/cryostat/reports/StorageCachingReportsService.java @@ -64,6 +64,9 @@ class StorageCachingReportsService implements ReportsService { @ConfigProperty(name = ConfigProperties.ARCHIVED_REPORTS_EXPIRY_DURATION) Duration expiry; + @ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT) + Duration timeout; + @Inject S3Client storage; @Inject RecordingHelper recordingHelper; @Inject ObjectMapper mapper; @@ -165,4 +168,16 @@ public Uni> reportFor(ActiveRecording recording) { public Uni> reportFor(String jvmId, String filename) { return reportFor(jvmId, filename, r -> true); } + + @Override + public boolean keyExists(ActiveRecording recording) { + String key = ReportsService.key(recording); + return enabled && checkStorage(key).await().atMost(timeout); + } + + @Override + public boolean keyExists(String jvmId, String filename) { + String key = recordingHelper.archivedRecordingKey(jvmId, filename); + return enabled && checkStorage(key).await().atMost(timeout); + } } diff --git a/src/test/java/io/cryostat/reports/ReportsTest.java b/src/test/java/io/cryostat/reports/ReportsTest.java index a64545f64..9669d2f4e 100644 --- a/src/test/java/io/cryostat/reports/ReportsTest.java +++ b/src/test/java/io/cryostat/reports/ReportsTest.java @@ -85,7 +85,7 @@ void testGetReportByTargetAndRemoteId() { .all() .when() .pathParams(Map.of("targetId", targetId)) - .formParam("recordingName", "activeRecordingsTest") + .formParam("recordingName", "activeRecordingsTestReports") .formParam("events", "template=Continuous") .pathParam("targetId", targetId) .post("/api/v4/targets/{targetId}/recordings") @@ -111,9 +111,9 @@ void testGetReportByTargetAndRemoteId() { .all() .and() .assertThat() - .statusCode(200) - .contentType(ContentType.JSON) - .body("size()", Matchers.greaterThan(0)); + .statusCode(202) + .contentType(ContentType.TEXT) + .body(Matchers.any(String.class)); given().log() .all() @@ -136,7 +136,7 @@ void testGetReportByUrl() { .all() .when() .pathParams(Map.of("targetId", targetId)) - .formParam("recordingName", "activeRecordingsTest") + .formParam("recordingName", "activeRecordingsTestReportsURL") .formParam("events", "template=Continuous") .pathParam("targetId", targetId) .post("/api/v4/targets/{targetId}/recordings") @@ -163,9 +163,9 @@ void testGetReportByUrl() { .all() .and() .assertThat() - .statusCode(200) - .contentType(ContentType.JSON) - .body("size()", Matchers.greaterThan(0)); + .statusCode(202) + .contentType(ContentType.TEXT) + .body(Matchers.any(String.class)); given().log() .all() diff --git a/src/test/java/itest/RecordingWorkflowTest.java b/src/test/java/itest/RecordingWorkflowTest.java index b0db8d8e3..27d8ff510 100644 --- a/src/test/java/itest/RecordingWorkflowTest.java +++ b/src/test/java/itest/RecordingWorkflowTest.java @@ -18,9 +18,12 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -35,7 +38,6 @@ import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.HttpResponse; -import io.vertx.ext.web.codec.BodyCodec; import itest.bases.StandardSelfTest; import itest.util.ITestCleanupFailedException; import jdk.jfr.consumer.RecordedEvent; @@ -49,6 +51,8 @@ @QuarkusTestResource(LocalStackResource.class) public class RecordingWorkflowTest extends StandardSelfTest { + private final ExecutorService worker = ForkJoinPool.commonPool(); + static final String TEST_RECORDING_NAME = "workflow_itest"; static final String TARGET_ALIAS = "selftest"; static long TEST_REMOTE_ID; @@ -113,19 +117,36 @@ public void testWorkflow() throws Exception { // save a copy of the partial recording dump MultiMap saveHeaders = MultiMap.caseInsensitiveMultiMap(); saveHeaders.add(HttpHeaders.CONTENT_TYPE.toString(), HttpMimeType.PLAINTEXT.mime()); - String archivedRecordingFilename = - webClient - .extensions() - .patch( - String.format( - "/api/v4/targets/%d/recordings/%d", - getSelfReferenceTargetId(), TEST_REMOTE_ID), - saveHeaders, - Buffer.buffer("SAVE"), - REQUEST_TIMEOUT_SECONDS) - .bodyAsString(); - archivedRecordingFilenames.add(archivedRecordingFilename); - + webClient + .extensions() + .patch( + String.format( + "/api/v4/targets/%d/recordings/%d", + getSelfReferenceTargetId(), TEST_REMOTE_ID), + saveHeaders, + Buffer.buffer("SAVE"), + REQUEST_TIMEOUT_SECONDS) + .bodyAsString(); + // Wait for the archive request to conclude, the server won't block the client + // while it performs the archive so we need to wait. + CountDownLatch archiveLatch = new CountDownLatch(1); + Future future = + worker.submit( + () -> { + try { + return expectNotification( + "ArchiveRecordingSuccess", 15, TimeUnit.SECONDS) + .get(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + archiveLatch.countDown(); + } + }); + archiveLatch.await(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + JsonObject archiveNotification = future.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + archivedRecordingFilenames.add( + archiveNotification.getJsonObject("message").getString("recording").toString()); // check that the in-memory recording list hasn't changed CompletableFuture listRespFuture3 = new CompletableFuture<>(); webClient @@ -219,10 +240,9 @@ public void testWorkflow() throws Exception { String reportUrl = recordingInfo.getString("reportUrl"); - HttpResponse reportResponse = + HttpResponse reportResponse = webClient .get(reportUrl) - .as(BodyCodec.jsonObject()) .send() .toCompletionStage() .toCompletableFuture() @@ -230,12 +250,27 @@ public void testWorkflow() throws Exception { MatcherAssert.assertThat( reportResponse.statusCode(), Matchers.both(Matchers.greaterThanOrEqualTo(200)).and(Matchers.lessThan(300))); - JsonObject report = reportResponse.body(); + MatcherAssert.assertThat(reportResponse.bodyAsString(), Matchers.notNullValue()); + + // Check that report generation concludes + CountDownLatch latch = new CountDownLatch(1); + Future f = + worker.submit( + () -> { + try { + return expectNotification("ReportSuccess", 15, TimeUnit.SECONDS) + .get(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + latch.countDown(); + } + }); - Map response = report.getMap(); - MatcherAssert.assertThat(response, Matchers.notNullValue()); + latch.await(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + JsonObject notification = f.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); MatcherAssert.assertThat( - response, Matchers.is(Matchers.aMapWithSize(Matchers.greaterThan(8)))); + notification.getJsonObject("message"), Matchers.notNullValue()); } finally { // Clean up what we created try { diff --git a/src/test/java/itest/ReportGenerationTest.java b/src/test/java/itest/ReportGenerationTest.java index a6c009d5a..5354e8e57 100644 --- a/src/test/java/itest/ReportGenerationTest.java +++ b/src/test/java/itest/ReportGenerationTest.java @@ -16,7 +16,11 @@ package itest; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import io.cryostat.resources.LocalStackResource; @@ -28,6 +32,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpHeaders; import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.handler.HttpException; import itest.bases.StandardSelfTest; import itest.util.ITestCleanupFailedException; @@ -41,6 +46,8 @@ @QuarkusTestResource(LocalStackResource.class) public class ReportGenerationTest extends StandardSelfTest { + private final ExecutorService worker = ForkJoinPool.commonPool(); + static final String TEST_RECORDING_NAME = "someRecording"; private String archivedReportRequestURL() { @@ -82,29 +89,38 @@ void testGetActiveReport() throws Exception { Thread.sleep(5_000); // Get a report for the above recording - CompletableFuture reportResponse = new CompletableFuture<>(); - webClient - .get(activeRecording.getString("reportUrl")) - .putHeader(HttpHeaders.ACCEPT.toString(), HttpMimeType.JSON.mime()) - .send( - ar -> { - if (assertRequestStatus(ar, reportResponse)) { - MatcherAssert.assertThat( - ar.result().statusCode(), - Matchers.both(Matchers.greaterThanOrEqualTo(200)) - .and(Matchers.lessThan(400))); - MatcherAssert.assertThat( - ar.result() - .getHeader(HttpHeaders.CONTENT_TYPE.toString()), - Matchers.equalTo("application/json;charset=UTF-8")); - reportResponse.complete(ar.result().bodyAsJsonObject()); + HttpResponse resp = + webClient + .get(activeRecording.getString("reportUrl")) + .putHeader(HttpHeaders.ACCEPT.toString(), HttpMimeType.JSON.mime()) + .send() + .toCompletionStage() + .toCompletableFuture() + .get(); + MatcherAssert.assertThat( + resp.statusCode(), + Matchers.both(Matchers.greaterThanOrEqualTo(200)).and(Matchers.lessThan(400))); + MatcherAssert.assertThat(resp, Matchers.notNullValue()); + + // Check that report generation concludes + CountDownLatch latch = new CountDownLatch(1); + Future f = + worker.submit( + () -> { + try { + return expectNotification("ReportSuccess", 15, TimeUnit.SECONDS) + .get(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + latch.countDown(); } }); - JsonObject jsonResponse = reportResponse.get(); - MatcherAssert.assertThat(jsonResponse, Matchers.notNullValue()); + + latch.await(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); + JsonObject notification = f.get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); MatcherAssert.assertThat( - jsonResponse.getMap(), - Matchers.is(Matchers.aMapWithSize(Matchers.greaterThan(0)))); + notification.getJsonObject("message"), Matchers.notNullValue()); } finally { if (activeRecording != null) { // Clean up recording diff --git a/src/test/java/itest/TargetRecordingPatchTest.java b/src/test/java/itest/TargetRecordingPatchTest.java index 30b5667d3..a534bacad 100644 --- a/src/test/java/itest/TargetRecordingPatchTest.java +++ b/src/test/java/itest/TargetRecordingPatchTest.java @@ -16,6 +16,10 @@ package itest; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import io.cryostat.util.HttpStatusCodeIdentifier; @@ -36,6 +40,8 @@ @QuarkusTest public class TargetRecordingPatchTest extends StandardSelfTest { + private final ExecutorService worker = ForkJoinPool.commonPool(); + static final String TEST_RECORDING_NAME = "someRecording"; String recordingRequestUrl() { @@ -84,8 +90,25 @@ void testSaveEmptyRecordingDoesNotArchiveRecordingFile() throws Exception { null, Buffer.buffer("SAVE"), 5); - MatcherAssert.assertThat(saveResponse.statusCode(), Matchers.equalTo(204)); - MatcherAssert.assertThat(saveResponse.body(), Matchers.equalTo(null)); + MatcherAssert.assertThat(saveResponse.statusCode(), Matchers.equalTo(200)); + MatcherAssert.assertThat(saveResponse.bodyAsString(), Matchers.any(String.class)); + + CountDownLatch latch = new CountDownLatch(1); + Future f = + worker.submit( + () -> { + try { + return expectNotification( + "ArchiveRecordingFailed", 15, TimeUnit.SECONDS) + .get(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + latch.countDown(); + } + }); + + latch.await(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); // Assert that no recording was archived CompletableFuture listRespFuture1 = new CompletableFuture<>(); diff --git a/src/test/java/itest/UploadRecordingTest.java b/src/test/java/itest/UploadRecordingTest.java index ba76b0ed1..3d356e2d2 100644 --- a/src/test/java/itest/UploadRecordingTest.java +++ b/src/test/java/itest/UploadRecordingTest.java @@ -109,14 +109,12 @@ public void shouldLoadRecordingToDatasource() throws Exception { getSelfReferenceTargetId(), RECORDING_REMOTE_ID), (Buffer) null, 0); - + // The endpoint should send back a job ID, while it kicks off the upload. MatcherAssert.assertThat(resp.statusCode(), Matchers.equalTo(200)); + MatcherAssert.assertThat(resp.bodyAsString(), Matchers.notNullValue()); - final String expectedUploadResponse = - String.format("Uploaded: %s\nSet: %s", DATASOURCE_FILENAME, DATASOURCE_FILENAME); - - MatcherAssert.assertThat( - resp.bodyAsString().trim(), Matchers.equalTo(expectedUploadResponse)); + // Sleep for a bit to give the upload time to complete + Thread.sleep(2000); HttpRequest req = webClient.get("/api/v4/grafana_datasource_url"); CompletableFuture respFuture = new CompletableFuture<>();