Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Fastapi endpoint for reserialize all/specific dags #48174

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

shubham-pyc
Copy link

Why
#47844

What
Added REST ENDPOINT to replicate CLI functionality of airflow dags deserialize

Incorporated @jedcunningham suggestions about implementing this feature similar to reparse a file feature


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@shubham-pyc
Copy link
Author

@jedcunningham I have incorporated your suggestions for reparse a file feature feature. Please take a look

Copy link
Member

@pierrejeambrun pierrejeambrun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, looking good overall. Just a few suggestions.



@dag_parsing_router.post(
"/manage/reserialize",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should change the router, /parseDagFiles.

And the route can just be a post on /parseDagFiles (meaning we want to reparse multiple files.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it should probably be a PUT too. I don't see why PUT for a single file and POST for multiple.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so basically the router becomes

for parsing multiple files

PUT /api/v2/parseDagFiles/

and for single file
PUT /api/v2/parseDagFiles/{file_token}

status.HTTP_400_BAD_REQUEST,
status.HTTP_404_NOT_FOUND,
status.HTTP_409_CONFLICT,
status.HTTP_500_INTERNAL_SERVER_ERROR,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
status.HTTP_500_INTERNAL_SERVER_ERROR,

This shouldn't be doncumented. That's unexpected and any endpoint can throw unexpected 500.

status.HTTP_500_INTERNAL_SERVER_ERROR,
]
),
dependencies=[Depends(action_logging())],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are missing the permissions. Please check other endpoints.

all_bundle_names = set(manager.get_all_bundle_names())

# Validate bundle names if specified
if request.bundle_names:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are utility to fetch query parameters. Please check other endpoints. You will be able to do validation and provide default values too.

Comment on lines +125 to +127
except HTTPException as e:
raise e

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This achieves nothing. To delete

Comment on lines +129 to +133
session.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to reserialize DAG bundles: {str(e)}",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is automatically done by the SessionDependency, can be removed.

Comment on lines +113 to +119
bundles_to_process = all_bundle_names

file_locations = session.scalars(
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))
)
# Process each bundle
parsing_requests = [DagPriorityParsingRequest(fileloc=fileloc) for fileloc in file_locations]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to handle the case where there is no bundle configures in the installation. (bundle_name is None), all_bundle_names is empty. And we need to reparse all I guess.



class DagReserializePostBody(BaseModel):
"""Dag Serializer for reserialzed bodies."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo

Comment on lines +188 to +189
message: str
processed_bundles: list[str]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should unify with other 'list' response type and "Collections". You can compare with other collection serializers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can look for CollectionResponse to find them. AssetAliasCollectionResponse, AssetResponse, DAGResponse, etc....

bundles_to_process = all_bundle_names

file_locations = session.scalars(
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))
select(DagModel.fileloc).where(DagModel.bundle_name.in_(list(bundles_to_process)))

We probably want a way to signal that we want to reparse the whole bundle vs going file by file like this. Especially since I think we should also force bundle refresh once per bundle vs per file like this would do.

Relatedly, I'd just pushed up some WIP changes I had to refactor the api side of this: #48216
I haven't done the dag processor side quite yet, but will in the next day or so. It might make sense to hold off on further changes until that lands, then we can determine how we modify the DagPriorityParsingRequest model and the dag processor to enable this.

Copy link
Author

@shubham-pyc shubham-pyc Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jedcunningham thanks for the response.

I read through the code for #48216. One possible change we can make to DagPriorityParsingRequest is to make the relative_fileloc nullable and add another flag called parse_whole_bundle.

In the DAG processor logic, we can then add a condition: if parse_whole_bundle is True, we parse all the files in the bundle location. Otherwise, we parse just one file.

This would also take care of the hashing logic for the DagPriorityParsingRequest ID column, since if you want to parse the whole bundle the relative file location would be empty.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jedcunningham I read through the changes made to pull requests:

#48216
#48424

For us to signal that the dag processor to prepare all the files do you think this approach works?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:API Airflow's REST/HTTP API area:DAG-processing area:UI Related to UI/UX. For Frontend Developers.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants