From d5f29dfd923ff16e69976d2559249535288e55da Mon Sep 17 00:00:00 2001 From: ErnestaP Date: Thu, 15 Aug 2024 17:22:01 +0200 Subject: [PATCH] Inspire Matomo: visits per day DAG --- README.md | 2 +- .../models/inspire_matomo/inspire_matomo.py | 15 +++ .../inspire_visits_per_day_dag.py | 96 +++++++++++++++++++ dags/inspire_matomo/utils.py | 22 +++++ ...55_database_revision_for_insipre_matomo.py | 33 +++++++ 5 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 dags/common/models/inspire_matomo/inspire_matomo.py create mode 100644 dags/inspire_matomo/inspire_visits_per_day_dag.py create mode 100644 dags/inspire_matomo/utils.py create mode 100644 dags/migrations/versions/fc43c200a255_database_revision_for_insipre_matomo.py diff --git a/README.md b/README.md index 4f6eec5..afa2dcd 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ airflow standalone ### 6. Start Postgres with Docker Compose If you're using Docker to manage your Postgres database, start the service. -IMPORTANT: Please add CDS_TOKEN value in Docker compose file +IMPORTANT: Please add CDS_TOKEN and MATOMO_AUTH_TOKEN value in Docker compose file ```sh docker-compose -f docker-compose.standalone.yaml up diff --git a/dags/common/models/inspire_matomo/inspire_matomo.py b/dags/common/models/inspire_matomo/inspire_matomo.py new file mode 100644 index 0000000..e23162f --- /dev/null +++ b/dags/common/models/inspire_matomo/inspire_matomo.py @@ -0,0 +1,15 @@ +from sqlalchemy import Column, Date, DateTime, Integer, func +from sqlalchemy.ext.declarative import declarative_base + +Base = declarative_base() + + +class MatomoData(Base): + __tablename__ = "inspire_matomo_data" + + id = Column(Integer, primary_key=True) + date = Column(Date) + visits = Column(Integer) + unique_visitors = Column(Integer) + created_at = Column(DateTime, default=func.now()) + updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) diff --git a/dags/inspire_matomo/inspire_visits_per_day_dag.py b/dags/inspire_matomo/inspire_visits_per_day_dag.py new file mode 100644 index 0000000..dee016b --- /dev/null +++ b/dags/inspire_matomo/inspire_visits_per_day_dag.py @@ -0,0 +1,96 @@ +import json +import os +from datetime import datetime + +import pendulum +from airflow.decorators import dag, task +from airflow.exceptions import AirflowException +from airflow.providers.http.hooks.http import HttpHook +from common.models.inspire_matomo.inspire_matomo import MatomoData +from common.operators.sqlalchemy_operator import sqlalchemy_task +from executor_config import kubernetes_executor_config +from inspire_matomo.utils import get_parameters +from tenacity import retry_if_exception_type, stop_after_attempt + +now = datetime.now() +formatted_date = now.strftime("%Y-%m-%d") + + +@dag( + start_date=pendulum.today("UTC").add(days=-1), + schedule="@monthly", + params={"period": "day", "date": formatted_date}, +) +def inspire_visits_per_day_dag(): + @task(executor_config=kubernetes_executor_config) + def fetch_visits_per_day(**kwargs): + http_hook = HttpHook(http_conn_id="matomo", method="GET") + period = kwargs["params"].get("period") + date = kwargs["params"].get("date") + parameters = get_parameters( + period=period, date=date, endpoint_key="visits_per_day" + ) + token = os.environ.get("MATOMO_AUTH_TOKEN") + response = http_hook.run_with_advanced_retry( + endpoint="/index.php", + data=parameters, + headers={"Authorization": f"Bearer {token}"}, + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) + return response.text + + @task(executor_config=kubernetes_executor_config) + def fetch_unique_visitors_per_day(**kwargs): + http_hook = HttpHook(http_conn_id="matomo", method="GET") + period = kwargs["params"].get("period") + date = kwargs["params"].get("date") + parameters = get_parameters( + period=period, date=date, endpoint_key="unique_visitors" + ) + token = os.environ.get("MATOMO_AUTH_TOKEN") + response = http_hook.run_with_advanced_retry( + endpoint="/index.php", + data=parameters, + headers={"Authorization": f"Bearer {token}"}, + _retry_args={ + "stop": stop_after_attempt(3), + "retry": retry_if_exception_type(AirflowException), + }, + ) + return response.text + + @sqlalchemy_task(conn_id="superset") + def populate_database(visits_per_day, unique_visitors_per_day, session, **kwargs): + visits_per_day_json = json.loads(visits_per_day) + unique_visitors_per_day_json = json.loads(unique_visitors_per_day) + date = kwargs["params"].get("date") + + record = ( + session.query(MatomoData) + .filter_by(date=visits_per_day_json.get("date")) + .first() + ) + if record: + record.visits = int(visits_per_day_json.visits) + record.unique_visitors = int(unique_visitors_per_day_json.unique_visitors) + else: + parsed_date = datetime.strptime(date, "%Y-%m-%d").date() + new_record = MatomoData( + visits=int(visits_per_day_json.get("value")), + unique_visitors=int(unique_visitors_per_day_json.get("value")), + date=parsed_date, + ) + session.add(new_record) + + visits_per_day = fetch_visits_per_day() + unique_visitors_per_day = fetch_unique_visitors_per_day() + populate_database( + visits_per_day=visits_per_day, + unique_visitors_per_day=unique_visitors_per_day, + ) + + +inspire_visits_per_day = inspire_visits_per_day_dag() diff --git a/dags/inspire_matomo/utils.py b/dags/inspire_matomo/utils.py new file mode 100644 index 0000000..40eca3c --- /dev/null +++ b/dags/inspire_matomo/utils.py @@ -0,0 +1,22 @@ +import os + + +def get_endpoint(key): + end_point_map = { + "visits_per_day": "VisitsSummary.getVisits", + "unique_visitors": "VisitsSummary.getUniqueVisitors", + } + return end_point_map[key] + + +def get_parameters(period, date, endpoint_key): + return { + "module": "API", + "token_auth": os.environ.get("MATOMO_AUTH_TOKEN"), + "idSite": os.environ.get("MATOMO_SITE_ID"), + "date": str(date), + "period": period, + "format": "json", + "module": "API", + "method": get_endpoint(endpoint_key), + } diff --git a/dags/migrations/versions/fc43c200a255_database_revision_for_insipre_matomo.py b/dags/migrations/versions/fc43c200a255_database_revision_for_insipre_matomo.py new file mode 100644 index 0000000..48e0a57 --- /dev/null +++ b/dags/migrations/versions/fc43c200a255_database_revision_for_insipre_matomo.py @@ -0,0 +1,33 @@ +"""Database revision for Insipre Matomo + +Revision ID: fc43c200a255 +Revises: db8ba01db969 +Create Date: 2024-08-15 17:20:55.998545 + +""" +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "fc43c200a255" +down_revision: Union[str, None] = "db8ba01db969" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "inspire_matomo_data", + sa.Column("id", sa.Integer, primary_key=True), + sa.Column("date", sa.Date, nullable=False), + sa.Column("visits", sa.Integer, nullable=False), + sa.Column("unique_visitors", sa.Integer, nullable=False), + sa.Column("created_at", sa.TIMESTAMP(timezone=True), nullable=False), + sa.Column("updated_at", sa.TIMESTAMP(timezone=True), nullable=False), + ) + + +def downgrade() -> None: + op.drop_table("inspire_matomo_data")