Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Nov 29, 2023
1 parent 06e37dc commit fbe1ee0
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 44 deletions.
39 changes: 5 additions & 34 deletions dags/clean/clean.py
Original file line number Diff line number Diff line change
@@ -1,48 +1,19 @@
import os

import pendulum
from airflow import DAG
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from airflow.operators.bash_operator import BashOperator
from clean.utils import cleanup_command

AIRFLOW_HOME = os.getenv("AIRFLOW_HOME")
LOG_DIR = "logs"
logs_dir = f"{AIRFLOW_HOME}/{LOG_DIR}"

default_args = {
"owner": "airflow",
"start_date": pendulum.today("UTC").add(days=-1),
"schedule": "@monthly",
"catchup": False,
"max_active_runs": 1,
}

dag = DAG(
"cleanup_logs",
default_args=default_args,
description="DAG to clean up old log directories",
schedule_interval="@monthly",
)
@dag(start_date=pendulum.today("UTC").add(days=-1), schedule="@monthly", catchup=False)
def cleanup_task():
BashOperator(task_id="cleanup_logs", bash_command=cleanup_command)

cleanup_command = f"""
logs_dir="{logs_dir}"
current_date=$(date "+%Y-%m-%dT%H:%M:%S.%N%z")

find "$logs_dir" -type d -path "*/run_id=*" -exec sh -c '
run_id=$(dirname "$1" | grep -oP "run_id=\K[^/]+")
task_id=$(dirname "$1" | grep -oP "task_id=\K[^/]+")
if [ $(date -d "$run_id" "+%s") -lt $(date -d "$2" "+%s" --date="-30 days") ]; then
echo "Deleting: $1"
rm -r "$1"
fi
' _ {{}} "$current_date" \;
"""

cleanup_task = BashOperator(
task_id="cleanup_logs",
bash_command=cleanup_command,
dag=dag,
)

cleanup_task
cleanup_task()
15 changes: 15 additions & 0 deletions dags/clean/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@

def cleanup_command(logs_dir):
return f"""
logs_dir="{logs_dir}"
current_date=$(date "+%Y-%m-%dT%H:%M:%S.%N%z")
find "$logs_dir" -type d -path "*/run_id=*" -exec sh -c '
run_id=$(dirname "$1" | grep -oP "run_id=\K[^/]+")
task_id=$(dirname "$1" | grep -oP "task_id=\K[^/]+")
if [ $(date -d "$run_id" "+%s") -lt $(date -d "$2" "+%s" --date="-30 days") ]; then
echo "Deleting: $1"
rm -r "$1"
fi
' _ {{}} "$current_date" \;
"""
21 changes: 11 additions & 10 deletions tests/units/clean/test_clean.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
import os
import subprocess
import datetime
import logging
import os
import subprocess
from unittest import mock

from clean.utils import cleanup_command
from freezegun import freeze_time
from pytest import fixture

from clean.clean import cleanup_command

@fixture
@freeze_time("2023-10-20")
def create_log(tmpdir):
logs_date = datetime.datetime.now()
log_path = tmpdir.join(f'dag_id=test/run_id=test__{logs_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")}/task_id=test_task/attempt=1.log')

# Create parent directories
log_path = tmpdir.join(
f'logs/dag_id=test/run_id=test__{logs_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")}/task_id=test_task/attempt=1.log'
)
log_path.dirpath().ensure_dir()

try:
with open(log_path, "w") as f:
f.write("test log")
return log_path
except:
logging.error('Failed file creation for tests')
logging.error("Failed file creation for tests")
pass


@freeze_time("2023-10-20")
@mock.patch.dict(os.environ, {"LOG_DIR": "tests/units/clean/logs"})
def test_clean_up_command(create_log, tmpdir):
logs_dir = tmpdir.mkdir("logs")
subprocess.run(cleanup_command, shell=True)
logs_dir = tmpdir.join("logs")
subprocess.run(cleanup_command(logs_dir), shell=True)
assert not os.path.exists(create_log)

0 comments on commit fbe1ee0

Please sign in to comment.