Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Add execution concurrency #5659

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
165 changes: 165 additions & 0 deletions rfc/system/RFC-0000-execution-concurrency.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please change the filename to include the PR number?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks

Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# [RFC Template] Title

**Authors:**

- @eapolinario
- @katrogan

## 1 Executive Summary

This is a proposal to implement workflow execution concurrency, defined at the launch plan level.

## 2 Motivation

See the following issues
1. https://github.com/flyteorg/flyte/issues/267
2. https://github.com/flyteorg/flyte/issues/420
3. https://github.com/flyteorg/flyte/discussions/3754
4. https://github.com/flyteorg/flyte/issues/5125

## 3 Proposed Implementation

Introduce a new attribute in [LaunchPlan.get_or_create](https://github.com/flyteorg/flytekit/blob/bc2e000cc8d710ed3d135cdbf3cbf257c5da8100/flytekit/core/launch_plan.py#L195) to allow specifying execution concurrency

e.g.
```python
my_lp = LaunchPlan.get_or_create(
name="my_serial_lp",
workflow=my_wf,
...
concurrency=Concurrency(
max=1, # defines how many executions with this launch plan can run in parallel
policy=ConcurrencyPolicy.WAIT # defines the policy to apply when the max concurrency is reached
)
)
```

### FlyteIDL
We propose adding a new IDL message to capture concurrency behavior at CreateExecutionTime

```protobuf
message Concurrency {
// Defines how many executions with this launch plan can run in parallel
uint32 max = 1;

// Defines how to handle the execution when the max concurrency is reached.
ConcurrencyPolicy policy = 2;
}

enum ConcurrencyPolicy {
UNSPECIFIED = 0;

// wait for previous executions to terminate before starting a new one
WAIT = 1;

// fail the CreateExecution request and do not permit the execution to start
ABORT = 2;
}

message LaunchPlanSpec {
...

Concurrency concurrency = X;
}

// embedded in the ExecutionClosure
message ExecutionStateChangeDetails {
...

// Includes the reason for the `PENDING` phase
string description = X;


}

// Can also add to ExecutionSpec to specify execution time overrides

```

### FlyteAdmin
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During last week's contributors meeting someone asked a question about having this concurrency control work across versions. Can we either have a discussion in this PR about it or list that use case as not being supported explicitly in the RFC?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can say that something that works across versions would be really useful for us.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For us too because we very often pyflyte run which means we often don't have two executions of the same version.

This could be made configurable here:

 concurrency=Concurrency(
        max=1,  # defines how many executions with this launch plan can run in parallel
        policy=ConcurrencyPolicy.WAIT  # defines the policy to apply when the max concurrency is reached,
        level=ConcurrencyLevel.Version, # or ConcurrencyLevel.LaunchPlan
    )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @eapolinario @corleyma @fg91 for the feedback, I don't think this will be too much of a lift but added a proposal for different levels of precision here too

At a broad level
1. At CreateExecution time, if the launch plan in the ExecutionSpec has a concurrency policy
1. Create the execution in the database with a new `PENDING` execution phase and reason populated in `ExecutionStateChangeDetails`.
1. or fail the request when the concurrency policy is set to `ABORT`
1. Do not create the workflow CRD

Introduce an async reconciliation loop in FlyteAdmin to poll for all pending executions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have prior art for this kind of reconciliation loop in flyteadmin?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the scheduler!

1. Query all pending executions by timestamp ascending (open question, should we prefer more recent executions instead? should we make this configurable?)
1. as an optimization, could even parallelize this into goroutines, one per distinct launch plan ID that has any `PENDING` execution
2. Check the database to see if there are fewer than `MAX_CONCURRENCY` non-terminal executions with an identical launch plan ID
3. If there are fewer than `MAX_CONCURRENCY` executions running, select the oldest pending execution for that launch plan
1. create the workflow CRD
1. open question: also update its phase in the database to `QUEUED`?
1. let execution proceed

We should consider adding an index to the executions table to include
eapolinario marked this conversation as resolved.
Show resolved Hide resolved
- launch_plan_id
- phase
- created_at

#### Open Questions
- Should we always attempt to schedule pending executions in ascending order of creation time?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make it configurable? FIFO, FILO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I wasn't sure! any suggestions here? we could introduce an enum and choose fifo to begin with and expand support

Copy link

@granthamtaylor granthamtaylor Aug 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have mixed thoughts on making the queue's order of execution configurable.

If we support a limited number of parallel executions (more than 1), the order of these executions would naturally start as FIFO up until that limit is reached.

To me, providing an option to begin executing FILO after that limit is reached feels confusing to me.

However, that brings a different question to mind: If multiple workflows are queued up, should we provide an option to enable loud notifications?

In other words, if backlogged executions have the possibility of impacting downstream operations, can we enable users to receive loud notifications, including the number of queued executions?

I can imagine a use case where: holiday shopping -> increased purchase volume -> increased data size -> multiple, consecutive execution delays -> cascading backlog of executions. In this scenario, the owners of the workflow may be out on leave and not be aware of the growing backlog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, we have workflow notifications enabled for terminal state but we've talked more about richer, customizable notifications and I think this slates neatly into that

I think for a v1 having the default behavior be fifo with an extended description/explanation for the pending state may provide some visibility here to start off with

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this suggestion of having an enum listing the policies to the Implementation details section?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add a customers feedback here, where the desired behaviour is to actually replace (terminate) the current executions by subsequent executions. Sounds like too much for the initial scope but still interested if this would be possible to add later with the current approach?

Copy link
Contributor Author

@katrogan katrogan Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fiedlerNr9 added a section under Alternatives. I don't think this is precluded by this implementation but not in scope for this proposal atm

- Should we propagate concurrency policies to child executions?

## 4 Metrics & Dashboards

*What are the main metrics we should be measuring? For example, when interacting with an external system, it might be the external system latency. When adding a new table, how fast would it fill up?*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this feature going to be rolled out? Should we have an explicit list of metrics used to help the health of the feature? (e.g. total number of attempts of a given launchplan )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question. I think scheduling attempts here is based on the polling interval right? But could be useful to understand time spent in PENDING


## 5 Drawbacks

*Are there any reasons why we should not do this? Here we aim to evaluate risk and check ourselves.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any reservations about more load on the DB (even with indexes, etc)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, we already have a ton of indices on executions - there is definitely a tradeoff to adding a new one


## 6 Alternatives

*What are other ways of achieving the same outcome?*

## 7 Potential Impact and Dependencies

*Here, we aim to be mindful of our environment and generate empathy towards others who may be impacted by our decisions.*

- *What other systems or teams are affected by this proposal?*
- *How could this be exploited by malicious attackers?*

## 8 Unresolved questions

*What parts of the proposal are still being defined or not covered by this proposal?*

## 9 Conclusion

*Here, we briefly outline why this is the right decision to make at this time and move forward!*

## 10 RFC Process Guide, remove this section when done

*By writing an RFC, you're giving insight to your team on the direction you're taking. There may not be a right or better decision in many cases, but we will likely learn from it. By authoring, you're making a decision on where you want us to go and are looking for feedback on this direction from your team members, but ultimately the decision is yours.*

This document is a:

- thinking exercise, prototype with words.
- historical record, its value may decrease over time.
- way to broadcast information.
- mechanism to build trust.
- tool to empower.
- communication channel.

This document is not:

- a request for permission.
- the most up to date representation of any process or system

**Checklist:**

- [ ] Copy template
- [ ] Draft RFC (think of it as a wireframe)
- [ ] Share as WIP with folks you trust to gut-check
- [ ] Send pull request when comfortable
- [ ] Label accordingly
- [ ] Assign reviewers
- [ ] Merge PR

**Recommendations**

- Tag RFC title with [WIP] if you're still ironing out details.
- Tag RFC title with [Newbie] if you're trying out something experimental or you're not entirely convinced of what you're proposing.
- Tag RFC title with [RR] if you'd like to schedule a review request to discuss the RFC.
- If there are areas that you're not convinced on, tag people who you consider may know about this and ask for their input.
- If you have doubts, ask on [#feature-discussions](https://slack.com/app_redirect?channel=CPQ3ZFQ84&team=TN89P6GGK) for help moving something forward.
Loading