-
Notifications
You must be signed in to change notification settings - Fork 14.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Fastapi endpoint for reserialize all/specific dags #48174
base: main
Are you sure you want to change the base?
Added Fastapi endpoint for reserialize all/specific dags #48174
Conversation
@jedcunningham I have incorporated your suggestions for reparse a file feature feature. Please take a look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, looking good overall. Just a few suggestions.
|
||
|
||
@dag_parsing_router.post( | ||
"/manage/reserialize", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should change the router, /parseDagFiles
.
And the route can just be a post on /parseDagFiles
(meaning we want to reparse multiple files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also it should probably be a PUT
too. I don't see why PUT for a single file and POST
for multiple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so basically the router becomes
for parsing multiple files
PUT /api/v2/parseDagFiles/
and for single file
PUT /api/v2/parseDagFiles/{file_token}
status.HTTP_400_BAD_REQUEST, | ||
status.HTTP_404_NOT_FOUND, | ||
status.HTTP_409_CONFLICT, | ||
status.HTTP_500_INTERNAL_SERVER_ERROR, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
status.HTTP_500_INTERNAL_SERVER_ERROR, |
This shouldn't be doncumented. That's unexpected and any endpoint can throw unexpected 500.
status.HTTP_500_INTERNAL_SERVER_ERROR, | ||
] | ||
), | ||
dependencies=[Depends(action_logging())], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are missing the permissions. Please check other endpoints.
all_bundle_names = set(manager.get_all_bundle_names()) | ||
|
||
# Validate bundle names if specified | ||
if request.bundle_names: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are utility to fetch query parameters. Please check other endpoints. You will be able to do validation and provide default values too.
except HTTPException as e: | ||
raise e | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This achieves nothing. To delete
session.rollback() | ||
raise HTTPException( | ||
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | ||
detail=f"Failed to reserialize DAG bundles: {str(e)}", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is automatically done by the SessionDependency, can be removed.
bundles_to_process = all_bundle_names | ||
|
||
file_locations = session.scalars( | ||
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process))) | ||
) | ||
# Process each bundle | ||
parsing_requests = [DagPriorityParsingRequest(fileloc=fileloc) for fileloc in file_locations] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to handle the case where there is no bundle configures in the installation. (bundle_name is None), all_bundle_names is empty. And we need to reparse all I guess.
|
||
|
||
class DagReserializePostBody(BaseModel): | ||
"""Dag Serializer for reserialzed bodies.""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo
message: str | ||
processed_bundles: list[str] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should unify with other 'list' response type and "Collections". You can compare with other collection serializers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can look for CollectionResponse
to find them. AssetAliasCollectionResponse
, AssetResponse
, DAGResponse
, etc....
bundles_to_process = all_bundle_names | ||
|
||
file_locations = session.scalars( | ||
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process))) | |
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process))) |
We probably want a way to signal that we want to reparse the whole bundle vs going file by file like this. Especially since I think we should also force bundle refresh once per bundle vs per file like this would do.
Relatedly, I'd just pushed up some WIP changes I had to refactor the api side of this: #48216
I haven't done the dag processor side quite yet, but will in the next day or so. It might make sense to hold off on further changes until that lands, then we can determine how we modify the DagPriorityParsingRequest model and the dag processor to enable this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jedcunningham thanks for the response.
I read through the code for #48216. One possible change we can make to DagPriorityParsingRequest is to make the relative_fileloc
nullable and add another flag called parse_whole_bundle
.
In the DAG processor logic, we can then add a condition: if parse_whole_bundle
is True
, we parse all the files in the bundle location. Otherwise, we parse just one file.
This would also take care of the hashing logic for the DagPriorityParsingRequest ID column, since if you want to parse the whole bundle the relative file location would be empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jedcunningham I read through the changes made to pull requests:
For us to signal that the dag processor to prepare all the files do you think this approach works?
Why
#47844
What
Added REST ENDPOINT to replicate CLI functionality of airflow dags deserialize
Incorporated @jedcunningham suggestions about implementing this feature similar to reparse a file feature
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.