From de81ca10294a5c2854ae152709f4cce9ab34bfa8 Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Mon, 19 Aug 2024 14:45:36 +0200 Subject: [PATCH] DAGs: fixes * Inspire matomo record update fix. * Annual Reports scheduler date fix to every 15 days. --- dags/annual_reports/annual_reports_categories.py | 2 +- dags/annual_reports/anuual_reports_publications.py | 5 ++++- dags/inspire_matomo/inspire_visits_per_day_dag.py | 13 +++++-------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/dags/annual_reports/annual_reports_categories.py b/dags/annual_reports/annual_reports_categories.py index d48fb86..0291b42 100644 --- a/dags/annual_reports/annual_reports_categories.py +++ b/dags/annual_reports/annual_reports_categories.py @@ -17,7 +17,7 @@ @dag( start_date=pendulum.today("UTC").add(days=-1), - schedule="@monthly", + schedule="0 0 */15 * *", ) def annual_reports_categories_dag(): @task(executor_config=kubernetes_executor_config) diff --git a/dags/annual_reports/anuual_reports_publications.py b/dags/annual_reports/anuual_reports_publications.py index 9a84183..bb97ce8 100644 --- a/dags/annual_reports/anuual_reports_publications.py +++ b/dags/annual_reports/anuual_reports_publications.py @@ -15,7 +15,10 @@ years = list(range(2004, current_year + 1)) -@dag(start_date=pendulum.today("UTC").add(days=-1), schedule="@monthly") +@dag( + start_date=pendulum.today("UTC").add(days=-1), + schedule="0 0 */15 * *", +) def annual_reports_publications_dag(): @task(executor_config=kubernetes_executor_config) def fetch_publication_report_count(year, **kwargs): diff --git a/dags/inspire_matomo/inspire_visits_per_day_dag.py b/dags/inspire_matomo/inspire_visits_per_day_dag.py index 48866ce..13a017d 100644 --- a/dags/inspire_matomo/inspire_visits_per_day_dag.py +++ b/dags/inspire_matomo/inspire_visits_per_day_dag.py @@ -64,20 +64,17 @@ def fetch_unique_visitors_per_day(**kwargs): @sqlalchemy_task(conn_id="superset") def populate_database(visits_per_day, unique_visitors_per_day, session, **kwargs): + print(json.loads(visits_per_day)) visits_per_day_json = json.loads(visits_per_day) unique_visitors_per_day_json = json.loads(unique_visitors_per_day) date = kwargs["params"].get("date") + parsed_date = datetime.strptime(date, "%Y-%m-%d").date() - record = ( - session.query(MatomoData) - .filter_by(date=visits_per_day_json.get("date")) - .first() - ) + record = session.query(MatomoData).filter_by(date=parsed_date).first() if record: - record.visits = int(visits_per_day_json.visits) - record.unique_visitors = int(unique_visitors_per_day_json.unique_visitors) + record.visits = (int(visits_per_day_json.get("value")),) + record.unique_visitors = (int(unique_visitors_per_day_json.get("value")),) else: - parsed_date = datetime.strptime(date, "%Y-%m-%d").date() new_record = MatomoData( visits=int(visits_per_day_json.get("value")), unique_visitors=int(unique_visitors_per_day_json.get("value")),