Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide more information to consumable resources #353

Merged
merged 1 commit into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/add_cr_extra.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* Provide more information to consumable resources (the database workflow run creation time and max-in-flight-per-workflow)
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import io.undertow.util.Headers;
import io.undertow.util.PathTemplateMatch;
import io.undertow.util.StatusCodes;
import java.time.Instant;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.BiPredicate;
Expand Down Expand Up @@ -77,10 +79,16 @@ public void release(

@Override
public ConsumableResourceResponse request(
String workflowName, String workflowVersion, String vidarrId, Optional<JsonNode> input) {
String workflowName,
String workflowVersion,
String vidarrId,
Instant createdTime,
OptionalInt workflowMaxInFlight,
Optional<JsonNode> input) {
return allowList.contains(vidarrId)
? ConsumableResourceResponse.AVAILABLE
: inner.request(workflowName, workflowVersion, vidarrId, input);
: inner.request(
workflowName, workflowVersion, vidarrId, createdTime, OptionalInt.empty(), input);
}

public void setInner(ConsumableResource inner) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Instant;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
Expand All @@ -33,18 +35,28 @@ public Optional<Pair<String, BasicType>> inputFromSubmitter() {
}

@Override
public void recover(String workflowName, String workflowVersion, String vidarrId, Optional<JsonNode> resourceJson) {
public void recover(
String workflowName,
String workflowVersion,
String vidarrId,
Optional<JsonNode> resourceJson) {
inFlight.add(vidarrId);
}

@Override
public void release(String workflowName, String workflowVersion, String vidarrId, Optional<JsonNode> input) {
public void release(
String workflowName, String workflowVersion, String vidarrId, Optional<JsonNode> input) {
inFlight.remove(vidarrId);
}

@Override
public synchronized ConsumableResourceResponse request(
String workflowName, String workflowVersion, String vidarrId, Optional<JsonNode> input) {
String workflowName,
String workflowVersion,
String vidarrId,
Instant createdTime,
OptionalInt workflowMaxInFlight,
Optional<JsonNode> input) {
if (inFlight.size() <= maximum) {
inFlight.add(vidarrId);
return ConsumableResourceResponse.AVAILABLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
import io.undertow.server.HttpHandler;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.ServiceLoader;
import java.util.ServiceLoader.Provider;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -121,12 +123,20 @@ void release(
* @param workflowName the name of the workflow
* @param workflowVersion the version of the workflow
* @param vidarrId the identifier of the workflow run
* @param createdTime the time the workflow run was created
* @param workflowMaxInFlight this is the max-in-flight value stored in the database for this
* workflow
* @param input the consumable resource information requested from the submitter, if applicable
* and provided.
* @return whether this resource is available
*/
ConsumableResourceResponse request(
String workflowName, String workflowVersion, String vidarrId, Optional<JsonNode> input);
String workflowName,
String workflowVersion,
String vidarrId,
Instant createdTime,
OptionalInt workflowMaxInFlight,
Optional<JsonNode> input);

/**
* Called to initialise this consumable resource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -110,18 +112,28 @@ public Optional<Pair<String, BasicType>> inputFromSubmitter() {
}

@Override
public void recover(String workflowName, String workflowVersion, String vidarrId, Optional<JsonNode> resourceJson) {
public void recover(
String workflowName,
String workflowVersion,
String vidarrId,
Optional<JsonNode> resourceJson) {
// Do nothing, as there's no intermediate state that needs to be tracked
}

@Override
public void release(String workflowName, String workflowVersion, String vidarrId, Optional<JsonNode> input) {
public void release(
String workflowName, String workflowVersion, String vidarrId, Optional<JsonNode> input) {
// Do nothing, as this doesn't actually hold onto any resources
}

@Override
public ConsumableResourceResponse request(
String workflowName, String workflowVersion, String vidarrId, Optional<JsonNode> input) {
String workflowName,
String workflowVersion,
String vidarrId,
Instant createdTime,
OptionalInt workflowMaxInFlight,
Optional<JsonNode> input) {
String workflowVersionWithUnderscores = workflowVersion.replaceAll("\\.", "_");
final var matchedAlertValues =
cache
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package ca.on.oicr.gsi.vidarr.server;

import java.time.Instant;

record Candidate(long id, String workflowRun, Instant created) {}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ final class ConsumableResourceChecker implements Runnable {
.register();

private final Map<String, JsonNode> consumableResources;
private final Instant createdTime;
private final HikariDataSource dataSource;
private final long dbId;
private final ScheduledExecutorService executor;
private final AtomicBoolean isLive;
private final MaxInFlightByWorkflow maxInFlightByWorkflow;
private final Runnable next;
private final Instant startTime = Instant.now();
private final Target target;
private final ObjectNode tracing = Main.MAPPER.createObjectNode();
private final String vidarrId;
Expand All @@ -49,10 +50,12 @@ public ConsumableResourceChecker(
ScheduledExecutorService executor,
long dbId,
AtomicBoolean isLive,
MaxInFlightByWorkflow maxInFlightByWorkflow,
String workflow,
String workflowVersion,
String vidarrId,
Map<String, JsonNode> consumableResources,
Instant createdTime,
Runnable next) {
this.target = target;
this.dataSource = dataSource;
Expand All @@ -63,7 +66,9 @@ public ConsumableResourceChecker(
this.workflowVersion = workflowVersion;
this.vidarrId = vidarrId;
this.consumableResources = consumableResources;
this.createdTime = createdTime;
this.next = next;
this.maxInFlightByWorkflow = maxInFlightByWorkflow;
}

@Override
Expand All @@ -82,6 +87,8 @@ public void run() {
workflow,
workflowVersion,
vidarrId,
createdTime,
maxInFlightByWorkflow.getMaximumFor(workflow),
broker.inputFromSubmitter().map(def -> consumableResources.get(def.first())))
.apply(
new Visitor<Optional<String>>() {
Expand Down Expand Up @@ -133,7 +140,7 @@ public Optional<String> unavailable() {
return;
}
}
final var waiting = Duration.between(startTime, Instant.now()).toSeconds();
final var waiting = Duration.between(createdTime, Instant.now()).toSeconds();
tracing.put("vidarr-waiting", waiting);
updateBlockedResource(null);
waitTime.labels(workflow).observe(waiting);
Expand Down
Loading
Loading