-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AIP 72: Handling "deferrable" tasks in execution_api and task SDK #44241
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good start!
"state": "deferred", | ||
"classpath": "my-class-path", | ||
"kwargs": {}, | ||
"created_date": "2024-10-31T12:00:00Z", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, why is this being sent in the payload? Should it be the time the server received the request instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is handled this way:
self.created_date = created_date or timezone.utcnow()
In case we do not pass it, it will take the UTC now. Should we allow the option to override it or remove it entirely?
Co-authored-by: Ash Berlin-Taylor <[email protected]>
# hmmm, can we do better here | ||
# setting to "\n" as we do not have a response to return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two options:
- Move
self.stdin.write(resp + b"\n")
to all the if's where we are sending a msg - Set
resp
toNone
. and change L529 to:
if resp:
self.stdin.write(resp + b"\n")
if mock_response == "": | ||
# for task instance endpoints, there won't be any response |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should handle this based on how we handle the client response for task instance endpoints
self.client.task_instances.finish( | ||
id=self.ti_id, state=self.final_state, when=datetime.now(tz=timezone.utc) | ||
) | ||
if self.final_state in TerminalTIState: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% certain if we need this check -- I am thinking by defn self.final_state
has to be in TerminalState -- if not it is an error!
@final_state.setter | ||
def final_state(self, value): | ||
"""Setter for final_state for certain task instance stated present in IntermediateTIState.""" | ||
if value not in TerminalTIState: | ||
self._final_state = value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a setter
? Shouldn't we set TI state to falil if exit_code != 0 like we already in
airflow/task_sdk/src/airflow/sdk/execution_time/supervisor.py
Lines 472 to 474 in 4d3140d
if self._exit_code == 0: | |
return self._terminal_state or TerminalTIState.SUCCESS | |
return TerminalTIState.FAILED |
# hmmm, can we do better here | ||
# setting to "\n" as we do not have a response to return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two options:
- Move
self.stdin.write(resp + b"\n")
to all the if's where we are sending a msg - Set
resp
toNone
. and change L529 to:
if resp:
self.stdin.write(resp + b"\n")
except TaskDeferred: | ||
... | ||
except TaskDeferred as defer: | ||
trigger = Trigger.from_object(defer.trigger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Trigger
model here?
i.e. why do we need L164 to L169
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is needed because the trigger
object provides the classpath, timeout and the kwargs from the exception. Done similarly in
airflow/airflow/models/taskinstance.py
Lines 1631 to 1634 in acf106b
ti.state = TaskInstanceState.DEFERRED | |
ti.trigger_id = trigger_row.id | |
ti.next_method = next_method | |
ti.next_kwargs = next_kwargs or {} |
closes: #44137
This PR is trying to port the "deferral" logic from airflow 2 to the airflow 3 (execution api + task sdk)
Summary of changes:
Server side changes (execution api):
TIDeferredStatePayload
inti_update_state
-> covered by unit test:test_ti_update_state_to_deferred
a. Didn't piggy back on
ti.defer_task()
as it extracts the trigger out ofTaskDeferred
exception. It is much more expensive to send across multiple models likeTaskInstance
,TaskDeferred
,Trigger
instead of just the required minimal propertiesb.
returning
and not proceeding with query execution as we already do it above https://github.com/apache/airflow/pull/44241/files#diff-d44a72566870079ee943e24bac2af74fb84c426c54d210561a251549a7078ed7L129Client side changes (task sdk):
HTTP client:
Added a new function defer that sends a patch request to the
task-instances/{id}/state execution
api with payload:PatchTIToDeferred
Comms:
Defining a new data model to send a request to patch ti as "deferred" from task runner to supervisor:
PatchTIToDeferred
(Added to ToSupervisor)Supervisor:
_final_state
to support@property
final_state
which is final state of a TIa. Added a setter to set values for this final state for cases like deferred so that the finish is not called for tasks those aren't in terminal stage: https://github.com/apache/airflow/pull/44241/files#diff-c2651fdee1a25e091e2a9d4f937f8032ca3d289d0de76f38ed88aee5df0f880dL392-L394
finish()
whenfinal_state
is notTerminalTIState
handle_requests
to receive requests from task runner and forwarding the message to http client to call deferTaskRunner:
Task runner executes: ti.task.execute and raises
TaskDeferred
for deferral. This sends a request to supervisor usingSUPERVISOR_COMMS
How was this tested?
test_handle_requests
covers the supervisor + client side of things along with a mock of the message from task runnertest_ti_update_state_to_deferred
covers the scenario for execution API^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.