From be93e909cb7c17060592ecc8179bd73884118c66 Mon Sep 17 00:00:00 2001 From: DonHaul Date: Thu, 15 Aug 2024 16:45:42 +0200 Subject: [PATCH] airflow: add dag integrity test, clear warnings, fix tests * ref: cern-sis/issues-inspire/issues/537 --- .github/workflows/test-workflows.yml | 3 ++- docker-compose.yaml | 4 ++-- workflows/Dockerfile.local | 10 ++++++++++ .../author/author_create/author_create_approved.py | 2 +- .../dags/author/author_create/author_create_init.py | 2 +- .../author/author_create/author_create_rejected.py | 2 +- workflows/dags/author/author_update/author_update.py | 4 ++-- workflows/dags/happy_flow_dag.py | 4 ++-- workflows/dags/process_until_breakpoint.py | 5 ++--- workflows/tests/test_dags_integrity.py | 6 ++++++ 10 files changed, 29 insertions(+), 13 deletions(-) create mode 100644 workflows/Dockerfile.local create mode 100644 workflows/tests/test_dags_integrity.py diff --git a/.github/workflows/test-workflows.yml b/.github/workflows/test-workflows.yml index 1b91fd6c..cebc3667 100644 --- a/.github/workflows/test-workflows.yml +++ b/.github/workflows/test-workflows.yml @@ -51,6 +51,7 @@ jobs: -v "$(pwd)"/tests:/opt/airflow/tests -v "$(pwd)"/requirements-test.txt:/opt/airflow/requirements-test.txt -v "$(pwd)"/data:/opt/airflow/data + -v "$(pwd)"/scripts/variables/variables.json:/opt/airflow/variables.json -e AIRFLOW__CORE__EXECUTOR=CeleryExecutor -e AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@127.0.0.1:5432/airflow -e AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@127.0.0.1:5432/airflow @@ -60,4 +61,4 @@ jobs: -e AIRFLOW__CORE__LOAD_EXAMPLES="false" -e AIRFLOW__API__AUTH_BACKENDS="airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session" registry.cern.ch/cern-sis/inspire/workflows@${{ needs.build.outputs.image-id }} - bash -c "pip install -r requirements-test.txt && airflow db init && pytest /opt/airflow/tests" + bash -c "pip install -r requirements-test.txt && airflow db init && airflow variables import /opt/airflow/variables.json && pytest /opt/airflow/tests" diff --git a/docker-compose.yaml b/docker-compose.yaml index 12900b49..96b5111d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -48,10 +48,9 @@ x-airflow-common: &airflow-common # In order to add custom dependencies or upgrade provider packages you can use your extended image. # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml # and uncomment the "build" line below, Then run `docker-compose build` to build the images. - image: apache/airflow:2.8.3-python3.11 build: context: workflows - dockerfile: Dockerfile + dockerfile: Dockerfile.local environment: &airflow-common-env AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres-airflow/airflow @@ -76,6 +75,7 @@ x-airflow-common: &airflow-common - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins - ${AIRFLOW_PROJ_DIR:-.}/scripts:/opt/airflow/scripts + - ${AIRFLOW_PROJ_DIR:-.}/tests:/opt/airflow/tests user: "${AIRFLOW_UID:-50000}:0" depends_on: &airflow-common-depends-on diff --git a/workflows/Dockerfile.local b/workflows/Dockerfile.local new file mode 100644 index 00000000..b600cbec --- /dev/null +++ b/workflows/Dockerfile.local @@ -0,0 +1,10 @@ +FROM apache/airflow:2.9.3-python3.11 + +WORKDIR /opt/airflow + +COPY --chown=airflow:root dags ./dags/ +COPY --chown=airflow:root plugins ./plugins/ +COPY --chown=airflow:root requirements.txt ./requirements.txt +COPY --chown=airflow:root requirements-test.txt ./requirements-test.txt + +RUN pip install --no-cache-dir -r requirements.txt -r requirements-test.txt diff --git a/workflows/dags/author/author_create/author_create_approved.py b/workflows/dags/author/author_create/author_create_approved.py index 12ff699b..083ae905 100644 --- a/workflows/dags/author/author_create/author_create_approved.py +++ b/workflows/dags/author/author_create/author_create_approved.py @@ -27,7 +27,7 @@ "create_ticket": Param(type="boolean", default=False), }, start_date=datetime.datetime(2024, 5, 5), - schedule_interval=None, + schedule=None, catchup=False, on_failure_callback=set_workflow_status_to_error, # TODO: what if callback fails? Data in backoffice not up to date! ) diff --git a/workflows/dags/author/author_create/author_create_init.py b/workflows/dags/author/author_create/author_create_init.py index 18982b03..940d4ff3 100644 --- a/workflows/dags/author/author_create/author_create_init.py +++ b/workflows/dags/author/author_create/author_create_init.py @@ -20,7 +20,7 @@ "data": Param(type="object", default={}), }, start_date=datetime.datetime(2024, 5, 5), - schedule_interval=None, + schedule=None, catchup=False, # TODO: what if callback fails? Data in backoffice not up to date! on_failure_callback=set_workflow_status_to_error, diff --git a/workflows/dags/author/author_create/author_create_rejected.py b/workflows/dags/author/author_create/author_create_rejected.py index 9751856e..e492038a 100644 --- a/workflows/dags/author/author_create/author_create_rejected.py +++ b/workflows/dags/author/author_create/author_create_rejected.py @@ -16,7 +16,7 @@ "data": Param(type="object", default={}), }, start_date=datetime.datetime(2024, 5, 5), - schedule_interval=None, + schedule=None, catchup=False, # TODO: what if callback fails? Data in backoffice not up to date! on_failure_callback=set_workflow_status_to_error, diff --git a/workflows/dags/author/author_update/author_update.py b/workflows/dags/author/author_update/author_update.py index 0e319a9a..6d07ca1e 100644 --- a/workflows/dags/author/author_update/author_update.py +++ b/workflows/dags/author/author_update/author_update.py @@ -18,7 +18,7 @@ @dag( start_date=datetime.datetime(2024, 5, 5), - schedule_interval=None, + schedule=None, params={ "workflow_id": Param(type="string", default=""), "data": Param(type="object", default={}), @@ -99,7 +99,7 @@ def set_author_update_workflow_status_to_completed(**context): collection=AUTHORS, ) - @task.branch(provide_context=True) + @task.branch() def author_update_success_branch(**context): ti = context["ti"] workflow_status = ti.xcom_pull(task_ids="update_author_on_inspire") diff --git a/workflows/dags/happy_flow_dag.py b/workflows/dags/happy_flow_dag.py index 3b732579..6e7743b9 100644 --- a/workflows/dags/happy_flow_dag.py +++ b/workflows/dags/happy_flow_dag.py @@ -2,10 +2,10 @@ import json from airflow.decorators import dag, task -from airflow.sensors.sql import SqlSensor +from airflow.providers.common.sql.sensors.sql import SqlSensor -@dag(start_date=datetime.datetime(2021, 1, 1), schedule_interval=None) +@dag(start_date=datetime.datetime(2021, 1, 1), schedule=None) def happy_flow_dag(): @task def fetch_document(filename: str) -> dict: diff --git a/workflows/dags/process_until_breakpoint.py b/workflows/dags/process_until_breakpoint.py index 43690aa3..fddc06e7 100644 --- a/workflows/dags/process_until_breakpoint.py +++ b/workflows/dags/process_until_breakpoint.py @@ -8,7 +8,7 @@ @dag( start_date=datetime.datetime(2021, 1, 1), - schedule_interval=None, + schedule=None, params={"approved": True}, ) def process_untill_breakpoint(): @@ -52,12 +52,11 @@ def validate(): task_id="check_approval", ignore_downstream_trigger_rules=False, python_callable=check_approval, - provide_context=True, ) fetch_document_task = fetch_document("test.json") normalize_affiliations_task = normalize_affiliations(fetch_document_task) auto_approval = ShortCircuitOperator( - task_id="auto_approval", python_callable=auto_approval, provide_context=True + task_id="auto_approval", python_callable=auto_approval ) validation = validate() diff --git a/workflows/tests/test_dags_integrity.py b/workflows/tests/test_dags_integrity.py new file mode 100644 index 00000000..86d1b16d --- /dev/null +++ b/workflows/tests/test_dags_integrity.py @@ -0,0 +1,6 @@ +from airflow.models import DagBag + + +def test_dagbag(): + dag_bag = DagBag(include_examples=False) + assert not dag_bag.import_errors