Skip to content

Commit

Permalink
Discussion comments
Browse files Browse the repository at this point in the history
Signed-off-by: Katrina Rogan <[email protected]>
  • Loading branch information
katrogan committed Nov 25, 2024
1 parent cce3e69 commit dd58406
Showing 1 changed file with 66 additions and 102 deletions.
168 changes: 66 additions & 102 deletions rfc/system/RFC-5659-execution-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ my_lp = LaunchPlan.get_or_create(
```

### FlyteIDL
We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime
We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime and embedding it in the existing [Schedule](https://github.com/flyteorg/flyte/blob/master/flyteidl/protos/flyteidl/admin/schedule.proto) message

```protobuf
message Concurrency {
Expand All @@ -59,7 +59,7 @@ enum ConcurrencyPolicy {
REPLACE = 3;
}
message LaunchPlanSpec {
message Schedule {
...
Concurrency concurrency = X;
Expand All @@ -79,51 +79,75 @@ message ExecutionStateChangeDetails {
```

### FlyteAdmin
At a broad level
### Concurrency Controller Singleton
At a broad level, we'll follow the precedent of the [scheduler](https://github.com/flyteorg/flyte/tree/master/flyteadmin/scheduler) defined in FlyteAdmin and define a singleton to manage concurrency across all launch plans.

1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy
1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`.
1. or fail the request when the concurrency policy is set to `ABORT`
1. Do not create the workflow CRD

Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions:
1. 1x a minute: Query all pending executions by timestamp ascending, grouped by launch plan ID, roughly something like
```sql
SELECT e.*
FROM executions AS e
WHERE ( launch_plan_id, created_at ) IN (SELECT launch_plan_id,
Min(created_at)
FROM executions
WHERE phase = 'PENDING'
GROUP BY launch_plan_id);
```
2. For each execution returned by the above query, `Add()` the pending execution to a [rate limiting workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/rate_limiting_queue.go#L27-L40) (as a suggestion)
3. In a separate goroutine, fetch items from the workqueue and individually process each execution entry
1. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions matching the launch plan ID in the pending execution model
```sql
select count(launch_plan_id) from executions where phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id;
```
1. If there are fewer than `MAX_CONCURRENCY` executions running
1. check that the execution is still in `PENDING`
1. create the workflow CRD
1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully
1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim
1. If creating the workflow CRD fails: mark the execution as `FAILED` in the db (and swallow any errors if the workflow is already in a terminal state should we have previously reported the failure after re-enqueuing the pending execution a previous loop). This will remove its eligiblity from the pending loop
1. If there are already `MAX_CONCURRENCY` executions running, simply proceed to (iii.)
1. Finally, always mark the queue item as [Done()](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L33)

If we wanted further parallelization here, we could introduce a worker pool rather than having one async process read from the workqueue.
Introduce the Concurrency Controller to poll for all pending executions:
1. Upon start-up, initialize a launch plan informer and a worker pool and spawn N number of worker threads.
1. The launch plan informer will be responsible for keeping a map of launch plans, by [NamedEntityIdentifier](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteidl/protos/flyteidl/admin/common.proto) (that is across versions) and their concurrency policy: `map[admin.NamedEntityIdentifier]admin.Schedule`
1. Periodically query the DB for pending executions `SELECT * FROM executions WHERE phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id;`
1. For each `PENDING` execution returned by the above query, `Add()` the pending execution to a [workqueue](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go). We can fine tune in the future to include differentiated priority.
1. For each non-`PENDING` execution returned by the above query, update the map of active executions by launch plan named entity into a thread-safe Map of type `rawActiveLaunchPlanExecutions map[admin.NamedEntityIdentifier]util.Set[admin.Execution]` (e.g. using this [set]("k8s.io/apimachinery/pkg/util/sets") library)
1. After processing the complete set of non-terminal executions, transform the `rawActiveLaunchPlanExecutions` map into a thread-safe, ordered list of executions by creation time: `activeLaunchPlanExecutions map[admin.NamedEntityIdentifier][]*core.WorkflowExecutionIdentifier` using an implementation where different keys can be accessed concurrently.
1. For each worker in the workqueue:
1. Check a separate in-memory map populated launch plan informer to see:
1. If the launch plan no longer has a concurrency policy, proceed to create the execution, see below
1. If the launch plan has an active concurrency policy and max executions has been reached: proceed to respect the concurrency policy:
1. `WAIT`: do nothing
1. `ABORT`: mark the execution as `FAILED` in the db with a sensible error explaining the concurrency policy was violated
1. `REPLACE`: terminate the oldest execution for the execution's launch plan in `activeLaunchPlanExecutions`. If this succeeds, or it's already terminated, then proceed to create the new execution: see below
1. If the launch plan has an active concurrency policy and max executions have not been reached:
1. Proceed to create the execution, see below
1. Finally, always mark the queue item as [Done()](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L33)

Creating an execution
1. create the workflow CRD
1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully
1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim
1. If creating the workflow CRD fails: mark the execution as `FAILED` in the db (and swallow any errors if the workflow is already in a terminal state should we have previously reported the failure after re-enqueuing the pending execution a previous loop). This will remove its eligiblity from the pending loop
2. Upon successful creation of the workflow CRD, append the execution identifier to `activeLaunchPlanExecutions` for the launch plan named entity

#### Launch Plan informer
This is an async process we run in the Concurrency Controller to ensure we have an eventually consistent view of launch plans.

Upon Concurrency Controller start-up, we'll query the DB for all active launch plans and populate a map of active launch plans: `map[admin.NamedEntityIdentifier]admin.Schedule

Periodically, the informer will re-issue the query, optionally filtering by [UpdatedAt](https://github.com/flyteorg/flyte/blob/master/datacatalog/pkg/repositories/models/base.go#L7) to only fetch launch plans that have been updated since the last query to repopulate the map.


### Flyte Admin changes
### Execution Manager
Because we fetch the launch plan to reconcile execution inputs at CreateExecution time, we'll have the concurrency policy available to us at the time of execution creation.
If there is no concurrency policy defined, we'll proceed as [normal](https://github.com/flyteorg/flyte/blob/f14348165ccdfb26f8509c0f1ef380a360e59c4d/flyteadmin/pkg/manager/impl/execution_manager.go#L1169-L1173) and create the workflow execution CRD and then create a database entry for the execution with phase `UNKNOWN`. This way, we don't incur any penalty for executions

If there is a concurrency policy defined, we'll create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails` _but will not create a workflow CRD_


#### Database
For performance, we can introduce new fields to denormalize the launch plan named entity the execution was launched by
In [models/execution.go](https://github.com/flyteorg/flyte/blob/25cfe16940f10f9bbef02e288c823db16eb37609/flyteadmin/pkg/repositories/models/execution.go)
```go
model Execution {
...
LaunchPlanProject string
LaunchPlanDomain string
LaunchPlanName string
}
````

We should consider adding an index to the executions table to include
- launch_plan_id
- phase==PENDING only (in order to safeguard for well-populated flyteadmin instances with lots of completed, historical executions)
- created_at
- phase in (`PENDING`, `QUEUED`, `RUNNING`) only (in order to safeguard for well-populated flyteadmin instances with lots of completed, historical executions)

##### Concurrency across launch plan versions
##### Concurrency by specified launch plan versions
Executions are always tied to the versioned launch plan that triggered them (see [here](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/execution.go#L26))
Therefore, this proposal only applies concurrency at the versioned launch plan level.
Therefore, this proposal only applies concurrency at the launch plan Named Entity level, that is across (project, domain, version).

If we wanted to support concurrency across launch plan versions:
If we wanted to support concurrency by launch plan versions, we'd introduce `LaunchPlanVersion` to the execution model and update the keys for the in memory maps to be by versioned launch plan rather than NamedEntityIdentifier.
We could update usage like so
Expand Down Expand Up @@ -155,49 +179,14 @@ message Concurrency {
}
enum ConcurrencyLevel {
UNSPECIFIED = 0;
// Applies concurrency limits across all launch plan versions.
LAUNCH_PLAN = 1;
LAUNCH_PLAN = 0;
// Applies concurrency at the versioned launch plan level
LAUNCH_PLAN_VERSION = 2;
LAUNCH_PLAN_VERSION = 1;
}
```

We could add another index to the Executions table to include the launch plan named entity, that is the entry in the [NamedEntity table](https://github.com/flyteorg/flyte/blob/38883c721dac2875bdd2333f4cd56e757e81ea5f/flyteadmin/pkg/repositories/models/named_entity.go#L39-L42) corresponding to the launch plan project, domain & name

In models/execution.go:
```go
type Execution struct {
...
// Already exists
LaunchPlanID uint `gorm:"index"`
// New field to make querying on the named entity
LaunchPlanNamedEntityID uint `gorm:"index"`
// New field to make querying on concurrency policy by the reconciliation loop easier
ConcurrencyLevel uint32
}
```

Then the reconciliation loop would query executions in a non-terminal phase matching the launch plan named entity ID instead of LaunchPlanID based on the ConcurrencyLevel.

```sql
SELECT e.*
FROM executions AS e
WHERE ( launch_plan_named_entity_id, created_at ) IN (SELECT launch_plan_named_entity_id,
Min(created_at)
FROM executions
WHERE phase = 'PENDING' AND concurrency_level = 2;
GROUP BY launch_plan_named_entity_id);
```

Note, in this proposal, registering a new version of the launch plan and setting it to active will determine the concurrency policy across all launch plan versions.
#### Prior Art
Expand All @@ -222,13 +211,11 @@ already has indices on
- state

Database performance suffers as new indices are added (ref [[1](https://use-the-index-luke.com/sql/dml/insert)] [[2](https://www.timescale.com/learn/postgresql-performance-tuning-optimizing-database-indexes)])
We could as an alternative, repurpose the existing launch plan index to include (launch plan id, phase, created at) to optimize the query for pending executions and not significantly affect queries on launch plan id leveraging the existing index.

## 6 Alternatives

### Scheduling
This proposal purposefully uses FIFO scheduling. But this does not preclude defining other scheduling orders or catch-up policies in the future.
This proposal purposefully uses random scheduling. But this does not preclude defining other scheduling orders or catch-up policies in the future.

To accomplish this, we can extend the `ConcurrenyPolicy` proto message to encapsulate scheduling behavior

Expand All @@ -251,32 +238,9 @@ type ConcurrencyScheduling enum {
}
```

Furthermore, we may want to introduce a max pending period to fail executions that have been in `PENDING` for too long
When we process the pending executions in the Concurrency Controller, we can sort the pending executions by creation time in ascending or descending order based on the scheduling policy.

### Other concurrency policies: Terminate priors on execution
What if we actually want to terminate existing executions when the concurrency limit is reached?
In practice this could work by adding a new `ConcurrencyPolicy` enum for `RUN_IMMEDIATELY`
And the reconciliation loop would now proceed like so
In a separate goroutine, fetch items from the workqueue and individually process each execution entry
1. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions matching the launch plan ID in the pending execution model
```sql
select count(launch_plan_id) from executions where phase not in ('SUCCEEDED', 'FAILED', 'ABORTED', 'TIMED_OUT') group by launch_plan_id;
```
1. If there are fewer than `MAX_CONCURRENCY` executions running
1. check that the execution is still in `PENDING`
1. create the workflow CRD
1. if the CRD already exists because we've previously processed this pending execution before we had a chance to update the DB state, swallow the already exists error gracefully
1. conditionally mark the execution as `QUEUED` in the db if it's already in `PENDING` or `QUEUED` to not overwrite any events should the execution have already reported progress in the interim
1. If creating the workflow CRD fails: mark the execution as `FAILED` in the db (and swallow any errors if the workflow is already in a terminal state should we have previously reported the failure after re-enqueuing the pending execution a previous loop). This will remove its eligiblity from the pending loop
1. If there are already `MAX_CONCURRENCY` executions running
1. Retrieve n executions where n = count(actively running executions) - MAX_CONCURRENCY (ordered by creation time, ascending so we kill the oldest executions first)
2. Kill each execution
3. Proceed to (1) above.
1. Finally, always mark the queue item as [Done()](https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L33)
Furthermore, we may want to introduce a max pending period to fail executions that have been in `PENDING` for too long


## 7 Potential Impact and Dependencies
Expand Down

0 comments on commit dd58406

Please sign in to comment.