Skip to content

Commit

Permalink
Clean logs: delete old logs
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Nov 30, 2023
1 parent bf3f848 commit d2f9e84
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 6 deletions.
24 changes: 24 additions & 0 deletions dags/clean/cleanup_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import os

import pendulum
from airflow.decorators import dag
from airflow.operators.bash import BashOperator
from airflow.operators.bash_operator import BashOperator

AIRFLOW_HOME = os.getenv("AIRFLOW_HOME")
LOG_DIR = "logs"
logs_dir = f"{AIRFLOW_HOME}/{LOG_DIR}"


@dag(start_date=pendulum.today("UTC").add(days=-1), schedule="@monthly", catchup=False)
def cleanup_logs():
BashOperator(
task_id="cleanup_logs",
bash_command=f"""
logs_dir="{logs_dir}"
find "$logs_dir" -type d -mtime +30 -exec rm -r {{}} \; 2>/dev/null
""",
)


cleanup_logs()
5 changes: 3 additions & 2 deletions dags/common/pull_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os
import tarfile
import zipfile
from datetime import datetime
from datetime import datetime, timezone

from airflow.api.common import trigger_dag
from common.ftp_service import FTPService
Expand Down Expand Up @@ -186,4 +186,5 @@ def trigger_file_processing(


def _generate_id(publisher: str):
return datetime.utcnow().strftime(f"{publisher}_%Y-%m-%dT%H:%M:%S.%f")
logs_date = datetime.utcnow().astimezone(timezone.utc)
return f'{publisher}__{logs_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")}'
4 changes: 2 additions & 2 deletions dags/elsevier/metadata_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ def _get_license(self, article):
def _get_local_files(self, article):
if self.file_path.endswith("A.tar"):
self.file_path = self.file_path.replace("A.tar", "")
if self.file_path.endswith(".zip"):
if self.file_path.endswith(".zip"):
self.file_path = self.file_path.replace(".zip", "")
if self.file_path.startswith("raw"):
if self.file_path.startswith("raw"):
self.file_path = self.file_path.replace("raw/", "")

pdf_file_path = os.path.join(
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/springer/test_dag_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_dag_loaded(dag: DAG):
@pytest.mark.skip(reason="It does not test anything.")
def test_dag_run(dag: DAG, dag_was_paused: bool, article: ET):
dag_run_id = datetime.datetime.utcnow().strftime(
"test_springer_dag_process_file_%Y-%m-%dT%H:%M:%S.%f"
"test_springer_dag_process_file_%Y-%m-%dT%H:%M:%S.%f%z"
)
if dag.get_is_paused():
DagModel.get_dagmodel(dag.dag_id).set_is_paused(is_paused=False)
Expand All @@ -82,7 +82,7 @@ def test_dag_run_no_input_file(dag: DAG, dag_was_paused: bool):
if dag.get_is_paused():
DagModel.get_dagmodel(dag.dag_id).set_is_paused(is_paused=False)
dag_run_id = datetime.datetime.utcnow().strftime(
"test_springer_dag_process_file_%Y-%m-%dT%H:%M:%S.%f"
"test_springer_dag_process_file_%Y-%m-%dT%H:%M:%S.%f%z"
)
dagrun = dag.create_dagrun(DagRunState.QUEUED, run_id=dag_run_id)
wait().at_most(60, SECOND).until(
Expand Down
39 changes: 39 additions & 0 deletions tests/units/clean/test_clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os
from datetime import datetime, timezone

from airflow import DAG
from airflow.models import DagBag
from freezegun import freeze_time
from pytest import fixture


@fixture
def dag():
dagbag = DagBag(dag_folder="dags/", include_examples=False)
assert dagbag.import_errors.get(f"dags/cleanup_logs.py") is None
clean_dag = dagbag.get_dag(dag_id="cleanup_logs")
return clean_dag


def test_dag_loaded(dag: DAG):
assert dag is not None
assert len(dag.tasks) == 1


@freeze_time("2023-09-20")
@fixture
def old_temp_dir(tmpdir, tmp_path):
logs_date = datetime.utcnow().astimezone(timezone.utc)
log_path = tmpdir.join(
f'logs/dag_id=test/run_id=test__{logs_date.strftime("%Y-%m-%dT%H:%M:%S.%f%z")}/task_id=test_task/attempt=1.log'
)
log_path.dirpath().ensure_dir()
yield log_path


@freeze_time("2023-09-20")
def test_clean_up_command(dag, old_temp_dir, monkeypatch):
monkeypatch.setenv("AIRFLOW_HOME", old_temp_dir)
dag.clear()
dag.test()
assert not os.path.exists(old_temp_dir)

0 comments on commit d2f9e84

Please sign in to comment.