Skip to content
This repository has been archived by the owner on Nov 21, 2024. It is now read-only.

Commit

Permalink
airflow: add dag integrity test, clear warnings, fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DonHaul authored and drjova committed Aug 19, 2024
1 parent d67febd commit be93e90
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 13 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test-workflows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ jobs:
-v "$(pwd)"/tests:/opt/airflow/tests
-v "$(pwd)"/requirements-test.txt:/opt/airflow/requirements-test.txt
-v "$(pwd)"/data:/opt/airflow/data
-v "$(pwd)"/scripts/variables/variables.json:/opt/airflow/variables.json
-e AIRFLOW__CORE__EXECUTOR=CeleryExecutor
-e AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:[email protected]:5432/airflow
-e AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:[email protected]:5432/airflow
Expand All @@ -60,4 +61,4 @@ jobs:
-e AIRFLOW__CORE__LOAD_EXAMPLES="false"
-e AIRFLOW__API__AUTH_BACKENDS="airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"
registry.cern.ch/cern-sis/inspire/workflows@${{ needs.build.outputs.image-id }}
bash -c "pip install -r requirements-test.txt && airflow db init && pytest /opt/airflow/tests"
bash -c "pip install -r requirements-test.txt && airflow db init && airflow variables import /opt/airflow/variables.json && pytest /opt/airflow/tests"
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ x-airflow-common: &airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: apache/airflow:2.8.3-python3.11
build:
context: workflows
dockerfile: Dockerfile
dockerfile: Dockerfile.local
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres-airflow/airflow
Expand All @@ -76,6 +75,7 @@ x-airflow-common: &airflow-common
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/scripts:/opt/airflow/scripts
- ${AIRFLOW_PROJ_DIR:-.}/tests:/opt/airflow/tests

user: "${AIRFLOW_UID:-50000}:0"
depends_on: &airflow-common-depends-on
Expand Down
10 changes: 10 additions & 0 deletions workflows/Dockerfile.local
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM apache/airflow:2.9.3-python3.11

WORKDIR /opt/airflow

COPY --chown=airflow:root dags ./dags/
COPY --chown=airflow:root plugins ./plugins/
COPY --chown=airflow:root requirements.txt ./requirements.txt
COPY --chown=airflow:root requirements-test.txt ./requirements-test.txt

RUN pip install --no-cache-dir -r requirements.txt -r requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
"create_ticket": Param(type="boolean", default=False),
},
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
schedule=None,
catchup=False,
on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date!
)
Expand Down
2 changes: 1 addition & 1 deletion workflows/dags/author/author_create/author_create_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"data": Param(type="object", default={}),
},
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
schedule=None,
catchup=False,
# TODO: what if callback fails? Data in backoffice not up to date!
on_failure_callback=set_workflow_status_to_error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"data": Param(type="object", default={}),
},
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
schedule=None,
catchup=False,
# TODO: what if callback fails? Data in backoffice not up to date!
on_failure_callback=set_workflow_status_to_error,
Expand Down
4 changes: 2 additions & 2 deletions workflows/dags/author/author_update/author_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

@dag(
start_date=datetime.datetime(2024, 5, 5),
schedule_interval=None,
schedule=None,
params={
"workflow_id": Param(type="string", default=""),
"data": Param(type="object", default={}),
Expand Down Expand Up @@ -99,7 +99,7 @@ def set_author_update_workflow_status_to_completed(**context):
collection=AUTHORS,
)

@task.branch(provide_context=True)
@task.branch()
def author_update_success_branch(**context):
ti = context["ti"]
workflow_status = ti.xcom_pull(task_ids="update_author_on_inspire")
Expand Down
4 changes: 2 additions & 2 deletions workflows/dags/happy_flow_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import json

from airflow.decorators import dag, task
from airflow.sensors.sql import SqlSensor
from airflow.providers.common.sql.sensors.sql import SqlSensor


@dag(start_date=datetime.datetime(2021, 1, 1), schedule_interval=None)
@dag(start_date=datetime.datetime(2021, 1, 1), schedule=None)
def happy_flow_dag():
@task
def fetch_document(filename: str) -> dict:
Expand Down
5 changes: 2 additions & 3 deletions workflows/dags/process_until_breakpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

@dag(
start_date=datetime.datetime(2021, 1, 1),
schedule_interval=None,
schedule=None,
params={"approved": True},
)
def process_untill_breakpoint():
Expand Down Expand Up @@ -52,12 +52,11 @@ def validate():
task_id="check_approval",
ignore_downstream_trigger_rules=False,
python_callable=check_approval,
provide_context=True,
)
fetch_document_task = fetch_document("test.json")
normalize_affiliations_task = normalize_affiliations(fetch_document_task)
auto_approval = ShortCircuitOperator(
task_id="auto_approval", python_callable=auto_approval, provide_context=True
task_id="auto_approval", python_callable=auto_approval
)
validation = validate()

Expand Down
6 changes: 6 additions & 0 deletions workflows/tests/test_dags_integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from airflow.models import DagBag


def test_dagbag():
dag_bag = DagBag(include_examples=False)
assert not dag_bag.import_errors

0 comments on commit be93e90

Please sign in to comment.