Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP 72: Handling "deferrable" tasks in execution_api and task SDK #44241

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Nov 21, 2024

closes: #44137

This PR is trying to port the "deferral" logic from airflow 2 to the airflow 3 (execution api + task sdk)

Summary of changes:

Server side changes (execution api):

  1. Handling TIDeferredStatePayload in ti_update_state -> covered by unit test: test_ti_update_state_to_deferred
    a. Didn't piggy back on ti.defer_task() as it extracts the trigger out of TaskDeferred exception. It is much more expensive to send across multiple models like TaskInstance, TaskDeferred, Trigger instead of just the required minimal properties
    b. returning and not proceeding with query execution as we already do it above https://github.com/apache/airflow/pull/44241/files#diff-d44a72566870079ee943e24bac2af74fb84c426c54d210561a251549a7078ed7L129
  2. Defining a datamodel for TIDeferredStatePayload and adding it to the discriminator: ti_state_discriminator

Client side changes (task sdk):

HTTP client:

Added a new function defer that sends a patch request to the task-instances/{id}/state execution api with payload: PatchTIToDeferred

Comms:

Defining a new data model to send a request to patch ti as "deferred" from task runner to supervisor: PatchTIToDeferred (Added to ToSupervisor)

Supervisor:

  1. Adding a new class property as _final_state to support @property final_state which is final state of a TI
    a. Added a setter to set values for this final state for cases like deferred so that the finish is not called for tasks those aren't in terminal stage: https://github.com/apache/airflow/pull/44241/files#diff-c2651fdee1a25e091e2a9d4f937f8032ca3d289d0de76f38ed88aee5df0f880dL392-L394
  2. Deferred tasks first enter the "wait" in supervisor and then mark themselves as deferred by setting up a trigger. So skipping finish() when final_state is not TerminalTIState
  3. Extended handle_requests to receive requests from task runner and forwarding the message to http client to call defer

TaskRunner:

Task runner executes: ti.task.execute and raises TaskDeferred for deferral. This sends a request to supervisor using SUPERVISOR_COMMS

How was this tested?

  1. The end to end flow isn't available as of now, but I have tried to cover up the ground using unit tests
  2. New case under test_handle_requests covers the supervisor + client side of things along with a mock of the message from task runner
  3. test_ti_update_state_to_deferred covers the scenario for execution API

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A good start!

airflow/api_fastapi/execution_api/routes/task_instances.py Outdated Show resolved Hide resolved
airflow/api_fastapi/execution_api/routes/task_instances.py Outdated Show resolved Hide resolved
"state": "deferred",
"classpath": "my-class-path",
"kwargs": {},
"created_date": "2024-10-31T12:00:00Z",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why is this being sent in the payload? Should it be the time the server received the request instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is handled this way:

self.created_date = created_date or timezone.utcnow()

In case we do not pass it, it will take the UTC now. Should we allow the option to override it or remove it entirely?

@amoghrajesh amoghrajesh added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label Nov 22, 2024
airflow/api_fastapi/execution_api/routes/task_instances.py Outdated Show resolved Hide resolved
airflow/api_fastapi/execution_api/routes/task_instances.py Outdated Show resolved Hide resolved
Comment on lines +522 to +523
# hmmm, can we do better here
# setting to "\n" as we do not have a response to return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs some thought here @ashb @kaxil

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two options:

  1. Move self.stdin.write(resp + b"\n") to all the if's where we are sending a msg
  2. Set resp to None. and change L529 to:
if resp:
     self.stdin.write(resp + b"\n")

Comment on lines +336 to +337
if mock_response == "":
# for task instance endpoints, there won't be any response
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should handle this based on how we handle the client response for task instance endpoints

@amoghrajesh amoghrajesh changed the title AIP 72: Handling deferrable tasks in execution API as well as TASK SDK AIP 72: Handling "deferrable" tasks in execution_api and task SDK Nov 22, 2024
@amoghrajesh amoghrajesh marked this pull request as ready for review November 22, 2024 11:39
@amoghrajesh amoghrajesh self-assigned this Nov 25, 2024
airflow/api_fastapi/execution_api/routes/task_instances.py Outdated Show resolved Hide resolved
airflow/api_fastapi/execution_api/routes/task_instances.py Outdated Show resolved Hide resolved
airflow/api_fastapi/execution_api/routes/task_instances.py Outdated Show resolved Hide resolved
airflow/api_fastapi/execution_api/routes/task_instances.py Outdated Show resolved Hide resolved
self.client.task_instances.finish(
id=self.ti_id, state=self.final_state, when=datetime.now(tz=timezone.utc)
)
if self.final_state in TerminalTIState:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% certain if we need this check -- I am thinking by defn self.final_state has to be in TerminalState -- if not it is an error!

Comment on lines +476 to +480
@final_state.setter
def final_state(self, value):
"""Setter for final_state for certain task instance stated present in IntermediateTIState."""
if value not in TerminalTIState:
self._final_state = value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a setter? Shouldn't we set TI state to falil if exit_code != 0 like we already in

if self._exit_code == 0:
return self._terminal_state or TerminalTIState.SUCCESS
return TerminalTIState.FAILED

Comment on lines +522 to +523
# hmmm, can we do better here
# setting to "\n" as we do not have a response to return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two options:

  1. Move self.stdin.write(resp + b"\n") to all the if's where we are sending a msg
  2. Set resp to None. and change L529 to:
if resp:
     self.stdin.write(resp + b"\n")

except TaskDeferred:
...
except TaskDeferred as defer:
trigger = Trigger.from_object(defer.trigger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Trigger model here?

i.e. why do we need L164 to L169

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is needed because the trigger object provides the classpath, timeout and the kwargs from the exception. Done similarly in

ti.state = TaskInstanceState.DEFERRED
ti.trigger_id = trigger_row.id
ti.next_method = next_method
ti.next_kwargs = next_kwargs or {}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handle "deferral" in ti_update_state endpoint in Execution API
3 participants