diff --git a/dags/clean/cleanup_logs.py b/dags/clean/cleanup_logs.py new file mode 100644 index 00000000..c9e5229d --- /dev/null +++ b/dags/clean/cleanup_logs.py @@ -0,0 +1,24 @@ +import os + +import pendulum +from airflow.decorators import dag +from airflow.operators.bash import BashOperator +from airflow.operators.bash_operator import BashOperator + +AIRFLOW_HOME = os.getenv("AIRFLOW_HOME") +LOG_DIR = "logs" +logs_dir = f"{AIRFLOW_HOME}/{LOG_DIR}" + + +@dag(start_date=pendulum.today("UTC").add(days=-1), schedule="@monthly", catchup=False) +def cleanup_logs(): + BashOperator( + task_id="cleanup_logs", + bash_command=f""" + logs_dir="{logs_dir}" + find "$logs_dir" -type d -mtime +30 -exec rm -r {{}} \; 2>/dev/null + """, + ) + + +cleanup_logs() diff --git a/dags/common/pull_ftp.py b/dags/common/pull_ftp.py index b687296c..b60ad9f5 100644 --- a/dags/common/pull_ftp.py +++ b/dags/common/pull_ftp.py @@ -3,7 +3,7 @@ import os import tarfile import zipfile -from datetime import datetime +from datetime import datetime, timezone from airflow.api.common import trigger_dag from common.ftp_service import FTPService @@ -186,4 +186,5 @@ def trigger_file_processing( def _generate_id(publisher: str): - return datetime.utcnow().strftime(f"{publisher}_%Y-%m-%dT%H:%M:%S.%f") + logs_date = datetime.utcnow().astimezone(timezone.utc) + return f'{publisher}__{logs_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")}' diff --git a/dags/elsevier/metadata_parser.py b/dags/elsevier/metadata_parser.py index fe112be3..7d40c8ad 100644 --- a/dags/elsevier/metadata_parser.py +++ b/dags/elsevier/metadata_parser.py @@ -117,9 +117,9 @@ def _get_license(self, article): def _get_local_files(self, article): if self.file_path.endswith("A.tar"): self.file_path = self.file_path.replace("A.tar", "") - if self.file_path.endswith(".zip"): + if self.file_path.endswith(".zip"): self.file_path = self.file_path.replace(".zip", "") - if self.file_path.startswith("raw"): + if self.file_path.startswith("raw"): self.file_path = self.file_path.replace("raw/", "") pdf_file_path = os.path.join( diff --git a/tests/integration/springer/test_dag_process_file.py b/tests/integration/springer/test_dag_process_file.py index b50111e5..40c01e4c 100644 --- a/tests/integration/springer/test_dag_process_file.py +++ b/tests/integration/springer/test_dag_process_file.py @@ -61,7 +61,7 @@ def test_dag_loaded(dag: DAG): @pytest.mark.skip(reason="It does not test anything.") def test_dag_run(dag: DAG, dag_was_paused: bool, article: ET): dag_run_id = datetime.datetime.utcnow().strftime( - "test_springer_dag_process_file_%Y-%m-%dT%H:%M:%S.%f" + "test_springer_dag_process_file_%Y-%m-%dT%H:%M:%S.%f%z" ) if dag.get_is_paused(): DagModel.get_dagmodel(dag.dag_id).set_is_paused(is_paused=False) @@ -82,7 +82,7 @@ def test_dag_run_no_input_file(dag: DAG, dag_was_paused: bool): if dag.get_is_paused(): DagModel.get_dagmodel(dag.dag_id).set_is_paused(is_paused=False) dag_run_id = datetime.datetime.utcnow().strftime( - "test_springer_dag_process_file_%Y-%m-%dT%H:%M:%S.%f" + "test_springer_dag_process_file_%Y-%m-%dT%H:%M:%S.%f%z" ) dagrun = dag.create_dagrun(DagRunState.QUEUED, run_id=dag_run_id) wait().at_most(60, SECOND).until( diff --git a/tests/units/clean/test_clean.py b/tests/units/clean/test_clean.py new file mode 100644 index 00000000..c701141e --- /dev/null +++ b/tests/units/clean/test_clean.py @@ -0,0 +1,39 @@ +import os +from datetime import datetime, timezone + +from airflow import DAG +from airflow.models import DagBag +from freezegun import freeze_time +from pytest import fixture + + +@fixture +def dag(): + dagbag = DagBag(dag_folder="dags/", include_examples=False) + assert dagbag.import_errors.get(f"dags/cleanup_logs.py") is None + clean_dag = dagbag.get_dag(dag_id="cleanup_logs") + return clean_dag + + +def test_dag_loaded(dag: DAG): + assert dag is not None + assert len(dag.tasks) == 1 + + +@freeze_time("2023-09-20") +@fixture +def old_temp_dir(tmpdir, tmp_path): + logs_date = datetime.utcnow().astimezone(timezone.utc) + log_path = tmpdir.join( + f'logs/dag_id=test/run_id=test__{logs_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")}/task_id=test_task/attempt=1.log' + ) + log_path.dirpath().ensure_dir() + yield log_path + + +@freeze_time("2023-09-20") +def test_clean_up_command(dag, old_temp_dir, monkeypatch): + monkeypatch.setenv("AIRFLOW_HOME", old_temp_dir) + dag.clear() + dag.test() + assert not os.path.exists(old_temp_dir)