Skip to content

Commit

Permalink
feat(async): long-running async job API (#698)
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh-Matsuoka authored Dec 11, 2024
1 parent 65e3787 commit bbd1149
Show file tree
Hide file tree
Showing 16 changed files with 593 additions and 159 deletions.
80 changes: 32 additions & 48 deletions schema/openapi.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
---
components:
schemas:
AnalysisResult:
properties:
evaluation:
$ref: '#/components/schemas/Evaluation'
name:
type: string
score:
format: double
type: number
topic:
type: string
type: object
Annotations:
properties:
cryostat:
Expand Down Expand Up @@ -239,19 +227,6 @@ components:
- id
- realm
type: object
Evaluation:
properties:
explanation:
type: string
solution:
type: string
suggestions:
items:
$ref: '#/components/schemas/Suggestion'
type: array
summary:
type: string
type: object
Event:
properties:
clazz:
Expand Down Expand Up @@ -309,6 +284,16 @@ components:
hash:
type: string
type: object
HttpServerResponse:
properties:
chunked:
type: boolean
statusCode:
format: int32
type: integer
statusMessage:
type: string
type: object
Instant:
example: 2022-03-10T16:15:50Z
format: date-time
Expand Down Expand Up @@ -554,15 +539,6 @@ components:
name:
type: string
type: object
Suggestion:
properties:
name:
type: string
setting:
type: string
value:
type: string
type: object
Target:
properties:
agent:
Expand Down Expand Up @@ -1225,6 +1201,11 @@ paths:
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/HttpServerResponse'
responses:
"200":
content:
Expand Down Expand Up @@ -1456,14 +1437,13 @@ paths:
required: true
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/HttpServerResponse'
responses:
"200":
content:
application/json:
schema:
additionalProperties:
$ref: '#/components/schemas/AnalysisResult'
type: object
description: OK
"401":
description: Not Authorized
Expand Down Expand Up @@ -2127,9 +2107,9 @@ paths:
type: integer
requestBody:
content:
text/plain:
application/json:
schema:
type: string
$ref: '#/components/schemas/HttpServerResponse'
responses:
"200":
content:
Expand Down Expand Up @@ -2160,6 +2140,11 @@ paths:
schema:
format: int64
type: integer
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/HttpServerResponse'
responses:
"200":
content:
Expand Down Expand Up @@ -2190,14 +2175,13 @@ paths:
schema:
format: int64
type: integer
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/HttpServerResponse'
responses:
"200":
content:
application/json:
schema:
additionalProperties:
$ref: '#/components/schemas/AnalysisResult'
type: object
description: OK
"401":
description: Not Authorized
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/io/cryostat/Producers.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.cryostat.core.reports.InterruptibleReportGenerator;
import io.cryostat.libcryostat.sys.Clock;
import io.cryostat.libcryostat.sys.FileSystem;
import io.cryostat.recordings.LongRunningRequestGenerator;

import io.quarkus.arc.DefaultBean;
import io.vertx.mutiny.core.Vertx;
Expand Down Expand Up @@ -76,6 +77,13 @@ public static InterruptibleReportGenerator produceInterruptibleReportGenerator()
singleThread ? Executors.newSingleThreadExecutor() : ForkJoinPool.commonPool());
}

@Produces
@RequestScoped
@DefaultBean
public static LongRunningRequestGenerator produceArchiveRequestGenerator() {
return new LongRunningRequestGenerator();
}

@Produces
@DefaultBean
public WebClient produceWebClient(Vertx vertx) {
Expand Down
58 changes: 41 additions & 17 deletions src/main/java/io/cryostat/recordings/ActiveRecordings.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.cryostat.recordings;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.time.Duration;
Expand All @@ -25,18 +24,22 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import io.cryostat.ConfigProperties;
import io.cryostat.libcryostat.templates.Template;
import io.cryostat.libcryostat.templates.TemplateType;
import io.cryostat.recordings.LongRunningRequestGenerator.ArchiveRequest;
import io.cryostat.recordings.LongRunningRequestGenerator.GrafanaActiveUploadRequest;
import io.cryostat.recordings.RecordingHelper.RecordingOptions;
import io.cryostat.recordings.RecordingHelper.RecordingReplace;
import io.cryostat.targets.Target;

import com.fasterxml.jackson.databind.ObjectMapper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.mutiny.core.eventbus.EventBus;
import jakarta.annotation.security.RolesAllowed;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
Expand Down Expand Up @@ -64,6 +67,8 @@ public class ActiveRecordings {

@Inject ObjectMapper mapper;
@Inject RecordingHelper recordingHelper;
@Inject LongRunningRequestGenerator generator;
@Inject EventBus bus;
@Inject Logger logger;

@ConfigProperty(name = ConfigProperties.CONNECTIONS_FAILED_TIMEOUT)
Expand Down Expand Up @@ -102,7 +107,11 @@ public RestResponse<InputStream> download(@RestPath long targetId, @RestPath lon
@Blocking
@Path("/{remoteId}")
@RolesAllowed("write")
public String patch(@RestPath long targetId, @RestPath long remoteId, String body)
public String patch(
HttpServerResponse response,
@RestPath long targetId,
@RestPath long remoteId,
String body)
throws Exception {
Target target = Target.find("id", targetId).singleResult();
Optional<ActiveRecording> recording =
Expand All @@ -121,18 +130,17 @@ public String patch(@RestPath long targetId, @RestPath long remoteId, String bod
.atMost(connectionFailedTimeout);
return null;
case "save":
try {
// FIXME this operation might take a long time to complete, depending on the
// amount of JFR data in the target and the speed of the connection between the
// target and Cryostat. We should not make the client wait until this operation
// completes before sending a response - it should be async. Here we should just
// return an Accepted response, and if a failure occurs that should be indicated
// as a websocket notification.
return recordingHelper.archiveRecording(activeRecording, null, null).name();
} catch (IOException ioe) {
logger.warn(ioe);
return null;
}
ArchiveRequest request =
new ArchiveRequest(UUID.randomUUID().toString(), activeRecording);
logger.tracev(
"Request created: ("
+ request.getId()
+ ", "
+ request.recording().name
+ ")");
response.endHandler(
(e) -> bus.publish(LongRunningRequestGenerator.ARCHIVE_ADDRESS, request));
return request.getId();
default:
throw new BadRequestException(body);
}
Expand Down Expand Up @@ -222,9 +230,25 @@ public void delete(@RestPath long targetId, @RestPath long remoteId) throws Exce
@Blocking
@Path("/{remoteId}/upload")
@RolesAllowed("write")
public Uni<String> uploadToGrafana(@RestPath long targetId, @RestPath long remoteId)
public String uploadToGrafana(
HttpServerResponse response, @RestPath long targetId, @RestPath long remoteId)
throws Exception {
return recordingHelper.uploadToJFRDatasource(targetId, remoteId);
// Send an intermediate response back to the client while another thread handles the upload
// request
logger.trace("Creating grafana upload request");
GrafanaActiveUploadRequest request =
new GrafanaActiveUploadRequest(UUID.randomUUID().toString(), remoteId, targetId);
logger.trace(
"Request created: ("
+ request.getId()
+ ", "
+ request.getRemoteId()
+ ", "
+ request.getTargetId()
+ ")");
response.endHandler(
(e) -> bus.publish(LongRunningRequestGenerator.GRAFANA_ACTIVE_ADDRESS, request));
return request.getId();
}

public record LinkedRecordingDescriptor(
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/io/cryostat/recordings/ArchivedRecordings.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;

import io.cryostat.ConfigProperties;
import io.cryostat.Producers;
import io.cryostat.StorageBuckets;
import io.cryostat.libcryostat.sys.Clock;
import io.cryostat.recordings.ActiveRecording.Listener.ArchivedRecordingEvent;
import io.cryostat.recordings.ActiveRecordings.Metadata;
import io.cryostat.recordings.LongRunningRequestGenerator.GrafanaArchiveUploadRequest;
import io.cryostat.targets.Target;
import io.cryostat.util.HttpMimeType;
import io.cryostat.ws.MessagingServer;
Expand All @@ -42,7 +44,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.handler.HttpException;
import io.vertx.mutiny.core.eventbus.EventBus;
Expand Down Expand Up @@ -473,7 +475,8 @@ public void deleteArchivedRecording(@RestPath String jvmId, @RestPath String fil
@Blocking
@Path("/api/v4/grafana/{encodedKey}")
@RolesAllowed("write")
public Uni<String> uploadArchivedToGrafana(@RestPath String encodedKey) throws Exception {
public String uploadArchivedToGrafana(HttpServerResponse response, @RestPath String encodedKey)
throws Exception {
var pair = recordingHelper.decodedKey(encodedKey);
var key = recordingHelper.archivedRecordingKey(pair);
storage.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build())
Expand All @@ -487,7 +490,16 @@ public Uni<String> uploadArchivedToGrafana(@RestPath String encodedKey) throws E
if (!found) {
throw new NotFoundException();
}
return recordingHelper.uploadToJFRDatasource(pair);
// Send an intermediate response back to the client while another thread handles the upload
// request
logger.trace("Creating grafana upload request");
GrafanaArchiveUploadRequest request =
new GrafanaArchiveUploadRequest(UUID.randomUUID().toString(), pair);
logger.trace(
"Request created: (" + request.getId() + ", " + request.getPair().toString() + ")");
response.endHandler(
(e) -> bus.publish(LongRunningRequestGenerator.GRAFANA_ARCHIVE_ADDRESS, request));
return request.getId();
}

@GET
Expand Down
Loading

0 comments on commit bbd1149

Please sign in to comment.