Skip to content

Commit

Permalink
refactor snapshot endpoints to use Unis
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewazores committed Jan 18, 2024
1 parent d54f194 commit 4fa2f90
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 67 deletions.
83 changes: 41 additions & 42 deletions src/main/java/io/cryostat/recordings/Recordings.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.mutiny.Uni;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.handler.HttpException;
import io.vertx.mutiny.core.eventbus.EventBus;
Expand Down Expand Up @@ -507,65 +508,63 @@ public Response patchV1(@RestPath URI connectUrl, @RestPath String recordingName

@POST
@Transactional
@Blocking
@Path("/api/v1/targets/{connectUrl}/snapshot")
@RolesAllowed("write")
public Response createSnapshotV1(@RestPath URI connectUrl) throws Exception {
public Uni<Response> createSnapshotV1(@RestPath URI connectUrl) throws Exception {
Target target = Target.getTargetByConnectUrl(connectUrl);
try {
ActiveRecording recording =
connectionManager.executeConnectedTask(
target,
connection -> recordingHelper.createSnapshot(target, connection));
return Response.status(Response.Status.OK).entity(recording.name).build();
} catch (SnapshotCreationException sce) {
return Response.status(Response.Status.ACCEPTED).build();
}
return connectionManager
.executeConnectedTaskUni(
target, connection -> recordingHelper.createSnapshot(target, connection))
.onItem()
.transform(
recording ->
Response.status(Response.Status.OK).entity(recording.name).build())
.onFailure(SnapshotCreationException.class)
.recoverWithItem(Response.status(Response.Status.ACCEPTED).build());
}

@POST
@Transactional
@Blocking
@Path("/api/v2/targets/{connectUrl}/snapshot")
@RolesAllowed("write")
public Response createSnapshotV2(@RestPath URI connectUrl) throws Exception {
public Uni<Response> createSnapshotV2(@RestPath URI connectUrl) throws Exception {
Target target = Target.getTargetByConnectUrl(connectUrl);
try {
ActiveRecording recording =
connectionManager.executeConnectedTask(
target,
connection -> recordingHelper.createSnapshot(target, connection));
return Response.status(Response.Status.CREATED)
.entity(
V2Response.json(
Response.Status.CREATED,
recordingHelper.toExternalForm(recording)))
.build();
} catch (SnapshotCreationException sce) {
return Response.status(Response.Status.ACCEPTED)
.entity(V2Response.json(Response.Status.ACCEPTED, null))
.build();
}
return connectionManager
.executeConnectedTaskUni(
target, connection -> recordingHelper.createSnapshot(target, connection))
.onItem()
.transform(
recording ->
Response.status(Response.Status.CREATED)
.entity(
V2Response.json(
Response.Status.CREATED,
recordingHelper.toExternalForm(recording)))
.build())
.onFailure(SnapshotCreationException.class)
.recoverWithItem(
Response.status(Response.Status.ACCEPTED)
.entity(V2Response.json(Response.Status.ACCEPTED, null))
.build());
}

@POST
@Transactional
@Blocking
@Path("/api/v3/targets/{id}/snapshot")
@RolesAllowed("write")
public Response createSnapshot(@RestPath long id) throws Exception {
public Uni<Response> createSnapshot(@RestPath long id) throws Exception {
Target target = Target.find("id", id).singleResult();
try {
ActiveRecording recording =
connectionManager.executeConnectedTask(
target,
connection -> recordingHelper.createSnapshot(target, connection));
return Response.status(Response.Status.OK)
.entity(recordingHelper.toExternalForm(recording))
.build();
} catch (SnapshotCreationException sce) {
return Response.status(Response.Status.ACCEPTED).build();
}
return connectionManager
.executeConnectedTaskUni(
target, connection -> recordingHelper.createSnapshot(target, connection))
.onItem()
.transform(
recording ->
Response.status(Response.Status.OK)
.entity(recordingHelper.toExternalForm(recording))
.build())
.onFailure(SnapshotCreationException.class)
.recoverWithItem(Response.status(Response.Status.ACCEPTED).build());
}

@POST
Expand Down
46 changes: 21 additions & 25 deletions src/main/java/io/cryostat/targets/TargetConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public class TargetConnectionManager {
private final JFRConnectionToolkit jfrConnectionToolkit;
private final MatchExpressionEvaluator matchExpressionEvaluator;
private final AgentConnectionFactory agentConnectionFactory;
private final Executor executor;
private final Logger logger;

private final AsyncLoadingCache<URI, JFRConnection> connections;
Expand All @@ -96,7 +95,6 @@ public class TargetConnectionManager {
this.jfrConnectionToolkit = jfrConnectionToolkit;
this.matchExpressionEvaluator = matchExpressionEvaluator;
this.agentConnectionFactory = agentConnectionFactory;
this.executor = executor;

int maxTargetConnections = 0; // TODO make configurable

Expand Down Expand Up @@ -175,33 +173,31 @@ void handleCredentialChange(Credential credential) {
}
}

public <T> Uni<T> executeConnectedTaskAsync(Target target, ConnectedTask<T> task) {
@Blocking
public <T> Uni<T> executeConnectedTaskUni(Target target, ConnectedTask<T> task) {
return Uni.createFrom()
.completionStage(
connections
.get(target.connectUrl)
.thenApplyAsync(
conn -> {
try {
synchronized (
targetLocks.computeIfAbsent(
target.connectUrl,
k -> new Object())) {
return task.execute(conn);
}
} catch (Exception e) {
logger.error("Connection failure", e);
throw new CompletionException(e);
}
},
executor));
.completionStage(connections.get(target.connectUrl))
.onItem()
.transform(
Unchecked.function(
conn -> {
synchronized (
targetLocks.computeIfAbsent(
target.connectUrl, k -> new Object())) {
return task.execute(conn);
}
}))
.onFailure(RuntimeException.class)
.transform(this::unwrapRuntimeException)
.onFailure(t -> isTargetConnectionFailure(t) || isUnknownTargetFailure(t))
.retry()
.withBackOff(Duration.ofSeconds(2))
.atMost(5);
}

@Blocking
public <T> T executeConnectedTask(Target target, ConnectedTask<T> task) throws Exception {
synchronized (targetLocks.computeIfAbsent(target.connectUrl, k -> new Object())) {
return task.execute(connections.get(target.connectUrl).get());
}
public <T> T executeConnectedTask(Target target, ConnectedTask<T> task) {
return executeConnectedTaskUni(target, task).await().atMost(Duration.ofSeconds(10));
}

@Blocking
Expand Down

0 comments on commit 4fa2f90

Please sign in to comment.