From fbe1ee0acbbff0abe0e67edb56a8dca9b9951f7c Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Wed, 29 Nov 2023 17:02:43 +0100 Subject: [PATCH] Tests --- dags/clean/clean.py | 39 +++++---------------------------- dags/clean/utils.py | 15 +++++++++++++ tests/units/clean/test_clean.py | 21 +++++++++--------- 3 files changed, 31 insertions(+), 44 deletions(-) create mode 100644 dags/clean/utils.py diff --git a/dags/clean/clean.py b/dags/clean/clean.py index 96fd94fc..f151687c 100644 --- a/dags/clean/clean.py +++ b/dags/clean/clean.py @@ -1,48 +1,19 @@ import os import pendulum -from airflow import DAG from airflow.decorators import dag from airflow.operators.bash import BashOperator from airflow.operators.bash_operator import BashOperator +from clean.utils import cleanup_command AIRFLOW_HOME = os.getenv("AIRFLOW_HOME") LOG_DIR = "logs" logs_dir = f"{AIRFLOW_HOME}/{LOG_DIR}" -default_args = { - "owner": "airflow", - "start_date": pendulum.today("UTC").add(days=-1), - "schedule": "@monthly", - "catchup": False, - "max_active_runs": 1, -} -dag = DAG( - "cleanup_logs", - default_args=default_args, - description="DAG to clean up old log directories", - schedule_interval="@monthly", -) +@dag(start_date=pendulum.today("UTC").add(days=-1), schedule="@monthly", catchup=False) +def cleanup_task(): + BashOperator(task_id="cleanup_logs", bash_command=cleanup_command) -cleanup_command = f""" - logs_dir="{logs_dir}" - current_date=$(date "+%Y-%m-%dT%H:%M:%S.%N%z") - find "$logs_dir" -type d -path "*/run_id=*" -exec sh -c ' - run_id=$(dirname "$1" | grep -oP "run_id=\K[^/]+") - task_id=$(dirname "$1" | grep -oP "task_id=\K[^/]+") - if [ $(date -d "$run_id" "+%s") -lt $(date -d "$2" "+%s" --date="-30 days") ]; then - echo "Deleting: $1" - rm -r "$1" - fi - ' _ {{}} "$current_date" \; -""" - -cleanup_task = BashOperator( - task_id="cleanup_logs", - bash_command=cleanup_command, - dag=dag, -) - -cleanup_task +cleanup_task() diff --git a/dags/clean/utils.py b/dags/clean/utils.py new file mode 100644 index 00000000..3f98a75b --- /dev/null +++ b/dags/clean/utils.py @@ -0,0 +1,15 @@ + +def cleanup_command(logs_dir): + return f""" + logs_dir="{logs_dir}" + current_date=$(date "+%Y-%m-%dT%H:%M:%S.%N%z") + + find "$logs_dir" -type d -path "*/run_id=*" -exec sh -c ' + run_id=$(dirname "$1" | grep -oP "run_id=\K[^/]+") + task_id=$(dirname "$1" | grep -oP "task_id=\K[^/]+") + if [ $(date -d "$run_id" "+%s") -lt $(date -d "$2" "+%s" --date="-30 days") ]; then + echo "Deleting: $1" + rm -r "$1" + fi + ' _ {{}} "$current_date" \; +""" diff --git a/tests/units/clean/test_clean.py b/tests/units/clean/test_clean.py index 64a5f785..eca3d4fb 100644 --- a/tests/units/clean/test_clean.py +++ b/tests/units/clean/test_clean.py @@ -1,20 +1,21 @@ -import os -import subprocess import datetime import logging +import os +import subprocess from unittest import mock + +from clean.utils import cleanup_command from freezegun import freeze_time from pytest import fixture -from clean.clean import cleanup_command @fixture @freeze_time("2023-10-20") def create_log(tmpdir): logs_date = datetime.datetime.now() - log_path = tmpdir.join(f'dag_id=test/run_id=test__{logs_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")}/task_id=test_task/attempt=1.log') - - # Create parent directories + log_path = tmpdir.join( + f'logs/dag_id=test/run_id=test__{logs_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")}/task_id=test_task/attempt=1.log' + ) log_path.dirpath().ensure_dir() try: @@ -22,12 +23,12 @@ def create_log(tmpdir): f.write("test log") return log_path except: - logging.error('Failed file creation for tests') + logging.error("Failed file creation for tests") pass + @freeze_time("2023-10-20") -@mock.patch.dict(os.environ, {"LOG_DIR": "tests/units/clean/logs"}) def test_clean_up_command(create_log, tmpdir): - logs_dir = tmpdir.mkdir("logs") - subprocess.run(cleanup_command, shell=True) + logs_dir = tmpdir.join("logs") + subprocess.run(cleanup_command(logs_dir), shell=True) assert not os.path.exists(create_log)