Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airflow dags): Externally triggered DAG template (foph: 'done'). #150

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions containers/airflow/dags/covidch_dashboard_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""
@author
@date Last change on 2022-10-24

The ability of triggering a DAG based on a External Successful Task.

Task Summary
------------

wait_for_foph (ExternalTaskSensor) :
This task will be triggered when the task `done` on the DAG `foph` is
marked as successful.
@note: `schedule_interval` needs to match the `foph` interval.
@note: `execution_date_fn` is the time window the task should sensor
for a external event.

start (EmptyOperator) :
This task does nothing. Used for representing the start of the flow.

end (EmptyOperator) :
This task does nothing. Used for representing that the update flow has
finish successfully. Marked as success only if all dependencies ran
successfully.

"""
from datetime import timedelta
import pendulum

from airflow.models import DagRun
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor

default_args = {
"owner": "epigraphhub",
"depends_on_past": False,
"start_date": pendulum.datetime(2022, 8, 26, 0, 0),
"email": ["[email protected]"],
"email_on_failure": False, # TODO: Set to True before merge
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=1),
}


@dag(
schedule_interval="@weekly",
default_args=default_args,
catchup=False,
)
def update_covidch():
"""

"""

def _most_recent_foph_dag_run(dt):
"""
This internal method is capable of getting the `foph` dag runs.
Uses a airflow datetime macro received by the Foph DAG itself and
returns the DAG most recent execution date scheduled.
@warning: will auto update every Scheduler pulse.
"""
foph_runs = DagRun.find(dag_id="foph")
foph_runs.sort(key=lambda x: x.execution_date, reverse=True)
return foph_runs[0].execution_date

triggered_by_foph = ExternalTaskSensor(
task_id="wait_for_foph",
external_dag_id="foph",
allowed_states=["success"],
external_task_ids=["done"],
execution_date_fn=_most_recent_foph_dag_run,
check_existence=True,
timeout=15,
)

start = EmptyOperator(
task_id="start",
)

@task
def say_hi():
print("hi!")

end = EmptyOperator(
task_id="done",
trigger_rule="all_success",
)

"""
Task Dependencies
-----------------

This area defines the task dependencies. A task depends on
another one if followed by a right bit shift (>>).
"""

triggered_by_foph >> start >> say_hi() >> end


dag = update_covidch()
2 changes: 1 addition & 1 deletion containers/airflow/dags/foph_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@
default_args = {
"owner": "epigraphhub",
"depends_on_past": False,
"start_date": pendulum.datetime(2022, 8, 26),
"start_date": pendulum.datetime(2022, 8, 26, 0, 0),
"email": ["[email protected]"],
"email_on_failure": True,
"email_on_retry": False,
Expand Down