From af693109bb2f102792a48a0ef0579c94ad0ced92 Mon Sep 17 00:00:00 2001 From: Andre Masella Date: Mon, 27 Nov 2023 20:22:42 -0500 Subject: [PATCH] Create a new priority scoring consumable resource Adds a new consumable resource that can collect data from other systems and then score the data to decide whether a job should launch. --- changes/add_priority_cr.md | 1 + plugin-guide.md | 420 ++++++++++++++++++ .../gsi/vidarr/core/AllPriorityScorer.java | 34 ++ .../gsi/vidarr/core/AnyPriorityScorer.java | 36 ++ .../gsi/vidarr/core/ArrayPriorityInput.java | 104 +++++ .../core/BaseAggregatePriorityFormula.java | 26 ++ .../core/BaseAggregatePriorityScorer.java | 53 +++ .../BaseInFlightCollectingPriorityScorer.java | 114 +++++ ...BaseTemporalEscalationPriorityFormula.java | 37 ++ .../vidarr/core/ConstantPriorityFormula.java | 22 + .../core/CoreConsumableResourceProvider.java | 17 + .../core/CorePriorityFormulaProvider.java | 23 + .../core/CorePriorityInputProvider.java | 20 + .../core/CorePriorityScorerProvider.java | 23 + .../gsi/vidarr/core/CutoffPriorityScorer.java | 44 ++ .../vidarr/core/DictionaryPriorityInput.java | 83 ++++ ...ghtCollectingByWorkflowPriorityScorer.java | 36 ++ ...ectingByWorkflowVersionPriorityScorer.java | 37 ++ .../InFlightCollectingPriorityScorer.java | 24 + .../ManualOverrideConsumableResource.java | 5 - .../core/MaxInFlightConsumableResource.java | 5 - .../vidarr/core/MaximumPriorityFormula.java | 11 + .../vidarr/core/MinimumPriorityFormula.java | 11 + ...lierTemporalEscalationPriorityFormula.java | 11 + ...fsetTemporalEscalationPriorityFormula.java | 10 + .../gsi/vidarr/core/OneOfPriorityInput.java | 80 ++++ .../core/PriorityConsumableResource.java | 140 ++++++ .../vidarr/core/PriorityFormulaVariable.java | 18 + .../vidarr/core/ProductPriorityFormula.java | 11 + .../gsi/vidarr/core/RawPriorityInput.java | 41 ++ .../gsi/vidarr/core/RemotePriorityInput.java | 110 +++++ .../vidarr/core/SubtractPriorityFormula.java | 31 ++ .../gsi/vidarr/core/SumPriorityFormula.java | 11 + .../gsi/vidarr/core/TuplePriorityInput.java | 46 ++ vidarr-core/src/main/java/module-info.java | 19 +- .../on/oicr/gsi/vidarr/PriorityFormula.java | 68 +++ .../gsi/vidarr/PriorityFormulaProvider.java | 10 + .../ca/on/oicr/gsi/vidarr/PriorityInput.java | 102 +++++ .../gsi/vidarr/PriorityInputProvider.java | 10 + .../ca/on/oicr/gsi/vidarr/PriorityScorer.java | 111 +++++ .../gsi/vidarr/PriorityScorerProvider.java | 10 + .../src/main/java/module-info.java | 10 +- .../prometheus/PrometheusPriorityInput.java | 129 ++++++ .../vidarr/prometheus/QueryResponseDto.java | 23 + .../gsi/vidarr/prometheus/VectorCache.java | 51 +++ .../gsi/vidarr/prometheus/VectorDataDto.java | 25 ++ .../vidarr/prometheus/VectorResultDto.java | 26 ++ .../src/main/java/module-info.java | 4 + .../ca/on/oicr/gsi/vidarr/server/Main.java | 40 +- .../gsi/vidarr/server/PriorityByWorkflow.java | 181 -------- vidarr-server/src/main/java/module-info.java | 4 - .../vidarr/server/PriorityByWorkflowTest.java | 341 -------------- 52 files changed, 2278 insertions(+), 581 deletions(-) create mode 100644 changes/add_priority_cr.md create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/AllPriorityScorer.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/AnyPriorityScorer.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ArrayPriorityInput.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseAggregatePriorityFormula.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseAggregatePriorityScorer.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseInFlightCollectingPriorityScorer.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseTemporalEscalationPriorityFormula.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ConstantPriorityFormula.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CoreConsumableResourceProvider.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityFormulaProvider.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityInputProvider.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityScorerProvider.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CutoffPriorityScorer.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/DictionaryPriorityInput.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingByWorkflowPriorityScorer.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingByWorkflowVersionPriorityScorer.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingPriorityScorer.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MaximumPriorityFormula.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MinimumPriorityFormula.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MultiplierTemporalEscalationPriorityFormula.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/OffsetTemporalEscalationPriorityFormula.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/OneOfPriorityInput.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/PriorityConsumableResource.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/PriorityFormulaVariable.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ProductPriorityFormula.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/RawPriorityInput.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/RemotePriorityInput.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/SubtractPriorityFormula.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/SumPriorityFormula.java create mode 100644 vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/TuplePriorityInput.java create mode 100644 vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityFormula.java create mode 100644 vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityFormulaProvider.java create mode 100644 vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInput.java create mode 100644 vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInputProvider.java create mode 100644 vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorer.java create mode 100644 vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorerProvider.java create mode 100644 vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/PrometheusPriorityInput.java create mode 100644 vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/QueryResponseDto.java create mode 100644 vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorCache.java create mode 100644 vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorDataDto.java create mode 100644 vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorResultDto.java delete mode 100644 vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflow.java delete mode 100644 vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java diff --git a/changes/add_priority_cr.md b/changes/add_priority_cr.md new file mode 100644 index 00000000..a4470589 --- /dev/null +++ b/changes/add_priority_cr.md @@ -0,0 +1 @@ +* Add a new priority-based consumable resource diff --git a/plugin-guide.md b/plugin-guide.md index c6c6e298..b4a9c506 100644 --- a/plugin-guide.md +++ b/plugin-guide.md @@ -211,6 +211,26 @@ to filter on runs. A filter could query Pinery and get all the external identifiers associated with that run and then construct a query based on those to match workflow runs that use any of those identifiers. +# Priority Consumable Resource Inputs, Formulas, and Scorers +The priority consumable resource takes plugins for the inputs, formulas, and +scorers. These are used by the [Priority Consumable +Resource](#priority_consumable_resource). This follows the same pattern as the +other plugins: an implementation of `ca.on.oicr.gsi.vidarr.PriorityInput`, +`ca.on.oicr.gsi.vidarr.PriorityFormula`, or +`ca.on.oicr.gsi.vidarr.PriorityScorer` for the input, formula, and scorers, +respectively and there needs to be a corresponding implementation of +`PriorityFormulaProvider`, `PriorityInputProvider`, or +`PriorityScorerProvider`. + +In the priority consumable resource's configuration, the `"type"` property will +select the appropriate input, formula, or scorer and deserialize it as a JSON +object. + +Each component will be called for every pending workflow run, so the analysis +should be relatively fast. `PriorityInput` implementations should cache results +from external services. + + # Provided Implementations This core implementation provides several plugins independent of external systems. @@ -273,6 +293,37 @@ active. } ``` + +### Priority Consumable Resource +The priority consumable resource operates by computing a number, a priority, +for each workflow run and then allowing the workflow run to proceed +based on that number. + +The resource first takes data from the submission request and then +implementations of `PriorityInput` consume this data and produce a numeric +value. Those values are then consumed by `PriorityFormula` to produce a final +definitive score from all the numbers. If a default priority is provided, the +submission request can contain no information and the inputs and formula will +be skipped and the default priority will be used instead. + +The priority is then scored by a `PriorityScorer` which determines if the +workflow is allowed to run or not. + +See the other sections for the possible inputs, formulas, and scorers. + +``` +{ + "type": "priority", + "defaultPriority": null, + "inputs": { + "foo": ..., + "bar": ... + }, + "formula": ..., + "scorer": ... +} +``` + ## Input Provisioners Input provisioners provided in Víðarr core. @@ -291,12 +342,14 @@ provided in the metadata. ### Raw Allows input to be provided as a string that is assumed to be a path. + ``` { "type": "raw", "format": [ "FILE", "DIRECTORY" ] } ``` + This can be limited to a particular input type format. ## Output Provisioner @@ -315,3 +368,370 @@ provided in the metadata. } ``` +## Priority Input +Priority inputs provided in Víðarr core. + +### JSON Array +Takes input as an index into an array and returns the value in that array. If +the index is less than zero, `"underflowPriority"` is returned. If the index is +beyond the end of the array, `"overflowPriority"` is used. The priorities are +stored in `"file"` which must be a JSON file containing an array of integers. + +``` +{ + "type": "json-array", + "file": "/path/to/list.json" + "overflowPriority": 0, + "underflowPriority": 1000 +} +``` + +### JSON Dictionary +Takes input as a string and looks up the value of that in a dictionary. If +the input is not in the dictionary, `"defaultPriority"` is used. The priorities +are stored in `"file"` which must be a JSON object where all the values are +integers. + +``` +{ + "type": "json-dictionary", + "defaultPriority": 0, + "file": "/path/to/obj.json" +} +``` + +### One-Of +Allows the submitter to select one of multiple priority inputs using a tagged +union. + +``` +{ + "type": "oneOf", + "defaultPriority": 0, + "inputs": { + "FOO": {...}, + "BAR": {...} + } +} +``` + +The input will take a tagged union/algebraic data type with the appropriate +inputs. If the name provided by the submitter does not match one of the inputs, +`"defaultPriority"` is used instead. The names of the keys of `"inputs"` should +be capitalized for compatibility with Shesmu. + +### Prometheus Input +Reads a variable from Prometheus, filtering on the label set, and returns the +current value. + +``` +{ + "type": "prometheus", + "cacheRequestTimeout": 1, + "cacheTtl": 15, + "defaultPriority": 0, + "labels": ["bob"], + "query": "some_prometheus_variable", + "url": "http://prometheus.example.com:9090", + "workflowNameLabel": "workflow", + "workflowVersionLabel": null +} +``` + +The process this input provider uses is as follows: + +1. Execute `"query"` on the Prometheus instance at `"url"`. The query can be + any valid Prometheus query. If it takes longer than `"cacheRequestTimeout"` + minutes, then the query will be treated as a failure. The results will be + cached for `"cacheTtl"` minutes before being refreshed. +2. The submission request will be processed into a label set as described below. +3. All the records that were returned by the query are scanned for a matching + label set. +4. If a matching label set is found, the last recorded value will be used, + regardless of when Prometheus observed it. +5. If no matching label set is found, `"defaultPriority"` will be used. + +The label set is constructed from the submission request. For each string in +`"labels"`, the submitter must provide a string value. These labels and values +will be used as the label set. For example, with the configuration `"labels": +["bob"]`, the submission request could have `{"bob": "eggs"}` and the filtered +label set would look like `[bob=eggs]`. Additionally, special labels are +available for the workflow name and version. If `"workflowNameLabel": +"workflow"` and the submission request was for `bcl2fastq`, then the label set +would be `[workflow=bcl2fastq]`. This can be further refined with a workflow +version using `"workflowVersionLabel"`, which will only be used if +`"workflowNameLabel"` is not null. Both of these can be turned off by being set +to null. + +### Raw Priority Input +Takes an optional integer from the submission request and returns it raw, or +`"defaultPriority"` if not provided. + +``` +{ + "type": "raw", + "defaultPriority": 0 +} +``` + +### Remote Input +Takes an arbitrary JSON value and sends it to remote HTTP endpoint for +evaluation. That endpoint must return a single number. The result will be +cached. The `"schema"` is a standard Víðarr type that should be requested from +the submission request. + +``` +{ + "type": "remote", + "defaultPriority": 0, + "schema": "string", + "ttl": 15, + "url": "http://foo.com/api/get-priority" +} +``` + +The `"schema"` property defines a type, including an object types, that will be +required on submission. The data provided by the submission will be sent via +`POST` request as the body to the URL provided. The endpoint must respond with +an integer for the priority or null to use the default priority. The result +will be cached for `"ttl"` minutes before being reattempted. + + +### Tuple-Wrapping Input +This changes the type of an input provider for compatibility with Shesmu. The +crux is this: Shesmu's tagged unions are more limited than Víðarr's. Shesmu +requires that a tagged union have a tuple or object while Víðarr permits either +of those. When using the _one-of_ input source, this introduces the possibility +of creating a type that Shesmu cannot process. This allows wrapping an +priority input's type in a single element tuple, thereby making it compatible +with Shesmu. + +``` +{ + "type": "tuple", + "inner": {...} +} +``` + +## Priority Formula +Priority formulas provided in Víðarr core. + +### Constant +Returns a constant value. + +``` +{ + "type": "constant", + "value": 100 +} +``` + +# Input Variable +Accesses one of the input scores. If no input score has the identifier +`"name"`, the minimum integer value is used. + +``` +{ + "type": "input", + "name: "foo" +} +``` + +### Minimum and Maximum +Takes the minimum or maximum of other formulas. + +``` +{ + "type": "maximum", + "components": [ ... ] +} +``` + +or + +``` +{ + "type": "minimum", + "components": [ ... ] +} +``` + +### Product +Computes the product of other formulas (_i.e._, multiplies their scores). + +``` +{ + "type": "product", + "components": [ ... ] +} +``` + +### Subtraction +Computes the difference between two formulas; the result of `"left"` minus the +result of `"right"`. + +``` +{ + "type": "difference", + "left": ..., + "right": ... +} +``` + +### Summation +Computes the summation of other formulas (_i.e._, adds their scores). + +``` +{ + "type": "sum", + "components": [ ... ] +} +``` + +# Temporal Escalating with Multiplier +Increases the priority as a workflow run sits around. The duration the workflow +run has been waiting is looked up in the `"escalation"` object; the keys are an +[ISO-8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) and the +values are a floating point number. The smallest matching duration is used and +the score is multiplied by the value provided. Values need to be greater than 1 +to increase priority. If workflow run has been waiting less than the smallest +duration in the dictionary, the original priority is used. The original +priority is provided using the `"base"` formula. + +``` +{ + "type": "escalating-multiplier", + "base": ..., + "escalation": { + "PT1H": 1.2, + "PT12H": 2.0 + } +} +``` + +# Temporal Escalating with Offset +Increases the priority as a workflow run sits around. The duration the workflow +run has been waiting is looked up in the `"escalation"` object; the keys are an +[ISO-8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) and the +values are a integer. The smallest matching duration is used and the value +provided is added to the original score. Values need to be greater than 1 to +increase priority. If workflow run has been waiting less than the smallest +duration in the dictionary, the original priority is used. The original +priority is provided using the `"base"` formula. + +``` +{ + "type": "escalating-offset", + "base": ..., + "escalation": { + "PT1H": 10, + "PT12H": 100 + } +} +``` + +## Priority Scorer +Priority scorers provided in Víðarr core. + +### All Of +Checks several priority scorers and allows permits the workflow run to proceed +if all scorers allow it to proceed. + +``` +{ + "type": "all", + "scorers": [ ... ] +} +``` + +This can be combined with the ranked max-in-flight family to allow a global +limit with per-workflow limits. For example: + +``` +{ + "scorers": [ + { + "maxInFlight": 500, + "type": "ranked-max-in-flight" + }, + { + "maxInFlight": 20, + "useCustom": true, + "type": "ranked-max-in-flight-by-workflow" + } + ], + "type": "all" +} +``` + +This would let the top 500 workflow runs to execute as long as they are also +among the top 20 workflow run in their respective workflow type. + +### Any Of +Checks several priority scorers and allows permits the workflow run to proceed +if any scorer would allow it to proceed. + +``` +{ + "type": "any", + "scorers": [ ... ] +} +``` + +### Cut-off +Allows the workflow run to start if the score is strictly greater than `"cutoff"`. + +``` +{ + "type": "cutoff", + "cutoff": 9000 +} +``` + +### Ranked Max-in-flight +Ranks workflow runs by score and allows the top ones to run, where the number +allowed to run is `"maxInFlight"`. This workflow makes a best effort to keep +the total number running at or below that limit, but various conditions, +including server relaunch or being used in an `"any"` scorer, may cause it to +exceed that bound. + +This scorer comes in a few flavours: + +- `"ranked-max-in-flight"`: the limit is applied to all workflow runs +- `"ranked-max-in-flight-by-workflow"`: the limit is applied per workflow type +- `"ranked-max-in-flight-by-workflow-version"`: the limit is applied per workflow type + including version + +The limit cannot be set individually per workflow in this configuration. +However, `"ranked-max-in-flight-by-workflow"` and +`"ranked-max-in-flight-by-workflow-version"` have an additional property +`"useCustom"`, which will use the max-in-flight values set when a workflow is +created, as is visible through the `/api/max-in-flight` endpoint. In that case +`"maxInFlight"` is treated as a fallback. + +``` +{ + "type": "ranked-max-in-flight", + "maxInFlight": 500 +} +``` + +or + +``` +{ + "type": "ranked-max-in-flight-by-workflow", + "useCustom": true, + "maxInFlight": 50 +} +``` + +or + +``` +{ + "type": "ranked-max-in-flight-by-workflow", + "useCustom": false, + "maxInFlight": 50 +} +``` diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/AllPriorityScorer.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/AllPriorityScorer.java new file mode 100644 index 00000000..d2686357 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/AllPriorityScorer.java @@ -0,0 +1,34 @@ +package ca.on.oicr.gsi.vidarr.core; + +import java.time.Instant; +import java.util.OptionalInt; + +public final class AllPriorityScorer extends BaseAggregatePriorityScorer { + + @Override + public boolean compute( + String workflowName, + String workflowVersion, + String vidarrId, + Instant created, + OptionalInt workflowMaxInFlight, + int score) { + for (var i = 0; i < scorers.size(); i++) { + if (!scorers + .get(i) + .compute(workflowName, workflowVersion, vidarrId, created, workflowMaxInFlight, score)) { + for (var j = 0; j < i; j++) { + scorers.get(j).release(workflowName, workflowVersion, vidarrId); + } + return false; + } + } + + return true; + } + + @Override + protected void startupInner() { + // Do nothing + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/AnyPriorityScorer.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/AnyPriorityScorer.java new file mode 100644 index 00000000..41e149f8 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/AnyPriorityScorer.java @@ -0,0 +1,36 @@ +package ca.on.oicr.gsi.vidarr.core; + +import java.time.Instant; +import java.util.OptionalInt; + +public final class AnyPriorityScorer extends BaseAggregatePriorityScorer { + + @Override + public boolean compute( + String workflowName, + String workflowVersion, + String vidarrId, + Instant created, + OptionalInt workflowMaxInFlight, + int score) { + for (var i = 0; i < scorers.size(); i++) { + if (scorers + .get(i) + .compute(workflowName, workflowVersion, vidarrId, created, workflowMaxInFlight, score)) { + for (var j = 0; j < scorers.size(); j++) { + if (j != i) { + scorers.get(j).recover(workflowName, workflowVersion, vidarrId); + } + } + return true; + } + } + + return false; + } + + @Override + protected void startupInner() { + // Do nothing. + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ArrayPriorityInput.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ArrayPriorityInput.java new file mode 100644 index 00000000..72e99ede --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ArrayPriorityInput.java @@ -0,0 +1,104 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.BasicType; +import ca.on.oicr.gsi.vidarr.JsonPost; +import ca.on.oicr.gsi.vidarr.PriorityInput; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.undertow.Handlers; +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.Headers; +import io.undertow.util.StatusCodes; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Instant; +import java.util.List; +import java.util.Optional; + +public final class ArrayPriorityInput implements PriorityInput { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private Path file; + private int overflowPriority; + private int underflowPriority; + private List values = List.of(); + + @Override + public int compute(String workflowName, String workflowVersion, Instant created, JsonNode input) { + // Create a local reference for thread safety + final var values = this.values; + if (input.isInt()) { + final var index = input.asInt(); + if (index < 0) { + return underflowPriority; + } else if (index >= values.size()) { + return overflowPriority; + } else { + return values.get(index); + } + } else { + return underflowPriority; + } + } + + private void dump(HttpServerExchange exchange) throws IOException { + exchange.setStatusCode(StatusCodes.OK); + exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json"); + exchange.getResponseSender().send(MAPPER.writeValueAsString(values)); + } + + public Path getFile() { + return file; + } + + public int getOverflowPriority() { + return overflowPriority; + } + + public int getUnderflowPriority() { + return underflowPriority; + } + + @Override + public Optional httpHandler() { + return Optional.of( + Handlers.routing() + .get("/", this::dump) + .put("/", JsonPost.parse(MAPPER, new TypeReference<>() {}, this::update))); + } + + @Override + public BasicType inputFromSubmitter() { + return BasicType.STRING; + } + + public void setFile(Path file) { + this.file = file; + } + + public void setOverflowPriority(int overflowPriority) { + this.overflowPriority = overflowPriority; + } + + public void setUnderflowPriority(int underflowPriority) { + this.underflowPriority = underflowPriority; + } + + @Override + public void startup(String resourceName, String inputName) { + try { + values = MAPPER.readValue(file.toFile(), new TypeReference<>() {}); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void update(HttpServerExchange exchange, List newValues) throws IOException { + values = newValues; + MAPPER.writeValue(file.toFile(), values); + exchange.setStatusCode(StatusCodes.OK); + exchange.getResponseHeaders().put(Headers.CONTENT_LENGTH, 0); + exchange.getResponseSender().send(""); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseAggregatePriorityFormula.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseAggregatePriorityFormula.java new file mode 100644 index 00000000..4ea5f291 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseAggregatePriorityFormula.java @@ -0,0 +1,26 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.PriorityFormula; +import java.time.Instant; +import java.util.List; +import java.util.function.ToIntFunction; +import java.util.stream.IntStream; + +public abstract class BaseAggregatePriorityFormula implements PriorityFormula { + private List components; + + protected abstract int aggregate(IntStream stream); + + @Override + public final int compute(ToIntFunction inputs, Instant created) { + return aggregate(components.stream().mapToInt(c -> c.compute(inputs, created))); + } + + public final List getComponents() { + return components; + } + + public final void setComponents(List components) { + this.components = components; + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseAggregatePriorityScorer.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseAggregatePriorityScorer.java new file mode 100644 index 00000000..e3699d1c --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseAggregatePriorityScorer.java @@ -0,0 +1,53 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.PriorityScorer; +import io.undertow.Handlers; +import io.undertow.server.HttpHandler; +import java.util.List; +import java.util.Optional; + +public abstract class BaseAggregatePriorityScorer implements PriorityScorer { + protected List scorers; + + public final List getScorers() { + return scorers; + } + + @Override + public final Optional httpHandler() { + final var routes = Handlers.path(); + for (var i = 0; i < scorers.size(); i++) { + final var prefix = "/" + i; + scorers.get(i).httpHandler().ifPresent(h -> routes.addPrefixPath(prefix, h)); + } + return Optional.of(routes); + } + + @Override + public final void recover(String workflowName, String workflowVersion, String vidarrId) { + for (final var scorer : scorers) { + scorer.recover(workflowName, workflowVersion, vidarrId); + } + } + + @Override + public final void release(String workflowName, String workflowVersion, String vidarrId) { + for (final var scorer : scorers) { + scorer.release(workflowName, workflowVersion, vidarrId); + } + } + + public final void setScorers(List scorers) { + this.scorers = scorers; + } + + @Override + public final void startup() { + for (final var scorer : scorers) { + scorer.startup(); + } + startupInner(); + } + + protected abstract void startupInner(); +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseInFlightCollectingPriorityScorer.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseInFlightCollectingPriorityScorer.java new file mode 100644 index 00000000..1532498f --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseInFlightCollectingPriorityScorer.java @@ -0,0 +1,114 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.PriorityScorer; +import io.undertow.server.HttpHandler; +import java.time.Instant; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.SortedSet; + +public abstract class BaseInFlightCollectingPriorityScorer implements PriorityScorer { + protected record WorkflowRunScore(String vidarrId, int priority) + implements Comparable { + + @Override + public int compareTo(WorkflowRunScore workflowRunScore) { + var result = Integer.compare(workflowRunScore.priority, this.priority); + if (result == 0) { + result = this.vidarrId.compareTo(workflowRunScore.vidarrId()); + } + return result; + } + } + + private int maxInFlight; + + @Override + public final boolean compute( + String workflowName, + String workflowVersion, + String vidarrId, + Instant created, + OptionalInt workflowMaxInFlight, + int score) { + if (score == Integer.MAX_VALUE) { + score--; + } + final var active = get(workflowName, workflowVersion); + final var limit = getLimit(workflowName, workflowVersion, maxInFlight, workflowMaxInFlight); + synchronized (active) { + final var existing = active.stream().filter(e -> e.vidarrId().equals(vidarrId)).findFirst(); + if (existing.isPresent()) { + final var existingPriority = existing.get().priority(); + if (existingPriority == Integer.MAX_VALUE) { + return true; + } else if (existingPriority == score) { + if (active.headSet(existing.get()).size() < limit) { + active.remove(existing.get()); + active.add(new WorkflowRunScore(vidarrId, Integer.MAX_VALUE)); + return true; + } else { + return false; + } + + } else { + active.remove(existing.get()); + } + } + final var wfrScore = new WorkflowRunScore(vidarrId, score); + active.add(wfrScore); + // Checking that an existing record was present ensures that we don't allow first-come low + // priority jobs to take all the tokens + if (existing.isPresent() && active.headSet(wfrScore).size() < limit) { + active.remove(wfrScore); + active.add(new WorkflowRunScore(vidarrId, Integer.MAX_VALUE)); + return true; + } else { + return false; + } + } + } + + protected abstract SortedSet get(String workflowName, String workflowVersion); + + protected abstract int getLimit( + String workflowName, + String workflowVersion, + int maxInFlight, + OptionalInt workflowMaxInFlight); + + public final int getMaxInFlight() { + return maxInFlight; + } + + @Override + public final Optional httpHandler() { + return Optional.empty(); + } + + @Override + public final void recover(String workflowName, String workflowVersion, String vidarrId) { + final var active = get(workflowName, workflowVersion); + synchronized (active) { + active.removeIf(a -> a.vidarrId().equals(vidarrId)); + active.add(new WorkflowRunScore(vidarrId, Integer.MAX_VALUE)); + } + } + + @Override + public final void release(String workflowName, String workflowVersion, String vidarrId) { + final var active = get(workflowName, workflowVersion); + synchronized (active) { + active.removeIf(a -> a.vidarrId().equals(vidarrId)); + } + } + + public final void setMaxInFlight(int maxInFlight) { + this.maxInFlight = maxInFlight; + } + + @Override + public final void startup() { + // Do nothing + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseTemporalEscalationPriorityFormula.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseTemporalEscalationPriorityFormula.java new file mode 100644 index 00000000..67de4ed2 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/BaseTemporalEscalationPriorityFormula.java @@ -0,0 +1,37 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.PriorityFormula; +import java.time.Duration; +import java.time.Instant; +import java.util.TreeMap; +import java.util.function.ToIntFunction; + +public abstract class BaseTemporalEscalationPriorityFormula implements PriorityFormula { + private PriorityFormula base; + private TreeMap escalation; + + @Override + public final int compute(ToIntFunction inputs, Instant created) { + final var entry = escalation.floorEntry(Duration.between(created, Instant.now())); + final var original = base.compute(inputs, created); + return entry == null ? original : escalate(original, entry.getValue()); + } + + protected abstract int escalate(int original, T escalation); + + public final PriorityFormula getBase() { + return base; + } + + public final TreeMap getEscalation() { + return escalation; + } + + public final void setBase(PriorityFormula base) { + this.base = base; + } + + public final void setEscalation(TreeMap escalation) { + this.escalation = escalation; + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ConstantPriorityFormula.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ConstantPriorityFormula.java new file mode 100644 index 00000000..febfc096 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ConstantPriorityFormula.java @@ -0,0 +1,22 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.PriorityFormula; +import java.time.Instant; +import java.util.function.ToIntFunction; + +public final class ConstantPriorityFormula implements PriorityFormula { + private int value; + + @Override + public int compute(ToIntFunction inputs, Instant created) { + return value; + } + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CoreConsumableResourceProvider.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CoreConsumableResourceProvider.java new file mode 100644 index 00000000..7bef8a7b --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CoreConsumableResourceProvider.java @@ -0,0 +1,17 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.Pair; +import ca.on.oicr.gsi.vidarr.ConsumableResource; +import ca.on.oicr.gsi.vidarr.ConsumableResourceProvider; +import java.util.stream.Stream; + +public final class CoreConsumableResourceProvider implements ConsumableResourceProvider { + + @Override + public Stream>> types() { + return Stream.of( + new Pair<>("manual-override", ManualOverrideConsumableResource.class), + new Pair<>("max-in-flight", MaxInFlightConsumableResource.class), + new Pair<>("priority", PriorityConsumableResource.class)); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityFormulaProvider.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityFormulaProvider.java new file mode 100644 index 00000000..df8e7f23 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityFormulaProvider.java @@ -0,0 +1,23 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.Pair; +import ca.on.oicr.gsi.vidarr.PriorityFormula; +import ca.on.oicr.gsi.vidarr.PriorityFormulaProvider; +import java.util.stream.Stream; + +public final class CorePriorityFormulaProvider implements PriorityFormulaProvider { + + @Override + public Stream>> types() { + return Stream.of( + new Pair<>("constant", ConstantPriorityFormula.class), + new Pair<>("difference", SubtractPriorityFormula.class), + new Pair<>("escalating-multiplier", MultiplierTemporalEscalationPriorityFormula.class), + new Pair<>("escalating-offset", OffsetTemporalEscalationPriorityFormula.class), + new Pair<>("input", PriorityFormulaVariable.class), + new Pair<>("maximum", MaximumPriorityFormula.class), + new Pair<>("minimum", MinimumPriorityFormula.class), + new Pair<>("product", ProductPriorityFormula.class), + new Pair<>("sum", SumPriorityFormula.class)); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityInputProvider.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityInputProvider.java new file mode 100644 index 00000000..c42ebeeb --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityInputProvider.java @@ -0,0 +1,20 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.Pair; +import ca.on.oicr.gsi.vidarr.PriorityInput; +import ca.on.oicr.gsi.vidarr.PriorityInputProvider; +import java.util.stream.Stream; + +public final class CorePriorityInputProvider implements PriorityInputProvider { + + @Override + public Stream>> types() { + return Stream.of( + new Pair<>("json-array", ArrayPriorityInput.class), + new Pair<>("json-dictionary", DictionaryPriorityInput.class), + new Pair<>("oneOf", OneOfPriorityInput.class), + new Pair<>("raw", RawPriorityInput.class), + new Pair<>("remote", RemotePriorityInput.class), + new Pair<>("tuple", TuplePriorityInput.class)); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityScorerProvider.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityScorerProvider.java new file mode 100644 index 00000000..14f498a7 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CorePriorityScorerProvider.java @@ -0,0 +1,23 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.Pair; +import ca.on.oicr.gsi.vidarr.PriorityScorer; +import ca.on.oicr.gsi.vidarr.PriorityScorerProvider; +import java.util.stream.Stream; + +public class CorePriorityScorerProvider implements PriorityScorerProvider { + + @Override + public Stream>> types() { + return Stream.of( + new Pair<>("all", AllPriorityScorer.class), + new Pair<>("any", AnyPriorityScorer.class), + new Pair<>("cutoff", CutoffPriorityScorer.class), + new Pair<>("ranked-max-in-flight", InFlightCollectingPriorityScorer.class), + new Pair<>( + "ranked-max-in-flight-by-workflow", InFlightCollectingByWorkflowPriorityScorer.class), + new Pair<>( + "ranked-max-in-flight-by-workflow-version", + InFlightCollectingByWorkflowVersionPriorityScorer.class)); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CutoffPriorityScorer.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CutoffPriorityScorer.java new file mode 100644 index 00000000..b3f6287d --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/CutoffPriorityScorer.java @@ -0,0 +1,44 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.PriorityScorer; +import io.undertow.server.HttpHandler; +import java.time.Instant; +import java.util.Optional; +import java.util.OptionalInt; + +public final class CutoffPriorityScorer implements PriorityScorer { + private int cutoff; + + @Override + public boolean compute( + String workflowName, + String workflowVersion, + String vidarrId, + Instant created, + OptionalInt workflowMaxInFlight, + int score) { + return score > cutoff; + } + + public int getCutoff() { + return cutoff; + } + + @Override + public Optional httpHandler() { + return Optional.empty(); + } + + @Override + public void recover(String workflowName, String workflowVersion, String vidarrId) {} + + @Override + public void release(String workflowName, String workflowVersion, String vidarrId) {} + + public void setCutoff(int cutoff) { + this.cutoff = cutoff; + } + + @Override + public void startup() {} +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/DictionaryPriorityInput.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/DictionaryPriorityInput.java new file mode 100644 index 00000000..9b2f2ff6 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/DictionaryPriorityInput.java @@ -0,0 +1,83 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.BasicType; +import ca.on.oicr.gsi.vidarr.JsonPost; +import ca.on.oicr.gsi.vidarr.PriorityInput; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.undertow.Handlers; +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.Headers; +import io.undertow.util.StatusCodes; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; + +public final class DictionaryPriorityInput implements PriorityInput { + private static final ObjectMapper MAPPER = new ObjectMapper(); + private int defaultPriority; + private Path file; + private Map values = Map.of(); + + @Override + public int compute(String workflowName, String workflowVersion, Instant created, JsonNode input) { + return values.getOrDefault(input.asText(), defaultPriority); + } + + private void dump(HttpServerExchange exchange) throws IOException { + exchange.setStatusCode(StatusCodes.OK); + exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json"); + exchange.getResponseSender().send(MAPPER.writeValueAsString(values)); + } + + public int getDefaultPriority() { + return defaultPriority; + } + + public Path getFile() { + return file; + } + + @Override + public Optional httpHandler() { + return Optional.of( + Handlers.routing() + .get("/", this::dump) + .put("/", JsonPost.parse(MAPPER, new TypeReference<>() {}, this::update))); + } + + @Override + public BasicType inputFromSubmitter() { + return BasicType.STRING; + } + + public void setDefaultPriority(int defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public void setFile(Path file) { + this.file = file; + } + + @Override + public void startup(String resourceName, String inputName) { + try { + values = MAPPER.readValue(file.toFile(), new TypeReference<>() {}); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void update(HttpServerExchange exchange, Map newValues) + throws IOException { + values = newValues; + MAPPER.writeValue(file.toFile(), values); + exchange.setStatusCode(StatusCodes.OK); + exchange.getResponseHeaders().put(Headers.CONTENT_LENGTH, 0); + exchange.getResponseSender().send(""); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingByWorkflowPriorityScorer.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingByWorkflowPriorityScorer.java new file mode 100644 index 00000000..c1e0e234 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingByWorkflowPriorityScorer.java @@ -0,0 +1,36 @@ +package ca.on.oicr.gsi.vidarr.core; + +import java.util.Map; +import java.util.OptionalInt; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; + +public final class InFlightCollectingByWorkflowPriorityScorer + extends BaseInFlightCollectingPriorityScorer { + + private final Map> active = new ConcurrentHashMap<>(); + private boolean useCustom; + + @Override + protected SortedSet get(String workflowName, String workflowVersion) { + return active.computeIfAbsent(workflowName, k -> new TreeSet<>()); + } + + @Override + protected int getLimit( + String workflowName, + String workflowVersion, + int maxInFlight, + OptionalInt workflowMaxInFlight) { + return useCustom ? workflowMaxInFlight.orElse(maxInFlight) : maxInFlight; + } + + public boolean isUseCustom() { + return useCustom; + } + + public void setUseCustom(boolean useCustom) { + this.useCustom = useCustom; + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingByWorkflowVersionPriorityScorer.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingByWorkflowVersionPriorityScorer.java new file mode 100644 index 00000000..f7ed71dc --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingByWorkflowVersionPriorityScorer.java @@ -0,0 +1,37 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.Pair; +import java.util.Map; +import java.util.OptionalInt; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; + +public final class InFlightCollectingByWorkflowVersionPriorityScorer + extends BaseInFlightCollectingPriorityScorer { + private final Map, SortedSet> active = + new ConcurrentHashMap<>(); + private boolean useCustom; + + @Override + protected SortedSet get(String workflowName, String workflowVersion) { + return active.computeIfAbsent(new Pair<>(workflowName, workflowVersion), k -> new TreeSet<>()); + } + + @Override + protected int getLimit( + String workflowName, + String workflowVersion, + int maxInFlight, + OptionalInt workflowMaxInFlight) { + return useCustom ? workflowMaxInFlight.orElse(maxInFlight) : maxInFlight; + } + + public boolean isUseCustom() { + return useCustom; + } + + public void setUseCustom(boolean useCustom) { + this.useCustom = useCustom; + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingPriorityScorer.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingPriorityScorer.java new file mode 100644 index 00000000..00a231ae --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/InFlightCollectingPriorityScorer.java @@ -0,0 +1,24 @@ +package ca.on.oicr.gsi.vidarr.core; + +import java.util.OptionalInt; +import java.util.SortedSet; +import java.util.TreeSet; + +public final class InFlightCollectingPriorityScorer extends BaseInFlightCollectingPriorityScorer { + + private final SortedSet active = new TreeSet<>(); + + @Override + protected SortedSet get(String workflowName, String workflowVersion) { + return active; + } + + @Override + protected int getLimit( + String workflowName, + String workflowVersion, + int maxInFlight, + OptionalInt workflowMaxInFlight) { + return maxInFlight; + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ManualOverrideConsumableResource.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ManualOverrideConsumableResource.java index d648ff58..e5d54c72 100644 --- a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ManualOverrideConsumableResource.java +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ManualOverrideConsumableResource.java @@ -3,7 +3,6 @@ import ca.on.oicr.gsi.Pair; import ca.on.oicr.gsi.vidarr.BasicType; import ca.on.oicr.gsi.vidarr.ConsumableResource; -import ca.on.oicr.gsi.vidarr.ConsumableResourceProvider; import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; @@ -20,12 +19,8 @@ import java.util.Set; import java.util.TreeSet; import java.util.function.BiPredicate; -import java.util.stream.Stream; public final class ManualOverrideConsumableResource implements ConsumableResource { - public static ConsumableResourceProvider provider() { - return () -> Stream.of(new Pair<>("manual-override", ManualOverrideConsumableResource.class)); - } private static final ObjectMapper MAPPER = new ObjectMapper(); private final Set allowList = new TreeSet<>(); diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MaxInFlightConsumableResource.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MaxInFlightConsumableResource.java index 9414f535..3a8b86fa 100644 --- a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MaxInFlightConsumableResource.java +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MaxInFlightConsumableResource.java @@ -3,7 +3,6 @@ import ca.on.oicr.gsi.Pair; import ca.on.oicr.gsi.vidarr.BasicType; import ca.on.oicr.gsi.vidarr.ConsumableResource; -import ca.on.oicr.gsi.vidarr.ConsumableResourceProvider; import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.JsonNode; @@ -12,13 +11,9 @@ import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Stream; // This is the global one. Per-Workflow is MaxInFlightByWorkflow public final class MaxInFlightConsumableResource implements ConsumableResource { - public static ConsumableResourceProvider provider() { - return () -> Stream.of(new Pair<>("max-in-flight", MaxInFlightConsumableResource.class)); - } @JsonIgnore private final Set inFlight = ConcurrentHashMap.newKeySet(); private int maximum; diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MaximumPriorityFormula.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MaximumPriorityFormula.java new file mode 100644 index 00000000..0790132c --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MaximumPriorityFormula.java @@ -0,0 +1,11 @@ +package ca.on.oicr.gsi.vidarr.core; + +import java.util.stream.IntStream; + +public final class MaximumPriorityFormula extends BaseAggregatePriorityFormula { + + @Override + protected int aggregate(IntStream stream) { + return stream.max().orElse(Integer.MIN_VALUE); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MinimumPriorityFormula.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MinimumPriorityFormula.java new file mode 100644 index 00000000..a6a2053f --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MinimumPriorityFormula.java @@ -0,0 +1,11 @@ +package ca.on.oicr.gsi.vidarr.core; + +import java.util.stream.IntStream; + +public final class MinimumPriorityFormula extends BaseAggregatePriorityFormula { + + @Override + protected int aggregate(IntStream stream) { + return stream.min().orElse(Integer.MAX_VALUE); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MultiplierTemporalEscalationPriorityFormula.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MultiplierTemporalEscalationPriorityFormula.java new file mode 100644 index 00000000..c59d6ef9 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/MultiplierTemporalEscalationPriorityFormula.java @@ -0,0 +1,11 @@ +package ca.on.oicr.gsi.vidarr.core; + +public final class MultiplierTemporalEscalationPriorityFormula + extends BaseTemporalEscalationPriorityFormula { + + @Override + protected int escalate(int original, Double escalation) { + final var result = original * escalation; + return result > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) (result); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/OffsetTemporalEscalationPriorityFormula.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/OffsetTemporalEscalationPriorityFormula.java new file mode 100644 index 00000000..6c061f51 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/OffsetTemporalEscalationPriorityFormula.java @@ -0,0 +1,10 @@ +package ca.on.oicr.gsi.vidarr.core; + +public final class OffsetTemporalEscalationPriorityFormula + extends BaseTemporalEscalationPriorityFormula { + + @Override + protected int escalate(int original, Integer escalation) { + return original + escalation; + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/OneOfPriorityInput.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/OneOfPriorityInput.java new file mode 100644 index 00000000..3a043f2d --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/OneOfPriorityInput.java @@ -0,0 +1,80 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.Pair; +import ca.on.oicr.gsi.vidarr.BasicType; +import ca.on.oicr.gsi.vidarr.PriorityInput; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import io.undertow.Handlers; +import io.undertow.server.HttpHandler; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; + +public final class OneOfPriorityInput implements PriorityInput { + private int defaultPriority; + private Map inputs; + + @Override + public int compute(String workflowName, String workflowVersion, Instant created, JsonNode input) { + if (!input.has("type")) { + return defaultPriority; + } + final var type = input.get("type"); + if (!type.isTextual()) { + return defaultPriority; + } + final var priorityInput = inputs.get(type.asText()); + if (priorityInput == null) { + return defaultPriority; + } + final var contents = input.get("contents"); + return priorityInput.compute( + workflowName, + workflowVersion, + created, + contents == null ? NullNode.getInstance() : contents); + } + + public int getDefaultPriority() { + return defaultPriority; + } + + public Map getInputs() { + return inputs; + } + + @Override + public Optional httpHandler() { + final var routes = Handlers.path(); + for (final var input : inputs.entrySet()) { + input + .getValue() + .httpHandler() + .ifPresent(handler -> routes.addPrefixPath(input.getKey(), handler)); + } + return Optional.of(routes); + } + + @Override + public BasicType inputFromSubmitter() { + return BasicType.taggedUnionFromPairs( + inputs.entrySet().stream() + .map(e -> new Pair<>(e.getKey(), e.getValue().inputFromSubmitter()))); + } + + public void setDefaultPriority(int defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public void setInputs(Map inputs) { + this.inputs = inputs; + } + + @Override + public void startup(String resourceName, String inputName) { + for (final var input : inputs.entrySet()) { + input.getValue().startup(resourceName, inputName + " " + input.getKey()); + } + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/PriorityConsumableResource.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/PriorityConsumableResource.java new file mode 100644 index 00000000..6a603392 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/PriorityConsumableResource.java @@ -0,0 +1,140 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.Pair; +import ca.on.oicr.gsi.vidarr.BasicType; +import ca.on.oicr.gsi.vidarr.ConsumableResource; +import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse; +import ca.on.oicr.gsi.vidarr.PriorityFormula; +import ca.on.oicr.gsi.vidarr.PriorityInput; +import ca.on.oicr.gsi.vidarr.PriorityScorer; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import io.undertow.Handlers; +import io.undertow.server.HttpHandler; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.stream.Collectors; + +public final class PriorityConsumableResource implements ConsumableResource { + private Integer defaultPriority; + private PriorityFormula formula; + private Map inputs; + private String name; + private PriorityScorer scorer; + + @Override + public Optional httpHandler() { + final var routes = scorer.httpHandler().map(Handlers::path).orElseGet(Handlers::path); + for (final var entry : inputs.entrySet()) { + entry.getValue().httpHandler().ifPresent(h -> routes.addPrefixPath("/" + entry.getKey(), h)); + } + return Optional.of(routes); + } + + @Override + public Optional> inputFromSubmitter() { + final var type = + BasicType.object( + inputs.entrySet().stream() + .map(e -> new Pair<>(e.getKey(), e.getValue().inputFromSubmitter()))); + return Optional.of(new Pair<>(name, defaultPriority == null ? type : type.asOptional())); + } + + @Override + public boolean isInputFromSubmitterRequired() { + return defaultPriority == null; + } + + @Override + public int priority() { + return -1000; + } + + @Override + public void recover( + String workflowName, + String workflowVersion, + String vidarrId, + Optional resourceJson) { + scorer.recover(workflowName, workflowVersion, vidarrId); + } + + @Override + public void release( + String workflowName, String workflowVersion, String vidarrId, Optional input) { + scorer.release(workflowName, workflowVersion, vidarrId); + } + + @Override + public ConsumableResourceResponse request( + String workflowName, + String workflowVersion, + String vidarrId, + Instant createdTime, + OptionalInt workflowMaxInFlight, + Optional input) { + final Optional> inputValues = + input.map( + i -> + inputs.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> { + final var value = i.get(e.getKey()); + return e.getValue() + .compute( + workflowName, + workflowVersion, + createdTime, + value == null ? NullNode.getInstance() : value); + }))); + final int priority = + inputValues + .map( + iv -> + formula.compute(name -> iv.getOrDefault(name, Integer.MIN_VALUE), createdTime)) + .orElse(defaultPriority); + final var available = + scorer.compute( + workflowName, workflowVersion, vidarrId, createdTime, workflowMaxInFlight, priority); + return new ConsumableResourceResponse() { + + @Override + public T apply(Visitor visitor) { + inputValues.ifPresentOrElse( + iv -> iv.forEach((varName, value) -> visitor.set(name + "-" + varName, value)), + () -> inputs.keySet().forEach(varName -> visitor.clear(name + "-" + varName))); + visitor.set(name, priority); + return available ? visitor.available() : visitor.unavailable(); + } + }; + } + + public void setDefaultPriority(Integer defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public void setFormula(PriorityFormula formula) { + this.formula = formula; + } + + public void setInputs(Map inputs) { + this.inputs = inputs; + } + + public void setScorer(PriorityScorer scorer) { + this.scorer = scorer; + } + + @Override + public void startup(String name) { + this.name = name; + for (final var input : inputs.entrySet()) { + input.getValue().startup(name, input.getKey()); + } + scorer.startup(); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/PriorityFormulaVariable.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/PriorityFormulaVariable.java new file mode 100644 index 00000000..a9384091 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/PriorityFormulaVariable.java @@ -0,0 +1,18 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.PriorityFormula; +import java.time.Instant; +import java.util.function.ToIntFunction; + +public final class PriorityFormulaVariable implements PriorityFormula { + private String name; + + @Override + public int compute(ToIntFunction inputs, Instant created) { + return inputs.applyAsInt(name); + } + + public void setName(String name) { + this.name = name; + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ProductPriorityFormula.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ProductPriorityFormula.java new file mode 100644 index 00000000..ea74ed97 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/ProductPriorityFormula.java @@ -0,0 +1,11 @@ +package ca.on.oicr.gsi.vidarr.core; + +import java.util.stream.IntStream; + +public final class ProductPriorityFormula extends BaseAggregatePriorityFormula { + + @Override + protected int aggregate(IntStream stream) { + return stream.reduce(1, (x, y) -> x * y); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/RawPriorityInput.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/RawPriorityInput.java new file mode 100644 index 00000000..651c1b95 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/RawPriorityInput.java @@ -0,0 +1,41 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.BasicType; +import ca.on.oicr.gsi.vidarr.PriorityInput; +import com.fasterxml.jackson.databind.JsonNode; +import io.undertow.server.HttpHandler; +import java.time.Instant; +import java.util.Optional; + +public final class RawPriorityInput implements PriorityInput { + + private int defaultPriority; + + @Override + public int compute(String workflowName, String workflowVersion, Instant created, JsonNode input) { + return input.asInt(defaultPriority); + } + + public int getDefaultPriority() { + return defaultPriority; + } + + @Override + public Optional httpHandler() { + return Optional.empty(); + } + + @Override + public BasicType inputFromSubmitter() { + return BasicType.INTEGER.asOptional(); + } + + public void setDefaultPriority(int defaultPriority) { + this.defaultPriority = defaultPriority; + } + + @Override + public void startup(String resourceName, String inputName) { + // Do nothing. + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/RemotePriorityInput.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/RemotePriorityInput.java new file mode 100644 index 00000000..35578dc6 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/RemotePriorityInput.java @@ -0,0 +1,110 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.cache.KeyValueCache; +import ca.on.oicr.gsi.cache.SimpleRecord; +import ca.on.oicr.gsi.vidarr.BasicType; +import ca.on.oicr.gsi.vidarr.JsonBodyHandler; +import ca.on.oicr.gsi.vidarr.PriorityInput; +import com.fasterxml.jackson.core.JacksonException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import io.undertow.Handlers; +import io.undertow.server.HttpHandler; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.StatusCodes; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Optional; + +public final class RemotePriorityInput implements PriorityInput { + private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient(); + private static final ObjectMapper MAPPER = + new ObjectMapper().configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true); + private int defaultPriority; + private BasicType schema; + private int ttl = 15; + private String url; + private KeyValueCache> values; + + @Override + public int compute(String workflowName, String workflowVersion, Instant created, JsonNode input) { + try { + return values.get(MAPPER.writeValueAsString(input)).orElse(defaultPriority); + } catch (JacksonException e) { + e.printStackTrace(); + return defaultPriority; + } + } + + public int getDefaultPriority() { + return defaultPriority; + } + + public BasicType getSchema() { + return schema; + } + + public int getTtl() { + return ttl; + } + + public String getUrl() { + return url; + } + + @Override + public Optional httpHandler() { + return Optional.of(Handlers.routing().delete("/", this::reset)); + } + + @Override + public BasicType inputFromSubmitter() { + return BasicType.STRING; + } + + private void reset(HttpServerExchange exchange) { + exchange.setStatusCode(StatusCodes.NO_CONTENT); + values.invalidateAll(); + } + + public void setDefaultPriority(int defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public void setSchema(BasicType schema) { + this.schema = schema; + } + + public void setTtl(int ttl) { + this.ttl = ttl; + } + + public void setUrl(String url) { + this.url = url; + } + + @Override + public void startup(String resourceName, String inputName) { + values = + new KeyValueCache<>(resourceName + " " + inputName, ttl, SimpleRecord::new) { + @Override + protected Optional fetch(String body, Instant instant) throws Exception { + return HTTP_CLIENT + .send( + HttpRequest.newBuilder(URI.create(url)) + .header("Content-type", "application/json") + .POST(BodyPublishers.ofString(body, StandardCharsets.UTF_8)) + .build(), + new JsonBodyHandler<>(MAPPER, new TypeReference>() {})) + .body() + .get(); + } + }; + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/SubtractPriorityFormula.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/SubtractPriorityFormula.java new file mode 100644 index 00000000..9d45da88 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/SubtractPriorityFormula.java @@ -0,0 +1,31 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.PriorityFormula; +import java.time.Instant; +import java.util.function.ToIntFunction; + +public final class SubtractPriorityFormula implements PriorityFormula { + private PriorityFormula left; + private PriorityFormula right; + + @Override + public int compute(ToIntFunction inputs, Instant created) { + return left.compute(inputs, created) - right.compute(inputs, created); + } + + public PriorityFormula getLeft() { + return left; + } + + public PriorityFormula getRight() { + return right; + } + + public void setLeft(PriorityFormula left) { + this.left = left; + } + + public void setRight(PriorityFormula right) { + this.right = right; + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/SumPriorityFormula.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/SumPriorityFormula.java new file mode 100644 index 00000000..65957f5b --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/SumPriorityFormula.java @@ -0,0 +1,11 @@ +package ca.on.oicr.gsi.vidarr.core; + +import java.util.stream.IntStream; + +public final class SumPriorityFormula extends BaseAggregatePriorityFormula { + + @Override + protected int aggregate(IntStream stream) { + return stream.sum(); + } +} diff --git a/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/TuplePriorityInput.java b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/TuplePriorityInput.java new file mode 100644 index 00000000..86140a79 --- /dev/null +++ b/vidarr-core/src/main/java/ca/on/oicr/gsi/vidarr/core/TuplePriorityInput.java @@ -0,0 +1,46 @@ +package ca.on.oicr.gsi.vidarr.core; + +import ca.on.oicr.gsi.vidarr.BasicType; +import ca.on.oicr.gsi.vidarr.PriorityInput; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.NullNode; +import io.undertow.server.HttpHandler; +import java.time.Instant; +import java.util.Optional; + +public final class TuplePriorityInput implements PriorityInput { + + private PriorityInput inner; + + @Override + public int compute(String workflowName, String workflowVersion, Instant created, JsonNode input) { + return inner.compute( + workflowName, + workflowVersion, + created, + input.has(0) ? input.get(0) : NullNode.getInstance()); + } + + public PriorityInput getInner() { + return inner; + } + + @Override + public Optional httpHandler() { + return inner.httpHandler(); + } + + @Override + public BasicType inputFromSubmitter() { + return BasicType.tuple(inner.inputFromSubmitter()); + } + + public void setInner(PriorityInput inner) { + this.inner = inner; + } + + @Override + public void startup(String resourceName, String inputName) { + inner.startup(resourceName, inputName); + } +} diff --git a/vidarr-core/src/main/java/module-info.java b/vidarr-core/src/main/java/module-info.java index 70efe383..8f341e2c 100644 --- a/vidarr-core/src/main/java/module-info.java +++ b/vidarr-core/src/main/java/module-info.java @@ -1,8 +1,13 @@ import ca.on.oicr.gsi.vidarr.ConsumableResourceProvider; import ca.on.oicr.gsi.vidarr.InputProvisionerProvider; import ca.on.oicr.gsi.vidarr.OutputProvisionerProvider; -import ca.on.oicr.gsi.vidarr.core.ManualOverrideConsumableResource; -import ca.on.oicr.gsi.vidarr.core.MaxInFlightConsumableResource; +import ca.on.oicr.gsi.vidarr.PriorityFormulaProvider; +import ca.on.oicr.gsi.vidarr.PriorityInputProvider; +import ca.on.oicr.gsi.vidarr.PriorityScorerProvider; +import ca.on.oicr.gsi.vidarr.core.CoreConsumableResourceProvider; +import ca.on.oicr.gsi.vidarr.core.CorePriorityFormulaProvider; +import ca.on.oicr.gsi.vidarr.core.CorePriorityInputProvider; +import ca.on.oicr.gsi.vidarr.core.CorePriorityScorerProvider; import ca.on.oicr.gsi.vidarr.core.OneOfInputProvisioner; import ca.on.oicr.gsi.vidarr.core.OneOfOutputProvisioner; import ca.on.oicr.gsi.vidarr.core.RawInputProvisioner; @@ -23,14 +28,20 @@ requires java.naming; requires java.sql; requires java.xml; + requires java.net.http; requires simpleclient; provides ConsumableResourceProvider with - ManualOverrideConsumableResource, - MaxInFlightConsumableResource; + CoreConsumableResourceProvider; provides InputProvisionerProvider with OneOfInputProvisioner, RawInputProvisioner; provides OutputProvisionerProvider with OneOfOutputProvisioner; + provides PriorityInputProvider with + CorePriorityInputProvider; + provides PriorityFormulaProvider with + CorePriorityFormulaProvider; + provides PriorityScorerProvider with + CorePriorityScorerProvider; } diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityFormula.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityFormula.java new file mode 100644 index 00000000..0d46f7aa --- /dev/null +++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityFormula.java @@ -0,0 +1,68 @@ +package ca.on.oicr.gsi.vidarr; + +import ca.on.oicr.gsi.Pair; +import ca.on.oicr.gsi.vidarr.PriorityFormula.PriorityFormulaIdResolver; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.As; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.databind.DatabindContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; +import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.ServiceLoader; +import java.util.ServiceLoader.Provider; +import java.util.function.ToIntFunction; +import java.util.stream.Collectors; + +/** Computes a new score from the input scores to determine a workflow run's priority */ +@JsonTypeIdResolver(PriorityFormulaIdResolver.class) +@JsonTypeInfo(use = JsonTypeInfo.Id.CUSTOM, include = As.PROPERTY, property = "type") +public interface PriorityFormula { + + final class PriorityFormulaIdResolver extends TypeIdResolverBase { + + private final Map> knownIds = + ServiceLoader.load(PriorityFormulaProvider.class).stream() + .map(Provider::get) + .flatMap(PriorityFormulaProvider::types) + .collect(Collectors.toMap(Pair::first, Pair::second)); + + @Override + public Id getMechanism() { + return Id.CUSTOM; + } + + @Override + public String idFromValue(Object o) { + return knownIds.entrySet().stream() + .filter(known -> known.getValue().isInstance(o)) + .map(Entry::getKey) + .findFirst() + .orElseThrow(); + } + + @Override + public String idFromValueAndType(Object o, Class aClass) { + return idFromValue(o); + } + + @Override + public JavaType typeFromId(DatabindContext context, String id) throws IOException { + final var clazz = knownIds.get(id); + return clazz == null ? null : context.constructType(clazz); + } + } + + /** + * Calculate the appropriate score + * + * @param inputs the scores from the user-provided input + * @param created the time when the workflow was first submitted + * @return the score + */ + int compute(ToIntFunction inputs, Instant created); +} diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityFormulaProvider.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityFormulaProvider.java new file mode 100644 index 00000000..f241cd28 --- /dev/null +++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityFormulaProvider.java @@ -0,0 +1,10 @@ +package ca.on.oicr.gsi.vidarr; + +import ca.on.oicr.gsi.Pair; +import java.util.stream.Stream; + +/** Reads JSON configuration and instantiates priority formulae appropriately */ +public interface PriorityFormulaProvider { + /** Provides the type names and classes this plugin provides */ + Stream>> types(); +} diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInput.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInput.java new file mode 100644 index 00000000..1708668b --- /dev/null +++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInput.java @@ -0,0 +1,102 @@ +package ca.on.oicr.gsi.vidarr; + +import ca.on.oicr.gsi.Pair; +import ca.on.oicr.gsi.vidarr.PriorityInput.PriorityInputIdResolver; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.As; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.databind.DatabindContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; +import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; +import io.undertow.server.HttpHandler; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.ServiceLoader.Provider; +import java.util.stream.Collectors; + +/** + * Consume data from a submission request to prepare a number for use in the priority scoring + * consumable resource. + */ +@JsonTypeIdResolver(PriorityInputIdResolver.class) +@JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = "type") +public interface PriorityInput { + + final class PriorityInputIdResolver extends TypeIdResolverBase { + + private final Map> knownIds = + ServiceLoader.load(PriorityInputProvider.class).stream() + .map(Provider::get) + .flatMap(PriorityInputProvider::types) + .collect(Collectors.toMap(Pair::first, Pair::second)); + + @Override + public Id getMechanism() { + return Id.CUSTOM; + } + + @Override + public String idFromValue(Object o) { + return knownIds.entrySet().stream() + .filter(known -> known.getValue().isInstance(o)) + .map(Entry::getKey) + .findFirst() + .orElseThrow(); + } + + @Override + public String idFromValueAndType(Object o, Class aClass) { + return idFromValue(o); + } + + @Override + public JavaType typeFromId(DatabindContext context, String id) throws IOException { + final var clazz = knownIds.get(id); + return clazz == null ? null : context.constructType(clazz); + } + } + + /** + * Compute the integer priority for the submission request + * + * @param workflowName the name of the workflow + * @param workflowVersion the version of the workflow + * @param created the time the workflow was first submitted + * @param input the data included as part of the submission + * @return the score + */ + int compute(String workflowName, String workflowVersion, Instant created, JsonNode input); + + /** + * Priority inputs may provide an optional HTTP API to extend their functionality that will be + * accessible through their parent consumable resource. + * + *

Paths will be automatically prefixed with the instance's name, so the HTTP handler can + * assume it is at the root URL (i.e., /). + * + * @return the HTTP handler to use + */ + Optional httpHandler(); + + /** + * The type of data that must be provided as part of the submission request + * + * @return the type of data required + */ + BasicType inputFromSubmitter(); + + /** + * Perform any initialization required by this input + * + * @param resourceName the name of the consumable resource that ultimately owns this input + * @param inputName a unique identifier; depending on the configuration, this name may not be a + * valid Shesmu identifier. It should only be used for logging/caching purposes. + */ + void startup(String resourceName, String inputName); +} diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInputProvider.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInputProvider.java new file mode 100644 index 00000000..a0e65fe6 --- /dev/null +++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityInputProvider.java @@ -0,0 +1,10 @@ +package ca.on.oicr.gsi.vidarr; + +import ca.on.oicr.gsi.Pair; +import java.util.stream.Stream; + +/** Reads JSON configuration and instantiates priority inputs appropriately */ +public interface PriorityInputProvider { + /** Provides the type names and classes this plugin provides */ + Stream>> types(); +} diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorer.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorer.java new file mode 100644 index 00000000..0351d3dc --- /dev/null +++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorer.java @@ -0,0 +1,111 @@ +package ca.on.oicr.gsi.vidarr; + +import ca.on.oicr.gsi.Pair; +import ca.on.oicr.gsi.vidarr.PriorityScorer.PriorityScorerIdResolver; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.As; +import com.fasterxml.jackson.annotation.JsonTypeInfo.Id; +import com.fasterxml.jackson.databind.DatabindContext; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.annotation.JsonTypeIdResolver; +import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase; +import io.undertow.server.HttpHandler; +import java.io.IOException; +import java.time.Instant; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.ServiceLoader; +import java.util.ServiceLoader.Provider; +import java.util.stream.Collectors; + +/** Examine a score to determine if a workflow run should be allowed to proceed or wait */ +@JsonTypeIdResolver(PriorityScorerIdResolver.class) +@JsonTypeInfo(use = Id.CUSTOM, include = As.PROPERTY, property = "type") +public interface PriorityScorer { + + final class PriorityScorerIdResolver extends TypeIdResolverBase { + + private final Map> knownIds = + ServiceLoader.load(PriorityScorerProvider.class).stream() + .map(Provider::get) + .flatMap(PriorityScorerProvider::types) + .collect(Collectors.toMap(Pair::first, Pair::second)); + + @Override + public Id getMechanism() { + return Id.CUSTOM; + } + + @Override + public String idFromValue(Object o) { + return knownIds.entrySet().stream() + .filter(known -> known.getValue().isInstance(o)) + .map(Entry::getKey) + .findFirst() + .orElseThrow(); + } + + @Override + public String idFromValueAndType(Object o, Class aClass) { + return idFromValue(o); + } + + @Override + public JavaType typeFromId(DatabindContext context, String id) throws IOException { + final var clazz = knownIds.get(id); + return clazz == null ? null : context.constructType(clazz); + } + } + + /** + * Determine if the workflow run can begin now + * + * @param workflowName the name of the workflow + * @param workflowVersion the version of the workflow + * @param vidarrId the workflow run identifier + * @param created the time when the workflow run was first submitted + * @param workflowMaxInFlight the max-in-flight value for this workflow, if available + * @param score the computed score for this workflow run + * @return true if the workflow run may proceed; false if it should wait + */ + boolean compute( + String workflowName, + String workflowVersion, + String vidarrId, + Instant created, + OptionalInt workflowMaxInFlight, + int score); + /** + * Priority scorers may provide an optional HTTP API to extend their functionality that will be + * accessible through their parent consumable resource. + * + *

Paths will be automatically prefixed with the instance's name, so the HTTP handler can + * assume it is at the root URL (i.e., /). + * + * @return the HTTP handler to use + */ + Optional httpHandler(); + + /** + * Indicate that Vidarr has restarted and this workflow run will be started even if this scorer + * would make it wait. + * + * @param workflowName the name of the workflow + * @param workflowVersion the version of the workflow + * @param vidarrId the identifier of the workflow run + */ + void recover(String workflowName, String workflowVersion, String vidarrId); + + /** + * Indicate that the workflow run has completed and the score should purge any state about it + * + * @param workflowName the name of the workflow + * @param workflowVersion the version of the workflow + * @param vidarrId the identifier of the workflow run + */ + void release(String workflowName, String workflowVersion, String vidarrId); + /** Perform any initialization required by this input */ + void startup(); +} diff --git a/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorerProvider.java b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorerProvider.java new file mode 100644 index 00000000..a08e23d7 --- /dev/null +++ b/vidarr-pluginapi/src/main/java/ca/on/oicr/gsi/vidarr/PriorityScorerProvider.java @@ -0,0 +1,10 @@ +package ca.on.oicr.gsi.vidarr; + +import ca.on.oicr.gsi.Pair; +import java.util.stream.Stream; + +/** Reads JSON configuration and instantiates priority scorer appropriately */ +public interface PriorityScorerProvider { + /** Provides the type names and classes this plugin provides */ + Stream>> types(); +} diff --git a/vidarr-pluginapi/src/main/java/module-info.java b/vidarr-pluginapi/src/main/java/module-info.java index 34709020..95575f80 100644 --- a/vidarr-pluginapi/src/main/java/module-info.java +++ b/vidarr-pluginapi/src/main/java/module-info.java @@ -1,6 +1,9 @@ import ca.on.oicr.gsi.vidarr.ConsumableResourceProvider; import ca.on.oicr.gsi.vidarr.InputProvisionerProvider; import ca.on.oicr.gsi.vidarr.OutputProvisionerProvider; +import ca.on.oicr.gsi.vidarr.PriorityFormulaProvider; +import ca.on.oicr.gsi.vidarr.PriorityInputProvider; +import ca.on.oicr.gsi.vidarr.PriorityScorerProvider; import ca.on.oicr.gsi.vidarr.RuntimeProvisionerProvider; import ca.on.oicr.gsi.vidarr.UnloadFilterProvider; import ca.on.oicr.gsi.vidarr.WorkflowEngineProvider; @@ -12,12 +15,15 @@ * plugins are expected to provide as well as accessory data required or provided. */ module ca.on.oicr.gsi.vidarr.pluginapi { - uses UnloadFilterProvider; - uses WorkflowEngineProvider; uses ConsumableResourceProvider; uses InputProvisionerProvider; uses OutputProvisionerProvider; + uses PriorityFormulaProvider; + uses PriorityInputProvider; + uses PriorityScorerProvider; uses RuntimeProvisionerProvider; + uses UnloadFilterProvider; + uses WorkflowEngineProvider; exports ca.on.oicr.gsi.vidarr; exports ca.on.oicr.gsi.vidarr.api; diff --git a/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/PrometheusPriorityInput.java b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/PrometheusPriorityInput.java new file mode 100644 index 00000000..c7e8f618 --- /dev/null +++ b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/PrometheusPriorityInput.java @@ -0,0 +1,129 @@ +package ca.on.oicr.gsi.vidarr.prometheus; + +import ca.on.oicr.gsi.Pair; +import ca.on.oicr.gsi.vidarr.BasicType; +import ca.on.oicr.gsi.vidarr.PriorityInput; +import ca.on.oicr.gsi.vidarr.PriorityInputProvider; +import com.fasterxml.jackson.databind.JsonNode; +import io.undertow.server.HttpHandler; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +public final class PrometheusPriorityInput implements PriorityInput { + private static boolean checkLabel(Map metric, String label, String value) { + if (label == null) { + return true; + } + final var entry = metric.get(label); + return entry != null && entry.equals(value); + } + + public static PriorityInputProvider provider() { + return () -> Stream.of(new Pair<>("prometheus", PrometheusPriorityInput.class)); + } + + private VectorCache cache; + private Integer cacheRequestTimeout; + private Integer cacheTtl; + private int defaultPriority; + private List labels; + private String query; + private String url; + private String workflowNameLabel; + private String workflowVersionLabel; + + @Override + public int compute(String workflowName, String workflowVersion, Instant created, JsonNode input) { + return cache + .get() + .filter( + result -> + labels.stream() + .allMatch( + label -> { + final var entry = result.getMetric().get(label); + return entry != null && entry.equals(input.get(label).asText()); + }) + && checkLabel(result.getMetric(), workflowNameLabel, workflowName) + && checkLabel(result.getMetric(), workflowVersionLabel, workflowVersion)) + .flatMap(result -> result.getValue().stream()) + .mapToInt(Float::intValue) + .findFirst() + .orElse(defaultPriority); + } + + public int getDefaultPriority() { + return defaultPriority; + } + + public List getLabels() { + return labels; + } + + public String getQuery() { + return query; + } + + public String getUrl() { + return url; + } + + public String getWorkflowNameLabel() { + return workflowNameLabel; + } + + public String getWorkflowVersionLabel() { + return workflowVersionLabel; + } + + @Override + public Optional httpHandler() { + return Optional.empty(); + } + + @Override + public BasicType inputFromSubmitter() { + return BasicType.object(labels.stream().map(label -> new Pair<>(label, BasicType.STRING))); + } + + public void setCacheRequestTimeout(Integer cacheRequestTimeout) { + this.cacheRequestTimeout = cacheRequestTimeout; + } + + public void setCacheTtl(Integer cacheTtl) { + this.cacheTtl = cacheTtl; + } + + public void setDefaultPriority(int defaultPriority) { + this.defaultPriority = defaultPriority; + } + + public void setLabels(List labels) { + this.labels = labels; + } + + public void setQuery(String query) { + this.query = query; + } + + public void setUrl(String url) { + this.url = url; + } + + public void setWorkflowNameLabel(String workflowNameLabel) { + this.workflowNameLabel = workflowNameLabel; + } + + public void setWorkflowVersionLabel(String workflowVersionLabel) { + this.workflowVersionLabel = workflowVersionLabel; + } + + @Override + public void startup(String resourceName, String inputName) { + cache = + new VectorCache(resourceName + " " + inputName, url, query, cacheRequestTimeout, cacheTtl); + } +} diff --git a/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/QueryResponseDto.java b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/QueryResponseDto.java new file mode 100644 index 00000000..ee800dcf --- /dev/null +++ b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/QueryResponseDto.java @@ -0,0 +1,23 @@ +package ca.on.oicr.gsi.vidarr.prometheus; + +public final class QueryResponseDto { + + private VectorDataDto data; + private String status; + + public VectorDataDto getData() { + return data; + } + + public String getStatus() { + return status; + } + + public void setData(VectorDataDto data) { + this.data = data; + } + + public void setStatus(String status) { + this.status = status; + } +} diff --git a/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorCache.java b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorCache.java new file mode 100644 index 00000000..1dfc992a --- /dev/null +++ b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorCache.java @@ -0,0 +1,51 @@ +package ca.on.oicr.gsi.vidarr.prometheus; + +import static ca.on.oicr.gsi.vidarr.prometheus.AlertmanagerAutoInhibitConsumableResource.MAPPER; + +import ca.on.oicr.gsi.cache.ReplacingRecord; +import ca.on.oicr.gsi.cache.ValueCache; +import ca.on.oicr.gsi.vidarr.JsonBodyHandler; +import java.net.URI; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublishers; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.stream.Stream; + +public final class VectorCache extends ValueCache> { + private static final HttpClient HTTP_CLIENT = HttpClient.newHttpClient(); + private final String prometheusUrl; + private final String query; + private final Integer requestTimeout; + + public VectorCache( + String name, String prometheusUrl, String query, Integer requestTimeout, Integer ttl) { + super("prometheus " + name, ttl, ReplacingRecord::new); + this.prometheusUrl = prometheusUrl; + this.query = query; + this.requestTimeout = requestTimeout; + } + + protected Stream fetch(Instant lastUpdated) throws Exception { + if (prometheusUrl == null) { + return Stream.empty(); + } + var response = + HTTP_CLIENT.send( + HttpRequest.newBuilder(URI.create(String.format("%s/api/v1/query", prometheusUrl))) + .POST( + BodyPublishers.ofString( + "query=" + URLEncoder.encode(query, StandardCharsets.UTF_8))) + .timeout(Duration.ofMinutes(requestTimeout)) + .build(), + new JsonBodyHandler<>(MAPPER, QueryResponseDto.class)); + final var result = response.body().get(); + if (result == null || result.getData() == null) { + return Stream.empty(); + } + return result.getData().getResult().stream(); + } +} diff --git a/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorDataDto.java b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorDataDto.java new file mode 100644 index 00000000..daacf7c0 --- /dev/null +++ b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorDataDto.java @@ -0,0 +1,25 @@ +package ca.on.oicr.gsi.vidarr.prometheus; + +import java.util.List; + +public final class VectorDataDto { + + private List result; + private String resultType; + + public List getResult() { + return result; + } + + public String getResultType() { + return resultType; + } + + public void setResult(List result) { + this.result = result; + } + + public void setResultType(String resultType) { + this.resultType = resultType; + } +} diff --git a/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorResultDto.java b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorResultDto.java new file mode 100644 index 00000000..7f75edaf --- /dev/null +++ b/vidarr-prometheus/src/main/java/ca/on/oicr/gsi/vidarr/prometheus/VectorResultDto.java @@ -0,0 +1,26 @@ +package ca.on.oicr.gsi.vidarr.prometheus; + +import java.util.List; +import java.util.Map; + +public final class VectorResultDto { + + private Map metric; + private List value; + + public Map getMetric() { + return metric; + } + + public List getValue() { + return value; + } + + public void setMetric(Map metric) { + this.metric = metric; + } + + public void setValue(List value) { + this.value = value; + } +} diff --git a/vidarr-prometheus/src/main/java/module-info.java b/vidarr-prometheus/src/main/java/module-info.java index f66bb1f6..1ff53d4e 100644 --- a/vidarr-prometheus/src/main/java/module-info.java +++ b/vidarr-prometheus/src/main/java/module-info.java @@ -1,5 +1,7 @@ import ca.on.oicr.gsi.vidarr.ConsumableResourceProvider; +import ca.on.oicr.gsi.vidarr.PriorityInputProvider; import ca.on.oicr.gsi.vidarr.prometheus.AlertmanagerAutoInhibitConsumableResource; +import ca.on.oicr.gsi.vidarr.prometheus.PrometheusPriorityInput; module ca.on.oicr.gsi.vidarr.prometheus { requires ca.on.oicr.gsi.vidarr.pluginapi; @@ -13,4 +15,6 @@ provides ConsumableResourceProvider with AlertmanagerAutoInhibitConsumableResource; + provides PriorityInputProvider with + PrometheusPriorityInput; } diff --git a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java b/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java index b2cbe7b2..401a87ab 100644 --- a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java +++ b/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/Main.java @@ -446,7 +446,6 @@ private static HttpHandler monitor(HttpHandler handler) { private final Map inputProvisioners; private final Semaphore loadCounter = new Semaphore(3); private final MaxInFlightByWorkflow maxInFlightPerWorkflow = new MaxInFlightByWorkflow(); - private final PriorityByWorkflow priorityPerWorkflow = new PriorityByWorkflow(); private final Map otherServers; private final Map outputProvisioners; private final int port; @@ -559,9 +558,7 @@ public void emit(SectionRenderer sectionRenderer) new Pair<>(name, consumableResources.get(name))), Stream.of( new Pair( - "", maxInFlightPerWorkflow), - new Pair( - "priority", priorityPerWorkflow))) + "", maxInFlightPerWorkflow))) .collect(Collectors.toList()); private final WorkflowEngine engine = workflowEngines.get(e.getValue().getWorkflowEngine()); @@ -719,41 +716,6 @@ protected Optional targetByName(String name) { .select(WORKFLOW.NAME, WORKFLOW.MAX_IN_FLIGHT) .from(WORKFLOW) .forEach(record -> maxInFlightPerWorkflow.set(record.value1(), record.value2())); - - DSL.using(connection) - .select( - WORKFLOW.NAME, - WORKFLOW_RUN.HASH_ID, - ACTIVE_WORKFLOW_RUN.CONSUMABLE_RESOURCES.cast(String.class)) - .from(WORKFLOW) - .join(WORKFLOW_VERSION) - .on(WORKFLOW.NAME.eq(WORKFLOW_VERSION.NAME)) - .join(WORKFLOW_RUN) - .on(WORKFLOW_RUN.WORKFLOW_VERSION_ID.eq(WORKFLOW_VERSION.ID)) - .join(ACTIVE_WORKFLOW_RUN) - .on(WORKFLOW_RUN.ID.eq(ACTIVE_WORKFLOW_RUN.ID)) - .where(ACTIVE_WORKFLOW_RUN.ENGINE_PHASE.eq(Phase.WAITING_FOR_RESOURCES)) - .forEach( - record -> { - try { - priorityPerWorkflow.set( - record.value1(), - record.value2(), - Optional.ofNullable( - MAPPER.readTree( - (record.value3() == null || record.value3() == null) - ? "{}" - : record.value3()))); - } catch (JsonProcessingException e) { - // not a disaster; we might just get some things running out of priority - // until max-in-flight gets saturated - System.out.println( - "Failed to serialize the consumable resources field" - + " on active workflow run for priority by workflow on startup:" - + " some actions may temporarily be run out of priority."); - System.out.println(e.getMessage()); - } - }); } unloadDirectory = Path.of(configuration.getUnloadDirectory()); diff --git a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflow.java b/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflow.java deleted file mode 100644 index 38333e73..00000000 --- a/vidarr-server/src/main/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflow.java +++ /dev/null @@ -1,181 +0,0 @@ -package ca.on.oicr.gsi.vidarr.server; - -import static java.util.Map.Entry.comparingByValue; - -import ca.on.oicr.gsi.Pair; -import ca.on.oicr.gsi.vidarr.BasicType; -import ca.on.oicr.gsi.vidarr.ConsumableResource; -import ca.on.oicr.gsi.vidarr.ConsumableResourceProvider; -import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse; -import com.fasterxml.jackson.databind.JsonNode; -import io.prometheus.client.Gauge; -import java.time.Instant; -import java.util.AbstractMap.SimpleEntry; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.SortedSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -public final class PriorityByWorkflow implements ConsumableResource { - - // 1 is lowest priority - // 4 is highest priority (will be given access to resources first) - private final List acceptedPriorities = Arrays.asList(1, 2, 3, 4); - - public static ConsumableResourceProvider provider() { - return () -> Stream.of(new Pair<>("priority", PriorityByWorkflow.class)); - } - - private static final class WaitingState { - - private final SortedSet> waiting = - new ConcurrentSkipListSet>(comparingByValue()) { - - // Replace if key matches, even if entire pair doesn't - @Override - public boolean add(SimpleEntry simpleEntry) { - SimpleEntry entry = this.getByKey(simpleEntry.getKey()); - if (entry != null) { - super.remove(entry); - } - return super.add(simpleEntry); - } - - public SimpleEntry getByKey(String simpleEntryKey) { - for (SimpleEntry entry : this) { - if (entry.getKey().equals(simpleEntryKey)) { - return (entry); - } - } - return null; - } - - // Override to remove by key instead of pair - @Override - public boolean remove(Object o) { - try { - String oEntry = (String) o; - for (SimpleEntry entry : this) { - if (entry.getKey().equals(oEntry)) { - super.remove(entry); - } - } - } catch (Exception e) { - return false; - } - return true; - } - }; - } - - // not currently monitored by anything - potentially remove? - private static final Gauge currentInPriorityWaitingCount = - Gauge.build( - "vidarr_in_priority_waiting_per_workflow_current", - "The current number of workflows that are on priority waiting to run.") - .labelNames("workflow") - .register(); - - private final Map workflows = new ConcurrentHashMap<>(); - - @Override - public Optional> inputFromSubmitter() { - return Optional.of(new Pair<>("priority", BasicType.INTEGER.asOptional())); - } - - @Override - public void recover( - String workflowName, - String workflowVersion, - String vidarrId, - Optional resourceJson) { - // Do nothing, as once the workflow run launches, it is no longer tracked in PriorityByWorkflow - } - - @Override - public void release( - String workflowName, String workflowVersion, String vidarrId, Optional input) { - // If workflow run did not launch, re-add to waiting list - if (input.isPresent()) { - set(workflowName, vidarrId, input); - } - // Otherwise do nothing - } - - @Override - public synchronized ConsumableResourceResponse request( - String workflowName, - String workflowVersion, - String vidarrId, - Instant created, - OptionalInt workflowMaxInFlight, - Optional input) { - - int workflowPriority = Collections.min(acceptedPriorities); - if (input.isPresent()) { - workflowPriority = input.get().asInt(); - } - - if (!acceptedPriorities.contains(workflowPriority)) { - return ConsumableResourceResponse.error( - String.format( - "Vidarr error: The workflow '%s' run's priority (%d) is invalid. Priority " - + "values should be one of the following: %s", - workflowName, - workflowPriority, - acceptedPriorities.stream().map(String::valueOf).collect(Collectors.joining(", ")))); - } - - final var state = workflows.get(workflowName); - if (state == null || state.waiting.isEmpty()) { - return ConsumableResourceResponse.AVAILABLE; - } - - // If this workflow has already been seen - // Add the current run to the waitlist - // ensuring it replaces previous runs with the same ID, accounting for if the priority has - // changed - set(workflowName, vidarrId, input); - - if (workflowPriority >= state.waiting.last().getValue()) { - state.waiting.remove(vidarrId); - currentInPriorityWaitingCount.labels(workflowName).set(state.waiting.size()); - return ConsumableResourceResponse.AVAILABLE; - } else { - currentInPriorityWaitingCount.labels(workflowName).set(state.waiting.size()); - return ConsumableResourceResponse.error( - String.format( - "There are %s workflows currently queued up with higher priority.", workflowName)); - } - } - - public void set(String workflowName, String vidarrId, Optional input) { - - int workflowPriority = Collections.min(acceptedPriorities); - if (input.isPresent()) { - workflowPriority = input.get().asInt(); - } - - final var stateWaiting = - workflows.computeIfAbsent(workflowName, k -> new WaitingState()).waiting; - stateWaiting.add(new SimpleEntry(vidarrId, workflowPriority)); - currentInPriorityWaitingCount.labels(workflowName).set(stateWaiting.size()); - } - - @Override - public void startup(String name) { - // Always ok. - } - - @Override - public boolean isInputFromSubmitterRequired() { - return false; - } -} diff --git a/vidarr-server/src/main/java/module-info.java b/vidarr-server/src/main/java/module-info.java index 56b8f6b6..d063b4a2 100644 --- a/vidarr-server/src/main/java/module-info.java +++ b/vidarr-server/src/main/java/module-info.java @@ -4,7 +4,6 @@ import ca.on.oicr.gsi.vidarr.RuntimeProvisionerProvider; import ca.on.oicr.gsi.vidarr.UnloadFilterProvider; import ca.on.oicr.gsi.vidarr.WorkflowEngineProvider; -import ca.on.oicr.gsi.vidarr.server.PriorityByWorkflow; module ca.on.oicr.gsi.vidarr.server { exports ca.on.oicr.gsi.vidarr.server; @@ -42,9 +41,6 @@ com.fasterxml.jackson.databind; opens db.migration; - provides ConsumableResourceProvider with - PriorityByWorkflow; - uses ConsumableResourceProvider; uses InputProvisionerProvider; uses OutputProvisionerProvider; diff --git a/vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java b/vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java deleted file mode 100644 index a68ebecc..00000000 --- a/vidarr-server/src/test/java/ca/on/oicr/gsi/vidarr/server/PriorityByWorkflowTest.java +++ /dev/null @@ -1,341 +0,0 @@ -package ca.on.oicr.gsi.vidarr.server; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse; -import ca.on.oicr.gsi.vidarr.ConsumableResourceResponse.Visitor; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.time.Instant; -import java.util.Optional; -import java.util.OptionalInt; -import org.junit.Before; -import org.junit.Test; - -public class PriorityByWorkflowTest { - - private PriorityByWorkflow sut; - private ObjectMapper mapper = new ObjectMapper(); - - String workflow = "test"; - String version = "1.0"; - ConsumableResourceResponse.Visitor> consumableResourceCheckerVisitor = - new Visitor>() { - @Override - public Optional available() { - return Optional.empty(); - } - - @Override - public void clear(String name) {} - - @Override - public Optional error(String message) { - return Optional.of(message); - } - - @Override - public void set(String name, long value) {} - - @Override - public Optional unavailable() { - return Optional.of(String.format("Resource is not available")); - } - }; - - @Before - public void instantiate() { - sut = new PriorityByWorkflow(); - } - - @Test - public void testRequest_invalidPriorityReturnsError() { - JsonNode invalidJson = mapper.valueToTree(5); - - Optional requestError = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(invalidJson)) - .apply(consumableResourceCheckerVisitor); - assertTrue(requestError.isPresent()); - assertEquals( - requestError.get(), - "Vidarr error: The workflow 'test' run's priority (5) is " - + "invalid. Priority values should be one of the following: 1, 2, 3, 4"); - } - - @Test - public void testRequest_emptyInputAndEmptyWaitingNoError() { - Optional requestError = - sut.request( - workflow, version, "abcdef", Instant.EPOCH, OptionalInt.empty(), Optional.empty()) - .apply(consumableResourceCheckerVisitor); - - assertTrue(requestError.isEmpty()); - } - - @Test - public void testRequest_emptyInputAndEmptyWaitingIsAvailable() { - - var response = - sut.request( - workflow, version, "abcdef", Instant.EPOCH, OptionalInt.empty(), Optional.empty()); - - assertEquals(response, ConsumableResourceResponse.AVAILABLE); - } - - @Test - public void testRequest_validInputAndEmptyWaitingIsOk() { - JsonNode validJson = mapper.valueToTree(2); - Optional requestError = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(validJson)) - .apply(consumableResourceCheckerVisitor); - - assertTrue(requestError.isEmpty()); - } - - @Test - public void testIfWorkflowRunWithHigherPriorityExists_thenWorkflowRunDoesNotLaunch() { - JsonNode higherPriority = mapper.valueToTree(4); - JsonNode lowerPriority = mapper.valueToTree(2); - - sut.set(workflow, "qwerty", Optional.of(higherPriority)); - - Optional requestError = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(lowerPriority)) - .apply(consumableResourceCheckerVisitor); - - assertTrue(requestError.isPresent()); - assertEquals( - requestError.get(), - "There are test workflows currently queued up with higher " + "priority."); - } - - @Test - public void testIfWorkflowRunWithLowerPriorityExists_thenWorkflowRunDoesNotError() { - JsonNode higherPriority = mapper.valueToTree(4); - JsonNode lowerPriority = mapper.valueToTree(2); - - sut.set(workflow, "qwerty", Optional.of(lowerPriority)); - - Optional requestError = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(higherPriority)) - .apply(consumableResourceCheckerVisitor); - - assertTrue(requestError.isEmpty()); - } - - @Test - public void testIfWorkflowRunWithLowerPriorityExists_thenWorkflowRunLaunches() { - JsonNode higherPriority = mapper.valueToTree(4); - JsonNode lowerPriority = mapper.valueToTree(2); - - sut.set(workflow, "qwerty", Optional.of(lowerPriority)); - - var response = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(higherPriority)); - - assertEquals(response, ConsumableResourceResponse.AVAILABLE); - } - - @Test - public void testIfWorkflowRunWithSamePriorityExists_thenWorkflowDoesNotError() { - JsonNode twoPriority = mapper.valueToTree(2); - JsonNode twoPriorityAlso = mapper.valueToTree(2); - - sut.set(workflow, "qwerty", Optional.of(twoPriority)); - - Optional requestError = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(twoPriorityAlso)) - .apply(consumableResourceCheckerVisitor); - - assertTrue(requestError.isEmpty()); - } - - @Test - public void testIfWorkflowRunWithSamePriorityExists_thenWorkflowRunLaunches() { - JsonNode twoPriority = mapper.valueToTree(2); - JsonNode twoPriorityAlso = mapper.valueToTree(2); - - sut.set(workflow, "qwerty", Optional.of(twoPriority)); - - var response = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(twoPriorityAlso)); - - assertEquals(response, ConsumableResourceResponse.AVAILABLE); - } - - @Test - public void testIfWorkflowRunWIthEmptyPriorityIsSet_thenWorkflowRunIsAddedWithLowestPriority() { - JsonNode lowestPriority = mapper.valueToTree(1); - - Optional requestErrorForEmpty = - sut.request( - workflow, version, "abcdef", Instant.EPOCH, OptionalInt.empty(), Optional.empty()) - .apply(consumableResourceCheckerVisitor); - - Optional requestErrorForLowest = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(lowestPriority)) - .apply(consumableResourceCheckerVisitor); - - assertTrue(requestErrorForEmpty.isEmpty()); - assertTrue(requestErrorForLowest.isEmpty()); - } - - @Test - public void testIfResourceIsReleasedButNotLaunched_thenLowerPriorityCannotLaunch() { - - JsonNode higherPriority = mapper.valueToTree(4); - JsonNode lowerPriority = mapper.valueToTree(1); - - Optional requestErrorHigher = - sut.request( - workflow, - version, - "qwerty", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(higherPriority)) - .apply(consumableResourceCheckerVisitor); - - sut.release(workflow, version, "qwerty", Optional.of(higherPriority)); - - Optional requestErrorLower = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(lowerPriority)) - .apply(consumableResourceCheckerVisitor); - - assertTrue(requestErrorHigher.isEmpty()); - assertTrue(requestErrorLower.isPresent()); - assertEquals( - requestErrorLower.get(), - "There are test workflows currently queued up with higher " + "priority."); - } - - @Test - public void testIfResourceIsReleasedAndLaunched_thenLowerPriorityCanLaunch() { - - JsonNode higherPriority = mapper.valueToTree(4); - JsonNode lowerPriority = mapper.valueToTree(1); - - Optional requestErrorHigher = - sut.request( - workflow, - version, - "qwerty", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(higherPriority)) - .apply(consumableResourceCheckerVisitor); - - sut.release(workflow, version, "qwerty", Optional.empty()); - - Optional requestErrorLower = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(lowerPriority)) - .apply(consumableResourceCheckerVisitor); - - assertTrue(requestErrorHigher.isEmpty()); - assertTrue(requestErrorLower.isEmpty()); - } - - @Test - public void testIfResourcePassesRequest_thenSamePriorityCanLaunch() { - - JsonNode validJson = mapper.valueToTree(2); - - sut.request( - workflow, version, "qwerty", Instant.EPOCH, OptionalInt.empty(), Optional.of(validJson)) - .apply(consumableResourceCheckerVisitor); - - Optional requestError = - sut.request( - workflow, - version, - "abcdef", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(validJson)) - .apply(consumableResourceCheckerVisitor); - - assertTrue(requestError.isEmpty()); - } - - @Test - public void testIfWorkflowLowersPriority_WorkflowIsNotBlockedBySelf() { - - JsonNode higherPriority = mapper.valueToTree(4); - JsonNode lowerPriority = mapper.valueToTree(2); - - sut.set(workflow, "qwerty", Optional.of(higherPriority)); - - Optional requestErrorLower = - sut.request( - workflow, - version, - "qwerty", - Instant.EPOCH, - OptionalInt.empty(), - Optional.of(lowerPriority)) - .apply(consumableResourceCheckerVisitor); - - assertTrue(requestErrorLower.isEmpty()); - } -}