-
Notifications
You must be signed in to change notification settings - Fork 187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] Refactor and add support for schedule conditions in DAG configuration: #320
[Feature] Refactor and add support for schedule conditions in DAG configuration: #320
Conversation
- Added support for schedules defined by conditions, enabling dynamic scheduling based on dataset filters and conditions. - Introduced `configure_schedule` function to streamline DAG schedule setup based on Airflow version and parameters. - Created `process_file_with_datasets` function to handle dataset processing and conditional evaluation from files. - Implemented `evaluate_condition_with_datasets` to evaluate schedule conditions while ensuring valid variable names for dataset URIs. - Replaced repetitive code with reusable functions for better modularity and maintainability. - Enhanced code readability by adding detailed docstrings for all functions, following a standard format. - Improved safety by avoiding reliance on `globals()` in `evaluate_condition_with_datasets`.
- Implemented logic to handle schedules with both file and datasets attributes. - Added support for evaluating conditions with datasets for Airflow version 2.9 and above. - Cleaned up schedule dictionary by removing processed keys.
- Added logic to handle schedules with both file and datasets attributes. - Implemented support for evaluating conditions with datasets for Airflow version 2.9 and above. - Cleaned up schedule dictionary by removing processed keys after use.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #320 +/- ##
==========================================
+ Coverage 93.62% 94.01% +0.39%
==========================================
Files 10 11 +1
Lines 784 886 +102
==========================================
+ Hits 734 833 +99
- Misses 50 53 +3 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @ErickSeo, this feature is super exciting. It significantly improves how DAG factory handles datasets. I'm sorry for the delay in reviewing it. I've left some comments in-line and am happy to discuss alternatives.
If we're able to reach a consensus on the implementation, we can aim to release this feature in DAG Factory 0.22, planned to be released on 10 January 2025.
…uce utility for parsing dataset conditions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very exciting feature, @ErickSeo ! Thank you very much for adding support to a more powerful way of scheduling DAGs in DAG Factory.
Thank you for addressing all the feedback. I'm sure there will be use-cases we did not think about - but we can always iterate over them.
Yes, indeed thanks a lot for the contribution and quickly addressing the review comments, much helpful @ErickSeo 🚀 |
### Added - Propagate provided dag_display_name to built dag by @pankajkoti in #326 - Add incipient documentation tooling by @tatiana in #328 - Support loading `default_args` from shared `defaults.yml` by @pankajastro in #330 - Add security policy by @tatiana in #339 - Add Robust Support for Callbacks at Task and TaskGroup Level by @@jroach-astronomer in #322 - Support `ExternalTaskSensor` `execution_date_fn` and `execution_delta` by @tatiana in #354 - Refactor and add support for schedule conditions in DAG configuration by @ErickSeo in #320 ### Fixed - Handle gracefully exceptions during telemetry collection by @tatiana in #335 - Adjust `markdownlint` configuration to enforce 4-space indentation for proper `mkdocs` rendering by @pankajkoti in #345 ### Docs - Create initial documentation index by @tatiana in #325 - Use absolute URLs for failing links in docs/index.md by @pankajkoti in #331 - Add quick start docs by @pankajastro in #324 - Add docs comparing Python and YAML-based DAGs by @tatiana in #327 - Add docs about project contributors and their roles by @tatiana in #341 - Add documentation to support developers by @tatiana in #343 - Add docs for configuring workflows, environment variables and defaults by @pankajkoti in #338 - Add code of conduct for contributors and DAG factory community by @tatiana in #340 - Document Dynamic Task Mapping feature by @pankajkoti in #344 - Fix warning message 404 in code_of_conduct docs by @pankajastro in #346 - Update theme for documentation by @pankajastro in #348 - Fix markdownlint errors and some rendering improvements by @pankajastro in #356 - Reword content in documentation by @yanmastin-astro in #336 ### Others - Improve integration tests scripts by @tatiana in #316 - Add Markdown pre-commit checks by @tatiana in #329 - Remove Airflow <> 2.0.0 check by @pankajastro in #334 - Reduce telemetry timeout from 5 to 1 second by @tatiana in #337 - Add GH action job to deploy docs by @pankajastro in #342 - Enable Depandabot to scan outdated Github Actions dependencies by @tatiana in #347 - Improve docs deploy job by @pankajastro in #352 - Unify how we build dagfactory by @tatiana in #353 - Fix running make docker run when previous versions were run locally by @tatiana in #362 - Install `jq` in `dev` container by @pankajastro in #363 - Dependabot GitHub actions version upgrades in #349, #350, #351 Closes: #306
Description
This feature introduces a enhancement to DAG scheduling in Airflow, enabling support for dynamic schedules based on dataset conditions. By leveraging dataset filters and logical conditions, users can now create more flexible and precise scheduling rules tailored to their workflows.
Key Features:
Condition-Based Scheduling: Allows defining schedules using logical conditions between datasets (e.g., ('dataset_1' & 'dataset_2') | 'dataset_3'), enabling workflows to trigger dynamically based on dataset availability.
Dynamic Dataset Processing: Introduced the process_file_with_datasets function to evaluate and process dataset URIs from external files, supporting both simple and condition-based schedules.
Improved Dataset Evaluation: Developed the evaluate_condition_with_datasets function to transform dataset URIs into valid variable names and evaluate logical conditions securely.
Workflow Example:
Given the following condition:
The system evaluates the datasets, ensuring valid references, and schedules the DAG dynamically when the condition resolves to True.
Example Use Case:
Consider a data pipeline that processes files only when multiple interdependent datasets are updated. With this feature, users can create dynamic DAG schedules that automatically adjust based on dataset availability and conditions, optimizing resource allocation and execution timing.
Images:
![Captura de tela 2024-12-16 181059](https://private-user-images.githubusercontent.com/12144307/396270316-e591538f-3f39-44a4-9503-dac45b972e64.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzg5NDc5NDksIm5iZiI6MTczODk0NzY0OSwicGF0aCI6Ii8xMjE0NDMwNy8zOTYyNzAzMTYtZTU5MTUzOGYtM2YzOS00NGE0LTk1MDMtZGFjNDViOTcyZTY0LnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjA3VDE3MDA0OVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTM5YmNlNWIxZjA3NWZjYmJjZTliYWY2ZjdkMGY2ZTg4NGFjZGJiNWQ1Mjk1M2I2ZGEyNGYzNjA3ZmZhYmRiM2EmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.dX9JcJuIylOVYvQAZ14j32P7TvsCR_IZQqeTJj1JPsY)
![Captura de tela 2024-12-16 181103](https://private-user-images.githubusercontent.com/12144307/396270320-11a2cdca-5cae-4075-bc22-5b257b5d6b00.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzg5NDc5NDksIm5iZiI6MTczODk0NzY0OSwicGF0aCI6Ii8xMjE0NDMwNy8zOTYyNzAzMjAtMTFhMmNkY2EtNWNhZS00MDc1LWJjMjItNWIyNTdiNWQ2YjAwLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjA3VDE3MDA0OVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTlkOWY2NjgwNmFkMTA3ODc0ZDUyNzRkMGZmNWY1ZDMxZTI5M2RlOTg4MjIzMmYxYjhmMmMxNGEwOGYzMzc1NWMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.wb2axapR8NKtN8uaiZcNN9IEa3XAYTPiTrZVNfX2YwI)
![Captura de tela 2024-12-16 181131](https://private-user-images.githubusercontent.com/12144307/396270322-9b40f176-91d5-455c-9812-ee4c0ca50912.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzg5NDc5NDksIm5iZiI6MTczODk0NzY0OSwicGF0aCI6Ii8xMjE0NDMwNy8zOTYyNzAzMjItOWI0MGYxNzYtOTFkNS00NTVjLTk4MTItZWU0YzBjYTUwOTEyLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjA3VDE3MDA0OVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWUwNTU3NDA5ZGM2YWZmNzA2MTFkYzcyMmZlNDIxZjhjYzcyODBkMjEwYmIwZGY1ZGRlMTg4ZjAxNTdjYzdjNDQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.herKEMt4gc9Q3uO7DRvHO-WtvxfqWSvBPo6oCQgAxhw)