Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Refactor and add support for schedule conditions in DAG configuration: #320

Merged

Conversation

ErickSeo
Copy link
Contributor

@ErickSeo ErickSeo commented Dec 16, 2024

Description

This feature introduces a enhancement to DAG scheduling in Airflow, enabling support for dynamic schedules based on dataset conditions. By leveraging dataset filters and logical conditions, users can now create more flexible and precise scheduling rules tailored to their workflows.

Key Features:

  • Condition-Based Scheduling: Allows defining schedules using logical conditions between datasets (e.g., ('dataset_1' & 'dataset_2') | 'dataset_3'), enabling workflows to trigger dynamically based on dataset availability.

  • Dynamic Dataset Processing: Introduced the process_file_with_datasets function to evaluate and process dataset URIs from external files, supporting both simple and condition-based schedules.

  • Improved Dataset Evaluation: Developed the evaluate_condition_with_datasets function to transform dataset URIs into valid variable names and evaluate logical conditions securely.

Workflow Example:
Given the following condition:

example_custom_config_condition_dataset_consumer_dag:
  description: "Example DAG consumer custom config condition datasets"
  schedule:
    file: $CONFIG_ROOT_DIR/datasets/example_config_datasets.yml
    datasets:  "((dataset_custom_1 & dataset_custom_2) | dataset_custom_3)"
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 'consumer datasets'"
example_without_custom_config_condition_dataset_consumer_dag:
  description: "Example DAG consumer custom config condition datasets"
  schedule:
    datasets: "((s3://bucket-cjmm/raw/dataset_custom_1 & s3://bucket-cjmm/raw/dataset_custom_2) | s3://bucket-cjmm/raw/dataset_custom_3)"
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 'consumer datasets'"
example_without_custom_config_condition_dataset_consumer_dag:
  description: "Example DAG consumer custom config condition datasets"
  schedule:
    datasets: 
      !or  
        - !and  
          - "s3://bucket-cjmm/raw/dataset_custom_1"  
          - "s3://bucket-cjmm/raw/dataset_custom_2"
        - "s3://bucket-cjmm/raw/dataset_custom_3"
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 'consumer datasets'"

The system evaluates the datasets, ensuring valid references, and schedules the DAG dynamically when the condition resolves to True.

Example Use Case:
Consider a data pipeline that processes files only when multiple interdependent datasets are updated. With this feature, users can create dynamic DAG schedules that automatically adjust based on dataset availability and conditions, optimizing resource allocation and execution timing.

Images:
Captura de tela 2024-12-16 181059
Captura de tela 2024-12-16 181103
Captura de tela 2024-12-16 181131

- Added support for schedules defined by conditions, enabling dynamic scheduling based on dataset filters and conditions.
- Introduced `configure_schedule` function to streamline DAG schedule setup based on Airflow version and parameters.
- Created `process_file_with_datasets` function to handle dataset processing and conditional evaluation from files.
- Implemented `evaluate_condition_with_datasets` to evaluate schedule conditions while ensuring valid variable names for dataset URIs.
- Replaced repetitive code with reusable functions for better modularity and maintainability.
- Enhanced code readability by adding detailed docstrings for all functions, following a standard format.
- Improved safety by avoiding reliance on `globals()` in `evaluate_condition_with_datasets`.
ErickSeo added 7 commits December 17, 2024 09:32
- remove self from unit test
- Implemented logic to handle schedules with both file and datasets attributes.
- Added support for evaluating conditions with datasets for Airflow version 2.9 and above.
- Cleaned up schedule dictionary by removing processed keys.
- Added logic to handle schedules with both file and datasets attributes.
- Implemented support for evaluating conditions with datasets for Airflow version 2.9 and above.
- Cleaned up schedule dictionary by removing processed keys after use.
@codecov-commenter
Copy link

codecov-commenter commented Dec 19, 2024

Codecov Report

Attention: Patch coverage is 97.43590% with 3 lines in your changes missing coverage. Please review.

Project coverage is 94.01%. Comparing base (48e5575) to head (fb33b67).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
dagfactory/utils.py 88.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #320      +/-   ##
==========================================
+ Coverage   93.62%   94.01%   +0.39%     
==========================================
  Files          10       11       +1     
  Lines         784      886     +102     
==========================================
+ Hits          734      833      +99     
- Misses         50       53       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tatiana tatiana added this to the DAG Factory 0.22.0 milestone Dec 30, 2024
@tatiana tatiana self-assigned this Dec 30, 2024
dagfactory/dagbuilder.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ErickSeo, this feature is super exciting. It significantly improves how DAG factory handles datasets. I'm sorry for the delay in reviewing it. I've left some comments in-line and am happy to discuss alternatives.

If we're able to reach a consensus on the implementation, we can aim to release this feature in DAG Factory 0.22, planned to be released on 10 January 2025.

dagfactory/__init__.py Outdated Show resolved Hide resolved
dagfactory/dagbuilder.py Outdated Show resolved Hide resolved
@pankajkoti pankajkoti mentioned this pull request Jan 10, 2025
Copy link
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very exciting feature, @ErickSeo ! Thank you very much for adding support to a more powerful way of scheduling DAGs in DAG Factory.

Thank you for addressing all the feedback. I'm sure there will be use-cases we did not think about - but we can always iterate over them.

@tatiana tatiana merged commit 4f2a57f into astronomer:main Jan 10, 2025
68 checks passed
@pankajkoti
Copy link
Contributor

Yes, indeed thanks a lot for the contribution and quickly addressing the review comments, much helpful @ErickSeo 🚀

tatiana pushed a commit that referenced this pull request Jan 10, 2025
### Added

- Propagate provided dag_display_name to built dag by @pankajkoti in
#326
- Add incipient documentation tooling by @tatiana in #328
- Support loading `default_args` from shared `defaults.yml` by
@pankajastro in #330
- Add security policy by @tatiana in #339
- Add Robust Support for Callbacks at Task and TaskGroup Level by
@@jroach-astronomer in #322
- Support `ExternalTaskSensor` `execution_date_fn` and `execution_delta`
by @tatiana in #354
- Refactor and add support for schedule conditions in DAG configuration
by @ErickSeo in #320

### Fixed

- Handle gracefully exceptions during telemetry collection by @tatiana
in #335
- Adjust `markdownlint` configuration to enforce 4-space indentation for
proper `mkdocs` rendering by @pankajkoti in #345

### Docs

- Create initial documentation index by @tatiana in #325
- Use absolute URLs for failing links in docs/index.md by @pankajkoti in
#331
- Add quick start docs by @pankajastro in #324
- Add docs comparing Python and YAML-based DAGs by @tatiana in #327
- Add docs about project contributors and their roles by @tatiana in
#341
- Add documentation to support developers by @tatiana in #343
- Add docs for configuring workflows, environment variables and defaults
by @pankajkoti in #338
- Add code of conduct for contributors and DAG factory community by
@tatiana in #340
- Document Dynamic Task Mapping feature by @pankajkoti in #344
- Fix warning message 404 in code_of_conduct docs by @pankajastro in
#346
- Update theme for documentation by @pankajastro in #348
- Fix markdownlint errors and some rendering improvements by
@pankajastro in #356
- Reword content in documentation by @yanmastin-astro in #336

### Others

- Improve integration tests scripts by @tatiana in #316
- Add Markdown pre-commit checks by @tatiana in #329
- Remove Airflow <> 2.0.0 check by @pankajastro in #334
- Reduce telemetry timeout from 5 to 1 second by @tatiana in #337
- Add GH action job to deploy docs by @pankajastro in #342
- Enable Depandabot to scan outdated Github Actions dependencies by
@tatiana in #347
- Improve docs deploy job by @pankajastro in #352
- Unify how we build dagfactory by @tatiana in #353
- Fix running make docker run when previous versions were run locally by
@tatiana in #362
- Install `jq` in `dev` container by @pankajastro in #363
- Dependabot GitHub actions version upgrades in #349, #350, #351


Closes: #306
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants